The Darkest Pipeline is a header-only library for building multi-threaded software pipelines on modern C++.
Built on C++17, it allows static declaration of processing pipelines using an embedded DSL (Domain-Specific Language):
auto world = [](std::string s){ return s + " World!\n"; };
auto print = [](auto msg){ std::cout << msg; };
// Pipeline declaration
auto pipeline = tdp::input<std::string> >> world >> tdp::consumer{print};
// Usage: provide an arbitrary amount of data, let TDP do the rest.
pipeline.input("Hello");
pipeline.input("Salutations");
pipeline.input("Ahoy");
Jump to Overview for a quick overlook at the DSL API. See the examples for use cases and deeper details.
Note: TDP is still under development, so instabilities may exist, and functionality may be missing. See the TODO file for the roadmap. For feature requests or bug reports, submit an issue or contact me on Twitter.
Index:
- Simplified declaration using an embedded DSL
- Launches one thread per pipeline stage, and manages lifetime of all threads
- Starts at construction, stops at destruction. No need for
start()
/stop()
functions! - Structure built at compile time, using stack storage whenever possible
- Provides
std::unique_ptr
andstd::shared_ptr
wrappers for ownership models
- Provides
- Allows direct user input and output or producer/consumer threads (see the types)
- Allows selection of many internal data types, suitable for diverse applications
We can't always parallelize a process. Some algorithms are inherently sequential. Sometimes we depend on blocking I/O.
For example, video capture from a USB camera can't be parallel, neither can displaying that video:
auto frame = camera.capture();
auto output = process(frame);
display(output);
In this example, even if we can parallelize process()
, the throughput of the system is limited by the accumulated latency of all three stages.
Using a pipeline, the latency should be similar, but the throughput is limited by the highest latency among the three stages.
In short, use a pipeline where throughput matters, but it can't be reached by reducing latency.
The Darkest Pipeline is a header-only library. You can copy the contents of the include
folder anywhere into your project!
TDP also utilizes CMake, so it allows:
- Using this repository as a subdirectory, with
add_subdirectory
- You can use this git repository as a submodule!
- Installing, with the CMake install interface
- e.g. on Linux, a
sudo make install
will make it globally available in the system
- e.g. on Linux, a
Then, include tdp/pipeline.hpp
in your code, and enjoy a life of easier pipelining!
auto pipeline = input_type >> [stage1 >> ... stageN >>] >> output_type [/policy] [/wrapper];
tdp::input<Args...>
: User-provided input. Can be provided from main thread withpipeline.input(args...)
.tdp::producer{functor}
: A thread that automatically callsfunctor()
to generate data for the pipeline. Allows control with thepause()
/resume()
interface.
tdp::output
: User-polled output. Can be obtained from main thread withwait_get()
(blocking) or the non-blocking member functiontry_get()
.tdp::consumer{functor}
: A thread that processes the pipeline output and returnsvoid
, removing the output interface from the pipeline.
The additional pipeline stages are functions that operate on input data and return data for the next pipeline stage.
They can be:
- Function pointers/references
- Lambdas
- Function objects, e.g.
std::function
- Pointers to member functions
Execution policies define the internal data structure utilized for communication between stages. TDP currently provides these policies:
tdp::policy::queue
(default): A blocking unbounded queuetdp::policy::triple_buffer
: Utilizes triple-buffering, for applications where the latest value is more important than processing all valuestdp::policy::triple_buffer_lockfree
: A lock-free implementation of triple buffering, for applications with high throughput
By default, a pipeline is constructed on the stack. Due to its internals, it can't be copy-constructed, nor move-constructed.
For applications where move operations are required, or shared ownership is desired, TDP allows using smart pointer wrappers:
tdp::as_unique_ptr
: Returns anstd::unique_ptr
containing the pipelinetdp::as_shared_ptr
: Returns anstd::shared_ptr
containing the pipeline
Copyright © 2020 Joel P. C. Filho
This software is released under the Boost Software License - Version 1.0. Refer to the License file for more details.
The software in the examples
folder is on public domain, released under The Unlicense.