A collection of (mainly concurrency related) data structures & utilities, created with game programming in mind
Header only!
- Lock-free
- Uses an interface resembling that of an std::atomic type
- Uses internal versioning to make it resistant to ABA problems
- Uses a shared_ptr similar to that of std::shared_ptr
- Single header include atomic_shared_ptr.h, optionally atomic_shared_ptr.natvis for better debug viewing in Visual Studio
There are eight different versions of compare_exchange_strong, four of which take raw_ptr as expected value. These are non-reference counted representations of shared_ptr/atomic_shared_ptr. raw_ptr may be used if shared ownership is not needed of the expected out value, as the shared_ptr variants incur extra costs upon failure.
utility wrapper class for 128 bit atomic operations.
#include <gdul/memory/atomic_128.h>
struct custom_struct
{
std::uint64_t data[2];
};
int main()
{
gdul::atomic_128<custom_struct> custom;
const custom_struct in{ 5, 10 };
custom.store(in);
const custom_struct out(custom.load());
custom_struct badExpected{ 5, 9 };
custom_struct goodExpected{ 5, 10 };
const custom_struct desired{ 15, 25 };
const bool resultA(custom.compare_exchange_strong(badExpected, desired));
const bool resultB(custom.compare_exchange_strong(goodExpected, desired));
gdul::atomic_128<gdul::u128> integer;
const gdul::u128 addTo64(integer.fetch_add_u64(1, 0));
const gdul::u128 exchange8(integer.exchange_u8(17, 4));
const gdul::u128 subTo32(integer.fetch_sub_u32(15, 1));
return 0;
}
Multi producer multi consumer unbounded lock-free queue. FIFO is respected within the context of single producers
Concurrency safe lock-free ordered map based on skip list design
Concurrency safe lock-free hash map based on open addressing with quadratic probing
Concurrency safe, lock free priority queue based around skip list design:
Abstraction to enable fields to use thread local storage. Internally, the object are held in a list, with fast-access references held in thread local memory. Construction of instances will happen when a thread first accesses the internal thread local object (ex. get()). Destruction happens when the parent thread_local_member object is destroyed. Unsafe iteration may be done over the stored instances via begin()/end()
A simple delegate class
Supports (partial or full) binding of arguments in its constructor. The amount of of local storage (used to avoid allocations) may be changed by altering the DelegateStorage variable.
Also two helper functions: make_delegate and allocate_delegate with support for signature evaluation along with partial or full binding of arguments
A basic job/task setup with an additional feature twist
- Supports dependencies so that
job.run_this_before(otherJob)
- Features lightweight execution time & dependency execution time tracking, which may be used to reorder jobs dynamically
Basic usage:
#include <iostream>
#include <gdul/jobs/job.h>
#include <gdul/jobs/consumer.h>
#include <gdul/jobs/queue.h>
int main()
{
using namespace gdul;
jobs::fifo_queue fifoQueue;
auto workUnit = [] {
std::cout << "Hello, world" << std::endl;
jobs::stop_consumer(std::this_thread::get_id());
};
shared_ptr<jobs::job> job = make_job(workUnit, fifoQueue);
job->enable();
jobs::enter_consumption_loop({ &fifoQueue });
}
Usage, demonstrating reordering of jobs based on expected completion time (& dependant completion times):
#include <iostream>
#include <gdul/jobs/job.h>
#include <gdul/jobs/consumer.h>
#include <gdul/jobs/queue.h>
int main()
{
using namespace gdul;
jobs::priority_queue<jobs::remaining_timebranch_comparator> priorityQueue;
// Setup consumers
std::vector<std::thread> consumerThreads;
for (std::size_t i = 0; i < std::thread::hardware_concurrency(); ++i) {
consumerThreads.emplace_back(std::thread(&jobs::enter_consumption_loop, std::initializer_list<jobs::queue*>{ &priorityQueue }));
}
auto sleepFor100Ms = [] {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
};
auto sleepFor500Ms = [] {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
};
constexpr std::size_t testIterations = 15;
for (std::size_t i = 0; i < testIterations; ++i) {
shared_ptr<jobs::job> rootJob = jobs::make_job([] {}, priorityQueue);
shared_ptr<jobs::job> endJob = jobs::make_job([] {}, priorityQueue);
// Create a set of short jobs to saturate the consumer pipelines
for (std::size_t shortJobIndex = 0; shortJobIndex < std::thread::hardware_concurrency(); ++shortJobIndex)
{
shared_ptr<jobs::job> shortJob = jobs::make_job(sleepFor100Ms, priorityQueue);
shortJob->run_this_after(rootJob);
shortJob->run_this_before(endJob);
shortJob->enable();
}
// Create a long job that naturally should run after the short jobs
shared_ptr<jobs::job> longJob = jobs::make_job(sleepFor500Ms, priorityQueue);
longJob->run_this_after(rootJob);
longJob->run_this_before(endJob);
longJob->enable();
timer time;
endJob->enable();
rootJob->enable();
jobs::wait_until_finished(*endJob);
// The time we naïvely would expect to reveive here is somewhere above 600ms, as we created
// 100ms jobs to saturate all consumers, with an additional 500ms job running after. However,
// as we are tracking the execution time & dependant time of the jobs, and can order them
// accordingly in the priority queue, this will be closer to 500ms
std::cout << "Milliseconds passed: " << time.elapsed_milliseconds().count() << std::endl;
}
// Shutdown threads
for (auto& thread : consumerThreads) {
jobs::stop_consumer(thread.get_id());
thread.detach();
}
}