Skip to content

RangePartitioning does not yield correct results #1906

Open
@mbutrovich

Description

@mbutrovich

Describe the bug

As noted by @Kontinuation, the RangePartitioning implementation that just got merged has a bug, most obvious when trying to RangePartition pre-sorted data:

This implementation of RangePartitioning may be incorrect. RangePartitioning should partition the input DataFrame into partitions with consecutive and non-overlapping ranges, this requires scanning the entire DataFrame to obtain the ranges of each partition before performing the actual shuffle writing.

Here is the PySpark code to illustrate the difference between the behavior of Comet and Vanilla Spark.

spark.range(0, 100000).write.format("parquet").mode("overwrite").save("range-partitioning")

df = spark.read.parquet("range-partitioning")
df_range_partitioned = df.repartitionByRange(10, "id")

df_range_partitioned.explain()

# Show the min and max of each range
def get_partition_bounds(idx, iterator):
    min = None
    max = None
    for row in iterator:
        if min is None or row.id < min:
            min = row.id
        if max is None or row.id > max:
            max = row.id
    yield idx, min, max

partition_bounds = df_range_partitioned.rdd.mapPartitionsWithIndex(get_partition_bounds).collect()

# Print the results
for partition_id, min_id, max_id in sorted(partition_bounds):
    print(f"Partition {partition_id}: min_id={min_id}, max_id={max_id}")

Comet:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- CometExchange rangepartitioning(id#17L ASC NULLS FIRST, 10), REPARTITION_BY_NUM, CometNativeShuffle, [plan_id=173]
   +- CometScan parquet [id#17L] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

Partition 0: min_id=0, max_id=90799
Partition 1: min_id=753, max_id=91680
Partition 2: min_id=1527, max_id=92520
Partition 3: min_id=2399, max_id=93284
Partition 4: min_id=3274, max_id=94123
Partition 5: min_id=4053, max_id=94844
Partition 6: min_id=4851, max_id=95671
Partition 7: min_id=5738, max_id=96522
Partition 8: min_id=6571, max_id=97335
Partition 9: min_id=7408, max_id=99999

Spark:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange rangepartitioning(id#20L ASC NULLS FIRST, 10), REPARTITION_BY_NUM, [plan_id=197]
   +- FileScan parquet [id#20L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

Partition 0: min_id=0, max_id=9974
Partition 1: min_id=9975, max_id=19981
Partition 2: min_id=19982, max_id=29993
Partition 3: min_id=29994, max_id=39997
Partition 4: min_id=39998, max_id=49959
Partition 5: min_id=49960, max_id=59995
Partition 6: min_id=59996, max_id=69898
Partition 7: min_id=69899, max_id=79970
Partition 8: min_id=79971, max_id=89976
Partition 9: min_id=89977, max_id=99999

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions