Tunnel | 中文
Tunnel is a cross platform, lightweight, and highly adaptable task execution framework based on C++20 coroutine
. You can use it to build task execution engines with complex dependencies, or pipeline execution engines.The idea of this project comes from the execution engine of ClickHouse
.
This project has the following features:
-
The user's processing logic does not need to focus on scheduling, synchronization, or mutual exclusion. You only need to design a reasonable DAG structure to achieve the ability of multi-core parallel execution;
-
Thanks to the powerful customization capabilities of c++20 coroutine, you can easily integrate with other asynchronous systems or network io (which means that
tunnel
can be easily expanded into a distributed task execution framework, which is also one of the long-term goals of this project); -
Thanks to async_simple with good design and interface, you can control which Executor each node in the Pipeline is scheduled on, which is beneficial for resource isolation and management;
-
Supports passing parameters between nodes, although each
Pipeline
only supports one parameter type. If you need to pass different types of data between different nodes, please usestd::any
orvoid *
and perform runtime conversion;
- This project is based on the c++20 standard.
- This project uses
bazel
to build the system. - This project is based on
async_simple
, so first ensure that your compiler (clang
,gcc
,Apple clang
) supports compilingasync_simple
. - This project supports
MacOS
,Linux
, andWindows
operating systems.
- async_simple
- googletest
- chriskohlhoff/asio
- rigtorp/MPMCQueue
- gflags
- spdlog
Firstly, you need to understand several basic concepts:
-
Processor
:Processor
is the basic unit for scheduling execution, and eachProcessor
holds 0, 1, or moreinput_port
and 0, 1, or moreoutput_port
. But it will not hold 0input_port
and 0output_port
at the same time. -
port
:port
is a tool for transferring data betweenProcessor
, and someports
share the same queue.port
and are divided intoinput_port
andoutput_port
,input_port
reads data from the queue, andoutput_port
writes data to the queue. -
pipeline
:apipeline
is composed of multipleprocessors
. Theseprocessors
are connected through queue and have the structure of a Directed Acyclic Graph. Thepipeline
can be sent to theExecutor
for scheduling and execution. -
Executor
:theExecutor
concept inasync_simple
.
The above are the four most basic concepts in this project, followed by some derived concepts:
Source
:Source
is a type ofProcessor
that does not have aninput_port
and is the node that generates data.EmptySource
:EmptySource
is a type ofSource
that only generates a EOF info.ChannelSource
:ChannelSource
is a type ofSource
that read data from bind_channel.Sink
:Sink
is a type ofProcessor
that does not have anoutput_port
and is a node that consumes data.DumpSimk
:DumpSimk
is a type ofSink
that reads and discards data.ChannelSink
:ChannelSink
is a type ofSink
that read data and write to bind_channel.TransForm
:TransForm
is a type ofProcessor
that exists only to provide a differentProcessor
type fromSource
andSink
.SimpleTransForm
:SimpleTransForm
is a type ofTransForm
that only has oneinput_port
and oneoutput_port
, used to perform simple transformations. Most of the user's logic should be accomplished through inheritance of this class.NoOpTransform
:NoOpTransform
is a type ofSimpleTransForm
that is only used for placeholders.Concat
:Concat
is a type ofProcessor
that has one or moreinput_ports
and oneoutput_port
, and it can be used for sequential consumption.Dispatch
:Dispatch
is a type ofProcessor
that has oneinput_port
and one or moreoutput_ports
, and it can be used for division.Filter
:Filter
is a type ofTransForm
that can be used for filtering.Accumulate
:Accumulate
is a type ofTransForm
that can be used for accumulation.Fork
:Fork
is a type ofProcessor
that has oneinput_port
and one or moreoutput_port
, it can be used for replication.
NOTE:This project does not have a Merge
node, but implements the Merge
function through other methods. The reason is that the Merge
node requires multiple input_ports
, but we cannot know which input_port
currently has data coming, so we need to suspend waiting for a certain input_port
, which is unreasonable. This project achieves this function by sharing multiple port
queues, as detailed in the Merge
interface of the Pipeline
.
The inheritance relationship of node types is as follows. Types marked in red indicate the need for inheritance implementation, while types marked in blue indicate that they can be directly used:
hello world
here is a Hello World program:
#include <functional>
#include <iostream>
#include <string>
#include "async_simple/coro/SyncAwait.h"
#include "async_simple/executors/SimpleExecutor.h"
#include "tunnel/pipeline.h"
#include "tunnel/sink.h"
#include "tunnel/source.h"
using namespace tunnel;
class MySink : public Sink<std::string> {
public:
virtual async_simple::coro::Lazy<void> consume(std::string &&value) override {
std::cout << value << std::endl;
co_return;
}
};
class MySource : public Source<std::string> {
public:
virtual async_simple::coro::Lazy<std::optional<std::string>> generate() override {
if (eof == false) {
eof = true;
co_return std::string("hello world");
}
co_return std::optional<std::string>{};
}
bool eof = false;
};
int main() {
Pipeline<std::string> pipe;
pipe.AddSource(std::make_unique<MySource>());
pipe.SetSink(std::make_unique<MySink>());
async_simple::executors::SimpleExecutor ex(2);
async_simple::coro::syncAwait(std::move(pipe).Run().via(&ex));
return 0;
}
As you can see, users need to inherit some Processors to implement custom processing logic, then combine these Processors in a certain structure through Pipeline, and finally start executing the Pipeline.
For example, for the Source node, only the generate() method needs to be rewritten to generate data. Users need to ensure that an empty std::optional<T>{}
representing EOF information is ultimately returned, otherwise the Pipeline will not stop executing; For Sink nodes, the consume() method needs to be rewritten to consume data.
For the use of more Processor types, users can read the source files in the tunnel directory.
about exception
If a Processor throws an exception during the pipeline running, tunnel may call std::abort
to abort the process (bind_abort_channel == false
), or catch the exception and pass the exit information to other Processors. The Processor receiving the exit information will enter managed mode, and user logic will not be called again in managed mode. It simply reads data from upstream and discards it, After all upstream data is read, EOF information is written to downstream and execution ends.
about expand pipeline at runtime
Users can construct and run a new pipeline in the Processor's processing logic, and connect the data streams of two pipelines through ChannelSource
and ChannelSink
. This feature is useful in certain situations, such as when you need to decide how to handle the remaining data based on the data generated during the pipeline execution process.
There is a simple example in example/embedpipeline.cc.
about pipeline interface
tunnel will assign a unique ID to each Processor instance, through which users and tunnel exchange pipeline structure information.
The API of pipeline follows the principle of only allowing post nodes to be added to leaf nodes. Leaf nodes refer to non-sink nodes that have not yet specified an output_port, for example, there is an empty pipeline:
- Firstly, add a source node (id == 1) through AddSource, so there is only one leaf node 1 in the pipeline.
- Then, by using AddTransform, add a post transform node (id == 2) to the source node , and the current leaf node in the pipeline will become 2.
- Next, add another source node (id == 3) through AddSource, so there are two leaf nodes in the pipeline now, 2 and 3.
- Finally, add a shared sink post node (id == 4) to all current leaf nodes ( 2 and 3 ) through SetSink. At this point, no leaf nodes exist in the pipeline. A pipeline without leaf nodes is called complete, and only complete pipelines can be executed.
Please read the content in the doc directory and example directory to learn about the API usage of this project.
- Support for more types of nodes [doing]
- Support Pipeline Merge [done]
- Topology detection
- Schedule event collection [doing]
- Support active stop of execution [done with throw exception]
- Exception handling during execution [done]
- Implementing a high-performance Executor [done]
- Support for extension of Pipeline at runtime [done]
- Support for distributed scheduling (support for network io based on async_simple first)
tunnel is distributed under the Apache License (Version 2.0).