Фреймворк для работы с io_uring, асинхронными вызовами ядра Linux, Wikipedia.
Пример:
#include <aiouring/AIOUring.h>
int main() {
try
{
AIOUring aioUring{};
aioUring.setup();
aioUring.pushTask(aioUring.newTask<MainTask>(&aioUring));
return aioUring.run();
}
catch(std::exception &e)
{
logger::ERROR("Uring: " + std::string(e.what()));
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
Как видно из примера - создается класс AIOUring
, далее производится первичная настройка io_uring, посредствам AIOUring::setup()
, при которой создаются все необходимые для работы сущности, в очередь кладется асинхронная задача MainTask
, аналог main()
как в обычной программе, и в конце запускается обработка очереди io_uring посредствам функции AIOUring::run()
.
Пример MainTask
:
class MainTask final : public AIOUringTask {
public:
explicit MainTask(AIOUring *aioUring)
: aioUring(aioUring) {
config = vsbconfig::getConfig();
}
TaskFuture poll(int io_result) override {
ASYNC_IO;
AWAIT_TASK(tcpListeningTask,
aioUring,
config.port,
config.maxBacklog);
if(TASK_HAS_ERROR(tcpListeningTask)) {
kklogging::ERROR(fmt::format("TCPListeningTask: {}", TASK_ERROR_TEXT(tcpListeningTask)));
HPURING_SHUTDOWN(1);
}
kklogging::WARN("TCPListeningTask completed successfully.");
return TASK_RESULT_NONE();
}
private:
AIOUring *aioUring{nullptr};
vsbconfig::MainConfiguration config{};
TASK_DEF(TCPListeningTask<BalancerAcceptTask>, tcpListeningTask);
};
Как видно из примера:
- Задача должна быть унаследована от абстрактного класса
AIOUringTask
. - Класс должен быть
final
. - Метод poll должен быть переопределён и должен возвращать
TaskFuture
. - Если в методе poll используется вызов других задач или вызов операций io_uring то такой метод должен начинаться с
ASYNC_IO;
. - Задачи, которые вызываются из метода poll должны быть сначала объявлены при помощи
TASK_DEF
, как в примереTASK_DEF(TCPListeningTask<BalancerAcceptTask>, tcpListeningTask);
. - В МЕТОДЕ poll НЕ ДОЛЖНО БЫТЬ НИ ОДНОЙ ПЕРЕММЕННОЙ ОБЪЯВЛЕННОЙ НА СТЕКЕ, ТО ЕСТЬ В САМОМ МЕТОДЕ poll. Это значит, что любые переменные которые нужно будет использовать в методе poll - нужно объявлять непосредственно в классе задачи и инициализировать либо в конструкторе, либо по ходу выполнения кода. То есть сам класс задачи и является контекстом для любой переменной, которая используется в методе poll.
- Параметром метода poll является io_result, в который записывается результат выполнения последней операции из io_uring.
Представляет собой тип tuple std::tuple<std::optional<AIOUringOp>, std::optional<std::any>>;
. Где первый элемент является операцией io_uring, если такая указана, а второй элемент это результат выполнения задачи, то есть тип возвращаемого задачей значения.
По умолчанию тип возвращаемого задачей значения является std::monostate
, и когда при помощи макроса TASK_RESULT_NONE()
задача возвращает значение, то таким значением является std::monostate, то есть пустым типом (Unit type), как void.
Чтобы переопределить возвращаемое значение, то есть, чтобы вернуть, например, строку то нужно начало задачи дополнить следующим определением:
class XTask final : public AIOUringTask {
public:
using TResult = std::string;
...
Таким образом мы сообщаем фреймворку, что данная задача возвращает значение типа std::string
.
TASK_RESULT
- возвращает результат из задачи. Пример:
return TASK_RESULT(tcpSocket);
TASK_RESULT_NONE
- возвращает пустой результат. Пример:
return TASK_RESULT_NONE();
TASK_ERROR
- возвращает ошибку из задачи. Пример:
return TASK_ERROR(fmt::format("Error on tcp connection: {}", uexcept::errnoStr(-socketErrno)));
TASK_HAS_ERROR
- проверяет завершалась ли задача ошибкой. Пример:
if(TASK_HAS_ERROR(tcpListeningTask)) {
kklogging::ERROR(fmt::format("TCPListeningTask: {}", TASK_ERROR_TEXT(tcpListeningTask)));
}
TASK_ERROR_TEXT
- возвращает текст ошибки с которой завершались задача. Пример:
if(TASK_HAS_ERROR(tcpListeningTask)) {
kklogging::ERROR(fmt::format("TCPListeningTask: {}", TASK_ERROR_TEXT(tcpListeningTask)));
}
TASK_HAS_RESULT
- проверяет имеет ли завершенная задача результат выполнения. Пример:
if(!TASK_HAS_RESULT(tcpConnectTask))
{
kklogging::ERROR("No socket on connection.");
return TASK_RESULT_NONE();
}
TASK_RESULT_VALUE
- возвращает результат выпонения задачи. Пример:
targetSocket = TASK_RESULT_VALUE(tcpConnectTask);
TASK_HAS_OPTIONAL_RESULT
- проверяет содержит ли задача опциональный результат. Пример:
if(TASK_HAS_OPTIONAL_RESULT(resolveHostTask)) {
resolveResult = std::move(TASK_OPTIONAL_VALUE(resolveHostTask));
} else {
return TASK_ERROR(fmt::format("No ip address for hostname {}", hostname));
}
TASK_OPTIONAL_VALUE
- возвращает значение опционального результата выполнения задачи. Пример:
if(TASK_HAS_OPTIONAL_RESULT(resolveHostTask)) {
resolveResult = std::move(TASK_OPTIONAL_VALUE(resolveHostTask));
} else {
return TASK_ERROR(fmt::format("No ip address for hostname {}", hostname));
}
Для запуска задачи используются следующие макросы:
AWAIT_TASK
- используется в случаях при которых мы будем заново возвращаться к этому месту используя циклические конструкции, или же задачу мы запускаем в единственном месте метода poll(). Первым параметром идёт имя переменной задачи которая была объявлена при помощи TASK_DEF, следующими параметрами идут параметры к конструтору задачи, либо ничего, если конструктор задачи не использует параметры. Пример:
AWAIT_TASK(findTargetTask, aioUring, requestTokens, urlTokens, queryTokens);
AWAIT_TASKNL
- используется в тех случаях когда задачу необходимо вызвать в нескольких местах вызова poll(), но такой запуск задачи исключает циклическое обращение именно к месту начала выполнения задачи. Например, в логике метода poll может быть несколько мест, где разработчик считает необходимым закрыть сокет. Пример:
AWAIT_TASKNL(sendBalancerResponse, aioUring,
urlTokens, tcpBuffer, clientSocket);
AWAIT_LONG_TASK
- используется в случае если есть код, который выполняется долго по тем или иным причинам, или же используется библиотека, которая не работает через io_uring, обращаясь к ресурсам. Такие задачи выполняются в отдельных потоках, без участия io_uring. В io_uring поступает лишь уведомление о завершении такой задачи. Первым параметром передается уникальное имя, относящееся к данному выову, вторым параметром передаетсяstd::function<void(tf::Executor *executor)>
. tf::Executor - Taskflow. Пример:
AWAIT_LONG_TASK(postgresqlUri, [this](tf::Executor *executor) {
setPostgresqlUriTarget();
});
ASYNC_CONTINUE_TASK
- переносит выполнение программы на начало выполнения задачи, то есть задача выполняется повторно. Пример:
AWAIT_TASK(findTargetTask, aioUring, requestTokens, urlTokens, queryTokens);
if(TASK_HAS_ERROR(findTargetTask)) {
logger:ERROR("task error");
ASYNC_CONTINUE_TASK(findTargetTask);
}
logger:ERROR("task ok");
Если возникает необходимость в конце задачи - очистить какие то ресурсы: закрыть сокеты, файлы и т.п., то можно использовать следующие методы:
- free - это синхронный метод очистки ресурсов задачи, его необходимо переопределить и вместе с ним вызвать такой же метод super класса. Пример:
void free() override {
AIOUringTask::free();
freeaddrinfo(host->ar_result);
::free(reinterpret_cast<void *>(const_cast<
addrinfo *>(host->ar_request)));
::free(host);
}
- finally - это асинхронный метод очистки ресурсов задачи, так же как и метод poll должен вернуть TaskFuture, и к нему применяются те же правила, которые описаны выше для poll(). Пример:
TaskFuture finally(int io_result) override {
ASYNC_IO;
AWAIT_TASKNL(tcpShutAndClose, clientSocket, targetSocket);
return TASK_RESULT_NONE();
}
Оба вышеописанных метода выполняются после полного завершения задачи.
Вся работа с операциями io_uring выполняется внутри задачи посредством следующих макросов:
- AWAIT_OP - Первым параметром идёт название IO метода, которое соответствует доступному в io_uring, вторым параметром идёт название уникальной метки, третьим и последующими параметрами идут уже параметры к самому вызову, если они для такого вызова есть. Пример:
AWAIT_OP(Write, writeTo, tcpTo, buffer.data() + offset, bytesToWrite);
- ASYNC_CONTINUE_OP - возвращает задачу к повторному выполнения участка кода по указанной метке. Пример:
if(newLinePos == std::string_view::npos) {
ASYNC_CONTINUE_OP(readClient);
}
Все операции декларируются и имплементируются в файлах AIOUringOp.h и AIOUringOp.cpp, пример для операции Accept:
AIOUringOp.h
:
static AIOUringOp Accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int flags = 0);
AIOUringOp.cpp
:
AIOUringOp AIOUringOp::Accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
return AIOUringOp {
.submit = [=](io_uring *ring, __u64 ptrTask) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_accept(sqe, fd, addr, addrlen, flags);
sqe->user_data = ptrTask;
}
};
}
- HPURING_SHUTDOWN - данный макрос запускает операцию ShutdownUring и первым параметром передает код завершения приложения (process exit code). Пример:
TaskFuture poll(int io_result) override {
ASYNC_IO;
AWAIT_TASK(xxx);
if(TASK_HAS_ERROR(xxx)) {
kklogging::ERROR("critical errr");
HPURING_SHUTDOWN(1);
}
...
}
- AWAIT_POLL - переносит работу асинхронной функции poll/finally к началу выполнения. Пример:
TaskFuture poll(int io_result) override {
ASYNC_IO;
if(!sockets.empty()) {
socketValue = sockets.front();
if(socketValue >= 0) {
AWAIT_OP(Shutdown, shutSocket, socketValue);
AWAIT_OP(Close, closeSocket, socketValue);
}
sockets.pop_front();
AWAIT_POLL();
}
return TASK_RESULT_NONE();
}
- AWAIT_EVENT - ожидает получения события на eventfd задачи. Пример:
AWAIT_EVENT(*this->getTaskfd().lock());
- EVENT_NOTIFY - синхронно отправляет событие на eventfd. Пример:
EVENT_NOTIFY(eventFd);
- EVENT_NOTIFY_ASYNC - асинхронно отправляет событие на eventfd. Пример:
EVENT_NOTIFY_ASYNC(eventFd);