-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathtable_format.py
More file actions
564 lines (451 loc) · 19.7 KB
/
table_format.py
File metadata and controls
564 lines (451 loc) · 19.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
# Copyright 2020 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from abc import ABC, abstractmethod
from enum import Enum
from typing import TYPE_CHECKING, Dict, Optional
if TYPE_CHECKING:
from feast.protos.feast.core.DataFormat_pb2 import TableFormat as TableFormatProto
class TableFormatType(Enum):
"""Enum for supported table formats"""
DELTA = "delta"
ICEBERG = "iceberg"
HUDI = "hudi"
class TableFormat(ABC):
"""
Abstract base class for table formats.
Table formats encapsulate metadata and configuration specific to different
table storage formats like Iceberg, Delta Lake, Hudi, etc. They provide
a unified interface for configuring table-specific properties that are
used when reading from or writing to these advanced table formats.
This base class defines the contract that all table format implementations
must follow, including serialization/deserialization capabilities and
property management.
Attributes:
format_type (TableFormatType): The type of table format (iceberg, delta, hudi).
properties (Dict[str, str]): Dictionary of format-specific properties.
Examples:
Table formats are typically used with data sources to specify
advanced table metadata and reading options:
>>> from feast.table_format import IcebergFormat
>>> iceberg_format = IcebergFormat(
... catalog="my_catalog",
... namespace="my_namespace"
... )
>>> iceberg_format.set_property("snapshot-id", "123456789")
"""
def __init__(
self, format_type: TableFormatType, properties: Optional[Dict[str, str]] = None
):
self.format_type = format_type
self.properties = properties or {}
@abstractmethod
def to_dict(self) -> Dict:
"""Convert table format to dictionary representation"""
pass
@classmethod
@abstractmethod
def from_dict(cls, data: Dict) -> "TableFormat":
"""Create table format from dictionary representation"""
pass
def get_property(self, key: str, default: Optional[str] = None) -> Optional[str]:
"""Get a table format property"""
return self.properties.get(key, default)
def set_property(self, key: str, value: str) -> None:
"""Set a table format property"""
self.properties[key] = value
class IcebergFormat(TableFormat):
"""
Apache Iceberg table format configuration.
Iceberg is an open table format for huge analytic datasets. This class provides
configuration for Iceberg-specific properties including catalog configuration,
namespace settings, and table-level properties for reading and writing Iceberg tables.
Args:
catalog (Optional[str]): Name of the Iceberg catalog to use. The catalog manages
table metadata and provides access to tables.
namespace (Optional[str]): Namespace (schema/database) within the catalog where
the table is located.
properties (Optional[Dict[str, str]]): Properties for configuring Iceberg
catalog and table operations (e.g., warehouse location, snapshot-id,
as-of-timestamp, file format, compression, partitioning).
Attributes:
catalog (str): The Iceberg catalog name.
namespace (str): The namespace within the catalog.
properties (Dict[str, str]): Iceberg configuration properties.
Examples:
Basic Iceberg configuration:
>>> iceberg_format = IcebergFormat(
... catalog="my_catalog",
... namespace="my_database"
... )
Advanced configuration with properties:
>>> iceberg_format = IcebergFormat(
... catalog="spark_catalog",
... namespace="lakehouse",
... properties={
... "warehouse": "s3://my-bucket/warehouse",
... "catalog-impl": "org.apache.iceberg.spark.SparkCatalog",
... "format-version": "2",
... "write.parquet.compression-codec": "snappy"
... }
... )
Reading from a specific snapshot:
>>> iceberg_format = IcebergFormat(catalog="my_catalog", namespace="db")
>>> iceberg_format.set_property("snapshot-id", "123456789")
Time travel queries:
>>> iceberg_format.set_property("as-of-timestamp", "1648684800000")
"""
def __init__(
self,
catalog: Optional[str] = None,
namespace: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
):
super().__init__(TableFormatType.ICEBERG, properties)
self.catalog = catalog
self.namespace = namespace
# Add catalog and namespace to properties if provided
if catalog:
self.properties["iceberg.catalog"] = catalog
if namespace:
self.properties["iceberg.namespace"] = namespace
def to_dict(self) -> Dict:
return {
"format_type": self.format_type.value,
"catalog": self.catalog,
"namespace": self.namespace,
"properties": self.properties,
}
@classmethod
def from_dict(cls, data: Dict) -> "IcebergFormat":
return cls(
catalog=data.get("catalog"),
namespace=data.get("namespace"),
properties=data.get("properties", {}),
)
def to_proto(self) -> "TableFormatProto":
"""Convert to protobuf TableFormat message"""
from feast.protos.feast.core.DataFormat_pb2 import (
TableFormat as TableFormatProto,
)
iceberg_proto = TableFormatProto.IcebergFormat(
catalog=self.catalog or "",
namespace=self.namespace or "",
properties=self.properties,
)
return TableFormatProto(iceberg_format=iceberg_proto)
@classmethod
def from_proto(cls, proto: "TableFormatProto") -> "IcebergFormat":
"""Create from protobuf TableFormat message"""
iceberg_proto = proto.iceberg_format
return cls(
catalog=iceberg_proto.catalog if iceberg_proto.catalog else None,
namespace=iceberg_proto.namespace if iceberg_proto.namespace else None,
properties=dict(iceberg_proto.properties),
)
class DeltaFormat(TableFormat):
"""
Delta Lake table format configuration.
Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark
and big data workloads. This class provides configuration for Delta-specific properties
including table properties, checkpoint locations, and versioning options.
Args:
checkpoint_location (Optional[str]): Location for storing Delta transaction logs
and checkpoints. Required for streaming operations.
properties (Optional[Dict[str, str]]): Properties for configuring Delta table
behavior (e.g., auto-optimize, vacuum settings, data skipping).
Attributes:
checkpoint_location (str): Path to checkpoint storage location.
properties (Dict[str, str]): Delta table configuration properties.
Examples:
Basic Delta configuration:
>>> delta_format = DeltaFormat()
Configuration with table properties:
>>> delta_format = DeltaFormat(
... properties={
... "delta.autoOptimize.optimizeWrite": "true",
... "delta.autoOptimize.autoCompact": "true",
... "delta.tuneFileSizesForRewrites": "true"
... }
... )
Streaming configuration with checkpoint:
>>> delta_format = DeltaFormat(
... checkpoint_location="s3://my-bucket/checkpoints/my_table"
... )
Time travel - reading specific version:
>>> delta_format = DeltaFormat()
>>> delta_format.set_property("versionAsOf", "5")
Time travel - reading at specific timestamp:
>>> delta_format.set_property("timestampAsOf", "2023-01-01 00:00:00")
"""
def __init__(
self,
checkpoint_location: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
):
super().__init__(TableFormatType.DELTA, properties)
self.checkpoint_location = checkpoint_location
# Add checkpoint location to properties if provided
if checkpoint_location:
self.properties["delta.checkpointLocation"] = checkpoint_location
def to_dict(self) -> Dict:
return {
"format_type": self.format_type.value,
"checkpoint_location": self.checkpoint_location,
"properties": self.properties,
}
@classmethod
def from_dict(cls, data: Dict) -> "DeltaFormat":
return cls(
checkpoint_location=data.get("checkpoint_location"),
properties=data.get("properties", {}),
)
def to_proto(self) -> "TableFormatProto":
"""Convert to protobuf TableFormat message"""
from feast.protos.feast.core.DataFormat_pb2 import (
TableFormat as TableFormatProto,
)
delta_proto = TableFormatProto.DeltaFormat(
checkpoint_location=self.checkpoint_location or "",
properties=self.properties,
)
return TableFormatProto(delta_format=delta_proto)
@classmethod
def from_proto(cls, proto: "TableFormatProto") -> "DeltaFormat":
"""Create from protobuf TableFormat message"""
delta_proto = proto.delta_format
return cls(
checkpoint_location=delta_proto.checkpoint_location
if delta_proto.checkpoint_location
else None,
properties=dict(delta_proto.properties),
)
class HudiFormat(TableFormat):
"""
Apache Hudi table format configuration.
Apache Hudi is a data management framework used to simplify incremental data processing
and data pipeline development. This class provides configuration for Hudi-specific
properties including table type, record keys, and write operations.
Args:
table_type (Optional[str]): Type of Hudi table. Options are:
- "COPY_ON_WRITE": Stores data in columnar format (Parquet) and rewrites entire files
- "MERGE_ON_READ": Stores data using combination of columnar and row-based formats
record_key (Optional[str]): Field(s) that uniquely identify a record. Can be a single
field or comma-separated list for composite keys.
precombine_field (Optional[str]): Field used to determine the latest version of a record
when multiple updates exist (usually a timestamp or version field).
properties (Optional[Dict[str, str]]): Additional Hudi table properties for
configuring compaction, indexing, and other Hudi features.
Attributes:
table_type (str): The Hudi table type (COPY_ON_WRITE or MERGE_ON_READ).
record_key (str): The record key field(s).
precombine_field (str): The field used for record deduplication.
properties (Dict[str, str]): Additional Hudi configuration properties.
Examples:
Basic Hudi configuration:
>>> hudi_format = HudiFormat(
... table_type="COPY_ON_WRITE",
... record_key="user_id",
... precombine_field="timestamp"
... )
Configuration with composite record key:
>>> hudi_format = HudiFormat(
... table_type="MERGE_ON_READ",
... record_key="user_id,event_type",
... precombine_field="event_timestamp"
... )
Advanced configuration with table properties:
>>> hudi_format = HudiFormat(
... table_type="COPY_ON_WRITE",
... record_key="id",
... precombine_field="updated_at",
... properties={
... "hoodie.compaction.strategy": "org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy",
... "hoodie.index.type": "BLOOM",
... "hoodie.bloom.index.parallelism": "100"
... }
... )
Reading incremental data:
>>> hudi_format = HudiFormat(table_type="COPY_ON_WRITE")
>>> hudi_format.set_property("hoodie.datasource.query.type", "incremental")
>>> hudi_format.set_property("hoodie.datasource.read.begin.instanttime", "20230101000000")
"""
def __init__(
self,
table_type: Optional[str] = None, # COPY_ON_WRITE or MERGE_ON_READ
record_key: Optional[str] = None,
precombine_field: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
):
super().__init__(TableFormatType.HUDI, properties)
self.table_type = table_type
self.record_key = record_key
self.precombine_field = precombine_field
# Add Hudi-specific properties if provided
if table_type:
self.properties["hoodie.datasource.write.table.type"] = table_type
if record_key:
self.properties["hoodie.datasource.write.recordkey.field"] = record_key
if precombine_field:
self.properties["hoodie.datasource.write.precombine.field"] = (
precombine_field
)
def to_dict(self) -> Dict:
return {
"format_type": self.format_type.value,
"table_type": self.table_type,
"record_key": self.record_key,
"precombine_field": self.precombine_field,
"properties": self.properties,
}
@classmethod
def from_dict(cls, data: Dict) -> "HudiFormat":
return cls(
table_type=data.get("table_type"),
record_key=data.get("record_key"),
precombine_field=data.get("precombine_field"),
properties=data.get("properties", {}),
)
def to_proto(self) -> "TableFormatProto":
"""Convert to protobuf TableFormat message"""
from feast.protos.feast.core.DataFormat_pb2 import (
TableFormat as TableFormatProto,
)
hudi_proto = TableFormatProto.HudiFormat(
table_type=self.table_type or "",
record_key=self.record_key or "",
precombine_field=self.precombine_field or "",
properties=self.properties,
)
return TableFormatProto(hudi_format=hudi_proto)
@classmethod
def from_proto(cls, proto: "TableFormatProto") -> "HudiFormat":
"""Create from protobuf TableFormat message"""
hudi_proto = proto.hudi_format
return cls(
table_type=hudi_proto.table_type if hudi_proto.table_type else None,
record_key=hudi_proto.record_key if hudi_proto.record_key else None,
precombine_field=hudi_proto.precombine_field
if hudi_proto.precombine_field
else None,
properties=dict(hudi_proto.properties),
)
def create_table_format(format_type: TableFormatType, **kwargs) -> TableFormat:
"""
Factory function to create appropriate TableFormat instance based on type.
This is a convenience function that creates the correct TableFormat subclass
based on the provided format type, passing through any additional keyword arguments
to the constructor.
Args:
format_type (TableFormatType): The type of table format to create.
**kwargs: Additional keyword arguments passed to the format constructor.
Returns:
TableFormat: An instance of the appropriate TableFormat subclass.
Raises:
ValueError: If an unsupported format_type is provided.
Examples:
Create an Iceberg format:
>>> iceberg_format = create_table_format(
... TableFormatType.ICEBERG,
... catalog="my_catalog",
... namespace="my_db"
... )
Create a Delta format:
>>> delta_format = create_table_format(
... TableFormatType.DELTA,
... checkpoint_location="s3://bucket/checkpoints"
... )
"""
if format_type == TableFormatType.ICEBERG:
return IcebergFormat(**kwargs)
elif format_type == TableFormatType.DELTA:
return DeltaFormat(**kwargs)
elif format_type == TableFormatType.HUDI:
return HudiFormat(**kwargs)
else:
raise ValueError(f"Unknown table format type: {format_type}")
def table_format_from_dict(data: Dict) -> TableFormat:
"""
Create TableFormat instance from dictionary representation.
This function deserializes a dictionary (typically from JSON or protobuf)
back into the appropriate TableFormat instance. The dictionary must contain
a 'format_type' field that indicates which format class to instantiate.
Args:
data (Dict): Dictionary containing table format configuration. Must include
'format_type' field with value 'iceberg', 'delta', or 'hudi'.
Returns:
TableFormat: An instance of the appropriate TableFormat subclass.
Raises:
ValueError: If format_type is not recognized.
KeyError: If format_type field is missing from data.
Examples:
Deserialize an Iceberg format:
>>> data = {
... "format_type": "iceberg",
... "catalog": "my_catalog",
... "namespace": "my_db"
... }
>>> iceberg_format = table_format_from_dict(data)
"""
if "format_type" not in data:
raise KeyError("Missing 'format_type' field in data")
format_type = data["format_type"]
if format_type == TableFormatType.ICEBERG.value:
return IcebergFormat.from_dict(data)
elif format_type == TableFormatType.DELTA.value:
return DeltaFormat.from_dict(data)
elif format_type == TableFormatType.HUDI.value:
return HudiFormat.from_dict(data)
else:
raise ValueError(f"Unknown table format type: {format_type}")
def table_format_from_json(json_str: str) -> TableFormat:
"""
Create TableFormat instance from JSON string.
This is a convenience function that parses a JSON string and creates
the appropriate TableFormat instance. Useful for loading table format
configurations from files or network requests.
Args:
json_str (str): JSON string containing table format configuration.
Returns:
TableFormat: An instance of the appropriate TableFormat subclass.
Raises:
json.JSONDecodeError: If the JSON string is invalid.
ValueError: If format_type is not recognized.
KeyError: If format_type field is missing.
Examples:
Load from JSON string:
>>> json_config = '{"format_type": "delta", "checkpoint_location": "s3://bucket/checkpoints"}'
>>> delta_format = table_format_from_json(json_config)
"""
data = json.loads(json_str)
return table_format_from_dict(data)
def table_format_from_proto(proto: "TableFormatProto") -> TableFormat:
"""
Create TableFormat instance from protobuf TableFormat message.
Args:
proto: TableFormat protobuf message
Returns:
TableFormat: An instance of the appropriate TableFormat subclass.
Raises:
ValueError: If the proto doesn't contain a recognized format.
"""
which_format = proto.WhichOneof("format")
if which_format == "iceberg_format":
return IcebergFormat.from_proto(proto)
elif which_format == "delta_format":
return DeltaFormat.from_proto(proto)
elif which_format == "hudi_format":
return HudiFormat.from_proto(proto)
else:
raise ValueError(f"Unknown table format in proto: {which_format}")