3434from .. import api as pg_api
3535from ..encodings import aliases as pg_enc_aliases
3636
37- from ..python .itertools import interlace
37+ from ..python .itertools import interlace , chunk
3838from ..python .socket import SocketFactory
3939
4040from ..protocol import xact3 as xact
@@ -1025,26 +1025,20 @@ def first(self, *parameters):
10251025 return None
10261026 return cm .extract_count () or cm .extract_command ()
10271027
1028- def _copy_data_in (self ,
1029- iterable ,
1030- tps : "tuples per *set*" = None ,
1031- ):
1028+ def _load_copy_chunks (self , chunks ):
10321029 """
1033- Given an iterable, execute the COPY ... FROM STDIN statement and
1034- send the copy lines produced by the iterable to the remote end.
1035-
1036- `tps` is the number of tuples to buffer prior to giving the data
1037- to the socket's send.
1030+ Given an chunks of COPY lines, execute the COPY ... FROM STDIN
1031+ statement and send the copy lines produced by the iterable to
1032+ the remote end.
10381033 """
1039- tps = tps or 500
10401034 x = xact .Instruction ((
10411035 element .Bind (
10421036 b'' ,
10431037 self ._pq_statement_id ,
10441038 (), (), (),
10451039 ),
10461040 element .Execute (b'' , 1 ),
1047- element .SynchronizeMessage ,
1041+ element .FlushMessage ,
10481042 ),
10491043 asynchook = self .database ._receive_async
10501044 )
@@ -1058,70 +1052,45 @@ def _copy_data_in(self,
10581052 else :
10591053 # Oh, it's not a COPY at all.
10601054 e = pg_exc .OperationError (
1061- "_copy_data_in () used on a non-COPY FROM STDIN query" ,
1055+ "_load_copy_chunks () used on a non-COPY FROM STDIN query" ,
10621056 creator = self
10631057 )
10641058 raise e
10651059
1066- if isinstance (iterable , Chunks ):
1067- # optimized, each iteration == row sequence
1068- while x .messages :
1069- while x .messages is not x .CopyFailSequence :
1070- self .database ._pq_step ()
1071- x .messages = []
1072- for rows in iterable :
1073- x .messages .extend ([
1074- element .CopyData (l ) for l in rows
1075- ])
1076- if len (x .messages ) > tps :
1077- break
1078- else :
1079- # each iteration == one row
1080- iterable = iter (iterable )
1081- while x .messages :
1082- # Process any messages setup for sending.
1083- while x .messages is not x .CopyFailSequence :
1084- self .database ._pq_step ()
1085- x .messages = [
1086- element .CopyData (l ) for l in islice (iterable , tps )
1087- ]
1060+ for chunk in chunks :
1061+ x .messages = [element .CopyData (l ) for l in chunk ]
1062+ while x .messages is not x .CopyFailSequence :
1063+ self .database ._pq_step ()
10881064 x .messages = x .CopyDoneSequence
10891065 self .database ._pq_complete ()
1066+ self .database .pq .synchronize ()
10901067
1091- def _load_bulk_tuples (self , tupleseq , tps = None ):
1092- tps = tps or 64
1093- if isinstance (tupleseq , Chunks ):
1094- tupleseqiter = chain .from_iterable (tupleseq )
1095- else :
1096- tupleseqiter = iter (tupleseq )
1068+ def _load_tuple_chunks (self , chunks ):
10971069 pte = self ._raise_parameter_tuple_error
1098- last = element .FlushMessage
1070+ last = ( element .SynchronizeMessage ,)
10991071 try :
1100- while last is element .FlushMessage :
1101- c = 0
1102- xm = []
1103- for t in tupleseqiter :
1104- params = pg_typio .process_tuple (
1105- self ._input_io , tuple (t ), pte
1106- )
1107- xm .extend ((
1072+ for chunk in chunks :
1073+ bindings = [
1074+ (
11081075 element .Bind (
11091076 b'' ,
11101077 self ._pq_statement_id ,
11111078 self ._input_formats ,
1112- params ,
1079+ pg_typio .process_tuple (
1080+ self ._input_io , tuple (t ), pte
1081+ ),
11131082 (),
11141083 ),
11151084 element .Execute (b'' , 1 ),
1116- ))
1117- if c == tps :
1118- break
1119- c += 1
1120- else :
1121- last = element .SynchronizeMessage
1122- xm .append (last )
1085+ )
1086+ for t in chunk
1087+ ]
1088+ bindings .append (last )
11231089 self .database ._pq_push (
1124- xact .Instruction (xm , asynchook = self .database ._receive_async ),
1090+ xact .Instruction (
1091+ chain .from_iterable (bindings ),
1092+ asynchook = self .database ._receive_async
1093+ ),
11251094 self
11261095 )
11271096 self .database ._pq_complete ()
@@ -1141,16 +1110,32 @@ def _load_bulk_tuples(self, tupleseq, tps = None):
11411110
11421111 def load (self , iterable , tps = None ):
11431112 """
1144- Execute the query for each parameter set in `iterable`.
1113+ WARNING: Deprecated, use load_chunks and load_rows instead.
1114+ """
1115+ if self .closed is None :
1116+ self ._fini ()
1117+ if isinstance (iterable , Chunks ):
1118+ l = self .load_chunks
1119+ else :
1120+ l = self .load_rows
1121+ return l (iterable )
1122+
1123+ def load_chunks (self , chunks ):
1124+ """
1125+ Execute the query for each row-parameter set in `iterable`.
11451126
1146- In cases of ``COPY ... FROM STDIN``, iterable must be an iterable `bytes`.
1127+ In cases of ``COPY ... FROM STDIN``, iterable must be an iterable of
1128+ sequences of `bytes`.
11471129 """
11481130 if self .closed is None :
11491131 self ._fini ()
11501132 if not self ._input :
1151- return self ._copy_data_in ( iterable , tps = tps )
1133+ return self ._load_copy_chunks ( chunks )
11521134 else :
1153- return self ._load_bulk_tuples (iterable , tps = tps )
1135+ return self ._load_tuple_chunks (chunks )
1136+
1137+ def load_rows (self , rows , chunksize = 256 ):
1138+ return self .load_chunks (chunk (rows , chunksize ))
11541139
11551140class StoredProcedure (pg_api .StoredProcedure ):
11561141 _e_factors = ('database' , 'procedure_id' )
0 commit comments