|
| 1 | +.. _pg_copyman: |
| 2 | + |
| 3 | +*************** |
| 4 | +Copy Management |
| 5 | +*************** |
| 6 | + |
| 7 | +The `postgresql.copyman` module provides a way to quickly move COPY data coming |
| 8 | +from one connection to many connections. Alternatively, it can also be sourced |
| 9 | +by arbitrary iterators and target arbitrary callables. |
| 10 | + |
| 11 | +Statement execution methods offer a way for running COPY operations |
| 12 | +with iterators, but the cost of allocating objects for each row is too |
| 13 | +significant for transferring gigabytes of COPY data from one connection to |
| 14 | +another. The interfaces available on statement objects are primarily intended to |
| 15 | +be used when transferring COPY data to and from arbitrary Python |
| 16 | +interfaces. |
| 17 | + |
| 18 | +Direct connection-to-connection COPY operations can be performed using the |
| 19 | +high-level `postgresql.copyman.COPY` function:: |
| 20 | + |
| 21 | + >>> from postgresql.copyman import COPY |
| 22 | + >>> send_stmt = source.prepare("COPY (SELECT i FROM generate_series(1, 1000000) AS g(i)) TO STDOUT") |
| 23 | + >>> destination.execute("CREATE TEMP TABLE loading_table (i int8)") |
| 24 | + >>> receive_stmt = destination.prepare("COPY loading_table FROM STDIN") |
| 25 | + >>> total_rows, total_bytes = COPY(send_stmt, receive_stmt) |
| 26 | + |
| 27 | +However, if more control is needed, the `postgresql.copyman.CopyManager` class |
| 28 | +should be used directly. |
| 29 | + |
| 30 | + |
| 31 | +Managers |
| 32 | +======== |
| 33 | + |
| 34 | +The `postgresql.copyman.CopyManager` class manages the Producer and the |
| 35 | +Receivers involved in a COPY operation. Normally, |
| 36 | +`postgresql.copyman.StatementProducer` and |
| 37 | +`postgresql.copyman.StatementReceiver` instances. Naturally, a Producer is the |
| 38 | +object that produces the COPY data to be given to the manager's Receivers. |
| 39 | + |
| 40 | +Using a CopyManager directly means that there is a need for more control over |
| 41 | +the operation. The Manager is both a context manager and an iterator. The |
| 42 | +context manager interfaces handle initialization and finalization, and the |
| 43 | +iterator provides an event loop emitting information about the amount of |
| 44 | +COPY data copied this cycle. Normal usage takes the form:: |
| 45 | + |
| 46 | + >>> from postgresql import copyman |
| 47 | + >>> send_stmt = source.prepare("COPY (SELECT i FROM generate_series(1, 1000000) AS g(i)) TO STDOUT") |
| 48 | + >>> destination.execute("CREATE TEMP TABLE loading_table (i int8)") |
| 49 | + >>> receive_stmt = destination.prepare("COPY loading_table FROM STDIN") |
| 50 | + >>> producer = copyman.StatementProducer(send_stmt) |
| 51 | + >>> receiver = copyman.StatementReceiver(receive_stmt) |
| 52 | + >>> |
| 53 | + >>> with source.xact(), destination.xact(): |
| 54 | + ... with copyman.CopyManager(producer, receiver) as copy: |
| 55 | + ... for num_messages, num_bytes in copy: |
| 56 | + ... update_rate(num_bytes) |
| 57 | + |
| 58 | +The use of the context manager is necessary for ensuring that connection state |
| 59 | +is properly restored at the end of the COPY. |
| 60 | + |
| 61 | +As an alternative to a for-loop inside a with-statement block, the `run` method |
| 62 | +can be called to perform the operation:: |
| 63 | + |
| 64 | + >>> with source.xact(), destination.xact(): |
| 65 | + ... copyman.CopyManager(producer, receiver).run() |
| 66 | + |
| 67 | + |
| 68 | +CopyManager Interface Points |
| 69 | +---------------------------- |
| 70 | + |
| 71 | +Primarily, the `postgresql.copyman.CopyManager` provides a context manager and |
| 72 | +an iterator for controlling the COPY operation. |
| 73 | + |
| 74 | + ``CopyManager.run()`` |
| 75 | + Perform the entire COPY operation. |
| 76 | + |
| 77 | + ``CopyManager.__enter__()`` |
| 78 | + Start the COPY operation. Connections taking part in the COPY should **not** |
| 79 | + be used until ``__exit__`` is ran. |
| 80 | + |
| 81 | + ``CopyManager.__exit__(typ, val, tb)`` |
| 82 | + Finish, abort, or fail the COPY operation. Aborts in the case of an incomplete |
| 83 | + COPY or an unidentified exception, and fails in the case of an untrapped |
| 84 | + fault. |
| 85 | + |
| 86 | + ``CopyManager.__iter__()`` |
| 87 | + Returns the CopyManager instance. |
| 88 | + |
| 89 | + ``CopyManager.__next__()`` |
| 90 | + Transfer the next chunk of COPY data to the receivers. Yields a tuple |
| 91 | + consisting of the number of messages and bytes transferred. Raises |
| 92 | + `StopIteration` when complete. |
| 93 | + |
| 94 | + ``CopyManager.reconcile(faulted_receiver)`` |
| 95 | + Reconcile a faulted receiver. When a receiver faults, it will no longer |
| 96 | + be in the receiver set. This method is used to signal to the manager that the |
| 97 | + problem has been cleared up, and the receiver is again ready to receive. |
| 98 | + |
| 99 | + |
| 100 | +Faults |
| 101 | +====== |
| 102 | + |
| 103 | +The CopyManager generalizes some exceptions that occur during transfer. While |
| 104 | +inside the context manager, `postgresql.copyman.Fault` may be raised if a |
| 105 | +Receiver raises an exception. The Manager assumes the Fault is fatal to a |
| 106 | +Receiver, and immediately removes it from the set of target receivers. |
| 107 | +Additionally, if the Fault goes untrapped, the copy will be aborted. |
| 108 | + |
| 109 | +The Fault exception references the Manager that raised the exception, and the |
| 110 | +actual exceptions that occurred, associated with the Receiver that caused them:: |
| 111 | + |
| 112 | + >>> from postgresql import copyman |
| 113 | + >>> send_stmt = source.prepare("COPY (SELECT i FROM generate_series(1, 1000000) AS g(i)) TO STDOUT") |
| 114 | + >>> destination.execute("CREATE TEMP TABLE loading_table (i int8)") |
| 115 | + >>> receive_stmt = destination.prepare("COPY loading_table FROM STDIN") |
| 116 | + >>> producer = copyman.StatementProducer(send_stmt) |
| 117 | + >>> receiver = copyman.StatementReceiver(receive_stmt) |
| 118 | + >>> |
| 119 | + >>> with source.xact(), destination.xact(): |
| 120 | + ... with copyman.CopyManager(producer, receiver) as copy: |
| 121 | + ... while copy.receivers: |
| 122 | + ... try: |
| 123 | + ... for num_messages, num_bytes in copy: |
| 124 | + ... update_rate(num_bytes) |
| 125 | + ... except copyman.Fault as cf: |
| 126 | + ... original_exception = cf.faults[receiver] |
| 127 | + ... if unknown_failure(original_exception): |
| 128 | + ... ... |
| 129 | + ... raise |
| 130 | + |
| 131 | + |
| 132 | +Fault Properties |
| 133 | +---------------- |
| 134 | + |
| 135 | +The following attributes exist on `postgresql.copyman.Fault` instances: |
| 136 | + |
| 137 | + ``Fault.manager`` |
| 138 | + The `postgresql.copyman.CopyManager` instance that raised the exception; the |
| 139 | + same manager that caught the fault. |
| 140 | + |
| 141 | + ``Fault.faults`` |
| 142 | + A dictionary mapping the Receiver to the exception that occurred. The Manager |
| 143 | + will give processing to every Receiver, so only one Fault will occur per |
| 144 | + transfer cycle. |
| 145 | + |
| 146 | +Reconciliation |
| 147 | +-------------- |
| 148 | + |
| 149 | +When a Fault occurs, it is possible that it was not fatal. In such cases the |
| 150 | +`postgresql.copyman.CopyManager.reconcile` method can be used to reintroduce the |
| 151 | +Receiver to the Manager's set. That is, when a Fault occurs, the Manager |
| 152 | +immediately removes the Receiver so that the COPY operation can continue. |
| 153 | + |
| 154 | +Faults should be trapped from within the Manager's context:: |
| 155 | + |
| 156 | + >>> import socket |
| 157 | + >>> from postgresql import copyman |
| 158 | + >>> send_stmt = source.prepare("COPY (SELECT i FROM generate_series(1, 1000000) AS g(i)) TO STDOUT") |
| 159 | + >>> destination.execute("CREATE TEMP TABLE loading_table (i int8)") |
| 160 | + >>> receive_stmt = destination.prepare("COPY loading_table FROM STDIN") |
| 161 | + >>> producer = copyman.StatementProducer(send_stmt) |
| 162 | + >>> receiver = copyman.StatementReceiver(receive_stmt) |
| 163 | + >>> |
| 164 | + >>> with source.xact(), destination.xact(): |
| 165 | + ... with copyman.CopyManager(producer, receiver) as copy: |
| 166 | + ... while copy.receivers: |
| 167 | + ... try: |
| 168 | + ... for num_messages, num_bytes in copy: |
| 169 | + ... update_rate(num_bytes) |
| 170 | + ... except copyman.Fault as cf: |
| 171 | + ... if isinstance(cf.faults[receiver], socket.timeout): |
| 172 | + ... copy.reconcile(receiver) |
| 173 | + ... else: |
| 174 | + ... raise |
| 175 | + |
| 176 | +Recovering from Faults does add significant complexity to a COPY operation, |
| 177 | +so, often, it's best to avoid conditions in which reconciliable Faults may |
| 178 | +occur. |
| 179 | + |
| 180 | + |
| 181 | +Terminology |
| 182 | +=========== |
| 183 | + |
| 184 | +The following terms are regularly used to describe the implementation and |
| 185 | +processes of the `postgresql.copyman` module: |
| 186 | + |
| 187 | + Manager |
| 188 | + The object used to manage data coming from a Producer and being given to the |
| 189 | + Receivers. It also manages the necessary initialization and finalization steps |
| 190 | + required by those factors. |
| 191 | + |
| 192 | + Producer |
| 193 | + The object used to produce the COPY data to be given to the Receivers. The |
| 194 | + source. |
| 195 | + |
| 196 | + Receiver |
| 197 | + An object that consumes COPY data. A target. |
| 198 | + |
| 199 | + Fault |
| 200 | + Specifically, the `postgresql.copyman.Fault` exception. A Fault is raised |
| 201 | + when a Receiver raises an exception. |
| 202 | + |
| 203 | + Reconciliation |
| 204 | + Generally, the steps performed by the "reconcile" method on |
| 205 | + `postgresql.copyman.CopyManager` instances. More precisely, the |
| 206 | + necessary steps for a Receiver's reintroduction into the COPY operation after |
| 207 | + a Fault. |
| 208 | + |
| 209 | + Realignment |
| 210 | + The process of providing compensating data to the receivers so that the |
| 211 | + connection will be on a message boundary. Occurs when the COPY operation is |
| 212 | + aborted. |
| 213 | + |
| 214 | + Aborted Copy |
| 215 | + An aborted copy is a COPY operation that terminated prematurely. This happens |
| 216 | + if a CopyManager's for-loop is terminated early by breaking or by an |
| 217 | + unidentified exception being raised. |
| 218 | + |
| 219 | + Failed Copy |
| 220 | + A failed copy is an aborted COPY operation that was |
| 221 | + *terminated due to a fault*, or a producer failure. |
0 commit comments