apache/arrow-ballista

Partitioning reasoning in DataFusion and Ballista

mingmwang opened this issue · 5 comments

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
(This section helps Arrow developers understand the context and why for this feature, in addition to the what)

In current Ballista code base, when it generates the distributed plan, it will remove any non-hash repartition from the distributed plan.
And In DataFusion, when it does the physical planning, it added RepartitionExec node blindly when it sees the hash join or aggregation without considering the children's output partitioning.

            match repart.output_partitioning() {
                Partitioning::Hash(_, _) => {
                    let shuffle_writer = create_shuffle_writer(
                        job_id,
                        self.next_stage_id(),
                        children[0].clone(),
                        Some(repart.partitioning().to_owned()),
                    )?;
                    let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
                        shuffle_writer.stage_id(),
                        shuffle_writer.schema(),
                        shuffle_writer.output_partitioning().partition_count(),
                        shuffle_writer
                            .shuffle_output_partitioning()
                            .map(|p| p.partition_count())
                            .unwrap_or_else(|| {
                                shuffle_writer.output_partitioning().partition_count()
                            }),
                    ));
                    stages.push(shuffle_writer);
                    Ok((unresolved_shuffle, stages))
                }
                _ => {
                    // remove any non-hash repartition from the distributed plan
                    Ok((children[0].clone(), stages))
                }
            }

When I look into Presto's source code, Presto's distributed plan can includes both remote exchanges and local exchanges.
Local exchange can benefit the inner Stage parallelism. Presto can add the remote exchanges and local exchanges only when necessary. I think it is time to introduce more advanced methods to reason the partitioning in a distributed plan, something more powerful than Spark SQL's EnsureRequirements rule

Incorporating Partitioning and Parallel Plans into the SCOPE Optimizer
http://www.cs.albany.edu/~jhh/courses/readings/zhou10.pdf

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

👍

I would add a couple things to this discussion:

Currently partitioning is a bit restrictive in that either the client has to specify the partitioning or we fall back to a single default partitioning. This has several limitations

  1. Partitioning should be sensitive to cluster capacity. Adding more partitions is probably counter-productive if you don't have enough executor capacity to execute the concurrently. All you are doing is adding scheduling overhead and shuffle exchange cost.
  2. The client probably has no information about the cluster capacity (which may itself be dynamic based on auto-scaling rules) nor does it necessarily have any information about how much data is scanned for a given query. So the client would lack any relevant information for deciding what the appropriate number of partitions are.
  3. Allowing the client to specify the number of partitions can be problematic for multi-tenancy since there is no way to prevent a greedy client from consuming too many cluster resources.

There may not be an optimal general solution for all of this so I think it might be useful to make this functionality pluggable. Something I've been tinkering with is to put the planning behind a trait which can allow for custom implementations. Something like:

trait BallistaPlanner {
    fn plan_query_stages<'a>(
        &'a mut self,
        job_id: &'a str,
        execution_plan: Arc<dyn ExecutionPlan>,
    ) -> Result<Vec<Arc<ShuffleWriterExec>>>;
}

trait BallistaPlannerFactory {
    type Output: BallistaPlanner;

    fn new_planner<T: 'static + AsLogicalPlan,U: 'static + AsExecutionPlan>(state: &Arc<SchedulerState<T,U>>) -> Output;
}

The existing BallistaPlanner could be the default implementation of course. But his could give us a nice mechanism for experimenting with different implementations as well.

I'll add the addition of range partitioning as well to this list - currently normal sorts are not running in parallel / distributed fashion.

I'll add the addition of range partitioning as well to this list - currently normal sorts are not running in parallel / distributed fashion.

Yes, currently there are couple of gaps in the physical plan phase. The ExecutionPlan trait need to be enhanced also.
Considering below SQL, ideally it should only need 1 or 2 remote shuffle exchanges.
Today SparkSQL has 4 shuffle exchanges, not sure how many remote shuffles PrestoSQL/Trino has.

select cntry_id, count(*), count(*) over (partition by cntry_id) as w,
count(*) over (partition by curncy_id) as c,
count(cntry_desc) over (partition by curncy_id,cntry_desc) as d,
count(cntry_desc) over (partition by cntry_id, curncy_id,cntry_desc) as e
from dw_countries group by cre_date, cntry_id, curncy_id,cntry_desc having cntry_id = 3;

I'm working on an experimenting rule for this now and also try to verify a new optimization process, if it is proved. it will be much easy to write new optimization rules. And the same methods can be applied to logical optimization rules as well.

Another recent paper, see the EXCHANGE PLACEMENT sections.

https://vldb.org/pvldb/vol15/p936-rajan.pdf

The issue is partially addressed by the new Enforcement rule in DataFusion, so just close the issue.