Any suggestions on how to deal with thread-safety of connection?
Closed this issue · 6 comments
Hi!
When using C++17, I need to make sure I only access the connection
object from the same thread.
If I want to execute operations on the connection and handle the results later, I have to post it to the io_context
, something like this:
#include <boost/asio.hpp>
#include <boost/redis.hpp>
#include <boost/redis/connection.hpp>
#include <boost/redis/src.hpp>
#include <iostream>
#include <semaphore>
void SetKey(std::string_view key, std::string_view value,
boost::redis::connection& connection, std::counting_semaphore<1>& semaphore,
boost::asio::io_context& io_context) {
auto request = std::make_shared<boost::redis::request>();
request->push("SET", key, value);
auto response = std::make_shared<boost::redis::response<std::string>>();
io_context.post([&connection, request, response, &semaphore]() {
connection.async_exec(
*request, *response,
[request, response, &semaphore]
(const boost::system::error_code& error, std::size_t) {
if (error) {
std::cout << "Failed to send SET command to Redis server: " << error.to_string() << std::endl;
} else {
std::cout << "Response: " << std::get<0>(*response).value() << std::endl;
}
semaphore.release();
});
});
}
int main() {
boost::asio::io_context io_context{1};
std::thread thread;
// Set up.
boost::redis::connection connection(io_context);
boost::redis::config config;
connection.async_run(config, {boost::redis::logger::level::warning},
[](const boost::system::error_code& error) {
if (error) {
std::cout << "Redis server setup error: " << error.to_string() << std::endl;
}
});
thread = std::thread([&io_context] {
std::cout << "RedisGlobalCache running" << std::endl;
io_context.run();
std::cout << "RedisGlobalCache finished" << std::endl;
});
std::counting_semaphore<1> semaphore(0);
SetKey("key1", "value1", connection, semaphore, io_context);
// Somewhat later I will check the result...
semaphore.acquire();
io_context.post([&connection] { connection.cancel(); });
thread.join();
return 0;
}
Is there a better way to do this? Does that mean I also can't have more than 1 thread in my io_context
?
Thanks in advance!
I think you have two options here:
1.-Go fully single-threaded (like Node.js). In this case, I'd advise you to not use std::thread
and have main
run the context:
#include <boost/asio.hpp>
#include <boost/redis.hpp>
#include <boost/redis/connection.hpp>
#include <boost/redis/src.hpp>
#include <iostream>
#include <semaphore>
void SetKey(std::string_view key, std::string_view value,
boost::redis::connection& connection) {
auto request = std::make_shared<boost::redis::request>();
request->push("SET", key, value);
auto response = std::make_shared<boost::redis::response<std::string>>();
connection.async_exec(
*request,
*response,
[request, response, &connection] (const boost::system::error_code& error, std::size_t) {
if (error) {
std::cout << "Failed to send SET command to Redis server: " << error.to_string() << std::endl;
} else {
std::cout << "Response: " << std::get<0>(*response).value() << std::endl;
}
}
connection.cancel();
);
}
int main() {
boost::asio::io_context io_context{1};
std::thread thread;
// Set up.
boost::redis::connection connection(io_context);
boost::redis::config config;
connection.async_run(config, {boost::redis::logger::level::warning},
[](const boost::system::error_code& error) {
if (error) {
std::cout << "Redis server setup error: " << error.to_string() << std::endl;
}
});
SetKey("key1", "value1", connection, semaphore);
io_context.run();
return 0;
}
This is using "work counting" in Boost.Asio. io_context::run won't return until all outstanding I/O operations have finished. In your example, this means it won't return until you've called cancel
on your connection.
- Go multi-threaded. I'd advise to use
asio::thread_pool
instead ofio_context
However, this requires extra care. The way to go is using strands. I'd advise to first build it single-threaded, then switch to multi-threaded only after efficiency measuring.
Note: io_context.post(f)
is deprecated. The new way of writing it is asio::post(asio::bind_executor(io_context.get_executor(), f))
Thanks @anarthal !
-
Thanks for the suggestion. The problem is that I do want the Redis operation to be async, as the way this integrates into my project will be something like: "User Request" -> "Executes Redis" -> "Reply later with result". And I can't really block the execution with the
run
(that is why I have it in another thread), and would like to reuse the same connection object. -
I see, it does seem that strands would be the best way to solve the problem if having more than one thread!
Thanks for pointing out about the deprecated function!
Hm, is the rest of your program using Asio? Or is it just the part using Redis? In the latter case, I'd suggest to use something like the following:
#include <boost/asio.hpp>
#include <boost/redis.hpp>
#include <cstddef>
#include <future>
#include <iostream>
namespace asio = boost::asio;
std::future<std::size_t> SetKey(
std::string_view key,
std::string_view value,
boost::redis::connection& connection,
boost::asio::io_context& io_context
)
{
// clang-format off
auto request = std::make_shared<boost::redis::request>();
request->push("SET", key, value);
auto response = std::make_shared<boost::redis::response<std::string>>();
return boost::asio::dispatch(boost::asio::bind_executor(
io_context.get_executor(),
boost::asio::deferred([&connection, request, response]() {
return connection.async_exec(
*request,
*response,
asio::consign(asio::deferred, request, response)
);
})
))(boost::asio::use_future);
// clang-format on
}
int main()
{
boost::asio::io_context io_context{1};
std::thread thread;
// Set up.
boost::redis::connection connection(io_context);
boost::redis::config config;
connection.async_run(
config,
{boost::redis::logger::level::warning},
[](const boost::system::error_code& error) {
if (error)
{
std::cout << "Redis server setup error: " << error.to_string() << std::endl;
}
}
);
thread = std::thread([&io_context] {
std::cout << "RedisGlobalCache running" << std::endl;
io_context.run();
std::cout << "RedisGlobalCache finished" << std::endl;
});
SetKey("key1", "value1", connection, io_context).get();
asio::post(asio::bind_executor(io_context.get_executor(), [&connection] { connection.cancel(); }));
thread.join();
return 0;
}
Explanation:
- You can think that your
std::thread
"owns" the io_context and any object within it. All Redis communication is going to happen using this dedicated thread. Communication with your main thread is going to usestd::future
, which is thread-safe. SetKey
returns astd::future
, which replaces your semaphore. It can communicate values and exceptions back to your code, which you will find useful when implementing the "get" part of the cache. We usestd::future<std::size_t>
becauseasync_execute
has handler signaturevoid(error_code, std::size_t)
.- Posting and dispatching to the
io_context
from any thread is safe. We need to do it because initiating anasync_execute
in a thread that is not running the context is a race condition. asio::consign
is a special completion token that is used to keep things alive until an async op finishes. This is telling "please keep request and response alive untilasync_execute
completes" (a requirement of Boost.Redis).asio::deferred
is used here to create an async chain. Read this as the.then()
function in Javascript: first dispatch to the context, then execute the operation.asio::use_future
is a special completion token that is used for this kind of patterns. It runs an async operation, and when it finishes, it sets the value of a future instead of calling a callback. Note that, if the operation fails, an exception is thrown (there are ways to prevent this, let me know if you're interested).
Note that async_run
should technically be wrapped in an asio::dispatch
call, too.
Thanks for the thorough response! It makes sense now. Two quick follow up questions:
- Can the post be simplified to:
asio::post(io_context, [&connection] { connection.cancel(); });
? - Unfortunately the code I am working with does not support exceptions, if you could show a simple example that uses the error codes instead that would be awesome!
Please feel free to close this issue after your response too, I hope this was the right forum to ask these clarifying questions! Thank you
- It will boil down to the same thing at the end of the day, but there are some technical differences that you must be aware of.
asio::post(io_context, [&connection] { connection.cancel(); });
first posts to the io_context, then dispatches to the callback's associated executor. This isasio::system_executor
, for which dispatch is just an inline, immediate completion, so you will observe no difference. However, in other scenarios you might observe differences, so I'd advise to stick to thebind_executor
syntax. It's longer, but it won't cause you any nasty surprise. - The difference between
post
anddispatch
is thatdispatch
may call the function you pass inline, whilepost
forces always a reschedule. You usually do dispatch when you just want to run a function in a context, and post when you want to force the reschedule to prevent recursion from overflowing your stack. In your case, both will do the same, because you're calling them from outside of the thread running the I/O context. In general, dispatch may be faster than post. dispatch probably captures better what you're trying to do than post. - You probably want to use
asio::as_tuple
. For instance:
std::future<std::tuple<error_code, std::size_t>> SetKey(
std::string_view key,
std::string_view value,
boost::redis::connection& connection,
boost::asio::io_context& io_context
)
{
// clang-format off
auto request = std::make_shared<boost::redis::request>();
request->push("SET", key, value);
auto response = std::make_shared<boost::redis::response<std::string>>();
return boost::asio::dispatch(boost::asio::bind_executor(
io_context.get_executor(),
boost::asio::deferred([&connection, request, response]() {
return connection.async_exec(
*request,
*response,
asio::consign(asio::as_tuple(asio::deferred), request, response)
);
})
))(boost::asio::use_future);
// clang-format on
}
BTW I'm not the maintainer of Boost.Redis so I can't close it :)
I think this forum is correct, we also have the #boost-asio channel in the cpplang slack for Asio questions.
Thank you!