Skip to content

Commit c1cb060

Browse files
author
James William Pye
committed
Add basic CopyManager documentation.
1 parent 67a7429 commit c1cb060

3 files changed

Lines changed: 229 additions & 0 deletions

File tree

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
##
2+
# .documentation.copyman
3+
##
4+
__doc__ = open(__file__[:__file__.rfind('.')] + '.txt').read()
5+
__docformat__ = 'reStructuredText'
6+
if __name__ == '__main__':
7+
help(__name__)
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
.. _pg_copyman:
2+
3+
***************
4+
Copy Management
5+
***************
6+
7+
The `postgresql.copyman` module provides a way to quickly move COPY data coming
8+
from one connection to many connections. Alternatively, it can also be sourced
9+
by arbitrary iterators and target arbitrary callables.
10+
11+
Statement execution methods offer a way for running COPY operations
12+
with iterators, but the cost of allocating objects for each row is too
13+
significant for transferring gigabytes of COPY data from one connection to
14+
another. The interfaces available on statement objects are primarily intended to
15+
be used when transferring COPY data to and from arbitrary Python
16+
interfaces.
17+
18+
Direct connection-to-connection COPY operations can be performed using the
19+
high-level `postgresql.copyman.COPY` function::
20+
21+
>>> from postgresql.copyman import COPY
22+
>>> send_stmt = source.prepare("COPY (SELECT i FROM generate_series(1, 1000000) AS g(i)) TO STDOUT")
23+
>>> destination.execute("CREATE TEMP TABLE loading_table (i int8)")
24+
>>> receive_stmt = destination.prepare("COPY loading_table FROM STDIN")
25+
>>> total_rows, total_bytes = COPY(send_stmt, receive_stmt)
26+
27+
However, if more control is needed, the `postgresql.copyman.CopyManager` class
28+
should be used directly.
29+
30+
31+
Managers
32+
========
33+
34+
The `postgresql.copyman.CopyManager` class manages the Producer and the
35+
Receivers involved in a COPY operation. Normally,
36+
`postgresql.copyman.StatementProducer` and
37+
`postgresql.copyman.StatementReceiver` instances. Naturally, a Producer is the
38+
object that produces the COPY data to be given to the manager's Receivers.
39+
40+
Using a CopyManager directly means that there is a need for more control over
41+
the operation. The Manager is both a context manager and an iterator. The
42+
context manager interfaces handle initialization and finalization, and the
43+
iterator provides an event loop emitting information about the amount of
44+
COPY data copied this cycle. Normal usage takes the form::
45+
46+
>>> from postgresql import copyman
47+
>>> send_stmt = source.prepare("COPY (SELECT i FROM generate_series(1, 1000000) AS g(i)) TO STDOUT")
48+
>>> destination.execute("CREATE TEMP TABLE loading_table (i int8)")
49+
>>> receive_stmt = destination.prepare("COPY loading_table FROM STDIN")
50+
>>> producer = copyman.StatementProducer(send_stmt)
51+
>>> receiver = copyman.StatementReceiver(receive_stmt)
52+
>>>
53+
>>> with source.xact(), destination.xact():
54+
... with copyman.CopyManager(producer, receiver) as copy:
55+
... for num_messages, num_bytes in copy:
56+
... update_rate(num_bytes)
57+
58+
The use of the context manager is necessary for ensuring that connection state
59+
is properly restored at the end of the COPY.
60+
61+
As an alternative to a for-loop inside a with-statement block, the `run` method
62+
can be called to perform the operation::
63+
64+
>>> with source.xact(), destination.xact():
65+
... copyman.CopyManager(producer, receiver).run()
66+
67+
68+
CopyManager Interface Points
69+
----------------------------
70+
71+
Primarily, the `postgresql.copyman.CopyManager` provides a context manager and
72+
an iterator for controlling the COPY operation.
73+
74+
``CopyManager.run()``
75+
Perform the entire COPY operation.
76+
77+
``CopyManager.__enter__()``
78+
Start the COPY operation. Connections taking part in the COPY should **not**
79+
be used until ``__exit__`` is ran.
80+
81+
``CopyManager.__exit__(typ, val, tb)``
82+
Finish, abort, or fail the COPY operation. Aborts in the case of an incomplete
83+
COPY or an unidentified exception, and fails in the case of an untrapped
84+
fault.
85+
86+
``CopyManager.__iter__()``
87+
Returns the CopyManager instance.
88+
89+
``CopyManager.__next__()``
90+
Transfer the next chunk of COPY data to the receivers. Yields a tuple
91+
consisting of the number of messages and bytes transferred. Raises
92+
`StopIteration` when complete.
93+
94+
``CopyManager.reconcile(faulted_receiver)``
95+
Reconcile a faulted receiver. When a receiver faults, it will no longer
96+
be in the receiver set. This method is used to signal to the manager that the
97+
problem has been cleared up, and the receiver is again ready to receive.
98+
99+
100+
Faults
101+
======
102+
103+
The CopyManager generalizes some exceptions that occur during transfer. While
104+
inside the context manager, `postgresql.copyman.Fault` may be raised if a
105+
Receiver raises an exception. The Manager assumes the Fault is fatal to a
106+
Receiver, and immediately removes it from the set of target receivers.
107+
Additionally, if the Fault goes untrapped, the copy will be aborted.
108+
109+
The Fault exception references the Manager that raised the exception, and the
110+
actual exceptions that occurred, associated with the Receiver that caused them::
111+
112+
>>> from postgresql import copyman
113+
>>> send_stmt = source.prepare("COPY (SELECT i FROM generate_series(1, 1000000) AS g(i)) TO STDOUT")
114+
>>> destination.execute("CREATE TEMP TABLE loading_table (i int8)")
115+
>>> receive_stmt = destination.prepare("COPY loading_table FROM STDIN")
116+
>>> producer = copyman.StatementProducer(send_stmt)
117+
>>> receiver = copyman.StatementReceiver(receive_stmt)
118+
>>>
119+
>>> with source.xact(), destination.xact():
120+
... with copyman.CopyManager(producer, receiver) as copy:
121+
... while copy.receivers:
122+
... try:
123+
... for num_messages, num_bytes in copy:
124+
... update_rate(num_bytes)
125+
... except copyman.Fault as cf:
126+
... original_exception = cf.faults[receiver]
127+
... if unknown_failure(original_exception):
128+
... ...
129+
... raise
130+
131+
132+
Fault Properties
133+
----------------
134+
135+
The following attributes exist on `postgresql.copyman.Fault` instances:
136+
137+
``Fault.manager``
138+
The `postgresql.copyman.CopyManager` instance that raised the exception; the
139+
same manager that caught the fault.
140+
141+
``Fault.faults``
142+
A dictionary mapping the Receiver to the exception that occurred. The Manager
143+
will give processing to every Receiver, so only one Fault will occur per
144+
transfer cycle.
145+
146+
Reconciliation
147+
--------------
148+
149+
When a Fault occurs, it is possible that it was not fatal. In such cases the
150+
`postgresql.copyman.CopyManager.reconcile` method can be used to reintroduce the
151+
Receiver to the Manager's set. That is, when a Fault occurs, the Manager
152+
immediately removes the Receiver so that the COPY operation can continue.
153+
154+
Faults should be trapped from within the Manager's context::
155+
156+
>>> import socket
157+
>>> from postgresql import copyman
158+
>>> send_stmt = source.prepare("COPY (SELECT i FROM generate_series(1, 1000000) AS g(i)) TO STDOUT")
159+
>>> destination.execute("CREATE TEMP TABLE loading_table (i int8)")
160+
>>> receive_stmt = destination.prepare("COPY loading_table FROM STDIN")
161+
>>> producer = copyman.StatementProducer(send_stmt)
162+
>>> receiver = copyman.StatementReceiver(receive_stmt)
163+
>>>
164+
>>> with source.xact(), destination.xact():
165+
... with copyman.CopyManager(producer, receiver) as copy:
166+
... while copy.receivers:
167+
... try:
168+
... for num_messages, num_bytes in copy:
169+
... update_rate(num_bytes)
170+
... except copyman.Fault as cf:
171+
... if isinstance(cf.faults[receiver], socket.timeout):
172+
... copy.reconcile(receiver)
173+
... else:
174+
... raise
175+
176+
Recovering from Faults does add significant complexity to a COPY operation,
177+
so, often, it's best to avoid conditions in which reconciliable Faults may
178+
occur.
179+
180+
181+
Terminology
182+
===========
183+
184+
The following terms are regularly used to describe the implementation and
185+
processes of the `postgresql.copyman` module:
186+
187+
Manager
188+
The object used to manage data coming from a Producer and being given to the
189+
Receivers. It also manages the necessary initialization and finalization steps
190+
required by those factors.
191+
192+
Producer
193+
The object used to produce the COPY data to be given to the Receivers. The
194+
source.
195+
196+
Receiver
197+
An object that consumes COPY data. A target.
198+
199+
Fault
200+
Specifically, the `postgresql.copyman.Fault` exception. A Fault is raised
201+
when a Receiver raises an exception.
202+
203+
Reconciliation
204+
Generally, the steps performed by the "reconcile" method on
205+
`postgresql.copyman.CopyManager` instances. More precisely, the
206+
necessary steps for a Receiver's reintroduction into the COPY operation after
207+
a Fault.
208+
209+
Realignment
210+
The process of providing compensating data to the receivers so that the
211+
connection will be on a message boundary. Occurs when the COPY operation is
212+
aborted.
213+
214+
Aborted Copy
215+
An aborted copy is a COPY operation that terminated prematurely. This happens
216+
if a CopyManager's for-loop is terminated early by breaking or by an
217+
unidentified exception being raised.
218+
219+
Failed Copy
220+
A failed copy is an aborted COPY operation that was
221+
*terminated due to a fault*, or a producer failure.

sphinx-src/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Contents
1616

1717
admin
1818
driver
19+
copyman
1920
lib
2021
clientparameters
2122
gotchas

0 commit comments

Comments
 (0)