apache/datafusion-ballista

[Discuss] Ballista Future Direction

thinkharderdev opened this issue · 41 comments

No that Ballista is it’s own top-level project, I want to extend a previous discussion around what exactly Ballista is (previous discussion apache/datafusion#1916)

I believe the consensus from that discussion was that Ballista should be a standalone system but in practice I think we have adopted somewhat of a “both and” approach in the sense that we have added several extension point to Ballista (while also providing default implementations that allow you to run Ballista out-of-the-box). I think this is a reasonable direction and agree that it is important that Ballista be something you can use “as is”.

That raises the question of what are the use cases for Ballista that we would like to optimize for?

To take a concrete example, I think the current architecture is optimized for batch processing using a straightforward map-reduce implementation. This has many advantages:

  • Scheduling is relatively straightforward.
  • Cluster utilization is efficient since the unit of schedulable work is a single task, so whenever a task slot frees up you can just schedule a pending task on it.
  • It is resilient to task failures since all intermediate results are serialized to disk so recovering from a spurious task failure is just a matter of rescheduling the task.

However, for non-batch oriented work it has some serious drawbacks

  • Results can only be returned once the entire query finishes. We can’t start streaming results as soon as they are available.
  • Stages are scheduled sequentially so each stage is bound by its slowest partition.
  • Queries that return large resultsets can be quite resource intensive as each partition must serialize its entire shuffle result to disk

There has already been some excellent proof-of-concept work done on a different scheduling paradigm in apache/datafusion#1842 (something my team hopes to help push forward in the near future) but this raises some questions of it’s own, Namely, when do we use streaming vs map-reduce execution? In the PoC it is a very simple heuristic but in real uses cases I’m not sure there is a one-size-fits-all solution. In some cases you may want to use only streaming execution and use auto-scaling or dynamic partitioning to make sure each query can be scheduled promptly. Or you may only care about resource utilization and want to disable streaming execution entirely.

This is one particular example but you can imagine many others.

To bring this back around to some concrete options, I think there are a few different ways we can go:

  1. Ballista is a distributed computing framework which can be customized for many different use cases. There are default implementations available which allow you to use Ballista as a standalone system but the internal implementation is defined in terms of interfaces which allow for customized implementations as needed.
  2. Ballista is a standalone system with limited customization capabilities and is highly optimized for its intended use case. You can plugin certain things (ObjectStore implementations, UDFs, UDAFs, etc) but the core functionality (scheduler, state management, etc) is what it is and if it doesn’t work for your use case then this is not the right solution for you.
  3. Ballista is a standalone system which is highly configurable but not highly extensible. It ships with, for instance, schedulers optimized for different use cases which can be enabled through runtime configuration (or build-time features) but you can’t just plugin your own custom scheduler implementation (without upstreaming it :)).

To be transparent, my team is building a query engine which is sensitive to time-to-first-result latency so we are very interested in fully streaming execution (and hoping to upstream as much as we can) but want to make sure that this is in line for the desired direction of Ballista for the rest of the community.

@thinkharderdev Thanks for the nice write up. Among the three options I am leaning towards 3 is more realistic to achieve. That is, the community can be more focused on making it work really well with a specific set of use cases first, which will hopefully grow the community further.

For streaming v.s. batch I don't have a strong opinion at the point, I believe whoever can use it in a real use case should try to drive the project forward.

We are not ready yet to do anything serious, though we have two major use cases in mind:

  1. Replace Spark for batch processing - 100s of billions of rows regularly with mega plans (100s of millions of expressions and projections, and thousands of other typical operators such as joins)
  2. High concurrency, medium latency (sub second) queries from data storage of 100s of billions or rows - result sets are not large

Use case 2 seems similar to your use case. I am curious when you said fully streaming execution did you mean like Flink? I think there is value to support operators/algorithms that needs to see the entire dataset/partition multiple times (for example ML), so a hybrid model would be good. For example, if the compiler can analyze the query and turn part of it "fully streaming" when possible.

Some other requirements for us are:

  1. easier and cheaper than Spark to operate
  2. natively support k8s - delicate resource and cluster management entirely to k8s
  3. (optional) support other execution engine similar to DaraFusion - through DataFrame APIs. This would be hard

@thinkharderdev Thanks for writing it up ❤️.

To be transparent, my team is building a query engine which is sensitive to time-to-first-result latency so we are very interested in fully streaming execution (and hoping to upstream as much as we can) but want to make sure that this is in line for the desired direction of Ballista for the rest of the community.

We have the same purpose: use ballista to build a latency sensitive query system. So we are also very interested in fully streaming execution (avoid shuffle to disk).

To bring this back around to some concrete options, I think there are a few different ways we can go:

For now, I have no strong opinion about these choices.

I am curious when you said fully streaming execution did you mean like Flink?

Exactly, the current execution model is basically Flink Batch execution, but what we ultimately want (for our use case) is streaming execution with RecordBatch as the event type.

alamb commented

Somewhat of a tangent for this discussion, but my project (https://github.com/influxdata/influxdb_iox) is also interested in time to first result (aka streaming) execution, as well as the lower resource (memory) usage such execution provides; While we will likely not use ballista directly, we intend to ensure DataFusion can be used for this type of system.

One concrete example is the work @tustvold is working on recently to get parquet decoding / IO better integrated (so the parquet decoding work can be done in a streaming fashion as well)

Thanks for starting this discussion @thinkharderdev ❤️

With Ballista moving to this new repository I think it is an excellent time to "reboot" the project and assess what we are trying to build here.

I'd like to provide some historical context for how we ended up where we are today:

The original goal with Ballista was essentially "rewrite Apache Spark in Rust" but avoiding an architecture that heavily favors a particular programming language (Scala, in Spark's case). This is why we serialize plans to protobuf format rather than just using Rust's "serde" crate, which would have been much easier. I now hope that we can eventually adopt https://substrait.io/ as the serialization format to make it easier for Ballista to leverage query engines other than DataFusion.

Quite early in the development process, I discovered Apache Arrow and made that a core part of the design as well. This in my mind was another clear advantage over Apache Spark, which is largely row-based.

Obviously, the choice of Rust is another major differentiator with its unique approach to memory management and safety.

I designed Ballista based on my experience of using Apache Spark for SQL/ETL batch jobs.

I would fully support seeing Ballista support both batch and streaming and I think it would be fine for a user to pick one or the other when executing a query and use different APIs for each case. That said, I have not looked into this so there are likely complications that I am not even aware of here. I will start learning more about streaming in Spark and Flink so that I can better contribute to the discussion.

To be transparent, my team is building a query engine which is sensitive to time-to-first-result latency so we are very interested in fully streaming execution (and hoping to upstream as much as we can) but want to make sure that this is in line for the desired direction of Ballista for the rest of the community.

I also have major usecases for latency-sensitive, potentially-multisource queries.
It boils down to being able to use it for end-user/interactive applications

One of the biggest bummers to me about Spark is that its architecture cripples it for latency-sensitive workloads
I wanted to see what the latency was like to do a basic, two-DB join query between in-memory databases:

Something like:

SELECT ... FROM db1.foo JOIN db2.bar ON ... LIMIT 1

Using the latest Spark nightly snapshot, this takes 150-200ms on my personal machine.

A significant portion of this is spent on things relevant to multi-node computation but not required for doing in-memory on a single node (serialization, broadcasts, scheduling/coordination)

The codegen + execution time isn't that bad

Understandably Spark isn't tailored for this. But there's a lot of great technology in there (Catalyst, Tungsten) that are state-of-the-art for query optimization and performance, and it's a bummer that you can't configure Spark (to my knowledge) for a "local" mode or directly interact with just the pieces you need to manually evaluate expressions/do query optimization.

Would be great if the future of Ballista accommodated for this. Opens up interesting possibilities.

Great, thanks for the feedback everyone!

I would summarize the key points as:

  1. Based on this and previous discussions, the goal is not to create yet another general "distributed compute framework" and that Ballista should be something that works out of the box.
  2. There is interest from the community for both batch processing (ala Spark) and a latency-sensitive distributed query engine with streaming execution.
  3. Ballista shouldn't be inextricably tied to DataFusion as the underlying compute engine. This isn't the case right now really but Substrait can be an "engine agnostic" representation of a compute plan in the future and we shouldn't foreclose that future work with design decision in the more immediate term.

Please chime in if anyone came to different conclusions :)

I have a PR open (#33) to improve the top-level README to better describe the current state of the project and the future direction, linking to this discussion.

I have also created a PR against "This Week In Rust" (rust-lang/this-week-in-rust#3276) to promote the new repo and hopefully direct more people to contribute to the discussion here.

Hi @thinkharderdev, for the point 2 of whether to use ballista for batch processing or latency-sensitive query processing, there should be a few things to be clarified:

  1. How the ballista cluster to be deployed
  2. Should the ballista cluster work in a long running way
  3. Should the tasks work in a long running way
  4. Which way to do the data exchange, push-based or pull-based
  5. Should the exchanged data be flushed to disk
  1. How the ballista cluster to be deployed

Since k8s is so common and popular today, I think we should support it natively. Like Spark for batch processing, we can provide a way to deploy the ballista by k8s on demand.

  1. Should the ballista cluster work in a long running way

I prefer the cluster as a standalone system and runs in a long running way. For a long running system, we have to pay much attention to several aspects, like avoiding memory leaks, managing historical states, etc. While for Spark, it previously focuses on batch processing rather than long running system. Then its cluster always be destroyed after the batch processing finishes and it doesn't need to pay much attention to the long running related aspects.

  1. Should the tasks work in a long running way

For streaming engines, like Flink, the tasks always work in a long running way. It will bring other challenges. The interests of my teams are not on this. We will focus on latency-sensitive interactive queries.

  1. Which way to do the data exchange, push-based or pull-based

The pull-based way may not be as efficient as the push-based way. However, the push-based way needs the whole task pipeline topology to be determined before task execution. While for the pull-based way, like the Spark employs, AQE can be introduced to make is possible to change the whole query plan adaptively during query execution. One coin has two sides. Therefore, I propose to implement both.

  • Pull-based good for ad-hoc queries
  • Push-based good for latency-sensitive interactive queries or tasks
  1. Should the exchanged data be flushed to disk

It also depends. Flushing to disk will be good for error recovering and easy memory management. However, it's not efficient. Therefore, I also propose to implement both.

  • Flushing to disk, good for batch processing with pull-based data change
  • Not flushing to disk, good for latency-sensitive queries with push-based data change.

I agree with @yahoNanJing on his answers above. The only thing I would add is that with respect to deployment we should be able to support different auto-scaling behaviors for long running clusters. Push-based, latency sensitive query execution probably will have more strict demands for auto-scaling since we need to schedule a lot of tasks all at once. I know that this is something my team is very interested in.

Regarding k8s support mentioned by @yahoNanJing - this is already supported and documented in the user guide (which needs updating to remove the DataFusion parts and we also need to get this published).

https://github.com/apache/arrow-ballista/blob/master/docs/source/user-guide/distributed/deployment/kubernetes.md

Hi everyone, first thanks for the great work on datafusion and ballista i am currently on @andygrove book for processing engine and it's pretty interesting as it help us to learn more about how processing engine works and take an extra steps on learning engine such spark.

I am not an expert in this area, but i have few questions in mind as i am willing to explore how ballista answer these and help in production grade data processing engine.

One of the things that i find interesting, is spark/redis integration that bump up spark performance due to the in memory nature of redis. However, i am not sure if that's already true in the context of ballista/datafusion due to the nature of Arrow an espcially with the integration of Plasma project in Arrow, made by Ray team, so i wonder guys if you can shed some lights on this.

One more think, how does ballista compare to engine such as Ray and Dask in general, and does potentially ballista could be direct competitor of those frameworks.

Finally as Ballista compares directly to spark, how do we see the implmentation of BALLISTA ML, Graph.

Thanks everyone for the great works.

alamb commented

For spark integration, check out https://github.com/blaze-init/blaze by @yjshen

In terms of the current status of the Plasma project I think you may have to ask on the dev@arrow.apache.org list -- I remember some discussions about it previously but not sure

Hi @ziedbouf. I'm glad you are finding the book helpful! Let me try and answer some of your questions.

I have not used Plasma and there is no support for it in DataFusion/Ballista. I recall a discussion about it being unmaintained but I could be wrong. Ballista doesn't provide an in-memory cache like redis, although it would be possible to implement a custom datasource to connect to redis.

Ballista is similar to Spark SQL and Dask-SQL in terms of architecture, so yes, could be seen as a competitor although it is not as mature yet. Ballista does not have any stream, graph, or ML capabilities yet though. As mentioned in this discussion, some contributors are planning on working on streaming.

My personal view is that we need to get Ballista to the point of maturity where it can run industry-standard benchmarks at scale with performance and scalability at least as good as Spark. With that, and some better docs, the project hopefully starts to gain more traction and more contributors and that would eventually lead to people building ML features perhaps.

The blaze project is really interesting because it leverages the mature Spark scheduler and uses DataFusion for execution.

What are the downsides of apache spark, why somebody should use ballista?

imo as far as I read posts and watched a talk on yt, the memory consumption of spark is huge. Even a hello world in spark will consume a lot of memory. The reduced memory consumption can be a big advantage for ballista. I always have to increase the memory of our production spark jobs - sometimes to even 32GB per executor.

What can do apache spark good?

Its integration very good with the hadoop distributed file system. This is from my perspective the big advantage: the computation is taken place as close to the data as possible, moving only data when really required. Again, as far as I understand, this is currently not possible with ballista?

And, as I am a data engineer and doing a lot of analytics, I really like the spark-shell to play around with the data.

I just started using rust, so maybe I am not the biggest help for implementing features, but I know spark from a developer perspective quite well as I am using it daily.

What could be a direction: Have the data mostly/only in ram in the cluster, reducing the (slow) HDD/SSD reads and running the computation than on the arrow In-memory data frames. That maybe a niche to fit into.

alamb commented

What can do apache spark good?

One major advantage is that it is very mature (so has many feature, documentation, integrations, etc -- such as spark-shell)

@andygrove what benefits are you expecting from migrating to Substrait?

@andygrove what benefits are you expecting from migrating to Substrait?

I think that Substrait will become the defacto standard for serializing relational algebra and has lots of smart people working on so I think we will benefit from this work and not have to build all new features ourselves and it will also potentially open up interesting integrations with the rest of the ecosystem.

I created a Google doc where we can collaborate more on this discussion and define the Ballista architecture that we are aiming for. This will provide documentation that we can put in the repo to help everyone understand how everything works and the direction we are moving towards. Contributors welcome as always.

https://docs.google.com/document/d/1Fd44vVmjSD6NuSGaFjHeI9pFSRxOIWmZ6Rv7aZ3S5Sw/edit?usp=sharing

open up interesting integrations with the rest of the ecosystem.

e.g.:

  1. substrait test suites that show correct before and after optimization plans that we could get for free
  2. substrait fuzzers for testing
  3. databases in other languages wanting to optimize query plans, being able to send them to datafusion as substrait and get back optimized results as substrate
  4. a double check that we didn't implement anything not in substrait in our model, and vice versa (i.e. apply nodes)

I also have some thoughts about a unified execution engine, welcome to take a look and comment:

https://www.notion.so/liurenjie1024/A-Cloud-Native-Universal-Execution-Engine-7903dd9eeea143c48049631a2d1cb845

cc @andygrove @mingmwang

I also have some thoughts about a unified execution engine, welcome to take a look and comment:

https://www.notion.so/liurenjie1024/A-Cloud-Native-Universal-Execution-Engine-7903dd9eeea143c48049631a2d1cb845

cc @andygrove @mingmwang

Thanks @liurenjie1024. I recently read the F1 paper (https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41344.pdf) which I found very interesting. It's quite a bit larger in scope than Ballista (it's intended to be more of a full DBMS than a query engine like Ballista), but the query processing section is very interesting. It has some very interesting properties:

  1. The scheduler/planner can dynamically determine whether queries should be distributed or executed centrally on a single executor so you can potentially eliminate the scheduling and shuffle overhead on fast OLTP-style queries.
  2. Distributed queries can be either batched (similar to Ballista now where each stage is fully materialized) or pipelined (where data is streamed between the stages).

Yes,deciding the execution mode(batched or pipelined) at runtime is an interesting topic, and I believe the first step is to make ballista flexible enough to support complex job graph.

Ideally we can implement the bubble execution model which mix the pipeline and batch execution model in one engine.
Based on the complexity of the query DAG, some parts can be scheduled and executed in batch model and other parts can be scheduled and executed in pipeline model.

http://www.vldb.org/pvldb/vol11/p746-yin.pdf

There are huge gaps in DataFusion's physical planning phase, I think we need to put more effort to address those gaps first. Only with efficient and sophisticated physical planner, we can go further.

I also have some thoughts about a unified execution engine, welcome to take a look and comment:

https://www.notion.so/liurenjie1024/A-Cloud-Native-Universal-Execution-Engine-7903dd9eeea143c48049631a2d1cb845

cc @andygrove @mingmwang

How can I get access to the doc ?

I also have some thoughts about a unified execution engine, welcome to take a look and comment:
https://www.notion.so/liurenjie1024/A-Cloud-Native-Universal-Execution-Engine-7903dd9eeea143c48049631a2d1cb845
cc @andygrove @mingmwang

How can I get access to the doc ?

I was able to view it. I think you have to be signed into Notion though (to any account)

@thinkharderdev @liurenjie1024
I think the 'Bubble Execution' model is what exactly you would like to achieve. And as I know, Alibaba's MaxCompute engine had already implemented the 'Bubble Execution' model.

https://alibaba-cloud.medium.com/dag-2-0-the-adaptive-execution-engine-that-supported-150-million-distributed-jobs-in-the-2020-31ffad8bd722

Some questions are still not clear to me as of now, particularly if we plan to bring the pipeline based scheduling(
(Morsel-Driven Parallelism) into Ballista, then what's the relationship between Bubble, Stage and Pipeline,
and how to divide the DAG into Bubbles, how to split the Stage into multiple Pipelines etc.

I think the 'Bubble Execution' model is what exactly you would like to achieve

Yes, I agree 100%.

then what's the relationship between Bubble, Stage and Pipeline,

I think generally a stage and pipeline would be the same thing. A pipeline breaker would be a map/reduce (aka repartition). A bubble would be a layer on top of that which breaks the graph into streaming subgraphs which are fully materialized at the bubble boundary.

cc @mingmwang @thinkharderdev
Sorry for late reply from a long vacation.

Some questions are still not clear to me as of now, particularly if we plan to bring the pipeline based scheduling(
(Morsel-Driven Parallelism) into Ballista, then what's the relationship between Bubble, Stage and Pipeline,
and how to divide the DAG into Bubbles, how to split the Stage into multiple Pipelines etc.

Bubble and stage

My motivation of dividing dag into bubble comes from two cases:

  1. Subplan reuse in complex olap queries, e.g. the spool operator in this paper which materialize results into disk.
  2. The dataflow api which can persist data explicitly.

From the bubble execution paper you mentioned, it takes into account the resource limitation of query exection, and add vertical cut when performing a bubble.

The relationship between stage and bubble is clearly described by this pic from alibaba's article:
pic

Stage and pipeline

In fact in my proposal I didn't mention bringing morsal driven into Ballista. The key problem with morsel driven is that it uses shared state so that each pipeline is paralleled, and it's not practical in distributed execution. Presto uses morsel driven approach to execute each stage, e.g. breaking stage plan fragments into pipelines and executes them in parallel. This way it can implement two level scheduling, e.g. each stage has a parallelism and each pipeline has another parallelism. They claim that this is more flexible, e.g. a query can have more resources when cluster is idle, and less resources when it's busy. I'm not a big fan of this approach and not sure whether it's state of art.

cc @mingmwang @thinkharderdev Sorry for late reply from a long vacation.

Some questions are still not clear to me as of now, particularly if we plan to bring the pipeline based scheduling(
(Morsel-Driven Parallelism) into Ballista, then what's the relationship between Bubble, Stage and Pipeline,
and how to divide the DAG into Bubbles, how to split the Stage into multiple Pipelines etc.

Bubble and stage

My motivation of dividing dag into bubble comes from two cases:

  1. Subplan reuse in complex olap queries, e.g. the spool operator in this paper which materialize results into disk.
  2. The dataflow api which can persist data explicitly.

From the bubble execution paper you mentioned, it takes into account the resource limitation of query exection, and add vertical cut when performing a bubble.

The relationship between stage and bubble is clearly described by this pic from alibaba's article: pic

Stage and pipeline

In fact in my proposal I didn't mention bringing morsal driven into Ballista. The key problem with morsel driven is that it uses shared state so that each pipeline is paralleled, and it's not practical in distributed execution. Presto uses morsel driven approach to execute each stage, e.g. breaking stage plan fragments into pipelines and executes them in parallel. This way it can implement two level scheduling, e.g. each stage has a parallelism and each pipeline has another parallelism. They claim that this is more flexible, e.g. a query can have more resources when cluster is idle, and less resources when it's busy. I'm not a big fan of this approach and not sure whether it's state of art.

I think pipeline execution is the trend. Presto, Clickhouse, Starrocks, DuckDb, Velox, all those are in this direction or trying
on.

I think pipeline execution is the trend. Presto, Clickhouse, Starrocks, DuckDb, Velox, all those are in this direction or trying
on.

I'm neutral to this. But if we want to support pipeline, current datafusion operators can't be reused.

alamb commented

In terms of pipeline execution (at least in terms of a push based, pipelined execution model), I wanted to point out that @tustvold investigated this approach in DataFusion (and figured out a way to reuse the current operators). See apache/datafusion#2226 which added a scheduler under a feature flag

Our eventual goal is to support running a plan on 100s of parquet files without having to fetch them all before (or concurrently). However, we currently have other things blocking this goal so additional work to the scheduler is on hold for now

You can find more detail on apache/datafusion#2504

Our eventual goal is to support running a plan on 100s of parquet files without having to fetch them all before (or concurrently). However, we currently have other things blocking this goal so additional work to the scheduler is on hold for now

I'm a little confused here. Avoiding fetching 100s of parquet files is more like an optimizer issue?

alamb commented

I'm a little confused here. Avoiding fetching 100s of parquet files is more like an optimizer issue?

What I mean is that now, if you give a datafusion plan 5000 parquet files (maybe for a SUM() type query) , it will likely try to start reading / decoding all 5000 files concurrently, even if the downstream operators can only consume a small fraction at once . This means the resources (file handles and/or memory buffers) for reading all 5000 are held open during the plan.

It also means if the plan terminates early (e.g. ... LIMIT 10) a large amount of IO will be done / wasted.

What we would like to happen is that a smaller subset are read and fully processed before new ones are opened. Of course, one challenge with doing this is we still need sufficient parallelism to hide IO latencies.

I'm a little confused here. Avoiding fetching 100s of parquet files is more like an optimizer issue?

What I mean is that now, if you give a datafusion plan 5000 parquet files (maybe for a SUM() type query) , it will likely try to start reading / decoding all 5000 files concurrently, even if the downstream operators can only consume a small fraction at once . This means the resources (file handles and/or memory buffers) for reading all 5000 are held open during the plan.

It also means if the plan terminates early (e.g. ... LIMIT 10) a large amount of IO will be done / wasted.

What we would like to happen is that a smaller subset are read and fully processed before new ones are opened. Of course, one challenge with doing this is we still need sufficient parallelism to hide IO latencies.

Within a given partition FileStream will still process them sequentially right?

alamb commented

Within a given partition FileStream will still process them sequentially right?

Yes that is my understanding -- maybe the parquet reader has gotten fancier since I last looked at it 🤔 Sorry if I am causing confusion

I have written a more detailed doc here, welcome to comment and discuss
cc @alamb @mingmwang @thinkharderdev