Documentation on how to connect transportbus of 2 go applications
vogtp opened this issue · 17 comments
Is your feature request related to a problem? Please describe.
Hi
In the documentation it is mentioned that it is quite easy to connect 2 instances of transport-bus.
Unfortunately I was not able to figure how to do it...
I was try in with MarkChannelAsGalactic and StartFabricEndpoint as well as with plank.
Otherwise I am quite happy with transport-bus and how it works...
Have fun
Patrick
Describe the solution you'd like
Some code explaining how to do it...
Describe alternatives you've considered
Writing it myself.
Additional context
No response
Hi Patrick,
Thank you so much for your interest. It completely slipped off my radar for me to document how to establish a connection between two Plank-based applications. I already wrote some example code to demonstrate how to talk to different brokers but it is missing detailed instructions on how to actually do that. Let me get back to it and do my best at writing a full end-to-end guide so you can follow along. Will get back soon to you!
Thanks,
Josh
Hi Josh
Thanks for answering so soon.
Your examples are nice and they got me quite far (my POC runs in one binary and it works nicely).
I managed to produce a message loop in one binary, that was replicated in a 2nd binary, so I am probably not missing much (I even tried to find the solution in the TypeScript documentation).
Could you point me to (a repository with) some working code? (preferable go or java)
I would be happy to help with the documentation and provide a PR...
Thanks and have a nice weekend
Patrick
I would love contributions from you, so let me write some proof of concept of what you want to achieve and will share a branch with you. I have a bit of a packed day today for my primary responsibilities so I may be able to work on it toward the end of day or later time permitting, but I will get to you very soon!
In the meantime, you could check out the examples at https://github.com/vmware/transport-go/tree/main/plank/cmd/broker_sample/plank if you have not checked it out. These examples showcase how to talk to another Plank instance either by WebSocket or TCP. If you opt to use TCP, make sure to update the value for key fabric_config.use_tcp
in config.json
file to true
.
It getting late here in Europe (and weekend)...
Plank was my first use for go.work, added some log lines and stepped it at some point, after which I switched from a topic approach (seemed logical for a bride) to a pub/sub approach (would work in my case).
config.json?
Most of my test where using WS not TCP (mostly because I felt that WS would leave more options open -- not to speak of less trouble with the security/firewall team).
Does plank need server side code or just correct config?
Is plank needed or does transport support direct coupling?
I think I now have better context. I see you may have used Plank without a configuration file. The config.json file I mentioned is a single source of truth for configuring and tweaking the behaviors of Plank such as WebSocket endpoints, TLS, etc.
If you followed the Hello World guide for Plank exactly as-is, you already have an HTTP server (with a self-signed cert) running with WebSocket endpoint at URI /ws.
I may have gaps in understanding your setup and actual commands you used to start Plank, but I would love to hear in detail what you are trying to achieve and help you achieve it, so if you can tell me how you tried the example code and what behaviors you got, that would be greatly helpful for me. In the meantime, here's what you can do because as you said it's already late for you and you may not want me to eat up into your precious weekend :)
Once you have started the sample by following the guide above (up to the point of running ./build/plank start-server --config-file config.json
), open up another terminal, cd into the plank
folder of the repo, then type the following:
WS_USE_TLS=1 LISTEN_METHOD=plank_ws go run cmd/broker_sample/app.go
Example
╭─kjosh@kjosh-a01 ~/Documents/transport-go/plank ‹main*›
╰─$ WS_USE_TLS=1 LISTEN_METHOD=plank_ws go run cmd/broker_sample/app.go
INFO[0000] waiting for messages fileName=over_ws.go goroutine=1 package=plank
INFO[0002] &{<nil> 0 ping-pong-service map[payload:hello-response timestamp:1.65187612e+09] false 0 <nil> map[Content-Type:application/json]}
// the environment variables provided tell the client app to use connect to the server running at `:30080/ws`
// where Plank is running. also since Plank is running with a self-signed cert, we make sure to use the same cert
// in the client app when connecting to the broker. (this part is hard-coded into over_ws.go for the purpose of a demo.
If everything went successfully, you'll see a message payload that originated from the Plank server that you started earlier. This essentially demonstrates how you can talk to other Plank or any STOMP servers (with the details in over_ws.go
file) Please let me know if you would need further help. Again, I'd like to know your exact use case and help you achieve it. Have a great weekend!
This did not leave my mind at peace...
Everything works as you described.
But if I start 2 instances of
WS_USE_TLS=1 LISTEN_METHOD=plank_ws go run cmd/broker_sample/app.go
they only get the response of their own request.
I hacked together some sample code (I know it is not running):
`
var (
channel = "events"
wg sync.WaitGroup
)
func main() {
wg.Add(2)
bus.GetBus().GetChannelManager().CreateChannel(channel)
go handler("1")
go handler("2")
bus.GetBus().SendBroadcastMessage(channel, "hello")
wg.Wait()
}
func handler(name string) {
handler, err := bus.GetBus().ListenStream(channel)
if err != nil {
log.Fatalf("unable to listen: %e", err)
}
handler.Handle(
func(msg *model.Message) {
log.Printf("Got Message: %v", msg)
wg.Done()
},
func(err error) {
log.Printf("error received on channel: %e", err)
})
}
`
What I would like to do is move handler("1") and handler("2") into separate binaries.
But this can wait 'till Monday. Have a nice weekend.
Thanks for providing a code sample! The "app.go" app is merely a client, so it does not have any capabilities to respond to incoming requests, so starting it twice basically emulates a single server serving two clients. In the context of what you are trying to achieve - two individual services talking to each other, therefore, it is not a good example.
I think this would be best explained if I drew up a diagram to explain some concepts and how all things fit together (e.g. where Plank comes into play that connects two channels each from different processes). So stay tuned!
In the meantime, so what you need is two Plank apps (let's call Plank A and Plank B) each running their own Transport bus instances. Then this piece of code bridges Plank A's local channel to Plank B's channel representing the service you're trying to reach.
In actuality, you would likely write a service like this and manage the connection state for the other service as a long-running goroutine. (Obviously it'll have to be much better than over_ws.go obviously)
Here's my quick take at visualizing what I explained above (excuse my handwriting 🤣)
During the week I'll try to create a fully end-to-end simple sample that demonstrates this flow.
Thanks for your help. Right now I do not see my mistake (unless it was only the missing subscribe), but I asked my boss to get authorisation to put some of my code on github to illustrate what I want to do...
I am not sure how far I'll get today so let me give you some background meanwhile:
Most of my event will just be received and processed by a (potentially unknown number) of consumers, some of those will just use it locally others will generate new events.
(I did not have any problems reading your handwriting, wait 'till you see mine ;)
There will (potentially) be 3 types of connections:
- Between processes on the same host.
- Between processes on hosts in the same network.
- Between processes separated by a firewall.
Right now I am doing this:
`
func main() {
bus.EnableLogging(true)
channel := "event"
e := bus.GetBus()
e.GetChannelManager().CreateChannel(channel)
config := &bridge.BrokerConnectorConfig{
Username: "guest",
Password: "guest",
ServerAddr: "localhost:4444",
WebSocketConfig: &bridge.WebSocketConfig{
WSPath: "/ws",
UseTLS: false,
},
UseWS: true,
HeartBeatOut: 30 * time.Second,
STOMPHeader: map[string]string{},
HttpHeader: http.Header{
"Sec-Websocket-Protocol": {"v12.stomp"},
},
}
log.Print("Getting broker")
broker, err := e.ConnectBroker(config)
if err != nil {
log.Printf("unable to connect to fabric: %e", err)
log.Print("Tring to start a fabric endpoint")
stompListener, err := stompserver.NewWebSocketConnectionListener(":4444", "/ws", nil)
if err != nil {
log.Fatalf("Cannot start stomp listener: %v", err)
}
started := make(chan bool)
go func() {
started <- true
err := e.StartFabricEndpoint(stompListener, bus.EndpointConfig{
TopicPrefix: "/topic",
UserQueuePrefix: "/queue",
AppRequestPrefix: "/pub",
AppRequestQueuePrefix: "/pub/queue",
})
if err != nil {
log.Fatalf("Endpoint error: %v", err)
}
}()
time.Sleep(100 * time.Millisecond)
<-started
broker, err = e.ConnectBroker(config)
if err != nil {
log.Fatalf("connect to own broker: %v", err)
}
}
sub, err := broker.Subscribe("/topic/" + channel)
if err != nil {
log.Printf("cannot subscribe sub: %v", err)
}
log.Printf("subscribed to dest: %v", sub.GetDestination())
err = e.GetChannelManager().MarkChannelAsGalactic(channel, "/topic/"+channel, broker)
if err != nil {
log.Printf("Could not mark channel %q as galactic: %v", channel, err)
}
handler, err := e.ListenStream(channel)
if err != nil {
log.Printf("Cannot register handler for chan %s: %v", channel, err)
}
var wg sync.WaitGroup
wg.Add(5)
handler.Handle(
func(m *model.Message) {
log.Printf("got message: %+v", m)
wg.Done()
},
func(err error) {
log.Printf("evt chan %s reported error: %v", channel, err)
wg.Done()
},
)
e.SendBroadcastMessage(channel, "message2")
wg.Wait()
}
`
Which produces a message loop...
Thanks for your response Patrick. I see that you have used the packages mostly in the correct way! But as you may have noticed writing that code, is is very verbose and requires extensive boilerplate code which could easily become unwieldy. Such boilerplating and plumbing work is handled by a "service" that comes handy with Plank.
I took this evening to write an example service that demonstrates cross-process communication in my WIP PR - #52. You can check out this PR's branch and inspect the service code and run each server app server_one and server_two as shown below:
In one terminal,
cd plank/cmd/cross_plank_communication/server_one
go run server_one.go --config-file config.json
In another,
cd plank/cmd/cross_plank_communication/server_two
go run server_two.go --config-file config.json
Make sure to start server_one first, otherwise you'll see during server_two startup an error message that it could not start the service. After both servers are up, after five seconds, you'll see some debug messages printed out to the terminal for server_two.
The example code demonstrates all the logic you wrote above, but does so in a service-like fashion where the actual business code is decoupled from the plumbing work (which Plank does for you). You still have to maintain the WebSocket connection as part of your business code (which is why the broker connection logic is part of the service code), and I left some comments throughout the code that may help you think about how best to handle the WS lifecycle.
Feel free to check it out, compare it with the use cases you need to achieve and let me know if you have further questions, happy to help!
Thanks a lot for your work! This helps a lot!
This implements a 1:1 communication, right?
Mostly I need a 1:n communication, I do not see how to implement it right now, but I am working on it.
I completely agree with you about the boilerplate code and would like to take it even further. What do you think about global channels get transparently replicated among the connected buses?
A global channel (in contrast to the galactic channel) would not need a user implemented plank service.
In my earlier code example it would look like:
err = e.GetChannelManager().MarkChannelAsGalactic(channel)
It would transparently connect to (or start) a broker that connects the global channels of different bus instances.
Of cause there should be a optional way to configure it (e.g. port etc).
I managed to get permission to opensource my bus code:
https://github.com/vogtp/go-crowd-bus/
The example show pretty much how my code uses transport bus.
My goal would be to run one example without flags and any number of it run with the -l flag should also print the messages.
But if I run a 2nd they crash
`
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xc0000005 code=0x0 addr=0x0 pc=0x56b2fa]
goroutine 77 [running]:
github.com/vmware/transport-go/bridge.(*BridgeClient).SendFrame(0xc00005c240, 0x0?)
C:/Users/vogtp/go/src/github.com/vogtp/transport-go/bridge/bridge_client.go:188 +0x1fa
created by github.com/vmware/transport-go/bridge.(*BridgeClient).Send
C:/Users/vogtp/go/src/github.com/vogtp/transport-go/bridge/bridge_client.go:173 +0x41f
exit status 2
`
My code triggering this crash is here:
https://github.com/vogtp/go-crowd-bus/blob/9fc3bb5bf6c88e35564ad519887c11493f49f86d/message_handler.go#L85
It does not matter if I use /topic or /pub/queue/ and /queue.
Hi Josh
Thanks a lot for your help and time.
I found a different solution...
Thanks and enjoy the weekend,
Patrick
Hi Patrick,
Sorry I was not able to assist you in time, as my main responsibilities bogged me down with getting back to you. Appreciate your spending time with me! I'm sure the panic you got has to do with either one of the bus channels or other constructs being nil, but for the sake of my own investigation and potentially letting you know I will review your code and why the panic happens if it's ok with you. Thanks again, and have a happy weekend as well!
Josh
Hi Josh
I really appreciate the time you spent with me! It helped me a lot and I learned new things about message buses.
I fully understand that your main responsibilities have priority and I feel honored that you spend spare time to write an example for me. I enjoyed the time, but I had to take a step back and reconsider earlier choices.
All the best
Patrick