flo_scene is a crate that provides a way to build large pieces of software by combining smaller 'sub-programs' that communicate via messages. This simplifies dependencies over more traditional object-oriented techniques, as sub-programs do not need to directly depend on each other. There are also benefits in terms of testing, code re-use and configurability.
A scene contains sub-programs. Each sub-program receives a single input stream of a single type of message, and can generate any number of output streams. Typically an output stream specifies a message type only, with the target being determined by an external connection definition, but it is also possible for a sub-program to request a direct connection to another program.
Scenes are created using Scene::default()
or Scene::empty()
. The empty scene contains no
subprograms by default but the default scene contains some default ones, in particular a
control program that can be used to start other programs or define connections between programs.
use flo_scene::*;
let scene = Scene::default();
Sub-programs read from a single input stream of messages and can write to any number of output
streams. Each output stream can be connected to the input of another program, and these connections
can be specified independently of the programs themselves. Messages need to implement the
SceneMessage
trait, and subprograms can be added to a scene using the add_subprogram()
function.
// Simple logger
pub enum LogMessage {
Info(String),
Warning(String),
Error(String)
}
impl SceneMessage for LogMessage { }
let log_program = SubProgramId::called("Logger");
scene.add_subprogram(log_program,
|mut log_messages: InputStream<LogMessage>, _context| async move {
while let Some(log_message) = log_messages.next().await {
match log_message {
LogMessage::Info(msg) => { println!("INFO: {:?}", msg); }
LogMessage::Warning(msg) => { println!("WARNING: {:?}", msg); }
LogMessage::Error(msg) => { println!("ERROR: {:?}", msg); }
}
}
},
10)
Connections can be defined by the connect_programs()
function. Note how this means that something
that generates log messages does not need to know their destination and that it's possible to change
how something is logging at run-time if needed.
// Connect any program that writes log messages to our log program
// '()' means 'any source' here, it's possible to define connections on a per-subprogram basis if needed.
scene.connect_programs((), log_program, StreamId::with_message_type::<LogMessage>()).unwrap();
Subprograms have a context that can be used to retrieve output streams, or send single messages, so after this connection is set up, anything can send log messages.
let test_program = SubProgramId::new();
scene.add_subprogram(test_program,
|_: InputStream<()>, context| async move {
// '()' means send to any target
let mut logger = context.send::<LogMessage>(()).unwrap();
// Will send to the logger program
logger.send(LogMessage::Warning("Hello".to_string())).await.unwrap();
},
0);
Once set up, the scene needs to be run, in the async context of your choice:
executor::block_on(async {
scene.run_scene().await;
});
A single scene can be run in multiple threads if needed and subprograms are naturally able to run asynchronously as they communicate with messages rather than by direct data access.
The SceneMessage
trait can be used to specify the default initialisation for a program: in particular,
a default target can be provided for the message type, along with an initialisation routine that can
be used to set up the message in a scene the first time it is seen. This becomes increasingly useful as
the number of subprograms increase, as it provides a way to combine the setup of a message type with
the implementation rather than having to have something that sets up a huge number of connections.
impl SceneMessage for MyMessage {
fn default_target() -> StreamTarget {
StreamTarget::Program(SubProgramId::called("MyMessageHandler"))
}
}
When the scene is created with Scene::default()
, a control program is present that allows
subprograms to start other subprograms or create connections:
/* ... */
context.send_message(SceneControl::start_program(new_program_id, |input, context| /* ... */, 10)).await.unwrap();
context.send_message(SceneControl::connect(some_program, some_other_program, StreamId::for_message_type::<MyMessage>())).await.unwrap();
There are also programs to send to stdout or stderr as well as receive from stdin.
The empty scene does not get any of the default programs, so it can be configured however is necessary, and it's also possible to create a scene with a particular set of default programs set up.
Filters make it possible to connect two subprograms that take different message types by transforming them. They need to be registered, then they can be used as a stream source or target:
// Note that the stream ID identifies the *source* stream: there's only one input stream for any program
let mine_to_yours_filter = FilterHandle::for_filter(|my_messages: InputStream<MyMessage>| my_messages.map(|msg| YourMessage::from(msg)));
scene.connect(my_program, StreamTarget::Filtered(mine_to_yours_filter, your_program), StreamId::with_message_type::<MyMessage>()).unwrap();
A program can 'upgrade' its input stream to annotate the messages with their source if it needs this information:
scene.add_subprogram(log_program,
|log_messages: InputStream<LogMessage>, _context| async move {
let mut log_messages = log_messages.messages_with_sources();
while let Some((source, log_message)) = log_messages.next().await {
match log_message {
LogMessage::Info(msg) => { println!("INFO: [{:?}] {:?}", source, msg); }
LogMessage::Warning(msg) => { println!("WARNING: [{:?}] {:?}", source, msg); }
LogMessage::Error(msg) => { println!("ERROR: [{:?}] {:?}", source, msg); }
}
}
},
10)
It is possible to request a stream directly to a particular program:
let mut logger = context.send::<LogMessage>(SubProgramId::called("MoreSpecificLogger"));
But it's also possible to redirect these with a connection request:
// The scene is the ultimate arbiter of who can talk to who, so if we don't want our program talking to the MoreSpecificLogger after all we can change that
// Take care as this can get confusing!
scene.connect(exception_program, standard_logger_program, StreamId::with_message_type::<LogMessage>().for_target(SubProgramId::called("MoreSpecificLogger")));
A very useful thing that can be done with the connect()
call is to specify a default filter for an
incoming connection. This allows a program that supports a richer version of an existing protocol to
also support the original one, or to specify how it receives events of different types. It's similar
to inheritance in object-oriented languages but considerably more flexible (filters can perform
any action that's possible to perform to a stream of data, there's no rigid hierarchy)
// '()' is shorthand for StreamSource::Any
// Any incoming connections of type `OldMessage` gets filtered to `NewMessage` for `new_message_program`
scene.connect_programs((), StreamTarget::Filtered(old_to_new_message_filter, new_message_program), StreamId::with_message_type::<OldMessage>()).unwrap();
The send_immediate()
request can be used on an output sink to send a message without needing
to use an await
. This combines well with thread stealing if the target program can process
the message without awaiting. A downside of send_immediate
is that it can override the
backpressure that subprograms normally generate when they have too many messages waiting, so
it needs to be used with some caution.
This is a fairly specialist use-case, but this could be used with a logging framework to enable
logging to a subprogram. scene_context()
can be used to acquire the context of the subprogram
running on the current thread, which can be very useful when combined with immediate messages.
'Thread stealing' is an option that can be turned on for an input stream by calling
input_stream.allow_thread_stealing(true)
. With this option on, the target program will
immediately be polled when a message is sent rather than waiting for the current subprogram
to yield. This is useful for actions like logging where information could be lost if the messages
were buffered, or where a message is intended to be used in immediate mode.
The initialise()
function in the SceneMessage
trait is called the first time a message type is
encountered in a scene, and one way this can be used is to ensure that a default handler program is
running when a message is sent for the first time. For example, this will set up a message handler
in any scene that users that message type:
impl SceneMessage for AutoStartMessage {
fn initialise(scene: &Scene) {
// Create a subprogram as the default handler for a message the first time that it's encountered in a scene
scene.add_subprogram(SubProgramId::called("AutoStart"), autostart_message_handler, 0);
scene.connect_programs((), SubProgramId::called("AutoStart"), StreamId::with_message_type::<AutoStartMessage>()).unwrap();
}
}
This is a way to move the burden of setting up subprograms away from the code that creates a scene and into the message type itself.
The TestBuilder
type can be used test a scene by sending and receiving messages. This takes advantage
of the simple program model of a scene to provide a simple 'send this, expect this result' test style.
// Check that the timer calls us back
let test_program = SubProgramId::new();
TestBuilder::new()
.send_message(TimerRequest::CallAfter(test_program, 1, Duration::from_millis(10)))
.expect_message(|_: TimeOut| { Ok(()) })
.run_in_scene(&scene, test_program);
Scenes are inherently testable: automatic initialisation can save on setup time, there's no need to
intercept internal method calls as all messages are available and can be redirected with the
connect_programs()
call, and the StreamTarget::None
target can be used to dump messages that
are not relevant to the test.
The Subscribe
message is often accepted via a filter and indicates a target where events of a
particular type should be sent. For instance, a program can monitor the overall state of the
scene by sending a Subscribe<SceneUpdate>
message, which is handled by the control program
by default.
The companion Query
message is used to send a request for a single QueryResponse
message.
Where subscriptions send messages indefinitely, queries send just one QueryResponse
, which
is itself a stream of data indicating the result of the query. For instance, to receive a
response indicating the current state of the scene, the Query<SceneUpdate>
message can be
sent. A single response will be received indicating which subprograms are running and how
they are connected.
Queries can be combined with Commands as a way to make a query against the state of something without needing to set up a filter to convert the QueryResponse (or making it possible to send unsolicited QueryResponses to confuse the subprogram)
Sub-programs run for an indefinite period of time and can accept messages from any source. Commands represent re-usable requests that process their input and return a single output stream. They can be used to perform background processing, group a sequence of messages into a single unit or retrieve the results of a query to another component. For instance, a command that sends a couple of messages can be created as follows:
let example_command = FnCommand::<(), ()>::new(|_no_input, context| async move {
context.send_message(Message1::SampleMessage).await.ok();
context.send_message(Message2::ExampleMessage).await.ok();
});
This can be spawned by a subprogram to perform this sequence of actions:
context.spawn_command(example_command.clone(), stream::empty()).unwrap();
The command is passed an input stream, and captures a single 'standard' output stream to return to the subprogram that spawned it:
let number_to_string = FnCommand::<usize, String>::new(|numbers, context| async move {
let mut numbers = numbers;
let mut strings = context.send(()).unwrap();
while let Some(number) = numbers.next().await {
strings.send(number.to_string()).await.ok();
}
});
let strings = context.spawn_command(number_to_string.clone(), stream::iter(vec![1, 2, 3, 4])).unwrap();
// strings is a stream ["1", "2", "3", "4"]
The input to a command can be a query using the spawn_query
function, and this is useful in combination
with the ReadCommand
which is a very basic command that sends its input straight to its output:
// Retrieve the current state of the scene as a stream of SceneUpdates
let current_scene_state = context.spawn_query(ReadCommand::default(), Query::<SceneUpdate>::with_no_target(), SceneTarget::Any).unwrap();
You can create and run more than one Scene
at once if needed, and run scenes underneath each
other. This provides a further way to structure a program, for example by providing scenes for
individual documents or users.
The core idea behind flo_scene is that there are at least three levels of abstraction in large programs and these have different needs, and the last of these ('system') is not well-served by the design of most programming languages:
- Algorithms
- Functions
- Systems
Algorithms are basic code: they have full access to a lot of state, but they generally are bad at re-using a part of themselves in different contexts.
Functions provide a way to solve the issue with algorithms. They allow an individual algorithm to be used in many different contexts, and can provide a structural element to break an algorithm down into simpler parts. They are limited to a single output given multiple inputs, and they tend to get more nested as a piece of software grows. For large enough software, this nesting becomes a problem: 'higher-level' functions can encapsulate a huge portion of the software's functionality and have huge networks on interdependencies.
Subprograms are flo_scene's approach to providing a system-level construct. They take a single input and can generate multiple outputs and have a flat dependency structure instead of a nested one. Dependencies are defined externally rather than internally and can even be changed at run-time. This relieves the problem of ever-growing complexity by removing the need for direct dependencies.
Object-oriented languages were supposed to provide a structure to relieve this nesting problem too, but their modern variants gave up messaging for method calls, viewing the type system as more important than the structural one. Method calls are just functions, so these languages typically do not provide the extra level of structure needed to relieve the nesting problem, unlike earlier examples such as Simula and SmallTalk.
The key concept here is Dependency Inversion
. The problem with building large software by nesting
smaller pieces of software ever deeper is that the upper levels incorporate the entire functionality
of the lower levels and hence become enormously complicated, hard to understand and hard to change.
Dependency inversion is an often poorly understood and implemented way to break this cycle by
reversing the nesting order (eg, 'the logger has a reference to everything' instead of 'everything
has a reference to the logger'). flo_scene is perhaps a step beyond this by breaking the direct
dependencies between components entirely and defining them externally.
The idea of objects that communicate with messages probably originates with the Simula language, with SmallTalk and Objective-C being other languages that notably adopt this approach.
Erlang is a functional language with a process model concept where processes communicate using mailboxes. An Erlang process is quite similar to a Simula object. flo_scene's model is also quite similar with a key differentiator being the ability to connect sub-programs that have no knowledge of each other.
Microservices are another quite similar concept, though it's far more heavyweight with the processes being entire web services by themselves. A flo_scene subprogram could be an entirely separate service as easily as it could be a co-routine in the same process, but no direct support is provided in the core crate for this.
All of these systems can be seen as different ways to implement the actor model, though most actor frameworks that exist focus on concurrency issues and making messages work like functions to present a familiar programming model (with its familiar nesting dependencies) over providing a structural element.