EliasFarhan/NekoEngine

Code review du JobSystem [Oleg].

Closed this issue · 1 comments

Je pense avoir fini le JobSystem. Si vous avez du feedback, c'est apprécié:

jobsystem.h

#pragma once
#include <functional>
#include <thread>
#include <condition_variable>
#include <queue>
#include <atomic>

class JobSystem
{
    enum Status : uint8_t
    { // Q: Why declaring this as an enum CLASS makes status_ & Status::RUNNING an invalid operation?
        RUNNING = 1u
    };

public:
    JobSystem();
    ~JobSystem();
    void KickJob(const std::function<void()>& func);
    void Work();

private:
    const static std::uint8_t OCCUPIED_THREADS = 3; // Define number of threads used by engine.
    std::uint8_t numberOfWorkers;
    std::queue<std::function<void()>> tasks_; // Managed via mutex. // TODO: replace with custom queue when those are implemented.
    std::vector<std::thread> workers_; // TODO: replace with fixed vector when those are implemented.
    std::mutex mutex_;
    std::condition_variable cv_;
    std::atomic<std::uint8_t> status_ = 1u;
    std::atomic<std::uint8_t> initializedWorkers_ = 0;
};

jobsystem.cpp

#include <engine/jobsystem.h>

JobSystem::JobSystem()
{
    numberOfWorkers = std::thread::hardware_concurrency() - OCCUPIED_THREADS;
    //TODO@Oleg: Add neko assert to check number of worker threads is valid!
    workers_.reserve(numberOfWorkers);

    const size_t len = numberOfWorkers;
    for (size_t i = 0; i < len; ++i)
    {
        workers_[i] = std::thread([this] { Work(); }); // Kick the thread => sys call
    }
}

JobSystem::~JobSystem()
{
    // Spin-lock waiting for all threads to become ready for shutdown.
    while (initializedWorkers_ != numberOfWorkers || !tasks_.empty()){} // WARNING: Not locking mutex for task_ access here!

    status_ = 0u; // Atomic assign.
    cv_.notify_all(); // Wake all workers.
    const size_t len = numberOfWorkers;
    for (size_t i = 0; i < len; ++i)
    {
        workers_[i].join(); // Join all workers.
    }
}

void JobSystem::KickJob(const std::function<void()>& func)
{
    {// CRITICAL
        std::unique_lock<std::mutex> lock(mutex_);
        tasks_.push(func);
    }// !CRITICAL
    cv_.notify_one();
}

void JobSystem::Work()
{
    initializedWorkers_++; // Atomic increment.

    NEXT_TASK:
    while (status_ & Status::RUNNING) // Atomic check.
    {
        std::function<void()> task;
        {// CRITICAL
            std::unique_lock<std::mutex> lock(mutex_);
            if (!tasks_.empty())
            {
                task = tasks_.front();
                tasks_.pop();
            }
            else
            {
                goto SLEEP; // Exits CRITICAL.
            }
        }// !CRITICAL
        task();
        goto NEXT_TASK;

        SLEEP:
        if (status_ & Status::RUNNING) // Atomic check.
        {
            std::unique_lock<std::mutex> lock(mutex_); // CRITICAL
            cv_.wait(lock); // !CRITICAL
        }
    }
}

test_jobsystem.cpp

#include <gtest/gtest.h>
#include <engine/jobsystem.h>
#include <easy/profiler.h>

void IncrementMem(unsigned int& mem)
{
    const unsigned int TASK_WORK_TIME = 1;

#ifdef USING_EASY_PROFILER
    EASY_BLOCK("JOBSYSTEM_DO_NOTHING");
#endif
    std::this_thread::sleep_for(std::chrono::seconds(TASK_WORK_TIME));
    mem++;
}

TEST(Engine, TestJobSystem)
{
    const unsigned int TASKS_COUNT = 8;
    unsigned int tasksDone = 0;

    {// JobSystem
#ifdef USING_EASY_PROFILER
        EASY_PROFILER_ENABLE;
        EASY_BLOCK("JOBSYSTEM_MAIN_THREAD"){
#endif
            JobSystem system;
            for (size_t i = 0; i < TASKS_COUNT; ++i)
            {
                system.KickJob(std::function<void()>{[&tasksDone] { IncrementMem(tasksDone); }});
            }

#ifdef USING_EASY_PROFILER
        } EASY_END_BLOCK;
#endif
    }// !JobSystem

#ifdef USING_EASY_PROFILER
    profiler::dumpBlocksToFile("JobSystem.prof");
#endif

    // JobSystem must make main thread wait until all tasks are done before self-destructing.
    EXPECT_TRUE(TASKS_COUNT == tasksDone);
}

I corrected the test jobsystem with the latest commit and switch jobsystem with resize instead of reserve before creating the threads.