Open
Description
Describe the bug
As noted by @Kontinuation, the RangePartitioning implementation that just got merged has a bug, most obvious when trying to RangePartition pre-sorted data:
This implementation of RangePartitioning may be incorrect. RangePartitioning should partition the input DataFrame into partitions with consecutive and non-overlapping ranges, this requires scanning the entire DataFrame to obtain the ranges of each partition before performing the actual shuffle writing.
Here is the PySpark code to illustrate the difference between the behavior of Comet and Vanilla Spark.
spark.range(0, 100000).write.format("parquet").mode("overwrite").save("range-partitioning") df = spark.read.parquet("range-partitioning") df_range_partitioned = df.repartitionByRange(10, "id") df_range_partitioned.explain() # Show the min and max of each range def get_partition_bounds(idx, iterator): min = None max = None for row in iterator: if min is None or row.id < min: min = row.id if max is None or row.id > max: max = row.id yield idx, min, max partition_bounds = df_range_partitioned.rdd.mapPartitionsWithIndex(get_partition_bounds).collect() # Print the results for partition_id, min_id, max_id in sorted(partition_bounds): print(f"Partition {partition_id}: min_id={min_id}, max_id={max_id}")Comet:
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- CometExchange rangepartitioning(id#17L ASC NULLS FIRST, 10), REPARTITION_BY_NUM, CometNativeShuffle, [plan_id=173] +- CometScan parquet [id#17L] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint> Partition 0: min_id=0, max_id=90799 Partition 1: min_id=753, max_id=91680 Partition 2: min_id=1527, max_id=92520 Partition 3: min_id=2399, max_id=93284 Partition 4: min_id=3274, max_id=94123 Partition 5: min_id=4053, max_id=94844 Partition 6: min_id=4851, max_id=95671 Partition 7: min_id=5738, max_id=96522 Partition 8: min_id=6571, max_id=97335 Partition 9: min_id=7408, max_id=99999
Spark:
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Exchange rangepartitioning(id#20L ASC NULLS FIRST, 10), REPARTITION_BY_NUM, [plan_id=197] +- FileScan parquet [id#20L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint> Partition 0: min_id=0, max_id=9974 Partition 1: min_id=9975, max_id=19981 Partition 2: min_id=19982, max_id=29993 Partition 3: min_id=29994, max_id=39997 Partition 4: min_id=39998, max_id=49959 Partition 5: min_id=49960, max_id=59995 Partition 6: min_id=59996, max_id=69898 Partition 7: min_id=69899, max_id=79970 Partition 8: min_id=79971, max_id=89976 Partition 9: min_id=89977, max_id=99999