Run command on you [$GOPATH/src] path:
go get -u github.com/alex60217101990/nats_utils
Create service:
natsServer := NewStreamingService(
SetContext(ctx),
SetConfigs(&Config{
Async: true,
ClusterID: &ClusterID,
MsgChannelBufferSize: &MsgChannelBufferSize,
MaxPubAcksInflight: &MaxPubAcksInflight,
Compress: true,
Logger: nil,
Options: &nats_streaming.Options{
Servers: servers,
Secure: false,
User: "some_user",
Password: "some_password",
},
}),
)
Connect with NATS single node or cluster:
natsServer.Connect()
Run publisher loop:
go natsServer.RunPublisher()
Publish message to channel:
natsServer.PublisherPush("test_channel", []byte(fmt.Sprintf("test_message: %v", t)))
Usage with handler function:
natsServer.RunSubscriber(SubscriberConfig{
Type: ClassicSubscribe,
StartType: DeliverAllAvailable,
SubMsgChan: subMsgChan,
ChannelName: &ChannelName,
MsgHandler: func(m *stan.Msg) {
//some action with current message...
},
MaxInflight: &MaxPubAcksInflight,
AckWaitSeconds: &AckWaitSeconds,
DurableName: &DurableName,
})
Usage with ring buffering channel:
subMsgChan := channels.NewRingChannel(channels.BufferCap(500))
defer subMsgChan.Close()
natsServer.RunSubscriber(SubscriberConfig{
Type: ClassicSubscribe,
StartType: DeliverAllAvailable,
SubMsgChan: subMsgChan,
ChannelName: &ChannelName,
MaxInflight: &MaxPubAcksInflight,
AckWaitSeconds: &AckWaitSeconds,
DurableName: &DurableName,
})
for {
select {
case m := <-subMsgChan.Out():
//some action with current message...
}
}