Race-condition with fan-in pattern
emil14 opened this issue · 6 comments
Every time you have N>1
senders for 1
receiver you can have race-condition
- sender
s1
sent a messagem1
at0:00:00
- sender
s2
sent a messagem2
at0:00:01
- message
m2
received by receiverr1
beforem1
- data-race
The reason for that is the way runtime interprets the program
For each connection there is a separate goroutine. Goroutines are concurrent which means if we have 2 goroutines they compete for resources.
So, 2 goroutines await for signal from sender (those are different) to deliver a message to receiver (which is the same). Even though first sender sent first there's no guarantee Go scheduler will activate corresponding connection goroutine first. Sometimes it does, sometimes it doesn't.
Here's the example
import { lists }
const lst list<bool> = [true, false]
component Main(start) (stop) {
nodes { lists.For{Printer} }
:start -> ($lst -> for -> :stop)
}
component Printer(data bool) (sig any) {
nodes { If, Println }
:data -> if
if:then -> ('true' -> println)
if:else -> ('false' -> println)
println -> :sig
}
It's clear that true
must be printed first. Yet it's not always the case.
This behaviour considered a bug that is caused by incorrect implementation. When there will be a language spec we will describe correct behaviour and each implementor will follow it.
We need to serialize message passing for connections that follow fan-in pattern.
Shared Queue As a Serialization Mechanism
Basically each solution we can get will be serialization for connections with shared receivers. To do so, we need to get rid of concurrency for them.
The most simple way to do is this:
- Get rid of goroutine-per connection
- Implement queue and share it across all senders
- When sender sends to a queue, there is a (one) goroutine that reads from it and does broadcasting
connect() {
for msg := range q {
broadcast(msg)
}
}
However, there's a couple of problems with this solution...
Problems
- It's a bottleneck, each sender sends to the same queue. Slow receiver will slow down fast senders and other receivers.
- Much bigger problem (not performance, but correctness) - how to handle intermediate connections?
Performance Problem (Making the bottleneck wider)
There's basically 3 ways to improve performance of this solution
- Add buffer to queue
- Add buffers to receiver channels
- Use several queues instead of one
It's easy to do first two but last is a bit tricky. The idea is to have queue for each fan-in pattern. I.e. each time we have N senders sharing the same receiver - they need a shared queue. Their messages needs to be serialized.
Intermediate Connections Handling (Fixing Correctness)
As soon as we got rid of "goroutine per connection" we created a problem. Let's look at broadcast()
function
broadcast(msg) {
for r := range receivers {
r <- msg
}
}
The problem is with this line r <- msg
- who gonna receive it?
It's not a problem if we have some runtime function that reads from this port but what if this it's intermediate connection_?
By "intermediate" connection I mean connection that is exist because there was a user-defined component.
Let's take a look at these 2 versions:
component Main(start) (stop) {
:start -> printer -> :stop
}
component Printer(data) (sig) {
nodes { Println }
:data -> println -> :sig
}
// VS
component Main(start) (stop) {
nodes { Println }
:start -> println -> :stop
}
They are equal by functionality but the first one gonna have more ports and connections (channels and gorotuines) - more message passing (more synchronization). Which is bad BTW.
This could be solved by Optimization step (whether that source-code lvl optimizer or IR optimizer, doesn't matter). However, we don't have optimizer how and it's not clear whether any program could be optimized up to the point where it doesn't have "intermediate connections".
So, the thing is after rewriting runtime will handle later but not the first. Because we don't have goroutine that could transfer our message from printer:data -> println:data
more. So nobody reads from printer/in:data[0]
these days. We gonna block. And no buffer will help us.
Solution
There's 2 ways to solve it
- implement optimizer that would get rid of all intermediate connections
- support intermediate connections at the level of runtime
The answer is... We need both! And we need to start from the second one.
The reason is that optimisation should be optimisation_. We should not (probably) think of slow programs as of incorrect ones (even tho I personally would love to).
Second, it's not clear whether we can optimize any program, as been said, this way. Is any program could be 100% inlined into one giant Main
? I'm not sure (even tho I personally would love to!).
So we need to support this. How?
connect(prog) {
for item := range q {
broadcast(item, prog)
}
}
broadcast(item, prog) {
finalReceivers := getFinalReceivers(item, prog)
for finalReceiver := range finalReceivers {
finalReceiver <- item.Msg
}
}
I didn't describe getFinalReceivers
but you get the idea. We need to traverse the graph until the leaf. Which is a runtime function.
As an optimisation for this instead of could make runtime remember addr of the leaf once it found so we don't have to do it all the time on each msg.
That... feels like that optimization we talked about before right? I know it's weird. But I feel like runtime should be able to work with this. We should not just push it fully onto compiler's shoulders. Why? Debugging
A note on -debug
(and tracing)
I think we should log events each time we "unwrap" intermediate step. Ofc it's weird to log all 3 levels sequentially but what can we do? Yep it's gonna be sent, pending, received
even tho nothing actually happened.
This also related to #347 and #94
Other option could ofc not to log them and be ready that you won't see the whole picture in logs. This feels bad and especially for message traces that going to be implemented someday.
Other Option
Other option would be to append message back to the queue (to the tail) but with different receiver. That could be either final one (leaf/runtimeFunc) or just one next level. However there's not much sense in that. We gonna do >=1 extra sending to queue ->chan
and that's less performant.
Log/trace thing feels like more sense if we send to q chan but why? I don't know.
IRGen
generates intermediate-step-less programs
This is improvement of #644 (comment)
- We wanna do as less as possible at runtime
- We ready to do as much as possible at compiler
- We need be able to easily debug our programs
How can we have all three? Ok look
Do not find leafs at Runtime
Find them at compile time. Probably that would be irgen
but I'm not sure. IR Connection gonna me smt like
type Connection struct {
Sender PortAddr
Receiver []Receiver
}
type ReceiverSide struct {
PortAddr PortAddr // final
Transit []PortAddr // intermediate (just to log)
}
This way we don't have to traverse the graph at runtime (even at first sending to remember it for later) but still have metadata we need to do the even listener calls we need
Graph Reduction Algorithm In IR
This is actual solution for #644 (comment)
Idea is to add extra step to irgen
that will do graph reduction in a way that all intermediate connections (and related ports) are removed from graph.
It's not the most efficient (probably possible to do in one go) but good enough
(ports, net) {
intermediatePorts = {}
resultNet = []
for conn in net {
// possible in theory to `if intermediatePort.has(conn.sender) { continue }`
receivers = []
for receiver in conn.receivers {
if !isIntermediate(receiver) {
receivers = [...receivers, receiver]
continue
}
intermediatePort.add(receiver)
finalReceivers = getFinalReceivers(receiver) // one intermediate receiver could lead to multiple final receivers
receivers = [...receivers, ...finalReceivers]
}
conn = { conn.sender, receivers }
result = [...result, conn]
}
// it's possible to leave ports as is but we can do better
// by removing intermediate ports cuz there're no connections to them
resultPorts = {}
// resultNet only contains connections with final receivers
// but some of them has senders that are intermediate resultPorts
// we need to remove these connections and ports for those nodes
for conn in resultNet {
if !intermediatePort.has(conn.sender) { // itermidiate port is always receiver for parent and sender for child
continue
}
// basically just add ports for this connection
resultPorts = {
...resultPorts,
conn.sender
}
for receiver in conn.receivers {
resultPorts = {
...resultPorts,
conn.sender
}
}
}
}
func isIntermediate(sender) {
// here we check if theres a receiver with exact same port
// as given sender (connections should be map for that)
// (we should not depend on if recevier is runtime function
// because there's :stop in Main that is both not intermediate nor runtime function
// and there might (not sure) be other cases of that)
}
This way we'll have graph where basically bunch of runtime functions does message passing via queue (serialized pub-sub basically) and we will have no race conditions. You can see highlighted ports on this picture, they will be removed.
And this is how this
component Main(start) (stop) {
:start -> printer -> :stop
}
component Printer(data) (sig) {
nodes { Println }
:data -> println -> :sig
}
Becomes this
component Main(start) (stop) {
nodes { Println }
:start -> println -> :stop
}
We don't need a goroutine to do intermediate step anymore. By not having several goroutines sending to same receiver we avoid race condition.
let fanInMap = {
r1: ["s1", "s2"],
r2: ["s1", "s3"]
};
let fanOutMap = {
s1: ["r1", "r2"],
s2: ["r1"],
s3: ["r2"],
};