Skip to content

Commit 516e54b

Browse files
author
James William Pye
committed
Implement load_rows and load_chunks to replace load().
Added: itertools.chunk for supporting load_rows() via load_chunks(). Notably, this change cleaned up the implmentation of COPY TO STDIN and bulk statement execution. closes #1010602
1 parent f8e065e commit 516e54b

6 files changed

Lines changed: 143 additions & 104 deletions

File tree

postgresql/api.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,7 @@ def first(self, *parameters) -> "'First' object that is returned by the query":
534534
"""
535535

536536
@abstractmethod
537-
def load(self,
537+
def load_rows(self,
538538
iterable : "A iterable of tuples to execute the statement with"
539539
):
540540
"""
@@ -548,8 +548,29 @@ def load(self,
548548
... q(*i)
549549
550550
Its purpose is to allow the implementation to take advantage of the
551-
knowledge that a series of parameters are to be loaded and subsequently
552-
optimize the operation.
551+
knowledge that a series of parameters are to be loaded so that the
552+
operation can be optimized.
553+
"""
554+
555+
@abstractmethod
556+
def load_chunks(self,
557+
iterable : "A iterable of chunks of tuples to execute the statement with"
558+
):
559+
"""
560+
Given an iterable, `iterable`, feed the produced parameters of the chunks
561+
produced by the iterable to the query. This is a bulk-loading interface
562+
for parameterized queries.
563+
564+
Effectively, it is equivalent to:
565+
566+
>>> ps = db.prepare(...)
567+
>>> for c in iterable:
568+
... for i in c:
569+
... q(*i)
570+
571+
Its purpose is to allow the implementation to take advantage of the
572+
knowledge that a series of chunks of parameters are to be loaded so
573+
that the operation can be optimized.
553574
"""
554575

555576
@abstractmethod

postgresql/documentation/driver.py

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -435,11 +435,11 @@
435435
Prepared statement objects have a few execution methods:
436436
437437
``ps(*parameters)``
438-
As shown before, statement objects can be simply invoked like a function to get
438+
As shown before, statement objects can be invoked like a function to get
439439
the statement's results.
440440
441441
``ps.rows(*parameters)``
442-
Return a simple iterator to all the rows produced by the statement. This
442+
Return a iterator to all the rows produced by the statement. This
443443
method will stream rows on demand, so it is ideal for situations where
444444
each individual row in a large result-set must be processed.
445445
@@ -496,6 +496,20 @@
496496
``ps.close()``
497497
Close the statement inhibiting further use.
498498
499+
``ps.load_rows(collections.Iterable(parameters))``
500+
Given an iterable producing parameters, execute the statement for each
501+
iteration. Always returns `None`.
502+
503+
``ps.load_chunks(collections.Iterable(collections.Iterable(parameters)))``
504+
Given an iterable of iterables producing parameters, execute the statement
505+
for each parameter produced. However, send the all execution commands with
506+
the corresponding parameters of each chunk before reading any results.
507+
Always returns `None`. This access point is designed to be used in conjunction
508+
with ``ps.chunks()`` for transferring rows from one connection to another with
509+
great efficiency::
510+
511+
>>> dst.prepare(...).load_chunks(src.prepare(...).chunks())
512+
499513
500514
Statement Metadata
501515
------------------
@@ -649,23 +663,24 @@
649663
650664
Using the call interface is fine for making a single insert, but when multiple
651665
records need to be inserted, it's not the most efficient means to load data. For
652-
multiple records, the ``ps.load([...])`` provides an efficient way to load large
653-
quantities of structured data::
666+
multiple records, the ``ps.load_rows([...])`` provides an efficient way to load
667+
large quantities of structured data::
654668
655669
>>> from datetime import date
656-
>>> mkemp.load([
670+
>>> mkemp.load_rows([
657671
... ("Jack Johnson", "85000", date(1962, 11, 23), date(1990, 3, 5)),
658672
... ("Debra McGuffer", "52000", date(1973, 3, 4), date(2002, 1, 14)),
659673
... ("Barbara Smith", "86000", date(1965, 2, 24), date(2005, 7, 19)),
660674
... ])
661675
662-
While small, the above illustrates the ``ps.load()`` method taking an iterable of
663-
tuples that provides parameters for the each execution of the statement.
676+
While small, the above illustrates the ``ps.load_rows()`` method taking an
677+
iterable of tuples that provides parameters for the each execution of the
678+
statement.
664679
665-
Load is also used to support ``COPY ... FROM STDIN`` statements::
680+
``load_rows`` is also used to support ``COPY ... FROM STDIN`` statements::
666681
667682
>>> copy_emps_in = db.prepare("COPY employee FROM STDIN")
668-
>>> copy_emps_in.load([
683+
>>> copy_emps_in.load_rows([
669684
... b'Emp Name1\t72000\t1970-2-01\t1980-10-22\n',
670685
... b'Emp Name2\t62000\t1968-9-11\t1985-11-1\n',
671686
... b'Emp Name3\t62000\t1968-9-11\t1985-11-1\n',
@@ -694,8 +709,8 @@
694709
In situations where other actions are invoked during a ``COPY FROM STDIN``, a
695710
COPY failure error will occur. The driver manages the connection state in such
696711
a way that will purposefully cause the error as the COPY was inappropriately
697-
interrupted. This not usually a problem as the ``load(...)`` method must
698-
complete the COPY command before returning.
712+
interrupted. This not usually a problem as ``load_rows(...)`` and
713+
``load_chunks(...)`` methods must complete the COPY command before returning.
699714
700715
Copy data is always transferred using ``bytes`` objects. Even in cases where the
701716
COPY is not in ``BINARY`` mode. Any needed encoding transformations *must* be
@@ -722,9 +737,9 @@
722737
<lots of data>
723738
724739
``COPY FROM STDIN`` commands are supported via
725-
`postgresql.api.PreparedStatement.load`. Each invocation to ``load``
726-
is a single invocation of COPY. ``load`` takes an iterable of COPY lines
727-
to send to the server::
740+
`postgresql.api.PreparedStatement.load_rows`. Each invocation to
741+
``load_rows`` is a single invocation of COPY. ``load_rows`` takes an iterable of
742+
COPY lines to send to the server::
728743
729744
>>> db.execute("""
730745
... CREATE TABLE sample_copy (
@@ -733,21 +748,20 @@
733748
... );
734749
... """)
735750
>>> copyin = db.prepare('COPY sample_copy FROM STDIN')
736-
>>> copyin.load([
751+
>>> copyin.load_rows([
737752
... b'123\tone twenty three\n',
738753
... b'350\ttree fitty\n',
739754
... ])
740755
741-
The ``load()`` method is trained to identify chunk iterators so that direct
742-
transfers from a source database to a destination database could be
743-
made in a streaming fashion::
756+
For direct connection-to-connection COPY, use of ``load_chunks(...)`` is
757+
recommended as it will provide the most efficient transfer method::
744758
745759
>>> copyout = src.prepare('COPY atable TO STDOUT')
746760
>>> copyin = dst.prepare('COPY atable FROM STDIN')
747-
>>> copyin.load(copyout.chunks())
761+
>>> copyin.load_chunks(copyout.chunks())
748762
749763
Specifically, each chunk of row data produced by ``chunks()`` will be written in
750-
full by ``load()`` before getting another chunk to write.
764+
full by ``load_chunks()`` before getting another chunk to write.
751765
752766
753767
Cursors

postgresql/driver/pq3.py

Lines changed: 48 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from .. import api as pg_api
3535
from ..encodings import aliases as pg_enc_aliases
3636

37-
from ..python.itertools import interlace
37+
from ..python.itertools import interlace, chunk
3838
from ..python.socket import SocketFactory
3939

4040
from ..protocol import xact3 as xact
@@ -1025,26 +1025,20 @@ def first(self, *parameters):
10251025
return None
10261026
return cm.extract_count() or cm.extract_command()
10271027

1028-
def _copy_data_in(self,
1029-
iterable,
1030-
tps : "tuples per *set*" = None,
1031-
):
1028+
def _load_copy_chunks(self, chunks):
10321029
"""
1033-
Given an iterable, execute the COPY ... FROM STDIN statement and
1034-
send the copy lines produced by the iterable to the remote end.
1035-
1036-
`tps` is the number of tuples to buffer prior to giving the data
1037-
to the socket's send.
1030+
Given an chunks of COPY lines, execute the COPY ... FROM STDIN
1031+
statement and send the copy lines produced by the iterable to
1032+
the remote end.
10381033
"""
1039-
tps = tps or 500
10401034
x = xact.Instruction((
10411035
element.Bind(
10421036
b'',
10431037
self._pq_statement_id,
10441038
(), (), (),
10451039
),
10461040
element.Execute(b'', 1),
1047-
element.SynchronizeMessage,
1041+
element.FlushMessage,
10481042
),
10491043
asynchook = self.database._receive_async
10501044
)
@@ -1058,70 +1052,45 @@ def _copy_data_in(self,
10581052
else:
10591053
# Oh, it's not a COPY at all.
10601054
e = pg_exc.OperationError(
1061-
"_copy_data_in() used on a non-COPY FROM STDIN query",
1055+
"_load_copy_chunks() used on a non-COPY FROM STDIN query",
10621056
creator = self
10631057
)
10641058
raise e
10651059

1066-
if isinstance(iterable, Chunks):
1067-
# optimized, each iteration == row sequence
1068-
while x.messages:
1069-
while x.messages is not x.CopyFailSequence:
1070-
self.database._pq_step()
1071-
x.messages = []
1072-
for rows in iterable:
1073-
x.messages.extend([
1074-
element.CopyData(l) for l in rows
1075-
])
1076-
if len(x.messages) > tps:
1077-
break
1078-
else:
1079-
# each iteration == one row
1080-
iterable = iter(iterable)
1081-
while x.messages:
1082-
# Process any messages setup for sending.
1083-
while x.messages is not x.CopyFailSequence:
1084-
self.database._pq_step()
1085-
x.messages = [
1086-
element.CopyData(l) for l in islice(iterable, tps)
1087-
]
1060+
for chunk in chunks:
1061+
x.messages = [element.CopyData(l) for l in chunk]
1062+
while x.messages is not x.CopyFailSequence:
1063+
self.database._pq_step()
10881064
x.messages = x.CopyDoneSequence
10891065
self.database._pq_complete()
1066+
self.database.pq.synchronize()
10901067

1091-
def _load_bulk_tuples(self, tupleseq, tps = None):
1092-
tps = tps or 64
1093-
if isinstance(tupleseq, Chunks):
1094-
tupleseqiter = chain.from_iterable(tupleseq)
1095-
else:
1096-
tupleseqiter = iter(tupleseq)
1068+
def _load_tuple_chunks(self, chunks):
10971069
pte = self._raise_parameter_tuple_error
1098-
last = element.FlushMessage
1070+
last = (element.SynchronizeMessage,)
10991071
try:
1100-
while last is element.FlushMessage:
1101-
c = 0
1102-
xm = []
1103-
for t in tupleseqiter:
1104-
params = pg_typio.process_tuple(
1105-
self._input_io, tuple(t), pte
1106-
)
1107-
xm.extend((
1072+
for chunk in chunks:
1073+
bindings = [
1074+
(
11081075
element.Bind(
11091076
b'',
11101077
self._pq_statement_id,
11111078
self._input_formats,
1112-
params,
1079+
pg_typio.process_tuple(
1080+
self._input_io, tuple(t), pte
1081+
),
11131082
(),
11141083
),
11151084
element.Execute(b'', 1),
1116-
))
1117-
if c == tps:
1118-
break
1119-
c += 1
1120-
else:
1121-
last = element.SynchronizeMessage
1122-
xm.append(last)
1085+
)
1086+
for t in chunk
1087+
]
1088+
bindings.append(last)
11231089
self.database._pq_push(
1124-
xact.Instruction(xm, asynchook = self.database._receive_async),
1090+
xact.Instruction(
1091+
chain.from_iterable(bindings),
1092+
asynchook = self.database._receive_async
1093+
),
11251094
self
11261095
)
11271096
self.database._pq_complete()
@@ -1141,16 +1110,32 @@ def _load_bulk_tuples(self, tupleseq, tps = None):
11411110

11421111
def load(self, iterable, tps = None):
11431112
"""
1144-
Execute the query for each parameter set in `iterable`.
1113+
WARNING: Deprecated, use load_chunks and load_rows instead.
1114+
"""
1115+
if self.closed is None:
1116+
self._fini()
1117+
if isinstance(iterable, Chunks):
1118+
l = self.load_chunks
1119+
else:
1120+
l = self.load_rows
1121+
return l(iterable)
1122+
1123+
def load_chunks(self, chunks):
1124+
"""
1125+
Execute the query for each row-parameter set in `iterable`.
11451126
1146-
In cases of ``COPY ... FROM STDIN``, iterable must be an iterable `bytes`.
1127+
In cases of ``COPY ... FROM STDIN``, iterable must be an iterable of
1128+
sequences of `bytes`.
11471129
"""
11481130
if self.closed is None:
11491131
self._fini()
11501132
if not self._input:
1151-
return self._copy_data_in(iterable, tps = tps)
1133+
return self._load_copy_chunks(chunks)
11521134
else:
1153-
return self._load_bulk_tuples(iterable, tps = tps)
1135+
return self._load_tuple_chunks(chunks)
1136+
1137+
def load_rows(self, rows, chunksize = 256):
1138+
return self.load_chunks(chunk(rows, chunksize))
11541139

11551140
class StoredProcedure(pg_api.StoredProcedure):
11561141
_e_factors = ('database', 'procedure_id')

postgresql/protocol/typio.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,9 @@ def process_tuple(procs, tup, exception_handler):
434434
i = len(procs)
435435
if len(tup) != i:
436436
raise ValueError(
437-
"inconsistent items, %d processors and %d objects"
437+
"inconsistent items, %d processors and %d objects" %(
438+
i, len(tup)
439+
)
438440
)
439441
r = [None] * i
440442
try:

postgresql/python/itertools.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
itertools extensions
77
"""
88
import collections
9-
from itertools import cycle
9+
from itertools import cycle, islice
1010

1111
def interlace(*iters) -> collections.Iterable:
1212
"""
@@ -20,3 +20,21 @@ def interlace(*iters) -> collections.Iterable:
2020
)
2121
"""
2222
return map(next, cycle([iter(x) for x in iters]))
23+
24+
def chunk(iterable, chunksize = 256):
25+
"""
26+
Given an iterable, return an iterable producing chunks of the objects
27+
produced by the given iterable.
28+
29+
chunks([o1,o2,o3,o4], chunksize = 2) -> [
30+
[o1,o2],
31+
[o3,o4],
32+
]
33+
"""
34+
iterable = iter(iterable)
35+
last = ()
36+
lastsize = chunksize
37+
while lastsize == chunksize:
38+
last = list(islice(iterable, chunksize))
39+
lastsize = len(last)
40+
yield last

0 commit comments

Comments
 (0)