suprematic/otplike

protocols for swapping out core.async and core.match?

Opened this issue · 22 comments

I was wondering if you did (or are planning to) abstract these 2 core dependencies out? Then if a better implementation comes along, or one with different tradeoffs, it would be easy to mix&match them while preserving the otplike functionality.

As an example matching could be swapped for meander and core.async might find a contestant once loom comes out.

From a quick glimpse it seems the libraries are used directly

Hi!

We use otplike extensively in our projects. But the development happens in free time (not at work) because the current state does the job for almost two years and doesn't need a lot of improvement from the business perspective. Since there are only two "active" contributors, we don't try to abstract for any possible case. The features picked for development/improvement are mostly defined by our current requirements at work.

core.match has never been an issue for us. On the contrary, core.async is and will possibly be replaced by something else. So, abstracting that part is probably a good idea.

If you want to help with that, I'll be happy to discuss the options :)

PS: thanks for the question, it's always pleasant to know that someone else is interested in the project.

Hi @sskorokhodov, thank you for your quick response.

I mentioned meander because I simply like it more, core.match is a fine choice. If receive! and the matching would be decoupled one could use anything, even plain clojure. Is there any reason why this wouldn't work? Like the timeouts for example?

Could you briefly describe your issues with core.async and what alternatives are you thinking about?

I'm building an application at work that basically runs ETL jobs. Some of them are live, so the code has to run infinitely and reliably. Right now I'm using manifold for async and streams (queues). It works but I have a feeling the code would be simpler and more reliable if I could use some of erlang's abstractions. I started reading Elixir in Action to get a better taste of the whole thing.

This looks like a great library, very nice job! I'm happy it's helping you solve your real world problems.

I also mentioned loom, would that fit the bill? Unfortunately I have no idea when can one expect that to land in an openjdk release.

As for helping - I might look around in the codebase to get a feel of the current state. Unfortunately my spare time is limited

I don’t have access to my laptop at the moment, so I’ll answer shortly right now and continue with the details tomorrow.

About abstracting core.match - i think it can be done if a matcher would return a sign of a match (e.g., a boolean) and an async value (kind of a channel).

I’ll write about the downsides of using core.async and about loom tomorrow. But the library I think of as a replacement is cloroutine.

I'm not sure I understand the details about core.match - AFAIK it doesn't provide what you just laid out as requirements. What does asynchronicity have to do with matching? I'm probably too unaware of the details of otplike.

I knew go blocks have their limitations and heard they return nil when an exception is thrown, which was enough to steer me away from it. I didn't know there are bugs this basic in it. I guess one cannot trust the code rewriter. Hopefully cloroutine does a better job. Maybe once continuations come out one won't have to rely on these meta-compilers.

Hi again,

I have some time to explain the matching in a receive block.

My previous answer wasn't correct. If you'll take a look at how selective-receive! works, you'll find that in general, it does the following:

  1. Builds the code matching messages against the provided expressions. As a result, the matching returns the index of the expression and doesn't execute any code yet.
  2. Then it traverses the process' queue looking for a matching message.
  3. When a match happens selective-receive! resets the process' queue into the new state. Only after that, it executes the code of the matching block.

Also, the expression code can contain another parking "receive" block which:

  • must be inside a go-block
  • must finish its execution before the process continues (surrounding code must wait for the go-block to finish)

That means that if the matcher is abstracted for swapping, it has to do something like this:

  1. Receive a value to match.
  2. Return a function to execute after the queue is reset to the new state.
  3. The function either always return an async value (returned by an async-block), or a tagged value. The tag tells whether the value is a result of an async execution or it's a regular value.

Now, let's turn to core.async.

Besides the bugs which a not fixed for years, there are some conceptual and practical issues holding otplike back.

core.async uses a single thread pool for all the go-blocks it runs

JVM is not capable (yet?) of "true" process-oriented programming. If you have an intensive computation or use non-async IO, the execution thread is occupied until the operation is not finished. For such a code, you need to control the number of parallel executions. In this case, you risk making your system not responsive, when most of the threads in the thread pool are busy crunching numbers. The obvious solution for this is to start those tasks on a separate thread pool, but core.async doesn't allow this.

Any parking must be "visible" inside a go-block

This makes receiving in nested functions quite inconvenient. You have to either pass the channel (returned by the go-block) all the way back to the parking code or to define macros instead of functions.

core.async is much more than just go-blocks

otplike uses channels only to trigger the execution when a receive happens (and in some other similar cases internally). It doesn't need taps, mixes, filters, different kins of buffers, etc.


The idea is to replace go-blocks with "receive blocks" implemented with cloroutine. It allows having multiple custom executors with appropriate behavior.

I'm not sure that we should make an abstraction for this part. At least before a proof of concept is ready. Anyway, executors are a layer of abstraction on their own if properly implemented.

Hi @sskorokhodov ,

thank you for taking your time to discuss this, it's fun! We are moving to chatting though, if you feel like it we can move the discussion to the clojure zulip chat. If you wish to keep the discussion here, that's fine too.

core.match

I looked at select-message where most of the stuff you're describing seems to be happening. I'm still not entirely convinced match needs to do that much. Maybe I'll try to rewrite a macro or 2 with meander if I get around it and see if some tests pass :)

A question - couldn't you simplify the macro by wrapping the code that is to be called after a match with a delay or just (fn [] ~match-body) and calling it afterwards? You could get rid of the double matching and indexing that way (if it works) and inline the prologue into the loop without the need to if on timeouts.

Also, isn't (swap! message-q# #(into new-mq# %)) kind of costly?

I didn't understand the parking receive that you were explaining, sorry, probably too dumb for now.

core.async

Why is core.async using a fixed thread pool? Wouldn't an unbounded thread pool solve the issue? The JVM is already preemptive so with enough threads around you shouldn't be in trouble (unless you hit the OS thread limits).

From cursory reading I guess the rest of your problems would be solved by switching to cloroutine, although I'm not sure if you're using the core.async channels in some other ways that cloroutine doesn't provide.

I prefer to continue the discussion here to keep the knowledge in one place. However, we need to stick to the theme of the issue, of course. If some other issues or questions arise during the discussions, it's better to create new issues for them.

For example, the question about the performance of swapping a process message queue can be discussed in the new issue if you don't mind.

core.match

Maybe it is easier to try it yourself to feel the problem with receiving a message. Let's model a nested "selective receive" situation.

Function p is a process function. It takes a message queue q and waits for a message. When a message arrives, it is matched against the patterns (:msg2) and if matches a new receive happens.

The actual code may look like this:

(proc-defn p []
  (selective-receive!
    :msg2 (receive! :msg1 (println "Yay! Got the first message!"))))

Here is a model of how it works right now (in practice the message queue and the channel are separated):

(defn p [q]
  ;; msgs is a coll of the messages received before the match is found.
  (go-loop [msgs []]

    ;; Take a message from the queue.
    ;; Now the queue is in inconsistent state, because we might already
    ;; have received messages.
    (let [m (async/<! q)

          ;; check if we have a match
          result (match m
                   :msg2 0
                   :else :miss)]
      (case result

        ;; no match - save the received message into msgs and continue
        :miss (recur (conj msgs m))

        ;; match - fix the queue state and execute the match body
        (let [;; create a new message queue
              new-q (async/chan 10)]

          ;; put the messages received so far to the begining of the queue
          (async/onto-chan new-q msgs)

          ;; put the rest messages in the channel to the end of the queue
          (async/pipe q new-q)

          ;; execute the match body
          (let [q new-q]
            (case result
              0 (match (async/<! q)
                  :msg1 (println "Yay! Got the first message!")))))))))


(let [;; create the message queue
      q (async/chan 10)]

  ;; put some messages into the queue
  (async/>!! q :msg1)
  (async/>!! q :msg2)
  (async/>!! q :msg3)

  ;; spawn the process
  (p q))

The macro capable of producing the code like this relies on the structure of the match macro.

To hide the actual matching algorithm, we have the following options:

  • make another "receive" macro and choose which one to use at compile time by checking some variable
  • make an abstraction "matcher" which would return a function to be executed after the process message queue is consistent again

A special thing about the second option is that the code inside the function returned is out of the scope of the upper-level go-block.

If you try the following code, you'll find that it doesn't work because the code inside the function is not in the scope of the go-block.

(defn p [q]
  (go
    (let [f (match (async/<! q)
              :msg1 (fn [] (println (async/<! q))))]
      (f))))

(let [q (async/chan 10)]
  (async/>!! q :msg1)
  (async/>!! q :msg2)
  (p q))

That's why parking receive (<!) can not be used there without wrapping it into another go-block. To fix that you need to start a go-block inside the function like this:

(defn p [q]
  (go
    (let [f (match (async/<! q)
              :msg1 (fn [] (go (println (async/<! q)))))]
      (f))))

(let [q (async/chan 10)]
  (async/>!! q :msg1)
  (async/>!! q :msg2)
  (p q))

core.async

Why is core.async using a fixed thread pool?

Because that is one of the main advantages of go-blocks / "green threads." In something bigger than a "hello world" project, you need to control resource usage: threads, memory, disk, inodes of the FS, cores, connections, etc. A limited number of threads for some category of tasks your application does gives you control over parallelism and CPU cores usage.

Instead of using an unlimited number of threads, it's better to start go-blocks / light-weight processes. You also need to control their number, but the limit is not that low. In Erlang, for example, you can have millions of processes in the system at the same time. They are easier to manipulate and bind together.

The problem with core.async is that it has only one thread pool for all the go-blocks, but sometimes you want to have a dedicated set of threads for some category of tasks (like sending logs to the network). The number of these threads does not correspond to the number of processes you start. It more depends on the number of cores you have than on the free memory.

Thank you, these discussions are really helpful for me to understand the inner workings of the library.

core.match

OK, I see the issues now, part of it is how go blocks work and part of it is looping on the messages. The hopefully last piece I'm missing is I don't understand why a non-matching message gets put back in the message queue (and therefore a recur happens). If the function doesn't match it won't match unless the function gets swapped out at runtime. If many "bad" messages arrive they will fill up the queue. Is this something erlang does too and you are modeling it the same way? I'm sure there's a good reason for this, I'm just not aware of it yet :)

Baded on the current implementation a minimal interface of a matcher would be 2 macros, let's name them numbered-match and match-number. numbered-match would need to swap out the execution bodies for indices and provide a final catch-all, whereas match-number would given an index return the one match branch that is needed for the given index.

core.async

Is this again modeled after erlang? It allows you to create a new thread pool? I'm still not sure how would that solve your CPU-intensive-tasks problem unless you can limit the thread pool's resources.

why a non-matching message gets put back in the message queue?

That is because of composability reasons. Imagine you have a process worker doing some work when a message [client request] arrives and sends the response back to client also via message. To do its job, worker may also need to send a request to some other process worker-2 and wait for the response. Only after that, it sends the response back to client.

The problem here is that worker can receive a request at any moment, even when it waits for the response from worker-2. As long as the code making a request is defined by worker, it's not a big deal to add handling of all the messages and save them to another queue until the response from worker-2 has not arrived. Although doing this each time you have this situation is not a piece of cake, I think.

A good practice is to have an API defined by the process' module (worker and worker-2). This API hides the burden of sending and receiving messages. In this case API code cannot know which messages (except expected ones) to handle and how. Here "selective" receive becomes very useful. It allows receiving a message and leaving the other messages intact, so the calling process doesn't even notice that messages were sent and received.

When you control the message queue - use simple receive (doesn't exist in erlang) with _ "catch-all" pattern at the end if required. When not - use "selective" receive.

core.async

Is this again modeled after erlang?

It's an implementaion of CSP.

It allows you to create a new thread pool?

It doesn't. That is one of the reasons to use cloroutine with custom executors. And that should solve the problem with CPU intensive tasks.

Based on the current implementation a minimal interface of a matcher would be 2 macros, let's name them numbered-match and match-number. numbered-match would need to swap out the execution bodies for indices and provide a final catch-all, whereas match-number would given an index return the one match branch that is needed for the given index.

I need to think about it a bit more. I'll try to come up with something these weekends.

Ah, so the use case is nested receives, i.e.a nested receive might wait for completely different messages than the upper one, so you need to be able to pick out the right one from the queue. I missed the point in the proc-defn example you posted that there's a nested receive!, I was too much focused on what roles does core.match fulfill.

I also now understand you're not trying to model everything from erlang/otp, I was under the impression you are kind of porting things 1:1 or at least as close as possible.

I need to think about it a bit more. I'll try to come up with something these weekends.

I don't know how many users you have but this needn't be a high priority issue. After our discussions I'd guess switching to cloroutine should be a higher priority.

Thanks, these posts were particularly enlightening :)

Apologies for bumping an old thread,

Now that we have virtual-threads in JDK19 and few clojure libraries to work with them e.g. Promesa (which also brings its own implementation of channels), is there anything on road map to change core.async to something else?

Promesa's go makes it easier to use its put! and take! functions, as the go macro determines whether the execution is to be done on virtual threads (if available) or platform theards.

I can help with the transition if someone can hint me where to start?
Is 0.7.0 branch the latest one?

aav commented

@fr33m0nk yes, we were thinking about such possibility. Every support is more than welcome. I will take a close look and come up with a sort of plan.

0.7.0 is the latest. It has several new features - applications and logger. We are using it internally for one of our systems, and it seems to be quite stable.

aav commented

One of the questions is if we want to keep core.async support and make it replaceable with some other implementation (i.e. promesa), or we drop core.async completly. So far I'm more for the second option, but would like to discuss it with collegues.

One of the questions is if we want to keep core.async support and make it replaceable with some other implementation (i.e. promesa), or we drop core.async completly. So far I'm more for the second option, but would like to discuss it with collegues.

Hi @aav , Is there any update on this?

@fr33m0nk yes, we were thinking about such possibility. Every support is more than welcome. I will take a close look and come up with a sort of plan.

0.7.0 is the latest. It has several new features - applications and logger. We are using it internally for one of our systems, and it seems to be quite stable.

Hi @aav I'm looking for someone to ask a question to about some behavior I can't work out.

I see that development has not happened in a long time and we are using otp-like for one of our core functionalities and I'm at the point now where I'm overriding library functions and putting in debugging to resolve a very weird behavior that I think someone like you could resolve in a lot less time.

Hope you won't mind me DM'ing you as well about this, thank you, I'm using the latest published in maven.

aav commented

@JMacLulich sure, please feel free to DM me with your problem.

btw, there is some ongoing development in the 0.7.0 branch. we have applications (in more or less erlang sense, logging facility, and configuration implemented). We are using this version in one of our projects, but I'm not sure it's mature to be oficially "released".