Skip to content

Commit b38ab48

Browse files
author
James William Pye
committed
Relocate the chunks() iterator.
Based on the assumed use-case of chunks(), it was relocated to statement objects. The likely fact is that chunks would be solely used from the beginning and to the end of a cursor. It should not be used with scrollable cursors and it expects receiving a fair amount of data so WITH HOLD would always be true outside of transactions. Additionally, give it a chunksize keyword argument.
1 parent 0239106 commit b38ab48

5 files changed

Lines changed: 63 additions & 53 deletions

File tree

postgresql/api.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ def emit(self):
398398
self.snapshot = self.ife_ancestry_snapshot_text()
399399
self.ife_emit(self)
400400

401-
class CursorChunks(
401+
class Chunks(
402402
collections.Iterator,
403403
collections.Iterable,
404404
):
@@ -559,13 +559,6 @@ def chunksize(self) -> int:
559559
Cursor operation option.
560560
"""
561561

562-
@propertydoc
563-
@abstractproperty
564-
def chunks(self) -> CursorChunks:
565-
"""
566-
Return a chunk iterator to the cursor.
567-
"""
568-
569562
@abstractmethod
570563
def read(self,
571564
quantity : "Number of rows to read" = None,
@@ -729,9 +722,9 @@ def parameter_types(self) -> [type]:
729722

730723
@abstractmethod
731724
def __call__(self,
732-
*args : "Positional Parameters",
725+
*parameters : "Positional Parameters",
733726
with_hold : \
734-
"Whether or not to request 'WITH HOLD'" = True,
727+
"Whether or not to request 'WITH HOLD'" = None,
735728
scroll : \
736729
"Whether or not to request 'SCROLL'" = False,
737730
cursor_id : \
@@ -748,8 +741,26 @@ def __call__(self,
748741
<postgresql.api.Cursor>
749742
"""
750743

744+
@propertydoc
745+
@abstractproperty
746+
def chunks(self, *parameters, chunksize = None) -> Chunks:
747+
"""
748+
Return an iterator producing sequences of rows produced by the cursor
749+
created from the statement bound with the given parameters.
750+
751+
Chunking iterators are *never* scrollable.
752+
753+
Supporting cursors will be WITH HOLD when outside of a transaction.
754+
755+
`chunks` is designed for the situations involving large data sets.
756+
757+
Each iteration returns sequences of rows *normally* of length(seq) ==
758+
chunksize. If chunksize is unspecified, a default, positive integer will
759+
be filled in.
760+
"""
761+
751762
@abstractmethod
752-
def first(self, *args) -> "'First' object that is yield by the query":
763+
def first(self, *parameters) -> "'First' object that is yield by the query":
753764
"""
754765
Execute the prepared statement with the given arguments as parameters.
755766
If the statement returns rows with multiple columns, return the first

postgresql/documentation/driver.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,13 @@
469469
Naturally, a statement used with ``first()`` should be crafted with these
470470
rules in mind.
471471
472+
``ps.chunks(*parameters, chunksize = 256)``
473+
This access point is designed for situations where rows are being streamed out
474+
quickly. It is a method that returns a ``collections.Iterator`` that produces
475+
*sequences* of rows. The size of the "chunks" produced is *normally* consistent
476+
with the given ``chunksize`` keyword argument. This is the most efficient way
477+
to get rows out of the cursor.
478+
472479
``ps.close()``
473480
Close the statement inhibiting further use.
474481
@@ -703,13 +710,6 @@
703710
When the cursor is scrollable, this seek interface can be used to move the
704711
position of the cursor. See `Scrollable Cursors`_ for more information.
705712
706-
``c.chunks``
707-
This access point is designed for situations where rows are being streamed out
708-
quickly. It is a property that provides an ``collections.Iterator`` that returns
709-
*sequences* of rows. The size of the "chunks" produced is *normally* consistent
710-
with the ``chunksize`` attribute on the cursor object itself. This is
711-
the most efficient way to get rows out of the cursor.
712-
713713
``c.close()``
714714
For cursors opened using ``cursor_from_id()``, this method must be called in
715715
order to ``CLOSE`` the cursor. For cursors created by invoking a prepared

postgresql/driver/pq3.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ def lookup_type_info(self, typid):
178178
def lookup_composite_type_info(self, typid):
179179
return self.database.prepare(CompositeLookup)(typid)
180180

181-
class CursorChunks(pg_api.CursorChunks):
181+
class Chunks(pg_api.Chunks):
182182
cursor = None
183183

184184
def __init__(self, cursor):
@@ -316,10 +316,6 @@ def state(self):
316316
else:
317317
return 'open'
318318

319-
@property
320-
def chunks(self):
321-
return CursorChunks(self)
322-
323319
@property
324320
def column_names(self):
325321
if self._output is not None:
@@ -1302,6 +1298,13 @@ def __call__(self, *parameters, **kw):
13021298
return cursor
13031299
__iter__ = __call__
13041300

1301+
def chunks(self, *parameters, chunksize = 256):
1302+
if chunksize < 1:
1303+
raise ValueError("cannot create chunk iterator with chunksize < 1")
1304+
c = self(*parameters, scroll = False)
1305+
c.chunksize = chunksize
1306+
return Chunks(c)
1307+
13051308
def first(self, *parameters):
13061309
if self.closed is None:
13071310
self._fini()

postgresql/test/test_connect.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class test_connect(pg_unittest.TestCaseWithCluster):
2424

2525
def __init__(self, *args, **kw):
2626
super().__init__(*args,**kw)
27+
# 8.4 nixed this.
2728
self.do_crypt = self.cluster.installation.version_info < (8,4)
2829

2930
def configure_cluster(self):

postgresql/test/test_driver.py

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -684,39 +684,34 @@ def testNoHold(self):
684684
def testChunking(self):
685685
gs = self.db.prepare("SELECT i FROM generate_series(1, 10000) AS g(i)")
686686
self.failUnlessEqual(
687-
list((x[0] for x in chain(*list((gs().chunks))))),
687+
list((x[0] for x in chain.from_iterable(gs.chunks()))),
688688
list(range(1, 10001))
689689
)
690690
# exercise ``for x in chunks: dst.load(x)``
691-
try:
692-
with self.db.connector() as db2:
693-
db2.prepare(
694-
"""
695-
CREATE TABLE chunking AS
696-
SELECT i::text AS t, i::int AS i
697-
FROM generate_series(1, 10000) g(i);
698-
"""
699-
)()
700-
read_chunking = self.db.prepare('select * FROM chunking')
701-
write_chunking = db2.prepare('insert into chunking values ($1, $2)')
702-
out = read_chunking()
703-
out.chunksize = 512
704-
for rows in out.chunks:
705-
write_chunking.load(rows)
706-
self.failUnlessEqual(
707-
self.db.prepare('select count(*) FROM chunking').first(),
708-
20000
709-
)
710-
self.failUnlessEqual(
711-
self.db.prepare('select count(DISTINCT i) FROM chunking').first(),
712-
10000
713-
)
714-
finally:
715-
try:
716-
with self.db.xact():
717-
self.db.execute('DROP TABLE chunking')
718-
except:
719-
pass
691+
with self.db.connector() as db2:
692+
db2.execute(
693+
"""
694+
CREATE TABLE chunking AS
695+
SELECT i::text AS t, i::int AS i
696+
FROM generate_series(1, 10000) g(i);
697+
"""
698+
)
699+
read = self.db.prepare('select * FROM chunking').chunks(chunksize = 256)
700+
write = db2.prepare('insert into chunking values ($1, $2)').load
701+
with db2.xact():
702+
for rows in read:
703+
write(rows)
704+
del read, write
705+
706+
self.failUnlessEqual(
707+
self.db.prepare('select count(*) FROM chunking').first(),
708+
20000
709+
)
710+
self.failUnlessEqual(
711+
self.db.prepare('select count(DISTINCT i) FROM chunking').first(),
712+
10000
713+
)
714+
self.db.execute('DROP TABLE chunking')
720715

721716
def testChunkingInXact(self):
722717
with self.db.xact():

0 commit comments

Comments
 (0)