Code review du JobSystem [Oleg].
Closed this issue · 1 comments
LoshkinOleg commented
Je pense avoir fini le JobSystem. Si vous avez du feedback, c'est apprécié:
jobsystem.h
#pragma once
#include <functional>
#include <thread>
#include <condition_variable>
#include <queue>
#include <atomic>
class JobSystem
{
enum Status : uint8_t
{ // Q: Why declaring this as an enum CLASS makes status_ & Status::RUNNING an invalid operation?
RUNNING = 1u
};
public:
JobSystem();
~JobSystem();
void KickJob(const std::function<void()>& func);
void Work();
private:
const static std::uint8_t OCCUPIED_THREADS = 3; // Define number of threads used by engine.
std::uint8_t numberOfWorkers;
std::queue<std::function<void()>> tasks_; // Managed via mutex. // TODO: replace with custom queue when those are implemented.
std::vector<std::thread> workers_; // TODO: replace with fixed vector when those are implemented.
std::mutex mutex_;
std::condition_variable cv_;
std::atomic<std::uint8_t> status_ = 1u;
std::atomic<std::uint8_t> initializedWorkers_ = 0;
};
jobsystem.cpp
#include <engine/jobsystem.h>
JobSystem::JobSystem()
{
numberOfWorkers = std::thread::hardware_concurrency() - OCCUPIED_THREADS;
//TODO@Oleg: Add neko assert to check number of worker threads is valid!
workers_.reserve(numberOfWorkers);
const size_t len = numberOfWorkers;
for (size_t i = 0; i < len; ++i)
{
workers_[i] = std::thread([this] { Work(); }); // Kick the thread => sys call
}
}
JobSystem::~JobSystem()
{
// Spin-lock waiting for all threads to become ready for shutdown.
while (initializedWorkers_ != numberOfWorkers || !tasks_.empty()){} // WARNING: Not locking mutex for task_ access here!
status_ = 0u; // Atomic assign.
cv_.notify_all(); // Wake all workers.
const size_t len = numberOfWorkers;
for (size_t i = 0; i < len; ++i)
{
workers_[i].join(); // Join all workers.
}
}
void JobSystem::KickJob(const std::function<void()>& func)
{
{// CRITICAL
std::unique_lock<std::mutex> lock(mutex_);
tasks_.push(func);
}// !CRITICAL
cv_.notify_one();
}
void JobSystem::Work()
{
initializedWorkers_++; // Atomic increment.
NEXT_TASK:
while (status_ & Status::RUNNING) // Atomic check.
{
std::function<void()> task;
{// CRITICAL
std::unique_lock<std::mutex> lock(mutex_);
if (!tasks_.empty())
{
task = tasks_.front();
tasks_.pop();
}
else
{
goto SLEEP; // Exits CRITICAL.
}
}// !CRITICAL
task();
goto NEXT_TASK;
SLEEP:
if (status_ & Status::RUNNING) // Atomic check.
{
std::unique_lock<std::mutex> lock(mutex_); // CRITICAL
cv_.wait(lock); // !CRITICAL
}
}
}
test_jobsystem.cpp
#include <gtest/gtest.h>
#include <engine/jobsystem.h>
#include <easy/profiler.h>
void IncrementMem(unsigned int& mem)
{
const unsigned int TASK_WORK_TIME = 1;
#ifdef USING_EASY_PROFILER
EASY_BLOCK("JOBSYSTEM_DO_NOTHING");
#endif
std::this_thread::sleep_for(std::chrono::seconds(TASK_WORK_TIME));
mem++;
}
TEST(Engine, TestJobSystem)
{
const unsigned int TASKS_COUNT = 8;
unsigned int tasksDone = 0;
{// JobSystem
#ifdef USING_EASY_PROFILER
EASY_PROFILER_ENABLE;
EASY_BLOCK("JOBSYSTEM_MAIN_THREAD"){
#endif
JobSystem system;
for (size_t i = 0; i < TASKS_COUNT; ++i)
{
system.KickJob(std::function<void()>{[&tasksDone] { IncrementMem(tasksDone); }});
}
#ifdef USING_EASY_PROFILER
} EASY_END_BLOCK;
#endif
}// !JobSystem
#ifdef USING_EASY_PROFILER
profiler::dumpBlocksToFile("JobSystem.prof");
#endif
// JobSystem must make main thread wait until all tasks are done before self-destructing.
EXPECT_TRUE(TASKS_COUNT == tasksDone);
}
EliasFarhan commented
I corrected the test jobsystem with the latest commit and switch jobsystem with resize instead of reserve before creating the threads.