after async_read resulting in a timeout, calling async_write causes a read access violation when using a strand
Closed this issue · 5 comments
I was attempting to return nice error responses for certain errors. For example i'd like to return a 408 when a timeout occurs. However it seems to also close the socket.
While testing this out I ran into a access violation in boost::asio::detail::strand_executor_service::enqueue
while acquiring a mutex lock and it seems the strand is already destructed (impl
is nullptr
). When not using a strand the next async_write
simply also returns the same error. I've tried different completion tokens but that doesn't change the result. It looks like the strand is cleaned up together with the socket so I guess I either need to:
- somehow extend the lifetime of the strand beyond the socket
- or manually start a timer instead of using
stream.expires_after
so it keeps the socket alive
Is this a bug in either beast or asio or am I using the strands wrong. I used https://www.boost.org/doc/libs/1_86_0/libs/beast/example/advanced/server-flex-awaitable/advanced_server_flex_awaitable.cpp for inspiration.
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <iostream>
#define USE_STRAND 1
#define USE_DEFERRED 1
#ifdef USE_STRAND
using executor_type = boost::asio::strand<boost::asio::io_context::executor_type>;
#else
using executor_type = boost::asio::io_context::executor_type;
#endif
using tcp_acceptor_type = boost::asio::ip::tcp::acceptor::rebind_executor<executor_type>::other;
using tcp_socket_type = boost::asio::ip::tcp::socket::rebind_executor<executor_type>::other;
using tcp_stream_type = boost::beast::tcp_stream::rebind_executor<executor_type>::other;
#ifdef USE_DEFERRED
constexpr auto completion_token = boost::asio::as_tuple(boost::asio::deferred);
#else
constexpr auto completion_token = boost::asio::as_tuple;
#endif
constexpr int num_threads = 1;
boost::beast::http::message_generator simple_result_response(boost::beast::http::status result, unsigned version, bool keep_alive) {
boost::beast::http::response<boost::beast::http::string_body> response{result, version};
response.keep_alive(keep_alive);
response.set(boost::beast::http::field::content_type, "text/plain; charset=UTF-8");
response.body() = boost::beast::http::obsolete_reason(response.result());
response.prepare_payload();
return {std::move(response)};
}
boost::asio::awaitable<void, executor_type> session(tcp_socket_type socket) {
tcp_stream_type stream{std::move(socket)};
boost::beast::flat_buffer buffer;
bool keep_alive;
do {
stream.expires_after(std::chrono::seconds{5});
boost::beast::http::request_parser<boost::beast::http::string_body> request_parser;
{
auto [ec, _] = co_await boost::beast::http::async_read(stream, buffer, request_parser, completion_token);
if (ec.failed()) {
std::println(std::cerr, "error from async_read(): {}", ec.message());
// currently only timeout is handled but one can for example also return a bad_request for boost::beast::http::error::bad_* errors.
if (ec == boost::beast::error::timeout) {
// if async_read errors because of a timeout, the next async_write does too. Except if using a strand in which case it results in a read access violation:
// std::shared_ptr<boost::asio::detail::strand_executor_service::strand_impl>::operator-><boost::asio::detail::strand_executor_service::strand_impl,0>(...) returned nullptr.
auto [ec2, _] = co_await boost::beast::async_write(stream, simple_result_response(boost::beast::http::status::request_timeout, request_parser.get().version(), false), completion_token);
if (ec2.failed()) {
std::println(std::cerr, "error from async_write() while sending error response: {}", ec2.message());
}
}
break;
}
}
auto response = simple_result_response(boost::beast::http::status::not_found, request_parser.get().version(), request_parser.keep_alive());
keep_alive = response.keep_alive();
{
auto [ec, _] = co_await boost::beast::async_write(stream, std::move(response), completion_token);
if (ec.failed()) {
std::println(std::cerr, "error from async_write(): {}", ec.message());
break;
}
}
} while (keep_alive);
if (stream.socket().is_open()) {
stream.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_send);
}
}
boost::asio::awaitable<void, executor_type> listen(boost::asio::ip::tcp::endpoint endpoint) {
auto executor = co_await boost::asio::this_coro::executor;
tcp_acceptor_type tcp_acceptor{executor, boost::asio::ip::tcp::endpoint{boost::asio::ip::tcp::v4(), 80}};
for (;;) {
#ifdef USE_STRAND
auto socket_executor = boost::asio::make_strand(executor.get_inner_executor());
#else
auto socket_executor = executor;
#endif
auto [ec, socket] = co_await tcp_acceptor.async_accept(socket_executor, completion_token);
if (ec.failed()) {
std::println(std::cerr, "error from async_accept(): {}", ec.message());
break;
}
boost::asio::co_spawn(
std::move(socket_executor),
session(std::move(socket)),
[](std::exception_ptr e) {
if (e) {
try {
std::rethrow_exception(e);
} catch(const std::exception& e) {
std::println(std::cerr, "error in session: {}", e.what());
}
}
}
);
}
}
int main() {
boost::asio::io_context io_context{num_threads};
boost::asio::co_spawn(
#ifdef USE_STRAND
boost::asio::make_strand(io_context),
#else
io_context,
#endif
listen({boost::asio::ip::tcp::v4(), 80}),
[](std::exception_ptr e) {
if (e) {
try {
std::rethrow_exception(e);
} catch(const std::exception& e) {
std::println(std::cerr, "error in listener: {}", e.what());
}
}
}
);
std::vector<std::jthread> threads;
if constexpr (num_threads > 1) {
threads.reserve(num_threads - 1);
for (int i = num_threads - 1; i > 0; --i) {
threads.emplace_back([&io_context]() {
io_context.run();
});
}
}
io_context.run();
return EXIT_SUCCESS;
}
You need to update the internal timeout after the fist expiration by calling stream.expires_after
before the next operation. If you want independent control over read and write timeouts, consider using the asio::cancel_after interface.
Thanks @ashtum that seems to work. I must have missed that when reading the docs. Can you point me to it?
Thanks @ashtum that seems to work. I must have missed that when reading the docs. Can you point me to it?
The timer applies collectively to any asynchronous reads or writes initiated after the expiration is set, until the expiration is set again. A call to async_connect counts as both a read and a write.
If a timer expires when the corresponding asynchronous operation is outstanding, the stream will be closed and any outstanding operations will complete with the error beast::error::timeout.
According to the above, when a timeout occurs while async_read
is outstanding the stream will be closed. But that doesn't seem to be the case. As you point out I can call expires_after
again (or expires_never
) which will keep the stream alive. At which point does it actually close the stream? I can't get that from the docs.
The timer applies collectively to any asynchronous reads or writes initiated after the expiration is set, until the expiration is set again.
What does that mean exactly? Does that mean that I should be able to call async_read
(or any other read or write) and expect another beast::error::timeout?
In the example above the behavior is different when using a strand and not using one. I did not expect a access violation when initiating a read or write after a timeout but instead just a timeout error as is the case if I don't use the strand.
According to the above, when a timeout occurs while
async_read
is outstanding the stream will be closed. But that doesn't seem to be the case. As you point out I can callexpires_after
again (orexpires_never
) which will keep the stream alive. At which point does it actually close the stream? I can't get that from the docs.
Sorry, I mistakenly assumed you were checking ec
for error::operation_aborted
instead of error::timeout
. When a beast::tcp_stream timer expires, it closes the underlying stream, preventing further operations. If you want to perform another operation after a timeout, consider using the asio::cancel_after interface.
The timer applies collectively to any asynchronous reads or writes initiated after the expiration is set, until the expiration is set again.
What does that mean exactly? Does that mean that I should be able to call
async_read
(or any other read or write) and expect another beast::error::timeout?
When you set an expiration time, it applies to all async operations initiated after that, not just one. To extend the timeout, you need to set a new expiration time before each operation.
In the example above the behavior is different when using a strand and not using one. I did not expect a access violation when initiating a read or write after a timeout but instead just a timeout error as is the case if I don't use the strand.
It seems you're encountering this issue: #2925. It's already fixed and will be included in the next Boost release (Boost 1.87).