@@ -53,6 +53,30 @@ def __init__(self, manager, receivers):
5353 self .manager = manager
5454 self .receivers = receivers
5555
56+ def __str__ (self ):
57+ return "{0} receivers faulted" .format (len (self .receivers ))
58+
59+ class CopyFail (Fault ):
60+ """
61+ Exception thrown by the CopyManager when the COPY failed.
62+ """
63+ def __init__ (self , manager , reason , faults = None ):
64+ self .manager = manager
65+ self .reason = reason
66+ self .faults = faults or {}
67+
68+ def __str__ (self ):
69+ return self .reason
70+
71+ class NoReceivers (CopyFail ):
72+ """
73+ Exception thrown by the CopyManager when the COPY failed due to all the
74+ receivers faulting out.
75+ """
76+ reason = 'no receivers remained after fault'
77+ def __init__ (self , manager ):
78+ self .manager = manager
79+
5680# The identifier for PQv3 copy data.
5781PROTOCOL_PQv3 = "PQv3"
5882# The identifier for iterables of copy data sequences.
@@ -98,7 +122,8 @@ def NoTransformation(arg):
98122 (PROTOCOL_PQv3 , PROTOCOL_PQv3 ) : lambda : NoTransformation ,
99123}
100124
101- # Used to manage the conversion of COPY data.
125+ # Used to manage the conversions of COPY data.
126+ # Notably, chunks -> PQv3 or PQv3 -> chunks.
102127class CopyTransformer (object ):
103128 __slots__ = ('current' , 'transformers' , 'get' )
104129 def __init__ (self , source_protocol , target_protocols ):
@@ -256,8 +281,13 @@ def __init__(self):
256281 self .total_bytes = 0
257282
258283 @abstractmethod
259- def reconcile (self ):
284+ def realign (self ):
260285 """
286+ Method implemented by producers that emit COPY data that is not
287+ guaranteed to be aligned.
288+
289+ This is only necessary in failure cases where receivers still need more
290+ data to complete the message.
261291 """
262292
263293 @abstractmethod
@@ -288,8 +318,8 @@ class NullProducer(Producer):
288318 _e_factors = ()
289319 protocol = PROTOCOL_NULL
290320
291- def reconcile (self ):
292- # Never needs to reconcile .
321+ def realign (self ):
322+ # Never needs to realigned .
293323 pass
294324
295325 def __next__ (self ):
@@ -304,8 +334,8 @@ def __init__(self, iterator):
304334 self .__next__ = self .iterator .__next__
305335 super ().__init__ ()
306336
307- def reconcile (self ):
308- # Never needs to reconcile ; data is emitted on message boundaries.
337+ def realign (self ):
338+ # Never needs to realign ; data is emitted on message boundaries.
309339 pass
310340
311341 def __next__ (self , next = next ):
@@ -336,7 +366,7 @@ def recover(self, view):
336366 ##
337367 # When a COPY is interrupted, this can be used to accommodate
338368 # the original state machine to identify the message boundaries.
339- def reconcile (self ):
369+ def realign (self ):
340370 s = self ._state
341371
342372 if s is None :
@@ -356,6 +386,8 @@ def reconcile(self):
356386 # Don't include the already sent parts.
357387 buf = header [len (self ._state .size_fragment ):]
358388 bodylen = ulong_unpack (header ) - 4
389+ # This will often cause an invalid copy data error,
390+ # but it doesn't matter much because we will issue a copy fail.
359391 buf += b'\x00 ' * bodylen
360392 for_receivers = buf
361393 elif s .remaining_bytes > 0 :
@@ -483,6 +515,12 @@ def __exit__(self, typ, val, tb):
483515 db = self .statement .database
484516 if not db .closed and self ._chunks ._xact is not None :
485517 db .interrupt ()
518+ if db .pq .xact :
519+ try :
520+ db ._pq_complete ()
521+ except Exception :
522+ # Let the copy manager indicate the failure.
523+ pass
486524 super ().__exit__ (typ , val , tb )
487525
488526class NullReceiver (Receiver ):
@@ -538,7 +576,7 @@ def __init__(self, statement, *parameters):
538576 self .xact = None
539577 super ().__init__ (statement .database .pq .socket .send ,)
540578
541- # A bit of a hack...
579+ # XXX: A bit of a hack...
542580 # This is actually a good indication that statements need a .copy()
543581 # execution method for producing a "Copy" cursor that reads or writes.
544582 class WireReady (BaseException ):
@@ -563,17 +601,31 @@ def __exit__(self, typ, val, tb):
563601 if self .xact is None :
564602 # Nothing to do.
565603 return super ().__exit__ (typ , val , tb )
604+
566605 if self .view :
567- # The reconciled producer emitted the necessary
606+ # The realigned producer emitted the necessary
568607 # data for message boundary alignment.
608+ #
609+ # In this case, we unconditionally fail.
569610 pq = self .statement .database .pq
570- pq .message_data = (pq .message_data or b'' ) + bytes (self .view )
611+ # There shouldn't be any message_data, atm.
612+ pq .message_data = bytes (self .view )
613+ self .statement .database ._pq_complete ()
614+ # It is possible for a non-alignment view to exist in cases of
615+ # faults. However, exit should *not* be called in those cases.
616+ ##
571617 elif typ is None :
618+ # Success.
572619 self .xact .messages = self .xact .CopyDoneSequence
573620 self .statement .database ._pq_complete ()
621+ # Find the complete message for command and count.
574622 for x in self .xact .messages_received ():
575623 if getattr (x , 'type' , None ) == Complete .type :
576624 self ._complete_message = x
625+ elif issubclass (typ , Exception ):
626+ # Likely raises.
627+ self .statement .database ._pq_complete ()
628+
577629 return super ().__exit__ (typ , val , tb )
578630
579631 def count (self ):
@@ -645,31 +697,69 @@ def __enter__(self):
645697 return self
646698
647699 def __exit__ (self , typ , val , tb ):
700+ ##
701+ # Exiting the CopyManager is a fairly complex operation.
702+ #
703+ # In cases of failure, re-alignment may need to happen
704+ # for when the receivers are not on message boundary.
705+ ##
648706 if typ is not None and not issubclass (typ , Exception ):
649707 # Don't recover on interrupts.
650708 return
651709
652- self .producer .reconcile ()
710+ # Does nothing if the COPY was successful.
711+ self .producer .realign ()
653712 try :
713+ # If the producer is not aligned to a message boundary,
714+ # it can emit completion data that will put the receivers
715+ # back on track.
716+ # This last service call will move that data onto the receivers.
654717 self .service_producer ()
655718 except StopIteration :
656- # No [state] reconciliation needed.
719+ # No re-alignment needed.
657720 pass
658721
659- # Now exit after reconciliation.
660722 self .producer .__exit__ (typ , val , tb )
723+
724+ # No receivers? It wasn't a success.
725+ if not self .receivers :
726+ if typ is NoReceivers :
727+ raise
728+ raise NoReceivers (self )
729+
730+ exit_faults = {}
661731 for x in self .receivers :
662- x .__exit__ (typ , val , tb )
732+ try :
733+ x .__exit__ (typ , val , tb )
734+ except Exception :
735+ exit_faults [x ] = sys .exc_info ()
736+ if exit_faults :
737+ raise CopyFail (self , "could not exit receivers" , exit_faults )
738+
739+ if typ :
740+ raise CopyFail (self , "exception occurred during COPY operation" )
741+
742+ def reconcile (self , r ):
743+ """
744+ Reconcile a receiver that faulted.
663745
664- def reconcile_receiver (self , r ):
746+ This method should be used to add back a receiver that failed to
747+ complete its write operation, but is capable of completing the
748+ operation at this time.
749+ """
665750 if r .protocol not in self .protocols :
666751 raise RuntimeError ("cannot add new receivers to copy operations" )
667752 # XXX: Assumes that is did not fail during receive().
668753 r ()
754+ # Okay, add it back.
669755 self .receivers .add (r )
670756
671757 def service_producer (self ):
672758 # Setup current data.
759+ if not self .receivers :
760+ # No receivers to take the data.
761+ raise NoReceivers (self )
762+
673763 try :
674764 nextdata = next (self .producer )
675765 except StopIteration :
0 commit comments