Two-Project Series: Build a Stock-Tracking CLI with Async Streams in Rust
In this liveProject, you’ll use some of the unique and lesser-known features of the Rust language to finish a prototype of a FinTech command line tool.
You’ll step into the shoes of a developer working for Future Finance Labs, an exciting startup that’s aiming to disrupt mortgages, and help out with building its market-tracking backend.
Your CTO has laid out the requirements for the minimum viable product, which must quickly process large amounts of stock pricing data and calculate key financial metrics in real time.
- The goal is to fetch all S&P 500 stock data from
the Yahoo! Finance API.
- We are using the yahoo_finance_api crate for this purpose, and it allows for both blocking and asynchronous way of work.
- The official S&P 500 web page
- Data is fetched from the date that a user provides as a CLI argument to the current moment.
- Users also provide symbols (tickers) that they want on the command line.
- The fetched data include OLHC data (open, low, high, close prices), timestamp and volume, for each symbol.
- The data that we extract from the received data are minimum, maximum and closing prices for each requested symbol, along with percent change and a simple moving average as a window function (over the 30-day period).
- As can be seen, the calculations performed are not super-intensive.
- We are implementing MUCH MORE than is required of us in the project description!
- The goal is to experiment:
- with single-threaded synchronous (blocking) code,
- with multithreaded synchronous (blocking) code,
- with single-threaded asynchronous code,
- with multithreaded asynchronous code,
- with different implementations of Actors (the Actor model):
- with actix, as an Actor framework for Rust,
- with xactor, as another Actor framework for Rust,
- with our own asynchronous implementation of Actors,
- TODO: with our own synchronous implementation of Actors,
- with a single actor that is responsible for downloading, processing, and printing of the processed data to console,
- with two actors: the data-fetching one and the one that combines data processing and printing of results,
- with three actors: one for data-fetching, one for data-processing and printing to console, and one for writing
results to a
CSV
file, - with
async/await
combined with Actors, - with the Publisher/Subscriber model for Actors,
- with single-threaded implementation,
- with multithreaded implementation (with various libraries),
- with various combinations of the above.
- Some of that was suggested by the project author, and some of it (a lot of it) was added on own initiative.
- Not everything is contained in the final commit.
- The repository's commit history contains all those different implementations.
- The goal was also to create a web service for serving the requests for fetching of the data.
- We can send the requests manually from the command line. We are not implementing a web client.
Here follow implementation details and conclusions.
We also included comparison of different implementations.
- We started with synchronous code, i.e., with a blocking implementation.
- The implementation was single-threaded.
- The yahoo_finance_api crate that we use supports the blocking feature.
- This implementation was slow.
- Writing to file was not implemented at this stage.
- Then we moved to a regular (sequential, single-threaded)
async
version.- With all S&P 500 symbols it took
84
seconds for it to complete.
- With all S&P 500 symbols it took
- Then we upgraded the main loop to use explicit concurrency with
async/await
paradigm.
It runs multiple instances of the sameFuture
concurrently.
We are using the futures crate to help us achieve this:futures::future::join_all(queries).await;
.
This uses the waiting time more efficiently.
This is not multithreading.
This can increase the program's I/O throughput dramatically, which may be needed to keep the strict schedule with an async stream (that ticks everyn
seconds, wheren = 30
by default), without having to manage threads or data structures to retrieve results.
We fetch the S&P 500 data everyn
seconds.- This way it takes the program less than
2
seconds to fetch all S&P 500 data, instead of84
seconds as before, on the same computer and at the same time of the day. - That's a speed-up of around 50 times on the computer.
- This further means that we can fetch data a lot more frequently than the default
30
seconds.
- This way it takes the program less than
- Using the same explicit concurrency with
async/await
paradigm but with configurable chunk size gives more or less the same execution time. This is the case in which our functionhandle_symbol_data()
still processes one symbol, as before.- Namely, whether chunk size of symbols is equal 1 or 128, all symbols are processed in around
2.5
seconds. - The function
handle_symbol_data()
is a bottleneck in this case.
- Namely, whether chunk size of symbols is equal 1 or 128, all symbols are processed in around
- If we modify
handle_symbol_data()
to take and process multiple symbols at the time, i.e., to work with chunks of symbols, the execution time changes depending on the chunk size.- For example, for chunk size of 1, it remains
~2.5
s. - For chunk size equal 3, it is around
1.3
s! - For chunk size equal 5 or 6, it is around
1.2
s! Chunk size of 5 seems like a sweet-spot for this application with this implementation. - For chunk size equal 2 or 10, it is around
1.5
s. - For chunk size equal 128, it rises to over
13
s!
- For example, for chunk size of 1, it remains
- Thanks to the fact that the calculations performed on the data are not super-intensive, we can conclude
that
async/await
can be fast enough, even though it's generally mostly meant for I/O.- Namely, the calculations are light, and we are doing a lot of them frequently. We have around 500 symbols and several calculations per symbol. The data-fetching and data-processing functions are all asynchronous. We can conclude that scheduling a lot of lightweight async tasks on a loop can increase efficiency.
- Fetching data from a remote API is an I/O-bound task, so it is all in the relative ratio between fetching and processing of the data.
- It is probably the case that the I/O part dominates the CPU part in this application, and consequently
the
async/await
paradigm is a good choice in this case.
- We are using
async
streams for improved efficiency (async
streaming on a schedule). - Unit tests are also asynchronous.
- We couldn't measure execution time properly in case of asynchronous code.
- Writing to file was not implemented at this stage.
- Using the rayon crate for parallelization keeps execution time at around
2.5
seconds without chunking symbols.- We still use
futures::future::join_all(queries).await;
to join all tasks.
- We still use
- Using
rayon
with chunks yields the same results as above implementation with chunks.- For chunk size equal 5, execution time is around
1.0
s! Chunk size of 5 again seems like a sweet-spot for this application with this implementation. - All CPU cores are really being employed (as can be seen in Task Manager).
- For chunk size equal 5, execution time is around
- Using
rayon
is easy. It has parallel iterators that support chunking. We can turn our sequential code into parallel by usingrayon
easily. - We also implemented classical multithreading with
tokio::spawn()
.- This requires using the crate once_cell and its
struct
once_cell::sync::OnceCell
. It is needed to initialize the variablesymbols
that holds symbols that a user provides on the command line. - Starting with Rust 1.70.0,
we can use the standard library's std::sync::OnceLock
instead of
once_cell::sync::OnceCell
with same results.- The functionality has been ported from the crate to the standard library.
- Note: This implementation doesn't employ
rayon
. - Performance is the same as with explicit concurrency with
async/await
or withrayon
.- The sweet spot for the chunk size is again 5, and it yields execution time of
1.2
s.
- The sweet spot for the chunk size is again 5, and it yields execution time of
- This requires using the crate once_cell and its
struct
- Wrapping
symbols
instd::sync::Arc
and usingstd::thread::scope
provides a working solution that is almost as fast as other fast multithreading solutions. - We can conclude that
rayon
andTokio
provide equal performance in our case, and the standard library MT solution is more or less of the same speed in our case. - A higher CPU utilization can be observed with chunk size of 5 than with chunk size of 128, for example, which is good, as it leads to higher efficiency.
- All measurements were performed with the 505 S&P symbols provided.
- Comments in code also assume all 505 symbols.
- The official list of tickers changes from time to time, so our list can be updated accordingly (see the Notes on Data section below).
- If only 10 symbols are provided, instead of 505, then the fastest solution is with chunk size of 1, around
250
ms.- Chunk size of 5 is slower, around
600
ms. - Chunk sizes of 10 or 128 are very slow, over
1
s!
- Chunk size of 5 is slower, around
- We are probably limited by the data-fetching time from the Yahoo! Finance API. That's probably our bottleneck.
- We need to fetch data for around 500 symbols and the function
get_quote_history()
fetches data for one symbol (called "ticker") at a time. It is asynchronous, but perhaps this is the best that we can do.
- We need to fetch data for around 500 symbols and the function
- Writing to file had not been implemented at this stage.
- When we added writing to file, using
rayon
with chunk size equal 5 yielded execution time of1.0
! - With writing to file, using
Tokio
with chunk size equal 5 yields execution time of0.9
s! - TODO: With writing to file,
stdlib
with chunk size equal 5 yields execution time of ??? s! - TODO: With writing to file, using
crossbeam
with chunk size equal 5 yields execution time... - We used a barrier to write all results, from all threads, to the file at once.
- TODO: We also implemented writing individual chunks, by individual threads, to the file, by using a lock.
- TODO: Performance was the same as with barrier for writing all results at once. ?! CHECK!!!
- TODO: Using
rayon
with chunk size equal 5 yields execution time equal1
second! - TODO: Using
crossbeam
with chunk size equal 5 yields execution time... - TODO: Using
stdlib
threading with chunk size equal 5 yields execution time... - Writing to file was implemented at this stage. It requires synchronization, as multiple threads write to the same
file.
- We can write chunks, which requires using a lock, or we can write all results at once, which requires a barrier.
- Writing chunks makes more sense with large amounts of data, especially if they don't fit in memory.
- Writing all results at once probably makes more sense when the amount of data is not large and fits in memory, which is our case.
- We can write chunks, which requires using a lock, or we can write all results at once, which requires a barrier.
- We introduced
Actors
(the Actor model). - This can be a fast solution for this problem, even with one type of actor that performs all three operations (fetch, process, write), but it depends on implementation a lot.
- Having only one Actor doesn't make sense, but we started with one with the intention to improve from there.
- We initially kept the code asynchronous.
- It was on the order of synchronous and single-threaded
async
code, i.e.,~80-90
seconds, when actors were processing one symbol at a time (when a request message contained only one symbol to process). This applies in case of theactix
crate with a singleMultiActor
that does all three operations., and in case of three actors when using thexactor
crate. - When we increased the chunk size to 128, the
MultiActor
performance withactix
improved a lot, enough for it to fit in the 30-second window. - Interestingly, reducing the chunk size back to 1 now, in this implementation, was able to put the complete
execution in a 5-second slot, possibly in even less than
3
s. - Making chunk size equal 5 or 10 reduced execution time to
1.5-2
s.
- It was on the order of synchronous and single-threaded
- We then moved on to a Two-Actor asynchronous implementation.
- One actor was responsible for fetching data from the Yahoo! Finance API, and the other for processing
(calculating) performance indicators and printing them to
stdout
. - We only implemented actors that process one symbol at a time, i.e., not in chunks.
- We tried with a single instance of the
FetchActor
and multiple instances ofProcessorWriterActor
, as well as with multiple instances of both types of Actor. The number of instances was equal to the number of symbols.- In either case,
FetchActor
spawnsProcessorWriterActor
and sends it messages with fetched data. - The two cases have the same performance, which is around
2.5
s.
- In either case,
- We tried without and with
rayon
.- Neither implementation is ordered, meaning output is not in the same order as input.
- The
rayon
implementation uses multiple instances of both types of Actor, and has the same performance as the non-rayon
implementation, i.e.,~2.5
s.
- One actor was responsible for fetching data from the Yahoo! Finance API, and the other for processing
(calculating) performance indicators and printing them to
- The Three-Actor implementation has the
ProcessorWriterActor
split into two actors, for the total of three actor types.- The
FetchActor
is responsible for fetching data from the Yahoo! Finance API. - The
ProcessorActor
calculates performance indicators and prints them tostdout
. - The
WriterActor
writes the performance indicators to aCSV
file. - As for performance, the execution time is around
2.5
s without chunks.- Each actor works with a single symbol.
- There are as many
FetchActor
s andProcessorActor
s as there are symbols.
- If we introduce chunks, the performance increases.
- We worked with chunk size equal 5.
- There are as many
FetchActor
s andProcessorActor
s as there are chunks.- The execution time was possibly below
2
seconds.
- The execution time was possibly below
- If the
WriterActor
also works with chunks (of the same size and with the same amount of them), the execution time is again below2
seconds, but possibly even shorter than in the previous case, i.e., around1.5
s, making it a very fast solution.- This includes flushing of the buffer to file with every chunk, which makes it possible to write all rows to the file, and to still get an even better performance in terms of execution speed.
- We are using
std::io::BufWriter
to improve the write performance. It wouldn't make sense to flush the buffer unless we worked with chunks of symbols, i.e., it wouldn't make sense to flush it for a single symbol, although, that would make the solution correct because it would write all rows to the file. Still, to have both good performance and a correct solution, we should use chunks and flush the buffer for every chunk.
- Using
rayon
is at least equally fast, but probably not faster.
- This implementation writes to a file, unlike previous implementations, so it is expected that its performance is slightly worse because of that.
- With async code it was not possible to have the
WriterActor
write out all 505 rows, i.e., performance indicators for all symbols, in the file, if we only flushed when stopping the actor, i.e., in itsstopped
method.- Namely, we stop the main loop, which is infinite, by interrupting program by pressing
CTRL+C
, so thestopped
method doesn't have a chance to get executed and flush the buffer. - Increasing the
WriterActor
's mailbox size didn't help. - The course-author's
xactor
-based solution also wasn't able to write all rows to the file, but they only flush the buffer in the actor'sstopped
method.
- Namely, we stop the main loop, which is infinite, by interrupting program by pressing
- By adding flushing of the buffer to the file in the
WriterActor
'shandle
method, we are able to solve this issue.- We do this in case the
WriterActor
also works with chunks.
- We do this in case the
- All 505 rows do get printed to
stdout
regardless of theWriterActor
, as the output to console is handled by theProcessorActor
.
- The
- We experimented with the Publisher/Subscriber model with the
Actix
framework, to get a feel of it.- The Publisher/Subscriber model is generally better suited to applications that have a number of different instances of the same actor, and they should all get a message, or where there are different types of subscriber actors which should all receive the same message, i.e., be notified of it.
- Our application is not like that, but we still wanted to try it out and play around with it a little.
- Namely, we don't want our actors to do the double work, so we have only one instance of each.
- We could try to implement multiple instances of actors for fun, to see if all of them really get the messages.
- For some more fun, we could randomize sending messages to only instance of each type of actor. They wouldn't be doing double work in that case, which would make the implementation correct.
- We weren't able to make it work.
- The issue is that some of our actors are both publishers and subscribers at the same time, and perhaps Actix simply doesn't support that, or at least not with async functions.
- Perhaps it can be solved, but I am not sure. Check out:
- https://github.com/actix/actix/blob/master/actix/examples/ring.rs - All Nodes in the example are of the same type, but this doesn't seem like a Publisher/Subscriber model. They create a node from another node to form a ring (a full circle) of nodes, but that's not P/S.
- The actix-broker crate.
- https://docs.rs/actix-broker/latest/actix_broker/
- https://github.com/actix/actix/blob/master/actix-broker/examples/basic.rs
- Couldn't get it to work because of
async
blocks and lifetimes (commit).
- Performance: N/A
- We are using actix as an Actor framework for Rust, and
its actix-rt as a runtime.
- Note: actix-rt is a "Tokio-based single-threaded async runtime for the Actix ecosystem".
- We implemented our own asynchronous Actor model from scratch.
- We have a variant with a universal (general) actor that can receive and handle multiple message types.
- We have a variant of specific actors that can only process and handle a single, specific, message type.
- We use Tokio as asynchronous runtime.
- Our implementation is project-specific - a custom one.
- It is not general. It is not meant as a general library, but it could be generalized. It's a good starting point for a general Actor Model library.
- Performance is excellent! It is at least as fast as our previous fastest solution, if not faster.
- TODO:
- The actors are connected to the outside world.
- We create a web service for this.
Check out the files that are provided for additional explanation:
Most of those were provided by the course author, but were modified where it made sense.
- List of S&P 500 symbols (tickers) can be found at:
- https://en.wikipedia.org/wiki/List_of_S%26P_500_companies
- https://github.com/datasets/s-and-p-500-companies/blob/main/data/constituents.csv
- The lists are not necessarily up-to-date.
- The alphabetically-sorted list is provided in sp500_2024_aug.csv.
- There are 505 symbols in it.
- Keep in mind that some symbols come and go to/from the S&P 500 list.
- In case a symbol is not officially on the list, it will be ignored and consequently
not shown in the output, be it in
stdout
or in the generatedoutput.csv
. - The official list should be looked up from time to time and the input CSV file in this repository,
that contains the list, should be updated accordingly.
- The file's name intentionally contains the date when it was constructed.
- In case a symbol is not officially on the list, it will be ignored and consequently
not shown in the output, be it in
- The output is not in the same order as input because of concurrent execution of futures.
Not all of them are present in every commit.
The git
commit history contains descriptive comments.
- actix, as an Actor framework for Rust
- actix-rt, as Tokio-based single-threaded async runtime for the Actix ecosystem
- async-std, as an async library
- clap, for CLI arguments parsing
- futures, for an implementation of futures (required for explicit concurrency
with
async/await
paradigm) - rayon, as a data-parallelism library for Rust
- time, as a date and time library (used by
yahoo_finance_api
) - Tokio, as an asynchronous runtime - used both directly and as a dependency of some other crates
- xactor, as a Rust Actors framework based on async-std (it also supports Tokio as runtime instead of async-std, but we didn't use that)
- yahoo_finance_api, as an adapter for the Yahoo! Finance API to fetch histories of market data quotes
- The application requires the
from
and thesymbols
arguments. - The
from
date and time argument should be provided in the RFC3339 format. - The
symbols
argument should contain comma-separated S&P symbols (tickers), with no blanks between them. - The
to
date and time are assumed as the current time instant, at the moment of execution of each iteration of the loop (at each new interval). - The examples below demonstrate how to run the app.
- The output date and time are also in the
RFC3339
format. - The program runs in a loop with a specified interval (in src/constants.rs).
$ cargo run -- --from 2023-07-03T12:00:09+00:00 --symbols AAPL,AMD,AMZN,GOOG,KO,LYFT,META,MSFT,NVDA,UBER
*** 2024-02-27 19:42:58.0795392 +00:00:00 ***
period start,symbol,price,change %,min,max,30d avg
2023-07-03T12:00:09Z,AMD,$177.65,53.38%,$93.67,$181.86,$172.12
2023-07-03T12:00:09Z,GOOG,$140.12,16.23%,$116.87,$154.84,$146.27
2023-07-03T12:00:09Z,LYFT,$16.63,64.00%,$9.17,$19.03,$13.90
2023-07-03T12:00:09Z,META,$485.17,69.63%,$283.25,$486.13,$435.32
2023-07-03T12:00:09Z,UBER,$78.10,81.25%,$40.62,$81.39,$70.34
2023-07-03T12:00:09Z,NVDA,$791.87,86.70%,$403.26,$791.87,$671.35
2023-07-03T12:00:09Z,AMZN,$173.62,33.33%,$119.57,$174.99,$164.80
2023-07-03T12:00:09Z,MSFT,$407.20,20.48%,$312.14,$420.55,$405.11
2023-07-03T12:00:09Z,AAPL,$183.13,-4.85%,$166.89,$198.11,$187.16
2023-07-03T12:00:09Z,KO,$60.17,-0.67%,$52.38,$63.05,$59.97
Took 278.264ms to complete.
Symbols (tickers) added in 2024:
$ cargo run -- --from 2024-01-01T12:00:09+00:00 --symbols KKR,CRWD,GDDY,VST,GEV,SOLV,SMCI,DECK
$ cargo run -- --from 2024-07-03T12:00:09+00:00 --symbols "$(cat sp500_2024_aug.csv)"
Or, equivalently:
$ export SYMBOLS="$(cat sp500_2024_aug.csv)" && cargo run -- --from 2024-07-03T12:00:09+00:00 --symbols $SYMBOLS
- This application fetches data from a remote API, so it is relatively I/O-bound.
- There are some light calculations taking place, but I believe that the application is more on the I/O-bound side.
- The
async/await
paradigm copes well with this kind of application. - We also implemented the Actor Model. It uses message passing between actors.
- Working with chunks of data instead of with individual pieces of data improves performance.
- Not all chunk sizes perform the same, so we need to find a sweet spot - an optimal chunk size.
- It probably helps even more so in a distributed setting (a distributed network of nodes) to send larger messages, than smaller ones, in both ways, which means to work with chunks instead of individual symbols.
- We couldn't measure execution time properly in case of asynchronous code.
TODO: - This proved to be the fastest solution for this concrete problem.
- Find a way to measure time correctly when working with async/await and/or with Actors.
- Add tracing or at least logging.
- Fully implement graceful stopping and shutdown after receiving the shutdown signal,
CTRL
+C
.- It's only partly implemented.
- We should broadcast the shutdown signal to all tasks, however they are implemented.
- In case of actors, which is considered our main implementation, we should broadcast the signal to all actors, or at least just send the signal to the single instance of the writer actor.
- Read symbols from a file instead of from the command line.
- Sort output by symbol.
- Rename "output.csv" to "<current_date_and_time_with_tz>.csv".
- Use RFC2822, or RFC3339, or ISO 8601 or any other format.
- Find ways to publish and subscribe to messages without explicit calls.
- actix might support this feature through the use
of Recipient;
also see here.
- We tried to do this, to no avail. Perhaps we are missing something, or perhaps Actix doesn't allow actors to be both a publisher and a subscriber at the same time, which is what we need.
- xactor does support the feature, but at the time of this writing, it hadn't
been updated in about three years. Also,
actix
is far more popular.- Still, a
xactor
implementation with three different Actors and with publish/subscribe model can be found in this commit.
- Still, a
- actix might support this feature through the use
of Recipient;
also see here.