A high-performance liburing backed event loop for building asynchronous and multi-threaded programs.
The original goal of this library was to learn the new coroutines TS for C++. I found the most difficult part of the coroutine TS is when you want to develop an asynchronous architecture or "executer/runtime" that can handle re-entrant code across possibly different threads. Thus, ZAB was born.
ZAB uses io_uring and liburing as its backend to provide asynchronous system calls. ZAB so far does not try to provide QoL or improvements on the posix system call API's. However, C++ classes are provided to provide resource management and group similar function calls.
Contact: donald.rupin@pm.me
/* An asynchronous function that returns nothing - it can return execution without finishing itself */
zab::async_function<>
your_class::foo();
/* An asynchronous function that can be awaited (can return void too) */
zab::simple_future<bool>
your_class::bar();
/* An asynchronous generator that can return multiple things (can return void too) */
zab::reusable_future<bool>
your_class::baz();
/* Doing asynchronous behavior ( all is non blocking )*/
zab::async_function<>
your_class::example()
{
/* Async function usage */
/* trigger a async function */
/* An async_function function will return execution on its first suspend. */
foo();
/* trigger an async function and get the result */
/* A co_await'ed simple_future will return execution once it has co_return'ed a value. */
auto value = co_await bar();
/* Keep getting values */
/* A co_await'ed reusable_future will return execution once it has co_return'ed or co_yield'ed a value. */
while (auto f = baz(); !f.complete()) {
auto value_2 = co_await f;
}
/* or inbuilt for_each(...) */
co_await zab::for_each(
baz(),
[](auto _value_2){ /* ... */ }
);
/* Async behavior */
/* yield control for a time (2 seconds) and return in default thread */
co_await yield(zab::order::in_seconds(2));
/* yield control and resume in a different thread (thread 2)*/
co_await yield(zab::thread::in(2));
/* or both (but resuming in any thread ) */
co_await yield(zab::order::in_seconds(2), zab::thread::any());
/* pause this function for an arbitrary amount of time */
co_await pause(
[this](auto* _pause_pack) {
/* Can be resumed at any time... in any thread... */
_pause_pack->thread_ = zab::thread::in(1);
unpause(_pause_pack, now());
});
/* concurrently await a series of futures */
auto[result_1, result_2] = co_await wait_for(
bar(),
baz()
);
/* Custom Lambda based suspension points */
int result = co_await suspension_point(
[this, x = 42]<typename T>(T _handle) noexcept
{
if constexpr (is_ready<T>())
{
/* Always suspend! */
return false;
}
else if constexpr (is_suspend<T>())
{
/* Resume strait away! */
engine_->resume(_handle);
}
else if constexpr (is_resume<T>())
{
/* Return x! */
return x;
}
}
);
assert(result == 42);
/* Custom Lambda based stateful suspension points */
/* Allows the suspension point to resume internally */
/* without resuming what is awaiting on it. Useful for */
/* handling automatic retries or partial completions. */
int result_2 = co_await stateful_suspension_point<int>(
[this, x = 42]<typename T>(T _handle) mutable noexcept
{
if constexpr (is_ready<T>())
{
/* Suspend until counter hits 44 */
return x == 44;
}
else if constexpr (is_stateful_suspend<int, T>())
{
/* Resume with data set to x + 1 */
assert(x >= 42 && x < 44>);
_handle->result_ = x + 1;
engine_->resume(_handle->event_);
} else if constexpr (is_notify<int, T>())
{
/* Something resumed us with value _handle */
x = _handle;
/* Return from is_ready<T>() */
return notify_ctl::kReady;
}
else if constexpr (is_resume<T>())
{
/* Return x! */
return x;
}
}
);
assert(result_2 == 44);
/* Observable */
zab::observable<std::string, int> ob(engine_);
auto con = ob.connect();
/* emit a value asynchronously */
ob.async_emit("hello", 4);
/* or emit and wait for all observers to receive */
co_await ob.emit("world", 2);
{
/* Emits are 0 copy, all observers will get the same object */
auto e = co_await con;
const auto&[e_string, e_int] = e.event();
/* Always received in order of emit */
assert(e_string == "hello" && e_int == 4);
/* Deconstruction of objects is guarded by e. Once all */
/* observer destroy e, the event objects are deconstructed */
/* An observable waiting on an emit will wake once all e's */
/* are deconstructed. */
}
/* We can do some non-blocking synchronisation */
/* mutex - for mutual exclusion */
zab::async_mutex mtx(engine_);
{
/* Acquire a scoped lock */
auto lck = co_await mtx;
}
/* binary semaphore - for signalling - created in locked mode */
zab::async_binary_semaphore sem(engine_, false);
/* release the sem */
sem.release();
/* acquire the sem */
co_await sem;
/* Lots more synchronization primitives in the library... */
/* File IO */
zab::async_file<char> file(engine_);
auto success = co_await file.open("test_file.txt", file::Option::kRWTruncate);
std::vector<char> buffer(42, 42);
/* write to file! */
bool success = co_await file.write_to_file(buffer);
if (success)
{
/* Reset position and read from file */
file.position(0);
std::optional<std::vector<char>> data = co_await file.read_file();
}
/* Networking */
/* acceptors or connectors make tcp streams! */
zab::tcp_acceptor acceptor(engine_);
struct sockaddr_storage _address;
socklen_t _length = sizeof(_address);
if (acceptor_.listen(AF_INET, 8080, 10))
{
while (!acceptor_.last_error())
{
auto stream = co_await acceptor_.accept<char>((struct sockaddr*) &_address, &_length);
if (stream) {
std::vector<char> buf(5);
auto amount_wrote = co_await stream->write(buf);
auto amount_read = co_await stream->read(buf);
co_await stream->shutdown();
co_await stream->close();
}
}
}
}