Skip to content

Commit 622e279

Browse files
author
James William Pye
committed
Add tests for reconciliation.
s/reconciliation/realignment/ in most cases. The terminology is now realignment deals with connection state, and reconciliation deals with recovering from [receiver] faults. Change load_chunks to use a SynchronizeMessage instead of a Flush; apparently in COPY failures, a sync is performed regardless, so xact3.Instruction won't be expecting that. Arguably, using a sync here is preferrable, but Flush had some nice properties when working outside of a transaction block. If one failed, they all failed. This was necessary in order to handle realignment in COPY fail cases.
1 parent 9b94cea commit 622e279

5 files changed

Lines changed: 227 additions & 27 deletions

File tree

postgresql/copyman.py

Lines changed: 105 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,30 @@ def __init__(self, manager, receivers):
5353
self.manager = manager
5454
self.receivers = receivers
5555

56+
def __str__(self):
57+
return "{0} receivers faulted".format(len(self.receivers))
58+
59+
class CopyFail(Fault):
60+
"""
61+
Exception thrown by the CopyManager when the COPY failed.
62+
"""
63+
def __init__(self, manager, reason, faults = None):
64+
self.manager = manager
65+
self.reason = reason
66+
self.faults = faults or {}
67+
68+
def __str__(self):
69+
return self.reason
70+
71+
class NoReceivers(CopyFail):
72+
"""
73+
Exception thrown by the CopyManager when the COPY failed due to all the
74+
receivers faulting out.
75+
"""
76+
reason = 'no receivers remained after fault'
77+
def __init__(self, manager):
78+
self.manager = manager
79+
5680
# The identifier for PQv3 copy data.
5781
PROTOCOL_PQv3 = "PQv3"
5882
# The identifier for iterables of copy data sequences.
@@ -98,7 +122,8 @@ def NoTransformation(arg):
98122
(PROTOCOL_PQv3, PROTOCOL_PQv3) : lambda: NoTransformation,
99123
}
100124

101-
# Used to manage the conversion of COPY data.
125+
# Used to manage the conversions of COPY data.
126+
# Notably, chunks -> PQv3 or PQv3 -> chunks.
102127
class CopyTransformer(object):
103128
__slots__ = ('current', 'transformers', 'get')
104129
def __init__(self, source_protocol, target_protocols):
@@ -256,8 +281,13 @@ def __init__(self):
256281
self.total_bytes = 0
257282

258283
@abstractmethod
259-
def reconcile(self):
284+
def realign(self):
260285
"""
286+
Method implemented by producers that emit COPY data that is not
287+
guaranteed to be aligned.
288+
289+
This is only necessary in failure cases where receivers still need more
290+
data to complete the message.
261291
"""
262292

263293
@abstractmethod
@@ -288,8 +318,8 @@ class NullProducer(Producer):
288318
_e_factors = ()
289319
protocol = PROTOCOL_NULL
290320

291-
def reconcile(self):
292-
# Never needs to reconcile.
321+
def realign(self):
322+
# Never needs to realigned.
293323
pass
294324

295325
def __next__(self):
@@ -304,8 +334,8 @@ def __init__(self, iterator):
304334
self.__next__ = self.iterator.__next__
305335
super().__init__()
306336

307-
def reconcile(self):
308-
# Never needs to reconcile; data is emitted on message boundaries.
337+
def realign(self):
338+
# Never needs to realign; data is emitted on message boundaries.
309339
pass
310340

311341
def __next__(self, next = next):
@@ -336,7 +366,7 @@ def recover(self, view):
336366
##
337367
# When a COPY is interrupted, this can be used to accommodate
338368
# the original state machine to identify the message boundaries.
339-
def reconcile(self):
369+
def realign(self):
340370
s = self._state
341371

342372
if s is None:
@@ -356,6 +386,8 @@ def reconcile(self):
356386
# Don't include the already sent parts.
357387
buf = header[len(self._state.size_fragment):]
358388
bodylen = ulong_unpack(header) - 4
389+
# This will often cause an invalid copy data error,
390+
# but it doesn't matter much because we will issue a copy fail.
359391
buf += b'\x00' * bodylen
360392
for_receivers = buf
361393
elif s.remaining_bytes > 0:
@@ -483,6 +515,12 @@ def __exit__(self, typ, val, tb):
483515
db = self.statement.database
484516
if not db.closed and self._chunks._xact is not None:
485517
db.interrupt()
518+
if db.pq.xact:
519+
try:
520+
db._pq_complete()
521+
except Exception:
522+
# Let the copy manager indicate the failure.
523+
pass
486524
super().__exit__(typ, val, tb)
487525

488526
class NullReceiver(Receiver):
@@ -538,7 +576,7 @@ def __init__(self, statement, *parameters):
538576
self.xact = None
539577
super().__init__(statement.database.pq.socket.send,)
540578

541-
# A bit of a hack...
579+
# XXX: A bit of a hack...
542580
# This is actually a good indication that statements need a .copy()
543581
# execution method for producing a "Copy" cursor that reads or writes.
544582
class WireReady(BaseException):
@@ -563,17 +601,31 @@ def __exit__(self, typ, val, tb):
563601
if self.xact is None:
564602
# Nothing to do.
565603
return super().__exit__(typ, val, tb)
604+
566605
if self.view:
567-
# The reconciled producer emitted the necessary
606+
# The realigned producer emitted the necessary
568607
# data for message boundary alignment.
608+
#
609+
# In this case, we unconditionally fail.
569610
pq = self.statement.database.pq
570-
pq.message_data = (pq.message_data or b'') + bytes(self.view)
611+
# There shouldn't be any message_data, atm.
612+
pq.message_data = bytes(self.view)
613+
self.statement.database._pq_complete()
614+
# It is possible for a non-alignment view to exist in cases of
615+
# faults. However, exit should *not* be called in those cases.
616+
##
571617
elif typ is None:
618+
# Success.
572619
self.xact.messages = self.xact.CopyDoneSequence
573620
self.statement.database._pq_complete()
621+
# Find the complete message for command and count.
574622
for x in self.xact.messages_received():
575623
if getattr(x, 'type', None) == Complete.type:
576624
self._complete_message = x
625+
elif issubclass(typ, Exception):
626+
# Likely raises.
627+
self.statement.database._pq_complete()
628+
577629
return super().__exit__(typ, val, tb)
578630

579631
def count(self):
@@ -645,31 +697,69 @@ def __enter__(self):
645697
return self
646698

647699
def __exit__(self, typ, val, tb):
700+
##
701+
# Exiting the CopyManager is a fairly complex operation.
702+
#
703+
# In cases of failure, re-alignment may need to happen
704+
# for when the receivers are not on message boundary.
705+
##
648706
if typ is not None and not issubclass(typ, Exception):
649707
# Don't recover on interrupts.
650708
return
651709

652-
self.producer.reconcile()
710+
# Does nothing if the COPY was successful.
711+
self.producer.realign()
653712
try:
713+
# If the producer is not aligned to a message boundary,
714+
# it can emit completion data that will put the receivers
715+
# back on track.
716+
# This last service call will move that data onto the receivers.
654717
self.service_producer()
655718
except StopIteration:
656-
# No [state] reconciliation needed.
719+
# No re-alignment needed.
657720
pass
658721

659-
# Now exit after reconciliation.
660722
self.producer.__exit__(typ, val, tb)
723+
724+
# No receivers? It wasn't a success.
725+
if not self.receivers:
726+
if typ is NoReceivers:
727+
raise
728+
raise NoReceivers(self)
729+
730+
exit_faults = {}
661731
for x in self.receivers:
662-
x.__exit__(typ, val, tb)
732+
try:
733+
x.__exit__(typ, val, tb)
734+
except Exception:
735+
exit_faults[x] = sys.exc_info()
736+
if exit_faults:
737+
raise CopyFail(self, "could not exit receivers", exit_faults)
738+
739+
if typ:
740+
raise CopyFail(self, "exception occurred during COPY operation")
741+
742+
def reconcile(self, r):
743+
"""
744+
Reconcile a receiver that faulted.
663745
664-
def reconcile_receiver(self, r):
746+
This method should be used to add back a receiver that failed to
747+
complete its write operation, but is capable of completing the
748+
operation at this time.
749+
"""
665750
if r.protocol not in self.protocols:
666751
raise RuntimeError("cannot add new receivers to copy operations")
667752
# XXX: Assumes that is did not fail during receive().
668753
r()
754+
# Okay, add it back.
669755
self.receivers.add(r)
670756

671757
def service_producer(self):
672758
# Setup current data.
759+
if not self.receivers:
760+
# No receivers to take the data.
761+
raise NoReceivers(self)
762+
673763
try:
674764
nextdata = next(self.producer)
675765
except StopIteration:

postgresql/driver/pq3.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1177,7 +1177,7 @@ def _load_copy_chunks(self, chunks):
11771177
(), (), (),
11781178
),
11791179
element.Execute(b'', 1),
1180-
element.FlushMessage,
1180+
element.SynchronizeMessage,
11811181
),
11821182
asynchook = self.database._receive_async
11831183
)

postgresql/protocol/xact3.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -490,10 +490,11 @@ def standard_put(self, messages,
490490
# No path for message type, could be a protocol error.
491491
if x[0] == ERROR_TYPE:
492492
em = ERROR_PARSE(x[1])
493+
# Is it fatal?
493494
self.fatal = fatal = em[b'S'].upper() != b'ERROR'
494495
self.error_message = em
495496
if fatal is True:
496-
# can't sync up if it's fatal.
497+
# Can't sync up if the session is closed.
497498
self.state = Complete
498499
return count
499500
# Error occurred, so sync up with backend if
@@ -502,6 +503,7 @@ def standard_put(self, messages,
502503
if cmd.type not in (
503504
element.Function.type, element.Query.type
504505
):
506+
# Adjust the offset forward until the Sync message is found.
505507
for offset in range(offset, NCOMMANDS):
506508
if COMMANDS[offset] is element.SynchronizeMessage:
507509
break
@@ -591,17 +593,23 @@ def standard_put(self, messages,
591593
# switched to an optimized processor.
592594
last = processed[-1]
593595
if last.__class__ is bytes:
596+
# Fast path for COPY data, 'd' messages.
594597
self.state = (Receiving, self.put_copydata)
595598
elif last.__class__ is tuple:
599+
# Fast path for Tuples, 'D' messages.
596600
self.state = (Receiving, self.put_tupledata)
597-
elif last.type == element.CopyToBegin.type:
598-
self.state = (Receiving, self.put_copydata)
599601
elif last.type == element.CopyFromBegin.type:
602+
# In this case, the commands that were sent past
603+
# message starting the COPY, need to be re-issued
604+
# once the COPY is complete. PG cleared its buffer.
600605
self.CopyFailSequence = (self.CopyFailMessage,) + \
601606
self.commands[offset+1:]
602607
self.CopyDoneSequence = (element.CopyDoneMessage,) + \
603608
self.commands[offset+1:]
604609
self.state = (Sending, self.sent_from_stdin)
610+
elif last.type == element.CopyToBegin.type:
611+
# Should be seeing COPY data soon.
612+
self.state = (Receiving, self.put_copydata)
605613
return count
606614

607615
def put_copydata(self, messages):

0 commit comments

Comments
 (0)