LoadBalance with substreams
jpaulm opened this issue · 7 comments
This issue was introduced on the Google group - https://groups.google.com/forum/#!topic/flow-based-programming/NaYix1I81QM . I am reproducing the discussion here.
@jpaulm (Aug.1, 2015)
I recently realized there was a problem when substreams were being sent through a LoadBalance process: the components of the substream (open brackets, data IPs, and close bracket) could theoretically be sent to different output port array elements, resulting in the substream getting disassembled.
I have therefore modified LoadBalance to route all components of a substream to the same output port array element. This code is now on GitHub - I will be doing some testing, but I would appreciate it if anyone using substreams and LoadBalance, preferably with large data volumes could also check it out.
@jpaulm (Aug. 13, 2015)
BTW before I go on, it should be stressed that this is in the context of "classical" FBP, as implemented by JavaFBP, C#FBP, CppFBP (and perhaps JSFBP). The mod to LoadBalance has only been actually implemented in JavaFBP so far, and in view of the problem I am going to describe, it seems I only did half the job anyway! I have no idea if NoFlo and similar FBP-like systems have an analogous problem.
Let us say that we have used the (modified) LoadBalance component to route whole substreams to the various output port elements of LoadBalance - the split streams will most likely have to be recombined at some point. Now we can't use something like a round robin merge to bring them back into one stream, as that would introduce the possibility of deadlocks. So we need some way of bringing them into a single input port - but the default "first come, first served" merge into one port would mix up the substreams, so I am proposing a new API call which allows a component to wait on any element of an input array port, and return the element number at which there is a data IP waiting. This function will suspend until an IP arrives at one of the array port elements.
From Ged Byrne via Google group (Aug. 13, 2015)
In the Splitter and Aggregator integration patterns this problem is usually addressed through the use of a Correlation Identifier.
Spliiter: http://www.enterpriseintegrationpatterns.com/Sequencer.html
Aggregator: http://www.enterpriseintegrationpatterns.com/Aggregator.html
Correlation Identfier: http://www.enterpriseintegrationpatterns.com/CorrelationIdentifier.htmlTo quote from the book: "In some cases it is useful to equip child messages with sequence numbers to improve message traceability and simplify the task of an Aggregator (268). Also, it is a good idea to equip each message with a reference to the original (combined) message so that processing results from the individual messages can be correlated back to the original message. This reference acts as a Correlation Identifier." https://books.google.co.uk/books?id=qqB7nrrna_sC&lpg=PP1&pg=PA262#v=onepage&q&f=false
With this approach their would be the option to add a correlation identifier to the IP. This identifier would allow the aggregator/merge to reassemble the messages based on the identifiers rather than having to bind them to individual queues.
A simple scheme would be enough to solve the immediate problem, and developers are free to implement more sophisticated implementations if they want.
From Alfredo Sistema (Aug. 13, 2015)
I think that I've proposed this kind of functionality in the past without much success but what I currently have is a function that returns a list of indexes an array port has that contain packets so that you can iterate over those and read packets. Receives would be exactly the same.
The correlation identifier is a good idea, one could achieve the same with substreams, but it adds the substream handling overhead. I experimented with a "tag" attribute in packets for situations like this, so that packets can be sorted by tag after merging streams, so even if packets got scrambled, the stream can be reassembled. A problem with this approach is that if a process adds packets to a substream, and those packets don't have the correct tag, then they will be lost. A functionality to "splice" a tagged packet in place would be needed.
Ged, my gut reaction is that the approach you describe would tend to be quite slow, and might also be deadlock-prone. Also incomplete groups have to be held somewhere until they are fully assembled - I think I have seen the term "marshalling" used here. I was looking for a "flow-based" solution where everything keeps flowing. Note the cited Aggregator page says, "Once a complete set of messages has been received..." - this may be quite long time...
We did do something like this in our JavaFBP Brokerage app, but this was because transactions had to be held until they could be connected with information coming from other sites. Once this happened, the whole assembly could then travel on...
Alfredo,
My solution sounds quite a bit like yours - here is my test setup:
It seems to be working quite nicely - as far as I can see, each substream is kept intact, but because of the asynchronism, once the connection capacities are greater than 1, individual substreams can change places!
SlowPass is like Passthru, but each incoming IP is delayed by a random amount of time, from 0 to 126 milliseconds.
I plan to write a component to check that the IPs within a given substream are in the right order - then we can run this network with larger amounts of data.
To make SubstreamSensitiveMerge, suspend properly, I had to create a new API call - findInputPortElementWithData() - this is described in the Release description of v3.0.2, and will also have to be added to http://www.jpaulmorrison.com/fbp/jsyntax.htm . It sounds similar to what you describe. To see how to use it, look at https://github.com/jpaulm/javafbp/blob/master/src/main/java/com/jpmorrsn/fbp/components/SubstreamSensitiveMerge.java .
New component CheckSequenceWithinSubstreams has been added - this detected an empty substream created by GenSS - this has been fixed!
In an earlier post on this topic, I pointed at https://github.com/jpaulm/javafbp/blob/master/src/main/java/com/jpmorrsn/fbp/examples/networks/TestLoadBalanceWithSubstreams.java .
You will see that there is a LoadBalance process (recently modified so it doesn't split up substreams) feeding some processes, which in turn feed into a SubstreamSensitiveMerge process. This of course is the classical divergent-convergent topology, which is one of the danger signs for deadlocks.
I believe that this example will run safely if the longest substream will fit within the connections on the shortest path between the LoadBalance and the SubstreamSensitiveMerge, but, if this rule is violated, you may get deadlocks. I have replaced the Passthru processes with SlowPass components (Passthru with a random delay), and this rule appears to hold.
I am therefore wondering if it would be better to simply make a blanket statement telling people they can use either of these two components, but not to use both in the same area of the network.
I have done some tests, and the above rule seems to hold, but I am wondering if any interested Google Group members could think about this, and give me some recommendations. There may be a role for both of these components, but perhaps not in the same network!
This is getting strange! I discovered a few small bugs in SubstreamSensitiveMerge and the (new) API call it uses - findInputPortElementWithData. These apply to JavaFBP, C#FBP, and probably JSFBP as well. These problems have now been fixed for the first two. Running various tests with these, using different connection capacities, now very seldom crashes with a deadlock!
At this time I can't prove that this network will never give a deadlock, which makes me rather uncomfortable! This network is, after all, a divergent-convergent topology, which is known to cause problems, but it may be that the findInputPortElementWithData function somehow compensates for this.
The above-mentioned API call has shown up rather late in FBP's evolution, in response to the problem caused by combining substreams with LoadBalance, and I am really not sure if I would build a production system using it. It should be noted that it should very seldom be necessary to recombine streams that have been split by LoadBalance - much better to leave them to go their separate ways in the network!
I would very much appreciate some Group member(s) of a mathematical bent doing some analysis on this... Remember that substream size can also be manipulated by modifying GenSS in both of these environments. GenSS can be found in folder examples\components in JavaFBP, and in FBPVerbs in C#FBP.
Thanks in advance, and best regards,