@@ -108,6 +108,9 @@ def lookup_type_info(self, typid):
108108 def lookup_composite_type_info (self , typid ):
109109 return self .database .sys .lookup_composite (typid )
110110
111+ ##
112+ # This class manages all the functionality used to get
113+ # rows from a PostgreSQL portal/cursor.
111114class Output (object ):
112115 _output = None
113116 _output_io = None
@@ -126,6 +129,7 @@ def _init(self):
126129 """
127130 Bind a cursor based on the configured parameters.
128131 """
132+ # The local initialization for the specific cursor.
129133
130134 def __init__ (self , cursor_id ):
131135 self .cursor_id = cursor_id
@@ -138,6 +142,7 @@ def __init__(self, cursor_id):
138142 if self .cursor_id == ID (self ):
139143 addgarbage = self .database .pq .garbage_cursors .append
140144 typio = self .database .typio
145+ # Callback for closing the cursor on remote end.
141146 self ._del = weakref .ref (
142147 self , lambda x : addgarbage (typio .encode (cursor_id ))
143148 )
@@ -154,6 +159,7 @@ def close(self):
154159 self .database .typio .encode (self .cursor_id )
155160 )
156161 self .closed = True
162+ # Don't need the weakref anymore.
157163 if hasattr (self , '_del' ):
158164 del self ._del
159165
@@ -224,7 +230,6 @@ def _pq_xp_fetch(self, direction, quantity):
224230 )
225231
226232 def _pq_xp_move (self , position , whence ):
227- 'make a command sequence for a MOVE single command'
228233 return (
229234 element .Parse (b'' ,
230235 b'MOVE ' + whence + b' ' + position + b' IN ' + \
@@ -243,24 +248,36 @@ def _process_copy_chunk(self, x):
243248 ]
244249 return x
245250
246- def _process_tuple_chunk_Row (self , x ):
247- """
248- Process the Tuple messages in `x`.
249- """
251+ # Process the element.Tuple message in x for column()
252+ def _process_tuple_chunk_Column (self , x , range = range ):
253+ unpack = self ._output_io [0 ]
254+ # get the raw data for the first column
255+ l = [y [0 ] for y in x ]
256+ # iterate over the range to keep track
257+ # of which item we're processing.
258+ r = range (len (l ))
259+ try :
260+ return [unpack (l [i ]) for i in r ]
261+ except :
262+ try :
263+ i = next (r )
264+ except StopIteration :
265+ i = len (l )
266+ self ._raise_column_tuple_error (self ._output_io , (l [i ],), 0 )
267+
268+ # Process the element.Tuple message in x for rows()
269+ def _process_tuple_chunk_Row (self , x ,
270+ proc = pg_typio .process_chunk ,
271+ from_seq = pg_types .Row .from_sequence ,
272+ ):
250273 return [
251- pg_types .Row .from_sequence (self ._output_attmap , y )
252- for y in pg_typio .process_chunk (
253- self ._output_io , x , self ._raise_column_tuple_error
254- )
274+ from_seq (self ._output_attmap , y )
275+ for y in proc (self ._output_io , x , self ._raise_column_tuple_error )
255276 ]
256277
257- def _process_tuple_chunk (self , x ):
258- """
259- Process the Tuple messages in `x`.
260- """
261- return pg_typio .process_chunk (
262- self ._output_io , x , self ._raise_column_tuple_error
263- )
278+ # Process the elemnt.Tuple messages in `x` for chunks()
279+ def _process_tuple_chunk (self , x , proc = pg_typio .process_chunk ):
280+ return proc (self ._output_io , x , self ._raise_column_tuple_error )
264281
265282 def _raise_column_tuple_error (self , procs , tup , itemnum ):
266283 'for column processing'
@@ -300,16 +317,19 @@ def state(self):
300317 def column_names (self ):
301318 if self ._output is not None :
302319 return list (self .database .typio .decodes (self ._output .keys ()))
320+ # `None` if _output does not exist; not row data
303321
304322 @property
305323 def column_types (self ):
306324 if self ._output is not None :
307325 return [self .database .typio .type_from_oid (x [3 ]) for x in self ._output ]
326+ # `None` if _output does not exist; not row data
308327
309328 @property
310329 def pg_column_types (self ):
311330 if self ._output is not None :
312331 return [x [3 ] for x in self ._output ]
332+ # `None` if _output does not exist; not row data
313333
314334 @property
315335 def sql_column_types (self ):
@@ -332,6 +352,9 @@ def count(self):
332352class Chunks (Output , pg_api .Chunks ):
333353 pass
334354
355+ ##
356+ # FetchAll is a Chunks cursor that gets all the information
357+ # in the cursor.
335358class FetchAll (Chunks ):
336359 _e_factors = ('statement' , 'parameters' ,)
337360 def _e_metas (self ):
@@ -388,21 +411,30 @@ def _init(self,
388411
389412 def __next__ (self ):
390413 x = self ._xact
414+ # self._xact = None; means that the cursor has been exhausted.
391415 if x is None :
392416 raise StopIteration
393417
418+ # Finish the protocol transaction.
394419 while x .state is not xact .Complete and not x .completed :
395420 self .database ._pq_step ()
421+
422+ # fatal is None == no error
423+ # fatal is True == dead connection
424+ # fatal is False == dead transaction
396425 if x .fatal is not None :
397426 self .database ._raise_pq_error (x , controller = self )
398427
428+ # no messages to process?
399429 if not x .completed :
400430 # Transaction has been cleaned out of completed? iterator is done.
401431 self ._xact = None
402432 raise StopIteration
403433
434+ # Get the chunk to be processed.
404435 chunk = x .completed [0 ][1 ]
405436 r = self ._process_chunk (chunk )
437+ # Remove it, it's been processed.
406438 del x .completed [0 ]
407439 return r
408440
@@ -413,9 +445,8 @@ class SingleXactCopy(FetchAll):
413445class SingleXactFetch (FetchAll ):
414446 _expect = element .Tuple .type
415447 _process_chunk_ = FetchAll ._process_tuple_chunk_Row
416- def _process_chunk (self , x ,
417- tuple_type = element .Tuple .type
418- ):
448+
449+ def _process_chunk (self , x , tuple_type = element .Tuple .type ):
419450 return self ._process_chunk_ ((
420451 y for y in x if y .type == tuple_type
421452 ))
@@ -452,16 +483,17 @@ def _init(self):
452483 self ._xact = self ._ins (self ._bind () + self ._command )
453484 self .database ._pq_push (self ._xact , self )
454485
455- def __next__ (self ):
486+ def __next__ (self , tuple_type = element . Tuple . type ):
456487 x = self ._xact
457488 if x is None :
458489 raise StopIteration
459490
460491 if self .database .pq .xact is x :
461492 self .database ._pq_complete ()
462493
494+ # get all the element.Tuple messages
463495 chunk = [
464- y for y in x .messages_received () if y .type == element . Tuple . type
496+ y for y in x .messages_received () if y .type == tuple_type
465497 ]
466498 if len (chunk ) == self .chunksize :
467499 # there may be more, dispatch the request for the next chunk
@@ -471,23 +503,41 @@ def __next__(self):
471503 # it's done.
472504 self ._xact = None
473505 if not chunk :
506+ # chunk is empty, it's done *right* now.
474507 raise StopIteration
475508 chunk = self ._process_chunk (chunk )
476509 return chunk
477510
511+ ##
512+ # The cursor is streamed to the client on demand *inside*
513+ # a single SQL transaction block.
478514class MultiXactInsideBlock (MultiXactStream ):
479515 _bind = MultiXactStream ._pq_xp_bind
480516 def _fetch (self ):
517+ ##
518+ # Use the extended protocol's execute to fetch more.
481519 return self ._pq_xp_execute (self .chunksize ) + \
482520 (element .SynchronizeMessage ,)
483521
522+ ##
523+ # The cursor is streamed to the client on demand *outside* of
524+ # a single SQL transaction block. [DECLARE ... WITH HOLD]
484525class MultiXactOutsideBlock (MultiXactStream ):
485526 _bind = MultiXactStream ._pq_xp_declare
527+
486528 def _fetch (self ):
529+ ##
530+ # Use the extended protocol's execute to fetch more *against*
531+ # an SQL FETCH statement yielding the data in the proper format.
532+ #
533+ # MultiXactOutsideBlock uses DECLARE to create the cursor WITH HOLD.
534+ # When this is done, the cursor is configured to use StringFormat with
535+ # all columns. It's necessary to use FETCH to adjust the formatting.
487536 return self ._pq_xp_fetch (True , self .chunksize ) + \
488537 (element .SynchronizeMessage ,)
538+
489539##
490- # Base Cursor class and cursor creation entry points .
540+ # Cursor is used to manage scrollable cursors .
491541class Cursor (Output , pg_api .Cursor ):
492542 _process_tuple = Output ._process_tuple_chunk_Row
493543 def _e_metas (self ):
@@ -704,6 +754,7 @@ def __init__(self, database, statement_id, string):
704754 addgarbage = database .pq .garbage_statements .append
705755 typio = database .typio
706756 sid = self .statement_id
757+ # Callback for closing the statement on remote end.
707758 self ._del = weakref .ref (
708759 self , lambda x : addgarbage (typio .encode (sid ))
709760 )
@@ -853,6 +904,7 @@ def close(self):
853904 if self .closed is False :
854905 self .database .pq .garbage_statements .append (self ._pq_statement_id )
855906 self .closed = True
907+ # Don't need the weakref anymore.
856908 if hasattr (self , '_del' ):
857909 del self ._del
858910
@@ -987,14 +1039,24 @@ def chunks(self, *parameters):
9871039 if self ._output is None :
9881040 return SingleXactCopy (self , parameters )
9891041 if self .database .pq .state == b'I' :
1042+ # Currently, *not* in a transaction block, so
1043+ # DECLARE the statement WITH HOLD in order to allow
1044+ # access across transactions.
9901045 if self .string is not None :
9911046 return MultiXactOutsideBlock (self , parameters , None )
9921047 else :
993- # statement source unknown, so it can't be DECLARE'd.
1048+ ##
1049+ # Statement source unknown, so it can't be DECLARE'd.
1050+ # This happens when statement_from_id is used.
9941051 return SingleXactFetch (self , parameters )
9951052 else :
9961053 return MultiXactInsideBlock (self , parameters , None )
9971054
1055+ def column (self , * parameters , ** kw ):
1056+ chunks = self .chunks (* parameters , ** kw )
1057+ chunks ._process_chunk = chunks ._process_tuple_chunk_Column
1058+ return chain .from_iterable (chunks )
1059+
9981060 def first (self , * parameters ):
9991061 if self .closed is None :
10001062 self ._fini ()
@@ -1388,15 +1450,11 @@ def getset(self, keys):
13881450 return setmap
13891451
13901452 def keys (self ):
1391- return map (
1392- get0 , self .database .sys .setting_keys ()
1393- )
1453+ return map (get0 , self .database .sys .setting_keys ())
13941454 __iter__ = keys
13951455
13961456 def values (self ):
1397- return map (
1398- get0 , self .database .sys .setting_values ()
1399- )
1457+ return map (get0 , self .database .sys .setting_values ())
14001458
14011459 def items (self ):
14021460 return self .database .sys .setting_items ()
0 commit comments