Apache Iceberg version
0.6.0 (latest release)
Please describe the bug 🐞
I'm facing a race condition when doing table.scan on my code. For some strange reason, the code exits before getting the final table.
This is my code:
from pyiceberg.table import StaticTable
from pyiceberg.expressions import AlwaysTrue
table = StaticTable.from_metadata(
metadata_location='s3a://my_s3_bucket/iceberg/taxi_fhvhv_7eba066f-7498-4a8a-b932-bf2dbcc938fd/meta...',
)
res = table.scan(
selected_fields=("*",),
row_filter=AlwaysTrue(),
limit=10_000,
)
print('A >>', res)
a_res = res.to_arrow()
print('B >>', a_res)
print('C >>', a_res.num_rows)
Which returns:
root@45daaeef1ce1:/# python test.py
A >> <pyiceberg.table.DataScan object at 0x7fece38ec6d0>
B >> pyarrow.Table
hvfhs_license_num: string
dispatching_base_num: string
originating_base_num: string
request_datetime: timestamp[us, tz=UTC]
on_scene_datetime: timestamp[us, tz=UTC]
pickup_datetime: timestamp[us, tz=UTC]
dropoff_datetime: timestamp[us, tz=UTC]
PULocationID: int64
DOLocationID: int64
trip_miles: double
trip_time: int64
base_passenger_fare: double
tolls: double
bcf: double
sales_tax: double
congestion_surcharge: double
airport_fee: int32
tips: double
driver_pay: double
shared_request_flag: string
shared_match_flag: string
access_a_ride_flag: string
wav_request_flag: string
wav_match_flag: string
----
hvfhs_license_num: []
dispatching_base_num: []
originating_base_num: []
request_datetime: []
on_scene_datetime: []
pickup_datetime: []
dropoff_datetime: []
PULocationID: []
DOLocationID: []
trip_miles: []
...
C >> 0
I think the problem happens here:
|
row_counts.append(len(arrow_table)) |
|
|
|
return to_requested_schema(projected_schema, file_project_schema, arrow_table) |
The code modifies the row_counts array before returning the table, but if multiple tasks are running concurrently, the next task that starts executing the _task_to_table function will return None due to
|
def _task_to_table( |
|
fs: FileSystem, |
|
task: FileScanTask, |
|
bound_row_filter: BooleanExpression, |
|
projected_schema: Schema, |
|
projected_field_ids: Set[int], |
|
positional_deletes: Optional[List[ChunkedArray]], |
|
case_sensitive: bool, |
|
row_counts: List[int], |
|
limit: Optional[int] = None, |
|
name_mapping: Optional[NameMapping] = None, |
|
) -> Optional[pa.Table]: |
|
if limit and sum(row_counts) >= limit: |
|
return None |
I think it happens because the original task with the data is still processing the content of the table here:
|
return to_requested_schema(projected_schema, file_project_schema, arrow_table) |
So now, I suppose, what happens is that the task that returned None is processed before the real task with the table content, indeed the completed_futures list now contains only a task with None, witch course the code to return an empty table:
https://github.com/apache/iceberg-python/blob/6989b92c2d449beb9fe4817c64f619ea5bfc81dc/pyiceberg/io/pyarrow.py#L1111C1-L1116C18
This is the content of the completed_futures & tables variables on the project_table fn:
>>>>> completed_futures SortedKeyList([<Future at 0x7fa7e4b97fd0 state=finished returned NoneType>], key=<function project_table.<locals>.<lambda> at 0x7fa90316c4a0>)
>>>>> tables []
And by modifying the loop with:
# for consistent ordering, we need to maintain future order
futures_index = {f: i for i, f in enumerate(futures)}
completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
for future in concurrent.futures.as_completed(futures):
completed_futures.add(future)
# stop early if limit is satisfied
if limit is not None and sum(row_counts) >= limit:
print('>>>>> ', limit, sum(row_counts), future.result())
break
I got:
Apache Iceberg version
0.6.0 (latest release)
Please describe the bug 🐞
I'm facing a race condition when doing
table.scanon my code. For some strange reason, the code exits before getting the final table.This is my code:
Which returns:
I think the problem happens here:
iceberg-python/pyiceberg/io/pyarrow.py
Lines 1021 to 1023 in 6989b92
The code modifies the
row_countsarray before returning the table, but if multiple tasks are running concurrently, the next task that starts executing the_task_to_tablefunction will returnNonedue toiceberg-python/pyiceberg/io/pyarrow.py
Lines 941 to 954 in 6989b92
I think it happens because the original task with the data is still processing the content of the table here:
iceberg-python/pyiceberg/io/pyarrow.py
Line 1023 in 6989b92
So now, I suppose, what happens is that the task that returned
Noneis processed before the real task with the table content, indeed thecompleted_futureslist now contains only a task withNone, witch course the code to return an empty table:https://github.com/apache/iceberg-python/blob/6989b92c2d449beb9fe4817c64f619ea5bfc81dc/pyiceberg/io/pyarrow.py#L1111C1-L1116C18
This is the content of the
completed_futures&tablesvariables on theproject_tablefn:And by modifying the loop with:
I got: