Due Thu Oct 8, 11:59pm
100 points total
Everyone likes to complete tasks quickly, and in this assignment we are asking you to do just that! You will implement a C++ library that executes tasks provided by an application as efficiently as possible on a multi-core CPU.
In the first part of the assignment, you will implement a version of the task execution library that supports bulk (data-parallel) launch of many instances of the same task. This functionality is similar to the ISPC task launch behavior you used to parallelize code across cores in Assignment 1.
In the second part of the assignment, you will extend your task runtime system to execute more complex task graphs, where the execution of tasks may depend on the results produced by other tasks. These dependencies constrain which tasks can be safely run in parallel by your task scheduling system. Scheduling execution of data-parallel task graphs on a parallel machine is a feature of many popular parallel runtime systems ranging from the popular Thread Building Blocks library, to Apache Spark, to modern deep learning frameworks such as PyTorch and TensorFlow.
This assignment will require you to:
- Manage task execution using a thread pool
- Orchestrate worker thread execution using synchronization primitives such as mutexes and condition variables
- Implement a task scheduler that reflects dependencies defined by a task graph
- Understand workload characteristics to make efficient task scheduling decisions
You may have already created thread pools and task execution libraries in classes such as CS110. However, the current assignment is a unique opportunity to better understand these systems. You will implement multiple task execution libraries, some without thread pools and some with different types of thread pools. From the implementations, you will better understand the design choices in creating a parallel system.
The assignment starter code has been tested to build and run on both Linux and OS X. You may wish to start this assignment by running on your own machine. The 4-core (8 hyper-thread) machines in the Myth cluster will be used for performance testing.
The assignment starter code is available on Github. Please clone the Assignment 2 starter code using:
git clone https://github.com/stanford-cs149/asst2.git
In Assignment 1, you used ISPC's task launch primitive to launch N instances of an ISPC task (launch[N] myISPCFunction()
). In the first part of this assignment, you will implement similar functionality in your task execution library.
To get started, get acquainted with the definition of ITaskSystem
in itasksys.h
. This abstract class defines the interface to your task execution system. The interface features a method run()
, which has the following signature:
virtual void run(IRunnable* runnable, int num_total_tasks) = 0;
run()
executes num_total_tasks
instances of the specified task. Since this single function call logically executes many tasks, we refer to each call to run()
as a bulk task launch.
The starter code in tasksys.cpp
contains a correct, but serial, implementation of TaskSystemSerial::run()
which serves as an example of how the task system uses the IRunnable
interface to execute a bulk task launch. (The definition of IRunnable
is in itasksys.h
) Notice how in each call to IRunnable::runTask()
the task system provides the task a current task identifier (an integer between 0 and num_total_tasks
), as well as the total number of tasks in the bulk task launch. The task's implementation will use these parameters to determine what work the task should do.
One important detail of run()
is that it must execute tasks synchronously with respect to the calling thread. In other words, when the call to run() returns, the application is guaranteed that the task system has completed execution of all tasks in the bulk task launch. The serial implementation of run()
provided in the starter code executes all tasks on the calling thread and thus meets this requirement.
The starter code contains a suite of test applications (see tests/tests.h
) that use your task system. For example, to run the test called mandelbrot_chunked
, which computes an image of a Mandelbrot fractal using a bulk launch of tasks that each process a continuous chunk of the image, type:
./runtasks -n 8 mandelbrot_chunked
The program will run the specified test and report the total execution time. It will also print error messages if your implementation does not yield the correct result.
The different tests have different performance characteristics -- some do little work per task, others perform significant amounts of processing. Some tests create large numbers of tasks per launch, others very few. Sometimes the tasks in a launch all have similar compute cost. In others, the cost of tasks in a single bulk launch is variable. We encourage you to inspect the code in tests/tests.h
to understand the behavior of the tests in more detail.
The -n
command-line option specifies the maximum number of threads the task system implementation can use. In the example above, we chose -n 8
because the CPU in the myth machines features eight execution contents. The full list of tests available to run is available via command line help (-h
command line option).
The -i
command-line options specifies the number of times to run the tests during performance measurement. To get an accurate measure of performance, ./runtasks
runs the test multiple times and records the minimum runtime of several runs; In general, the default value is sufficient---Larger values might yield more accurate measurements, at the cost of greater test runtime.
In addition, we also provide you the test harness that we will use for grading performance:
>>> python ../tests/run_test_harness.py
The harness has the following command line arguments,
>>> python run_test_harness.py -h
usage: run_test_harness.py [-h] [-n NUM_THREADS]
[-t TEST_NAMES [TEST_NAMES ...]] [-a]
Run task system performance tests
optional arguments:
-h, --help show this help message and exit
-n NUM_THREADS, --num_threads NUM_THREADS
Max number of threads that the task system can use. (8
by default)
-t TEST_NAMES [TEST_NAMES ...], --test_names TEST_NAMES [TEST_NAMES ...]
List of tests to run
-a, --run_async Run async tests
It produces a detailed performance report that looks like this:
>>> python ../tests/run_test_harness.py -t super_light super_super_light
================================================================================
Running task system grading harness... (2 total tests)
- Detected CPU with 8 execution contexts
- Task system configured to use at most 4 threads
================================================================================
================================================================================
Executing test: super_super_light...
Results for: super_super_light
STUDENT REFERENCE PERF?
[Serial] 7.492 10.636 0.70 (OK)
[Parallel + Always Spawn] 7.49 10.407 0.72 (OK)
[Parallel + Thread Pool + Spin] 7.487 11.625 0.64 (OK)
[Parallel + Thread Pool + Sleep] 7.487 9.412 0.80 (OK)
================================================================================
Executing test: super_light...
Results for: super_light
STUDENT REFERENCE PERF?
[Serial] 154.719 158.792 0.97 (OK)
[Parallel + Always Spawn] 154.711 36.381 4.25 (NOT OK)
[Parallel + Thread Pool + Spin] 154.727 29.691 5.21 (NOT OK)
[Parallel + Thread Pool + Sleep] 154.712 25.668 6.03 (NOT OK)
================================================================================
Overall performance results
[Serial] : All passed Perf
[Parallel + Always Spawn] : Perf did not pass all tests
[Parallel + Thread Pool + Spin] : Perf did not pass all tests
[Parallel + Thread Pool + Sleep] : Perf did not pass all tests
In the above output PERF
is the ratio of your implementation's runtime to the reference solution's runtime. So values less than one indicate that your task system implementation is faster than the reference implementation.
Mac users: While we provided binaries runtasks_ref_osx
for both part a and part b, we will be testing your code using the linux binaries. Therefore, we recommend you check your implementation in the myth machines before submitting.
Your job is to implement a task execution engine that efficiently uses your multi-core CPU. You will be graded on both the correctness of your implementation (it must run all the tasks correctly) as well as on its performance. This should be a fun coding challenge, but it is a non-trivial piece of work. To help you stay on the right track, to complete Part A of the assignment, we will have you implement multiple versions of the task system, slowly increasing in complexity and performance of your implementation. Your three implementations will be in the classes defined in tasksys.cpp/.h
.
TaskSystemParallelSpawn
TaskSystemParallelThreadPoolSpinning
TaskSystemParallelThreadPoolSleeping
Implement your part A implementation in the part_a/
sub-directory to compare to the correct reference implementation (part_a/runtasks_ref_*
).
Pro tip: Notice how the instructions below take the approach of "try the simplest improvement first". Each step increases the complexity of the task execution system's implementation, but on each step along the way you should have a working (fully correct) task runtime system.
In this step please implement the class TaskSystemParallelSpawn
.
The starter code provides you a working serial implementation of the task system in TaskSystemSerial
. In this step of the assignment you will extend the starter code to execute a bulk task launch in parallel.
-
You will need to create additional threads of control to perform the work of a bulk task launch. Notice that
TaskSystem
's constructor is provided a parameternum_threads
which is the maximum number of worker threads your implementation may use to run tasks. -
In the spirit of "do the simplest thing first", we recommend that you spawn worker threads at the beginning of
run()
and join these threads from the main thread beforerun()
returns. This will be a correct implementation, but it will incur significant overhead from frequent thread creation. -
How will you assign tasks to your worker threads? Should you consider static or dynamic assignment of tasks to threads?
-
Are there shared variables (internal state of your task execution system) that you need to protect from simultaneous access from multiple threads? You may wish to review our C++ synchronization tutorial for more information on the synchronization primitives in the C++ standard library.
In this step please implement the class TaskSystemParallelThreadPoolSpinning
.
Your implementation in step 1 will incur overhead due to creating threads in each call to run()
. This overhead is particularly noticeable when tasks are cheap to compute. At this point, we recommend you move to a "thread pool" implementation where your task execution system creates all worker threads up front (e.g., during TaskSystem
construction, or upon the first call to run()
).
-
As a starting implementation we recommend that you design your worker threads to continuously loop, always checking if there is more work to them to perform. (A thread entering a while loop until a condition is true is typically referred to as "spinning".) How might a worker thread determine there is work to do?
-
It is now non-trivial to ensure that
run()
implements the required synchronous behavior. How do you need to change the implementation ofrun()
to determine that all tasks in the bulk task launch have completed?
In this step please implement the class TaskSystemParallelThreadPoolSleeping
.
One of the drawbacks of the step 2 implementation is that threads utilize a CPU core's execution resources as they "spin" waiting for something to do. For example, worker threads might loop waiting for new tasks to arrive. As another example, the main thread might loop waiting for the worker threads to complete all tasks so it can return from a call to run()
. This can hurt performance since CPU resources are used to run these threads even though the threads are not doing useful work.
In this part of the assignment, we want you to improve the efficiency of your task system by putting threads to sleep until the condition they are waiting for is met.
-
Your implementation may choose to use condition variables to implement this behavior. Condition variables are a synchronization primitive that enables threads to sleep (and occupy no CPU processing resources) while they are waiting for a condition to exist. Other threads "signal" waiting threads to wake up to see if the condition they were waiting for has been met. For example, your worker threads could be put to sleep if there is no work to be done (so they don't take CPU resources away from threads trying to do useful work). As another example, your main application thread that calls
run()
might want to sleep while it waits for all the tasks in a bulk task launch to be completed by the worker threads. (Otherwise a spinning main thread would take CPU resources away from the worker threads!) Please see our C++ synchronization tutorial for more information on condition variables in C++. -
Your implementation in this part of the assignment may have tricky race conditions to think about. You'll need to consider many possible interleavings of thread behavior.
-
You might want to consider writing additional test cases to exercise your system. The assignment starter code includes the workloads that the grading script will use to grade the performance of your code, but we will also test the correctness of your implementation using a wider set of workloads that we are not providing in the starter code!
In part B of the assignment you will extend your part A task system implementation to support the asynchronous launch of tasks that may have dependencies on previous tasks. These inter-task dependencies create scheduling constraints that your task execution library must respect.
The ITaskSystem
interface has an additional method:
virtual TaskID runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
const std::vector<TaskID>& deps) = 0;
runAsyncWithDeps()
is similar to run()
in that it also is used to perform a bulk launch of num_total_tasks
tasks. However, it differs from run()
in a number of ways...
First, tasks created using runAsyncWithDeps()
are executed by the task system asynchronously with the calling thread. This means that runAsyncWithDeps()
, should return to the called immediately, even if the tasks have not completed execution. The method returns a unique identifier associated with this bulk task launch.
The calling thread can determine when the bulk task launch has actually completed by calling sync()
.
virtual void sync() = 0;
sync()
returns to the caller only when the tasks associated with all prior bulk task launches have completed. For example, consider the following code:
// assume taskA and taskB are valid instances of IRunnable...
std::vector<TaskID> noDeps; // empty vector
ITaskSystem *t = new TaskSystem(num_threads);
// bulk launch of 4 tasks
TaskID launchA = t->runAsyncWithDeps(taskA, 4, noDeps);
// bulk launch of 8 tasks
TaskID launchB = t->runAsyncWithDeps(taskB, 8, noDeps);
// at this point tasks associated with launchA and launchB
// may still be running
t->sync();
// at this point all 12 tasks associated with launchA and launchB
// are guaranteed to have terminated
As described in the comments above, the calling thread is not guaranteed tasks from previous calls to runAsyncWithDeps()
have completed until the thread calls sync()
. To be precise, runAsyncWithDeps()
tells your task system to perform a new bulk task launch, but your implementation has the flexibility to execute these tasks at any time prior to the next call to sync()
. Note that this specification means there is no guarantee that your implementation perform tasks from launchA prior to starting tasks from launchB!
The second key detail of runAsyncWithDeps()
is its third argument: a vector of TaskID identifiers that must refer to previous bulk task launches using runAsyncWithDeps()
. This vector specifies what prior tasks the tasks in the current bulk task launch depend on. Therefore, your task runtime cannot begin execution of any task in the current bulk task launch until all tasks from the launches given in the dependency vector are complete! For example, consider the following example:
std::vector<TaskID> noDeps; // empty vector
std::vector<TaskID> depOnA;
std::vector<TaskID> depOnBC;
ITaskSystem *t = new TaskSystem(num_threads);
TaskID launchA = t->runAsyncWithDeps(taskA, 128, noDeps);
depOnA.push_back(launchA);
TaskID launchB = t->runAsyncWithDeps(taskB, 2, depOnA);
TaskID launchC = t->runAsyncWithDeps(taskC, 6, depOnA);
depOnBC.push_back(launchB);
depOnBC.push_back(launchC);
TaskID launchD = t->runAsyncWithDeps(taskD, 32, depOnBC);
t->sync();
The code above features four bulk task launches (taskA: 128 tasks, taskB: 2 tasks, taskC: 6 tasks, taskD: 32 tasks). Notice that the launch of taskB and of taskC depend on taskA. The bulk launch of taskD (launchD
) depends on the results of both launchB
and launchC
. Therefore, while your task runtime is allowed to process tasks associated with launchB
and launchC
in any order (including in parallel), all tasks from these launches must begin executing after the completion of tasks from launchA
, and they must complete before your runtime can begin executing any task from launchD
.
We can illustrate these dependencies visually as a task graph. A task graph is a directed acyclic graph (DAG), where nodes in the graph correspond to bulk task launches, and an edge from node X to node Y indicates a dependency of Y on the output of X. The task graph for the code above is:
Notice that if you were running the example above on a Myth machine with eight execution contexts, the ability to schedule the tasks from launchB
and launchC
in parallel might be quite useful, since neither bulk task launch on its own is sufficient to use all the execution resources of the machine.
You must extend your task system implementation that uses a thread pool (and sleeps) from part A to correctly implement TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps()
and TaskSystemParallelThreadPoolSleeping::sync()
. You do not need to implement the other TaskSystem
classes in Part B. As with Part A, we offer you the following tips to get started:
-
It may be helpful to think about the behavior of
runAsyncWithDeps()
as pushing a record corresponding to the bulk task launch, or perhaps records corresponding to each of the tasks in the bulk task launch onto a "work queue". Once the record to work to do is in the queue,runAsyncWithDeps()
can return to the caller. -
The trick in this part of the assignment is performing the appropriate bookkeeping to track dependencies. What must be done when all the tasks in a bulk task launch complete? (This is the point when new tasks may become available to run.)
-
It can be helpful to have two data structures in your implementation: (1) a structure representing tasks that have been added to the system via a call to
runAsyncWithDeps()
, but are not yet ready to execute because they depend on tasks that are still running (these tasks are "waiting" for others to finish) and (2) a "ready queue" of tasks that are not waiting on any prior tasks to finish and can safely be run as soon as a worker thread is available to process them. -
You need not worry about integer wrap around when generating unique task launch ids. We will not hit your task system with over 2^31 bulk task launches.
-
You can assume all programs will either call only
run()
or onlyrunAsyncWithDeps()
; that is, you do not need to handle the case where arun()
call needs to wait for all proceeding calls torunAsyncWithDeps()
to finish. Note that this assumption means you can implementrun()
using appropriate calls torunAsyncWithDeps()
andsync()
.
Implement your part B implementation in the part_b/
sub-directory to compare to the correct reference implementation (part_b/runtasks_ref_*
).
Points in this assignment will be assigned as follows,
Part A (50 points)
- 5 points each for correctness and performance of
TaskSystemParallelSpawn::run()
- 10 points each for correctness and performance of
TaskSystemParallelThreadPool*::run()
Part B (40 points)
- 30 points for correctness of
TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps()
,TaskSystemParallelThreadPoolSleeping::run()
, andTaskSystemParallelThreadPoolSleeping::sync()
- 10 points for performance of
TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps()
,TaskSystemParallelThreadPoolSleeping::run()
, andTaskSystemParallelThreadPoolSleeping::sync()
Writeup (10 points)
For each test, full performance points will be awarded for implementations within 20% of the provided reference implementation. Performance points are only awarded for implementations that return correct answers. As stated before, we will also test the correctness of your implementation using a wider set of workloads that we are not providing in the starter code.
Please submit your work using Gradescope. Your submission should include both your task system code, and a writeup describing your implementation. We are expecting the following five files in the handin:
- part_a/task_sys.cpp
- part_a/task_sys.h
- part_b/task_sys.cpp
- part_b/task_sys.h
- writeup.pdf
We ask you to submit source files part_a/tasksys.cpp|.h
and part_b/tasksys.cpp|.h
. You can create a directory with sub-directories part_a
and part_b
, drop the relevant files in, zip the directory, and upload it. Please submit the zip to assignment Programming Assignment 2 (Code Submission) on Gradescope.
Before submitting the source files, make sure that all code is compilable and runnable! We should be able to drop these files into a clean starter code tree, type make
, and then execute your program without manual intervention.
Our grading scripts will run the checker code provided to you in the starter code to determine performance points. We will also run your code on other applications that are not provided in the starter code to further test its correctness!
Please submit a brief writeup to the assignment Programming Assignment 2 (Writeup Submission) on Gradescope, addressing the following:
- Describe your task system implementation (1 page is fine). In additional to a genera description of how it works, please make sure you address the following questions:
- How did you decide to manage threads? (e.g., did you implement a thread pool?)
- How does your system assign tasks to worker threads? Did you use static or dynamic assignment?
- How did you track dependencies in Part B to ensure correct execution of task graphs?
- In Part A, you may have noticed that simpler task system implementations (e.g., a completely serial implementation, or the spawn threads every launch implementation), perform as well as or sometimes better than the more advanced implementations. Please explain why this is the case, citing certain tests as examples. For example, in what situations did the sequential task system implementation perform best? Why? In what situations did the spawn-every-launch implementation perform as well as the more advanced parallel implementations that use a thread pool? When does it not?