nevalang/neva

Runtime Algorithm

emil14 opened this issue · 4 comments

TLDR: we need algorithm that is race-free and deadlock-free. The idea is to create N queues for each fan-in occurrence. Each sender that is involved into >=1 fan-in's must send data to those queues. Requirements are

  • There must be no concurrent goroutines serving connections involved into fan-in (source of race-conditions in existing design)
  • There must be no single queue shared by >1 nodes that are part of a single downstream (source of deadlock in #670)

Further I will describe the implementation.

Compiler Part (Irgen)

  • maintain internal fanIn map "1 receiver -> all its senders"
  • add raceFree bool flag to each connection set to false
  • for each connection check if its race-free and set flag to true if it is
  • race-free means no receiver in a connection is involved into fan-in pattern
  • fan-in pattern means 1 receiver and >=2 senders

So now we have understanding which senders must not send to receivers directly (to avoid race-condition) but instead must send to shared queue.

Runtime Part

  • when message is send we check if sender is in race-free condition or not
  • if it is, then get its receivers and send message to each of them
  • to send message use queue-based round-robin
  • if it's not, then send message to it's shared queue
  • queue is always shared between at least two ports
  • there are multiple queues for each fan-in occurrence
  • if port sends to queue, it only sends to that queue
  • if port sends to receivers, it only sends to those receivers

With this algorithm we have guarantee that if two sends sent messages to the same receiver, receiver will receive them in exact order as they were sent. This is thanks to shared queue which works as a serialized broker.

Buffer size of the queue

Shared queue could have buffer size dependent on both senders and receivers count

Senders count matters because the more senders we have, the higher chance of blocking. With more senders queue might become full faster.

Receiver count matters because messages are processed sequentially which means second message waits first to be received by all receivers. This is a requirement because we must preserve ordering. It also means that the latency of a broadcast equals latency of all receivers.

Buffer size of a receiver channel (the "slow receiver" problem)

Latency of a receiver not only affects senders (fast sender must wait for slow receivers) butalso receivers themselves - fast receiver must wait its slow "neighbors" to receive next message. This is because of a broadcasting algorithm (which is queue-based round-robin). This problem must be solved without spawning more broadcast goroutines because we might get race-condition this way.

Not perfect (we still have possibility of block) but possible solution must be adding buffers to receiver port channels. Buffer size can depend of number of senders because there's nothing more to depend on. In real life buffer size must depend on throughput of sender and receiver (at their ratio) but we don't know that (unless without making language more complex). That especially might be enough with combination of #671

The Need For This Design (problem with simple variant with single queue)

UPDATE: turns out design in #670 is incorrect and we have to to move to something more
complex (perhaps to smt like this)

Because all senders share the same queue the deadlock might appear - sender S1 sends to receiver R1 but R1 is busy trying to sends it's output data to the queue, which is impossible because queue is busy trying to serve S1

I was thinking about more simple ways like this - let each sender has it's own channel. Then let there be a one single goroutine with giant select over all senders (code generation or reflection). Turns out it's not race-free design - imagine S1 and S2 both blocked because select-goroutine is busy (processing previous request, buffer is full).

I would like to note that increasing a buffer might defer the problem, reduce chances of it's probability. But no buffer (except infinite out) will truly solve it.

Problem with this design

If we have these 2 conditions at the same time

  1. Fan-in
  2. Loop

Then we might have a deadlock again

Screenshot 2024-06-20 at 13 19 46

  • N1 sends to N2
  • N2 starts to do job
  • N1 sends next message to N2
  • N2 is busy and cannot receive new message
  • N2 finishes the job and tries to write its output data
  • deadlock

N2 blocks while sending because nobody can receive. N2 is the one who should receive but it's busy trying to send


Possible solution could be to have a "proxy" channel to outport in this case. Which however mean that we might have a "race" (out-of-order-delivery) again

Very Raw Idea

Let's assume that output ports are Go channels (as in the original design), but we don't have separate goroutines for fan-out connections. This means there are no situations where goroutines write to the same receiver and compete.

Instead, for each receiver, there is a separate goroutine with an infinite loop and a select statement.

In the select statement, we read from all sender-output ports, and whoever writes first gets processed first.

A race condition can occur, however, if two or more senders manage to write before the receiver can read.


Example

  1. s1 writes, we read it and attempt to write to r1, but get blocked because r1 is busy.
  2. We don't proceed to the next loop iteration because we are waiting for r1 to become available.
  3. In the meantime, two events occur: first s2 attempts to write (and gets blocked), then s3 faces the same fate.
  4. Meanwhile, r1 becomes available and reads from it, allowing us to proceed to the next iteration.

This is where the race condition occurs - in the next iteration, the select statement randomly chooses between s2 and s3.


Solution

To solve this, we need to defeat randomness by somehow understanding the order in which the senders became ready. We need a way to prioritize.

Since the problem arises when there are N >= 2 messages, we can read these N messages, sort them, and send them in the correct order.

Of course, we need the queueItem to have some sort of sequence number, which should be incremented by the sender when it sends a message.


for {
  q = SortableQueue{}
  select {
  case msg := <-s1:
    q.Push(msg)
  case msg := <-s2:
    q.Push(msg)
  case receiver <- <-q.Pop():
  }
}

SortableQueue {
  ch chan Msg
  Push(msg)
  Pop() chan Message
}

After merge of #670 we gonna close this. We still have problems with out-of-order delivery but we better describe them separately.