Skip to content

Race condition on Table.scan with limit #542

@bigluck

Description

@bigluck

Apache Iceberg version

0.6.0 (latest release)

Please describe the bug 🐞

I'm facing a race condition when doing table.scan on my code. For some strange reason, the code exits before getting the final table.

This is my code:

from pyiceberg.table import StaticTable
from pyiceberg.expressions import AlwaysTrue


table = StaticTable.from_metadata(
    metadata_location='s3a://my_s3_bucket/iceberg/taxi_fhvhv_7eba066f-7498-4a8a-b932-bf2dbcc938fd/meta...',
)
res = table.scan(
    selected_fields=("*",),
    row_filter=AlwaysTrue(),
    limit=10_000,
)

print('A >>', res)

a_res = res.to_arrow()
print('B >>', a_res)
print('C >>', a_res.num_rows)

Which returns:

root@45daaeef1ce1:/# python test.py
A >> <pyiceberg.table.DataScan object at 0x7fece38ec6d0>
B >> pyarrow.Table
hvfhs_license_num: string
dispatching_base_num: string
originating_base_num: string
request_datetime: timestamp[us, tz=UTC]
on_scene_datetime: timestamp[us, tz=UTC]
pickup_datetime: timestamp[us, tz=UTC]
dropoff_datetime: timestamp[us, tz=UTC]
PULocationID: int64
DOLocationID: int64
trip_miles: double
trip_time: int64
base_passenger_fare: double
tolls: double
bcf: double
sales_tax: double
congestion_surcharge: double
airport_fee: int32
tips: double
driver_pay: double
shared_request_flag: string
shared_match_flag: string
access_a_ride_flag: string
wav_request_flag: string
wav_match_flag: string
----
hvfhs_license_num: []
dispatching_base_num: []
originating_base_num: []
request_datetime: []
on_scene_datetime: []
pickup_datetime: []
dropoff_datetime: []
PULocationID: []
DOLocationID: []
trip_miles: []
...
C >> 0

I think the problem happens here:

row_counts.append(len(arrow_table))
return to_requested_schema(projected_schema, file_project_schema, arrow_table)

The code modifies the row_counts array before returning the table, but if multiple tasks are running concurrently, the next task that starts executing the _task_to_table function will return None due to

def _task_to_table(
fs: FileSystem,
task: FileScanTask,
bound_row_filter: BooleanExpression,
projected_schema: Schema,
projected_field_ids: Set[int],
positional_deletes: Optional[List[ChunkedArray]],
case_sensitive: bool,
row_counts: List[int],
limit: Optional[int] = None,
name_mapping: Optional[NameMapping] = None,
) -> Optional[pa.Table]:
if limit and sum(row_counts) >= limit:
return None

I think it happens because the original task with the data is still processing the content of the table here:

return to_requested_schema(projected_schema, file_project_schema, arrow_table)

So now, I suppose, what happens is that the task that returned None is processed before the real task with the table content, indeed the completed_futures list now contains only a task with None, witch course the code to return an empty table:

https://github.com/apache/iceberg-python/blob/6989b92c2d449beb9fe4817c64f619ea5bfc81dc/pyiceberg/io/pyarrow.py#L1111C1-L1116C18

This is the content of the completed_futures & tables variables on the project_table fn:

>>>>> completed_futures SortedKeyList([<Future at 0x7fa7e4b97fd0 state=finished returned NoneType>], key=<function project_table.<locals>.<lambda> at 0x7fa90316c4a0>)
>>>>> tables []

And by modifying the loop with:

    # for consistent ordering, we need to maintain future order
    futures_index = {f: i for i, f in enumerate(futures)}
    completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
    for future in concurrent.futures.as_completed(futures):
        completed_futures.add(future)

        # stop early if limit is satisfied
        if limit is not None and sum(row_counts) >= limit:
            print('>>>>> ', limit, sum(row_counts), future.result())
            break

I got:

>>>>>  10000 10000 None

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions