2020default_buffer_size = 1024 * 10
2121
2222class Fault (Exception ):
23+ pass
24+
25+ class ProducerFault (Fault ):
26+ """
27+ Exception raised when the Producer caused an exception.
28+
29+ Normally, Producer faults are fatal.
30+ """
31+ def __init__ (self , manager ):
32+ self .manager = manager
33+
34+ def __str__ (self ):
35+ return "producer raised exception"
36+
37+ class ReceiverFaults (Fault ):
2338 """
24- Receivers raised exceptions. This happens in cases where a receiver raises
25- an exception. Faults should be trapped if recovery from an exception is
39+ Exception raised when Receivers cause an exception.
40+
41+ Faults should be trapped if recovery from an exception is
2642 possible, or if the failed receiver is optional to the succes of the
2743 operation.
2844
@@ -46,13 +62,20 @@ class CopyFail(Exception):
4662
4763 The 'reason' attribute is a string indicating why it failed.
4864
49- The 'faults ' attribute is a mapping of receivers to exceptions that were
65+ The 'receiver_faults ' attribute is a mapping of receivers to exceptions that were
5066 raised on exit.
67+
68+ The 'producer_fault' attribute specifies if the producer raise an exception
69+ on exit.
5170 """
52- def __init__ (self , manager , reason = None , faults = None ):
71+ def __init__ (self , manager , reason = None ,
72+ receiver_faults = None ,
73+ producer_fault = None ,
74+ ):
5375 self .manager = manager
5476 self .reason = reason
55- self .faults = faults or {}
77+ self .receiver_faults = receiver_faults or {}
78+ self .producer_fault = producer_fault
5679
5780 def __str__ (self ):
5881 return self .reason or 'copy aborted'
@@ -683,27 +706,29 @@ def __exit__(self, typ, val, tb):
683706 # Don't recover on interrupts.
684707 return
685708
686- # Does nothing if the COPY was successful.
687- self .producer .realign ()
709+ profail = None
688710 try :
689- ##
690- # If the producer is not aligned to a message boundary,
691- # it can emit completion data that will put the receivers
692- # back on track.
693- # This last service call will move that data onto the receivers.
694- self ._service_producer ()
695- ##
696- # The receivers need to handle any new data in their __exit__.
697- except StopIteration :
698- # No re-alignment needed.
699- pass
711+ # Does nothing if the COPY was successful.
712+ self .producer .realign ()
713+ try :
714+ ##
715+ # If the producer is not aligned to a message boundary,
716+ # it can emit completion data that will put the receivers
717+ # back on track.
718+ # This last service call will move that data onto the receivers.
719+ self ._service_producer ()
720+ ##
721+ # The receivers need to handle any new data in their __exit__.
722+ except StopIteration :
723+ # No re-alignment needed.
724+ pass
700725
701- self .producer .__exit__ (typ , val , tb )
726+ self .producer .__exit__ (typ , val , tb )
727+ except Exception as profail :
728+ pass
702729
703730 # No receivers? It wasn't a success.
704731 if not self .receivers :
705- if typ is CopyFail :
706- raise
707732 raise CopyFail (self , "no receivers" )
708733
709734 exit_faults = {}
@@ -736,13 +761,15 @@ def _service_producer(self):
736761 # Setup current data.
737762 if not self .receivers :
738763 # No receivers to take the data.
739- raise CopyFail ( self , "no receivers" )
764+ raise StopIteration
740765
741766 try :
742767 nextdata = next (self .producer )
743768 except StopIteration :
744769 # Should be over.
745770 raise
771+ except Exception :
772+ raise ProducerFault (self )
746773
747774 self .transformer (nextdata )
748775
@@ -762,7 +789,7 @@ def _service_receivers(self):
762789 # The CopyManager is eager to continue the operation.
763790 for x in faults :
764791 self .receivers .discard (x )
765- raise Fault (self , faults )
792+ raise ReceiverFaults (self , faults )
766793
767794 # Run the COPY to completion.
768795 def run (self ):
0 commit comments