Skip to content

Commit 20b3532

Browse files
author
James William Pye
committed
Implement the ps.column() execution method.
Also, add a lot of comments around Output/Cursor while at it. closes #18
1 parent cc2a2b2 commit 20b3532

4 files changed

Lines changed: 135 additions & 40 deletions

File tree

postgresql/api.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,23 @@ def __call__(self, *parameters : "Positional Parameters") -> ["Row"]:
490490
[...]
491491
"""
492492

493+
@abstractmethod
494+
def chunks(self, *parameters) -> collections.Iterable:
495+
"""
496+
Return an iterator producing sequences of rows produced by the cursor
497+
created from the statement bound with the given parameters.
498+
499+
Chunking iterators are *never* scrollable.
500+
501+
Supporting cursors will be WITH HOLD when outside of a transaction.
502+
503+
`chunks` is designed for the situations involving large data sets.
504+
505+
Each iteration returns sequences of rows *normally* of length(seq) ==
506+
chunksize. If chunksize is unspecified, a default, positive integer will
507+
be filled in.
508+
"""
509+
493510
@abstractmethod
494511
def rows(self, *parameters) -> collections.Iterable:
495512
"""
@@ -498,30 +515,32 @@ def rows(self, *parameters) -> collections.Iterable:
498515
499516
Row iterators are never scrollable.
500517
501-
Supporting cursors will be WITH HOLD when outside of a transaction.
518+
Supporting cursors will be WITH HOLD when outside of a transaction to
519+
allow cross-transaction access.
502520
503521
`rows` is designed for the situations involving large data sets.
504522
505-
Each iteration returns a single row. Arguably, best implemented:
523+
Each iteration returns a single row. Arguably, best implemented::
506524
507525
return itertools.chain.from_iterable(self.chunks(*parameters))
508526
"""
509527

510528
@abstractmethod
511-
def chunks(self, *parameters) -> collections.Iterable:
529+
def column(self, *parameters) -> collections.Iterable:
512530
"""
513-
Return an iterator producing sequences of rows produced by the cursor
514-
created from the statement bound with the given parameters.
531+
Return an iterator producing the values of the first column in
532+
the cursor created from the statement bound with the given parameters.
515533
516-
Chunking iterators are *never* scrollable.
534+
Column iterators are never scrollable.
517535
518-
Supporting cursors will be WITH HOLD when outside of a transaction.
536+
Supporting cursors will be WITH HOLD when outside of a transaction to
537+
allow cross-transaction access.
519538
520-
`chunks` is designed for the situations involving large data sets.
539+
`column` is designed for the situations involving large data sets.
521540
522-
Each iteration returns sequences of rows *normally* of length(seq) ==
523-
chunksize. If chunksize is unspecified, a default, positive integer will
524-
be filled in.
541+
Each iteration returns a single value. `column` is equivalent to::
542+
543+
return map(operator.itemgetter(0), self.rows(*parameters))
525544
"""
526545

527546
@abstractmethod

postgresql/documentation/driver.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,13 @@ Prepared statement objects have a few execution methods:
466466
>>> for table_name, in db.prepare("SELECT table_name FROM information_schema.tables"):
467467
... print(table_name)
468468

469+
``ps.column(*parameters)``
470+
Return a iterator to the first column produced by the statement. This
471+
method will stream values on demand, and *should* only be used with statements
472+
that have a single column; otherwise, bandwidth will ultimately be wasted as
473+
the other columns will be dropped.
474+
*This execution method cannot be used with COPY statements.*
475+
469476
``ps.first(*parameters)``
470477
For simple statements, cursor objects are unnecessary.
471478
Consider the data contained in ``c`` from above, 'hello world!'. To get at this

postgresql/driver/pq3.py

Lines changed: 87 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ def lookup_type_info(self, typid):
108108
def lookup_composite_type_info(self, typid):
109109
return self.database.sys.lookup_composite(typid)
110110

111+
##
112+
# This class manages all the functionality used to get
113+
# rows from a PostgreSQL portal/cursor.
111114
class Output(object):
112115
_output = None
113116
_output_io = None
@@ -126,6 +129,7 @@ def _init(self):
126129
"""
127130
Bind a cursor based on the configured parameters.
128131
"""
132+
# The local initialization for the specific cursor.
129133

130134
def __init__(self, cursor_id):
131135
self.cursor_id = cursor_id
@@ -138,6 +142,7 @@ def __init__(self, cursor_id):
138142
if self.cursor_id == ID(self):
139143
addgarbage = self.database.pq.garbage_cursors.append
140144
typio = self.database.typio
145+
# Callback for closing the cursor on remote end.
141146
self._del = weakref.ref(
142147
self, lambda x: addgarbage(typio.encode(cursor_id))
143148
)
@@ -154,6 +159,7 @@ def close(self):
154159
self.database.typio.encode(self.cursor_id)
155160
)
156161
self.closed = True
162+
# Don't need the weakref anymore.
157163
if hasattr(self, '_del'):
158164
del self._del
159165

@@ -224,7 +230,6 @@ def _pq_xp_fetch(self, direction, quantity):
224230
)
225231

226232
def _pq_xp_move(self, position, whence):
227-
'make a command sequence for a MOVE single command'
228233
return (
229234
element.Parse(b'',
230235
b'MOVE ' + whence + b' ' + position + b' IN ' + \
@@ -243,24 +248,36 @@ def _process_copy_chunk(self, x):
243248
]
244249
return x
245250

246-
def _process_tuple_chunk_Row(self, x):
247-
"""
248-
Process the Tuple messages in `x`.
249-
"""
251+
# Process the element.Tuple message in x for column()
252+
def _process_tuple_chunk_Column(self, x, range = range):
253+
unpack = self._output_io[0]
254+
# get the raw data for the first column
255+
l = [y[0] for y in x]
256+
# iterate over the range to keep track
257+
# of which item we're processing.
258+
r = range(len(l))
259+
try:
260+
return [unpack(l[i]) for i in r]
261+
except:
262+
try:
263+
i = next(r)
264+
except StopIteration:
265+
i = len(l)
266+
self._raise_column_tuple_error(self._output_io, (l[i],), 0)
267+
268+
# Process the element.Tuple message in x for rows()
269+
def _process_tuple_chunk_Row(self, x,
270+
proc = pg_typio.process_chunk,
271+
from_seq = pg_types.Row.from_sequence,
272+
):
250273
return [
251-
pg_types.Row.from_sequence(self._output_attmap, y)
252-
for y in pg_typio.process_chunk(
253-
self._output_io, x, self._raise_column_tuple_error
254-
)
274+
from_seq(self._output_attmap, y)
275+
for y in proc(self._output_io, x, self._raise_column_tuple_error)
255276
]
256277

257-
def _process_tuple_chunk(self, x):
258-
"""
259-
Process the Tuple messages in `x`.
260-
"""
261-
return pg_typio.process_chunk(
262-
self._output_io, x, self._raise_column_tuple_error
263-
)
278+
# Process the elemnt.Tuple messages in `x` for chunks()
279+
def _process_tuple_chunk(self, x, proc = pg_typio.process_chunk):
280+
return proc(self._output_io, x, self._raise_column_tuple_error)
264281

265282
def _raise_column_tuple_error(self, procs, tup, itemnum):
266283
'for column processing'
@@ -300,16 +317,19 @@ def state(self):
300317
def column_names(self):
301318
if self._output is not None:
302319
return list(self.database.typio.decodes(self._output.keys()))
320+
# `None` if _output does not exist; not row data
303321

304322
@property
305323
def column_types(self):
306324
if self._output is not None:
307325
return [self.database.typio.type_from_oid(x[3]) for x in self._output]
326+
# `None` if _output does not exist; not row data
308327

309328
@property
310329
def pg_column_types(self):
311330
if self._output is not None:
312331
return [x[3] for x in self._output]
332+
# `None` if _output does not exist; not row data
313333

314334
@property
315335
def sql_column_types(self):
@@ -332,6 +352,9 @@ def count(self):
332352
class Chunks(Output, pg_api.Chunks):
333353
pass
334354

355+
##
356+
# FetchAll is a Chunks cursor that gets all the information
357+
# in the cursor.
335358
class FetchAll(Chunks):
336359
_e_factors = ('statement', 'parameters',)
337360
def _e_metas(self):
@@ -388,21 +411,30 @@ def _init(self,
388411

389412
def __next__(self):
390413
x = self._xact
414+
# self._xact = None; means that the cursor has been exhausted.
391415
if x is None:
392416
raise StopIteration
393417

418+
# Finish the protocol transaction.
394419
while x.state is not xact.Complete and not x.completed:
395420
self.database._pq_step()
421+
422+
# fatal is None == no error
423+
# fatal is True == dead connection
424+
# fatal is False == dead transaction
396425
if x.fatal is not None:
397426
self.database._raise_pq_error(x, controller = self)
398427

428+
# no messages to process?
399429
if not x.completed:
400430
# Transaction has been cleaned out of completed? iterator is done.
401431
self._xact = None
402432
raise StopIteration
403433

434+
# Get the chunk to be processed.
404435
chunk = x.completed[0][1]
405436
r = self._process_chunk(chunk)
437+
# Remove it, it's been processed.
406438
del x.completed[0]
407439
return r
408440

@@ -413,9 +445,8 @@ class SingleXactCopy(FetchAll):
413445
class SingleXactFetch(FetchAll):
414446
_expect = element.Tuple.type
415447
_process_chunk_ = FetchAll._process_tuple_chunk_Row
416-
def _process_chunk(self, x,
417-
tuple_type = element.Tuple.type
418-
):
448+
449+
def _process_chunk(self, x, tuple_type = element.Tuple.type):
419450
return self._process_chunk_((
420451
y for y in x if y.type == tuple_type
421452
))
@@ -452,16 +483,17 @@ def _init(self):
452483
self._xact = self._ins(self._bind() + self._command)
453484
self.database._pq_push(self._xact, self)
454485

455-
def __next__(self):
486+
def __next__(self, tuple_type = element.Tuple.type):
456487
x = self._xact
457488
if x is None:
458489
raise StopIteration
459490

460491
if self.database.pq.xact is x:
461492
self.database._pq_complete()
462493

494+
# get all the element.Tuple messages
463495
chunk = [
464-
y for y in x.messages_received() if y.type == element.Tuple.type
496+
y for y in x.messages_received() if y.type == tuple_type
465497
]
466498
if len(chunk) == self.chunksize:
467499
# there may be more, dispatch the request for the next chunk
@@ -471,23 +503,41 @@ def __next__(self):
471503
# it's done.
472504
self._xact = None
473505
if not chunk:
506+
# chunk is empty, it's done *right* now.
474507
raise StopIteration
475508
chunk = self._process_chunk(chunk)
476509
return chunk
477510

511+
##
512+
# The cursor is streamed to the client on demand *inside*
513+
# a single SQL transaction block.
478514
class MultiXactInsideBlock(MultiXactStream):
479515
_bind = MultiXactStream._pq_xp_bind
480516
def _fetch(self):
517+
##
518+
# Use the extended protocol's execute to fetch more.
481519
return self._pq_xp_execute(self.chunksize) + \
482520
(element.SynchronizeMessage,)
483521

522+
##
523+
# The cursor is streamed to the client on demand *outside* of
524+
# a single SQL transaction block. [DECLARE ... WITH HOLD]
484525
class MultiXactOutsideBlock(MultiXactStream):
485526
_bind = MultiXactStream._pq_xp_declare
527+
486528
def _fetch(self):
529+
##
530+
# Use the extended protocol's execute to fetch more *against*
531+
# an SQL FETCH statement yielding the data in the proper format.
532+
#
533+
# MultiXactOutsideBlock uses DECLARE to create the cursor WITH HOLD.
534+
# When this is done, the cursor is configured to use StringFormat with
535+
# all columns. It's necessary to use FETCH to adjust the formatting.
487536
return self._pq_xp_fetch(True, self.chunksize) + \
488537
(element.SynchronizeMessage,)
538+
489539
##
490-
# Base Cursor class and cursor creation entry points.
540+
# Cursor is used to manage scrollable cursors.
491541
class Cursor(Output, pg_api.Cursor):
492542
_process_tuple = Output._process_tuple_chunk_Row
493543
def _e_metas(self):
@@ -704,6 +754,7 @@ def __init__(self, database, statement_id, string):
704754
addgarbage = database.pq.garbage_statements.append
705755
typio = database.typio
706756
sid = self.statement_id
757+
# Callback for closing the statement on remote end.
707758
self._del = weakref.ref(
708759
self, lambda x: addgarbage(typio.encode(sid))
709760
)
@@ -853,6 +904,7 @@ def close(self):
853904
if self.closed is False:
854905
self.database.pq.garbage_statements.append(self._pq_statement_id)
855906
self.closed = True
907+
# Don't need the weakref anymore.
856908
if hasattr(self, '_del'):
857909
del self._del
858910

@@ -987,14 +1039,24 @@ def chunks(self, *parameters):
9871039
if self._output is None:
9881040
return SingleXactCopy(self, parameters)
9891041
if self.database.pq.state == b'I':
1042+
# Currently, *not* in a transaction block, so
1043+
# DECLARE the statement WITH HOLD in order to allow
1044+
# access across transactions.
9901045
if self.string is not None:
9911046
return MultiXactOutsideBlock(self, parameters, None)
9921047
else:
993-
# statement source unknown, so it can't be DECLARE'd.
1048+
##
1049+
# Statement source unknown, so it can't be DECLARE'd.
1050+
# This happens when statement_from_id is used.
9941051
return SingleXactFetch(self, parameters)
9951052
else:
9961053
return MultiXactInsideBlock(self, parameters, None)
9971054

1055+
def column(self, *parameters, **kw):
1056+
chunks = self.chunks(*parameters, **kw)
1057+
chunks._process_chunk = chunks._process_tuple_chunk_Column
1058+
return chain.from_iterable(chunks)
1059+
9981060
def first(self, *parameters):
9991061
if self.closed is None:
10001062
self._fini()
@@ -1388,15 +1450,11 @@ def getset(self, keys):
13881450
return setmap
13891451

13901452
def keys(self):
1391-
return map(
1392-
get0, self.database.sys.setting_keys()
1393-
)
1453+
return map(get0, self.database.sys.setting_keys())
13941454
__iter__ = keys
13951455

13961456
def values(self):
1397-
return map(
1398-
get0, self.database.sys.setting_values()
1399-
)
1457+
return map(get0, self.database.sys.setting_values())
14001458

14011459
def items(self):
14021460
return self.database.sys.setting_items()

postgresql/test/test_driver.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,17 @@ def testRowInterface(self):
569569
self.failUnlessEqual(i, row.index_from_key('col' + str(i)))
570570
self.failUnlessEqual('col' + str(i), row.key_from_index(i))
571571

572+
def testColumn(self):
573+
g_i = self.db.prepare('SELECT i FROM generate_series(1,10) as g(i)').column
574+
# ignore the second column.
575+
g_ii = self.db.prepare('SELECT i, i+10 as i2 FROM generate_series(1,10) as g(i)').column
576+
self.failUnlessEqual(tuple(g_i()), tuple(g_ii()))
577+
self.failUnlessEqual(tuple(g_i()), (1,2,3,4,5,6,7,8,9,10))
578+
579+
def testColumnInXact(self):
580+
with self.db.xact():
581+
self.testColumn()
582+
572583
def testStatementFromId(self):
573584
self.db.execute("PREPARE foo AS SELECT 1 AS colname;")
574585
ps = self.db.statement_from_id('foo')

0 commit comments

Comments
 (0)