|
1 | 1 | ## |
2 | | -# driver.pg_type - Standard Database Type I/O |
| 2 | +# .driver.pg_type - Standard Database Type I/O |
3 | 3 | ## |
4 | 4 | from codecs import lookup as lookup_codecs |
5 | 5 | from abc import ABCMeta, abstractmethod |
|
10 | 10 | from ..string import quote_ident |
11 | 11 | from .. import types as pg_types |
12 | 12 | from ..types.io import resolve |
13 | | -from ..types.io.pg_container import record_io_factory, array_io_factory |
14 | 13 | from ..types import Row, Array, oid_to_sql_name, oid_to_name |
| 14 | +from ..python.functools import process_tuple |
| 15 | +from .. import exceptions as pg_exc |
| 16 | +from ..types.io import lib |
15 | 17 |
|
16 | 18 | class TypeIO(object, metaclass = ABCMeta): |
17 | 19 | """ |
@@ -87,6 +89,9 @@ def __init__(self): |
87 | 89 | } |
88 | 90 | self.typinfo = {} |
89 | 91 |
|
| 92 | + def row_type_factory(self, column_names): |
| 93 | + pass |
| 94 | + |
90 | 95 | def sql_type_from_oid(self, oid, qi = quote_ident): |
91 | 96 | if oid in oid_to_sql_name: |
92 | 97 | return oid_to_sql_name[oid] |
@@ -163,20 +168,20 @@ def resolve(self, |
163 | 168 | ) |
164 | 169 | cio.append((pack or self.encode, unpack or self.decode)) |
165 | 170 | i += 1 |
166 | | - self._cache[typid] = typio = record_io_factory( |
| 171 | + self._cache[typid] = typio = self.record_io_factory( |
167 | 172 | cio, typids, attmap, list( |
168 | 173 | map(self.sql_type_from_oid, typids) |
169 | 174 | ), attnames, |
170 | 175 | quote_ident(typnamespace) + '.' + \ |
171 | 176 | quote_ident(typname), |
172 | 177 | ) |
173 | 178 | elif ae_typid is not None: |
174 | | - # Array Type |
| 179 | + # resolve the element type and I/O pair |
175 | 180 | te = self.resolve( |
176 | 181 | int(typelem), |
177 | 182 | from_resolution_of = list(from_resolution_of) + [typid] |
178 | 183 | ) or (None, None) |
179 | | - typio = array_io_factory( |
| 184 | + typio = self.array_io_factory( |
180 | 185 | te[0] or self.encode, |
181 | 186 | te[1] or self.decode, |
182 | 187 | typelem, |
@@ -206,3 +211,132 @@ def identify(self, **identity_mappings): |
206 | 211 | (oid, io if io.__class__ is tuple else io(oid, self)) |
207 | 212 | for oid, io in zip(oids, ios) |
208 | 213 | ]) |
| 214 | + |
| 215 | + def array_parts(self, array, ArrayType = Array): |
| 216 | + if array.__class__ is not ArrayType: |
| 217 | + # Assume the data is a nested list. |
| 218 | + array = ArrayType(array) |
| 219 | + return ( |
| 220 | + array.elements(), |
| 221 | + array.dimensions, |
| 222 | + array.lowerbounds |
| 223 | + ) |
| 224 | + |
| 225 | + def array_from_parts(self, parts, ArrayType = Array): |
| 226 | + elements, dimensions, lowerbounds = parts |
| 227 | + return ArrayType.from_elements( |
| 228 | + elements, |
| 229 | + lowerbounds = lowerbounds, |
| 230 | + upperbounds = [x + lb - 1 for x, lb in zip(dimensions, lowerbounds)] |
| 231 | + ) |
| 232 | + |
| 233 | + ## |
| 234 | + # array_io_factory - build I/O pair for ARRAYs |
| 235 | + ## |
| 236 | + def array_io_factory( |
| 237 | + self, |
| 238 | + pack_element, unpack_element, |
| 239 | + typoid, # array element id |
| 240 | + hasbin_input, hasbin_output, |
| 241 | + array_pack = lib.array_pack, |
| 242 | + array_unpack = lib.array_unpack, |
| 243 | + ): |
| 244 | + packed_typoid = lib.ulong_pack(typoid) |
| 245 | + if hasbin_input: |
| 246 | + def pack_an_array(data, get_parts = self.array_parts): |
| 247 | + elements, dimensions, lowerbounds = get_parts(data) |
| 248 | + return array_pack(( |
| 249 | + 0, # unused flags |
| 250 | + typoid, dimensions, lowerbounds, |
| 251 | + (x if x is None else pack_element(x) for x in elements), |
| 252 | + )) |
| 253 | + else: |
| 254 | + # signals string formatting |
| 255 | + pack_an_array = None |
| 256 | + |
| 257 | + if hasbin_output: |
| 258 | + def unpack_an_array(data, array_from_parts = self.array_from_parts): |
| 259 | + flags, typoid, dims, lbs, elements = array_unpack(data) |
| 260 | + return array_from_parts((map(unpack_element, elements), dims, lbs)) |
| 261 | + else: |
| 262 | + # signals string formatting |
| 263 | + unpack_an_array = None |
| 264 | + |
| 265 | + return (pack_an_array, unpack_an_array, Array) |
| 266 | + |
| 267 | + ## |
| 268 | + # record_io_factory - Build an I/O pair for RECORDs |
| 269 | + ## |
| 270 | + def record_io_factory(self, |
| 271 | + column_io : "sequence (pack,unpack) tuples corresponding to the columns", |
| 272 | + typids : "sequence of type Oids; index must correspond to the composite's", |
| 273 | + attmap : "mapping of column name to index number", |
| 274 | + typnames : "sequence of sql type names in order", |
| 275 | + attnames : "sequence of attribute names in order", |
| 276 | + composite_name : "the name of the composite type", |
| 277 | + get0 = itemgetter(0), |
| 278 | + get1 = itemgetter(1), |
| 279 | + ): |
| 280 | + fpack = tuple(map(get0, column_io)) |
| 281 | + funpack = tuple(map(get1, column_io)) |
| 282 | + |
| 283 | + def raise_pack_tuple_error(procs, tup, itemnum): |
| 284 | + data = repr(tup[itemnum]) |
| 285 | + if len(data) > 80: |
| 286 | + # Be sure not to fill screen with noise. |
| 287 | + data = data[:75] + ' ...' |
| 288 | + raise pg_exc.ColumnError( |
| 289 | + "failed to pack attribute %d, %s::%s, of composite %s for transfer" %( |
| 290 | + itemnum, |
| 291 | + attnames[itemnum], |
| 292 | + typnames[itemnum], |
| 293 | + composite_name, |
| 294 | + ), |
| 295 | + details = { |
| 296 | + 'context': data, |
| 297 | + 'position' : str(itemnum) |
| 298 | + }, |
| 299 | + ) |
| 300 | + |
| 301 | + def raise_unpack_tuple_error(procs, tup, itemnum): |
| 302 | + data = repr(tup[itemnum]) |
| 303 | + if len(data) > 80: |
| 304 | + # Be sure not to fill screen with noise. |
| 305 | + data = data[:75] + ' ...' |
| 306 | + raise pg_exc.ColumnError( |
| 307 | + "failed to unpack attribute %d, %s::%s, of composite %s from wire data" %( |
| 308 | + itemnum, |
| 309 | + attnames[itemnum], |
| 310 | + typnames[itemnum], |
| 311 | + composite_name, |
| 312 | + ), |
| 313 | + details = { |
| 314 | + 'context': data, |
| 315 | + 'position' : str(itemnum), |
| 316 | + }, |
| 317 | + ) |
| 318 | + |
| 319 | + def unpack_a_record(data, |
| 320 | + unpack = lib.record_unpack, |
| 321 | + process_tuple = process_tuple, |
| 322 | + row_from_seq = Row.from_sequence |
| 323 | + ): |
| 324 | + data = tuple([x[1] for x in unpack(data)]) |
| 325 | + return row_from_seq( |
| 326 | + attmap, process_tuple(funpack, data, raise_unpack_tuple_error), |
| 327 | + ) |
| 328 | + |
| 329 | + sorted_atts = sorted(attmap.items(), key = get1) |
| 330 | + def pack_a_record(data, |
| 331 | + pack = lib.record_pack, |
| 332 | + process_tuple = process_tuple, |
| 333 | + ): |
| 334 | + if isinstance(data, dict): |
| 335 | + data = [data.get(k) for k,_ in sorted_atts] |
| 336 | + return pack( |
| 337 | + tuple(zip( |
| 338 | + typids, |
| 339 | + process_tuple(fpack, tuple(data), raise_pack_tuple_error) |
| 340 | + )) |
| 341 | + ) |
| 342 | + return (pack_a_record, unpack_a_record, Row) |
0 commit comments