UDP client doesn't work anymore
Closed this issue · 10 comments
Hello,
I've updated my riemann-java-client to version 0.2.10 so that I can take advantage of the "attributes" fix (796c3ac). Everything was working fine with 0.2.9, but since 0.2.10, events sent in udp are not received by riemann. If I switch to tcp, everything works fine.
I think 1309404 (and the associate be48daa) might have broken udp mode since it seems that it now requires an ack for both tcp and udp.
- public void send() {
- client.sendEvents(build());
+ public IPromise send() {
+ return client.aSendEventsWithAck(build());
}
Indeed I may have missed something though.
I use the following code to send an event:
client = RiemannClient.udp(riemannIP, riemannPort);
client.connect();
client.event().
service(service).
state("running").
description(desc).
attributes(data).
metric(time).
tags(tags).
ttl(30).
send();
Of course, adding deref(1, java.util.concurrent.TimeUnit.SECONDS)
doesn't change anything more than firing an UnsupportedOperationException since UDP transport doesn't support receiving messages
.
In addition, I've examined the content of the object return by our client.send();
operation (a Promise), where we find that property ref has the value java.lang.UnsupportedOperationException: UDP transport doesn't support receiving messages
.
It seems we really need to distinguish between tcp and udp here.
Good catch. We should use aMaybeSendEvents instead.
This change seems to fix the problem.
vvision/riemann-java-client@151953d
But in this case, aSendEventsWithAck is maybe not a good name any more; since it's only true with tcp.
I think you're right; how do you feel about adding an aSendEvents function to EventDSL which doesn't imply an ack?
I'm willing to give it a try.
This should work works:
public void aSendEvents() {
client.sendEvents(build());
}
But, if I understand clearly, a obviously stands for asynchronous.
So here, we would use the aSendEvents function to call a synchronous one sendEvents.
Should we make the distinction between synchronous and asynchronous, having something like:
public void sendEvents() {
client.sendEvents(build());
}
public IPromise aSendEvents() {
return client.aSendEvents(build());
}
In AbstractRiemannClient, aSendEvents function would be:
public IPromise<Boolean> aSendEvents(final List<Event> events) {
final IPromise<Msg> p = aSendMaybeRecvMessage(
Msg.newBuilder()
.addAllEvents(events)
.build());
return new IPromise<Boolean>() {
public void deliver(Object value) { }
public Boolean deref() throws IOException {
validate(p.deref());
return true;
}
public Boolean deref(long time, TimeUnit unit) throws IOException {
validate(p.deref(time, unit));
return true;
}
public Boolean deref(long time, TimeUnit unit, Boolean timeoutValue) throws IOException {
if (null != validate(p.deref(time, unit, null))) {
return true;
} else {
return false;
}
}
};
}
public IPromise<Boolean> aSendEvents(final Event... events) {
return aSendEvents(Arrays.asList(events));
}
This is working too. I haven't fully understand Promise mechanism yet, so I wonder if it could affect udp transport in one way or another.
Pushing changes to fork: vvision/riemann-java-client@ac75924.
Are there plans to have UDP working in riemann-java-client? I'm using 0.4.5
What is your issue ? I just tried using 0.4.5 :
RiemannClient c = RiemannClient.udp("localhost", 5555);
c.connect();
c.event().
service("udp").
state("ok").
metric(10).
tags("foo").
time(1).
send();
And it works.
@mcorbin
Good to know that works for someone.
I'm sending messages to the same server with tcp and udp. Messages sent though tcp client are logged by riemann. upd messages are missed.
I see in logs that both tcp and udp servers are started.
Logs
riemann-server_1 | INFO [2017-05-05 12:30:36,280] main - riemann.bin - PID 1 riemann-server_1 | INFO [2017-05-05 12:30:36,592] clojure-agent-send-off-pool-3 - riemann.transport.tcp - TCP server 0.0.0.0 5555 online riemann-server_1 | INFO [2017-05-05 12:30:36,620] clojure-agent-send-off-pool-1 - riemann.transport.udp - UDP server 0.0.0.0 5555 16384 -1 online riemann-server_1 | INFO [2017-05-05 12:30:36,624] main - riemann.core - Hyperspace core online riemann-server_1 | #riemann.codec.Event{:host "host", :service "serviceName", :state "state", :description "tcp1", :metric nil, :tags ["LOGIN"], :time 1493987490942, :ttl nil, :attr1 "val1", :attr2 "val2"} riemann-server_1 | #riemann.codec.Event{:host "host", :service "serviceName", :state "state", :description "tcp2", :metric nil, :tags ["LOGIN"], :time 1493987587819, :ttl nil, :attr1 "val1", :attr2 "val2"} riemann-server_1 | #riemann.codec.Event{:host "host", :service "serviceName", :state "state", :description "tcp3", :metric nil, :tags ["LOGIN"], :time 1493987669253, :ttl nil, :attr1 "val1", :attr2 "val2"}
Code for tcp:
`
RiemannClient riemannClient = RiemannClient.tcp("192.168.1.55", 5555);
try {
riemannClient.connect();
final EventDSL riemannEvent = riemannClient.event();
riemannEvent.description("tcp3");
riemannEvent.service("serviceName");
riemannEvent.host("host");
riemannEvent.state("state");
riemannEvent.time(System.currentTimeMillis());
riemannEvent.tag(EventType.LOGIN.name());
Map<String, String> attributes = new HashMap<>();
attributes.put("attr1", "val1");
attributes.put("attr2", "val2");
riemannEvent.attributes(attributes);
final IPromise<Proto.Msg> msgIPromise = riemannClient.sendEvent(riemannEvent.build());
final Proto.Msg deref = msgIPromise.deref(3, TimeUnit.SECONDS);
log.info(deref.toString());
} catch (IOException e) {
log.error("Error during send", e);
} finally {
riemannClient.close();
}
Code for udp:
RiemannClient riemannClient = RiemannClient.udp("192.168.1.55", 5555);
try {
riemannClient.connect();
final EventDSL riemannEvent = riemannClient.event();
riemannEvent.description("udp3");
riemannEvent.service("serviceName");
riemannEvent.host("host");
riemannEvent.state("state");
riemannEvent.time(System.currentTimeMillis());
riemannEvent.tag(EventType.LOGIN.name());
Map<String, String> attributes = new HashMap<>();
attributes.put("attr1", "val1");
attributes.put("attr2", "val2");
riemannEvent.attributes(attributes);
riemannEvent.send();
} catch (IOException e) {
log.error("Error during send", e);
} finally {
riemannClient.close();
}
}
`
Knowing this works for you will try to find the issue on my side. Thanks!
Your description for udp is tcp3:
riemannEvent.description("tcp3");
Maybe it's the event you see in the log ?
My problem was with not opening 5555 as an udp port in docker. Everything works fine now.