fusesource/mqtt-client

Client listener does not seem to function in multiple threads

Opened this issue · 0 comments

Hi,

Based on the client example I wrote a Listener class to listen to topics from my MQTT server. When I start the client it listens to a topic by starting it in a thread. If I start one thread then everything is fine. If I start 2 threads then something goes wrong and onConnected / onDisconnected starts flowing over me :-)

Everything in my listener class is private so I started to wonder if I am not allowed to use the org.fusesource.mqtt.client library in threads.

Has anyone tried this?

Thanks in advance,

  • Gummi

PS: Here is the source I have

////////////ListenerMain.java

public class ListenerMain {

    public static void main(String[] args) throws Exception {
        //Listener mqList1 = new Listener("tcp://mq.mydev.local:1883", "topic1/#", "c:/gj.txt", true);      
        //new Thread(mqList1).start( );                                     

        //Listener mqList2 = new Listener("tcp://mq.mydev.local:1883", "topic2/#", "c:/gj2.txt", true);     
        //new Thread(mqList2).start( );                             

        Listener mqList3 = new Listener("tcp://mq.mydev.local:1883", "topic3/#", "c:/gj3.txt", true);       
        new Thread(mqList3).start( );

    }

}



////////////// Listener.java

import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.logging.*;
import java.io.*;
import java.net.URISyntaxException;

public class Listener implements Runnable{  
    private static final long DEFAULT_SLEEP_BEFORE_RE_ATTEMPT_IN_SECONDS = 5000;
    private static final long DEFAULT_MAX_RE_ATTEMPT_DURATION_IN_SECONDS = 3600 * 3;

    private long listenerSleepBeforeReAttemptInSeconds; 
    private long listenerMaxReAttemptDurationInSeconds;     
    private MQTT mqtt;  

    private ArrayList<Topic> topics;
    private boolean listenerDebug;
    private String listenerHostURI;
    private String listenerTopic;
    private String listenerLogFile; 
    private long listenerLastSuccessfulSubscription;

    private Logger fLogger;
    private String NEW_LINE = System.getProperty("line.separator");

    public Listener(String listenerHostURI, String listenerTopic, String logFile, boolean debug) {
        this(listenerHostURI, listenerTopic, logFile, DEFAULT_SLEEP_BEFORE_RE_ATTEMPT_IN_SECONDS, DEFAULT_MAX_RE_ATTEMPT_DURATION_IN_SECONDS, debug);
    }

    public Listener(String listenerHostURI, String listenerTopic, String logFile, long listenerSleepBeforeReAttemptInSeconds, long listenerMaxReAttemptDurationInSeconds, boolean debug) {
        init(listenerHostURI, listenerTopic, logFile, listenerSleepBeforeReAttemptInSeconds, listenerMaxReAttemptDurationInSeconds, debug);
    }

    private void init(String listenerHostURI, String listenerTopic, String logFile, long listenerSleepBeforeReAttemptInSeconds, long listenerMaxReAttemptDurationInSeconds, boolean debug) {        
        this.listenerHostURI = listenerHostURI;
        this.listenerTopic = listenerTopic;
        this.listenerLogFile = logFile;
        this.listenerSleepBeforeReAttemptInSeconds = listenerSleepBeforeReAttemptInSeconds;
        this.listenerMaxReAttemptDurationInSeconds = listenerMaxReAttemptDurationInSeconds;
        this.listenerDebug = debug;
        initMQTT();
    }

    private void initMQTT() {
        mqtt = new MQTT();
        listenerLastSuccessfulSubscription = System.currentTimeMillis();

        try {
            fLogger = Logger.getLogger("eTactica.mqtt.listener");
            FileHandler handler = new FileHandler(listenerLogFile);
            fLogger.addHandler(handler);
        } catch (IOException e) {
            System.out.println("Logger - Failed");
        }                    

        try {
            mqtt.setHost(listenerHostURI);
        } catch (URISyntaxException e) {
            stderr("setHost failed: " + e);
            stderr(e);
        }       
        QoS qos = QoS.AT_MOST_ONCE;
        topics = new ArrayList<Topic>();
        topics.add(new Topic(listenerTopic, qos));              
    }

    private void stdout(String x) {
        if (listenerDebug) {
            fLogger.log(Level.INFO, x + NEW_LINE);
        }
    }

    private void stderr(String x) {
        if (listenerDebug) {
            fLogger.log(Level.SEVERE, x + NEW_LINE);
        }
    }

    private void stderr(Throwable e) {
        if (listenerDebug) {            
            StringWriter sw = new StringWriter();
            PrintWriter pw = new PrintWriter(sw);
            e.printStackTrace(pw);

            fLogger.log(Level.SEVERE, sw.toString() + NEW_LINE);
        }
    }

    private void subscriptionSuccessful() {
        listenerLastSuccessfulSubscription = System.currentTimeMillis();    
    }    

    private boolean tryToListen() {             
        return ((System.currentTimeMillis() - listenerLastSuccessfulSubscription) < listenerMaxReAttemptDurationInSeconds * 1000);
    }

    private void sleepBeforeReAttempt() throws InterruptedException {       
        stdout(String.format(("Listener stopped, re-attempt in %s seconds."), listenerSleepBeforeReAttemptInSeconds));
        Thread.sleep(listenerSleepBeforeReAttemptInSeconds);
    }

    private void listenerReAttemptsOver() {
        stdout(String.format(("Listener stopped since reattempts have failed for %s seconds."), listenerMaxReAttemptDurationInSeconds));        
    }

    private void listen() {
        final CallbackConnection connection = mqtt.callbackConnection();
        final CountDownLatch done = new CountDownLatch(1);



       /* Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                setName("MQTT client shutdown");
                stderr("Disconnecting the client.");

                connection.getDispatchQueue().execute(new Runnable() {
                    public void run() {
                        connection.disconnect(new Callback<Void>() {
                            public void onSuccess(Void value) {
                                stdout("Disconnecting onSuccess.");
                                done.countDown();
                            }
                            public void onFailure(Throwable value) {
                                stderr("Disconnecting onFailure: " + value);
                                stderr(value);
                                done.countDown();
                            }
                        });
                    }
                });
            }
        });
        */

        connection.listener(new org.fusesource.mqtt.client.Listener() {

            public void onConnected() {
                stdout("Listener onConnected");                
            }

            public void onDisconnected() {
                stdout("Listener onDisconnected");
            }

            public void onPublish(UTF8Buffer topic, Buffer body, Runnable ack) {
                stdout(topic + " --> " + body.toString());                      
                ack.run();
            }

            public void onFailure(Throwable value) {
                stdout("Listener onFailure: " + value);                             
                stderr(value);
                done.countDown();
            }
        });

        connection.resume();

        connection.connect(new Callback<Void>() {
            public void onFailure(Throwable value) {
                stderr("Connect onFailure...: " + value);                        
                stderr(value);
                done.countDown();                
            }

            public void onSuccess(Void value) {
                final Topic[] ta = topics.toArray(new Topic[topics.size()]);
                connection.subscribe(ta, new Callback<byte[]>() {
                    public void onSuccess(byte[] value) {
                        for (int i = 0; i < value.length; i++) {
                            stdout("Subscribed to Topic: " + ta[i].name() + " with QoS: " + QoS.values()[value[i]]);
                        }
                        subscriptionSuccessful();
                    }
                    public void onFailure(Throwable value) {
                        stderr("Subscribe failed: " + value);                        
                        stderr(value);
                        done.countDown();
                    }
                });
            }
        });

        try {
            done.await();
        } catch (Exception e) {
            stderr(e);
        }
    }

    @Override
    public void run() {
        while (tryToListen()) {
            initMQTT();
            listen();

            try {
                sleepBeforeReAttempt();
            } catch (InterruptedException e) {
                stderr("Sleep failed:" + e);
                stderr(e);
            }
        }

        listenerReAttemptsOver();       
    }

}