Sample tutorials for more functionality.
Opened this issue · 7 comments
It will be nice to have some more tutorial and examples for functionality like Reactors and Pollers.
Tutorials on actors and inter-thread communications would be very nice too.
Would having versions of the examples from the zmq guide (https://github.com/imatix/zguide/tree/master/examples) build with zmqpp solve that? I'm not sure I'm all that good at writing a tutorial.
That would be very helpful.
On May 19, 2016 11:40 AM, "Ben Gray" notifications@github.com wrote:
Would having versions of the examples from the zmq guide (
https://github.com/imatix/zguide/tree/master/examples) build with zmqpp
solve that? I'm not sure I'm all that good at writing a tutorial.—
You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub
#160 (comment)
+1
#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include <vector>
using namespace std;
#include <zmqpp/context.hpp>
#include <zmqpp/socket.hpp>
#include <zmqpp/message.hpp>
#include <zmqpp/reactor.hpp>
#include <signal.h>
static volatile bool interrupted = false;
void my_signal_handler(int){
cout << "signal" << endl;
interrupted = true;
}
void producer_task(){
try{
zmqpp::context zmq_context;
zmqpp::socket socket {
zmq_context,
zmqpp::socket_type::push
};
socket.bind( "tcp://*:4099" );
int x = 0;
while( 1 ){
std::this_thread::sleep_for( std::chrono::seconds(1) );
cout << "Message: " << x << endl;
zmqpp::message request;
request << "Message: " + std::to_string(x);
request << x;
socket.send(request);
x++;
}
}catch( zmqpp::zmq_internal_exception& e ){
cerr << "Exception: " << e.what() << endl;
}catch( ... ){
cerr << "Unknown Exception: " << endl;
}
}
void consumer_task(){
try{
zmqpp::context zmq_context;
zmqpp::socket socket_1 {
zmq_context,
zmqpp::socket_type::pull
};
socket_1.connect( "tcp://127.0.0.1:4099" );
zmqpp::socket socket_2 {
zmq_context,
zmqpp::socket_type::pull
};
socket_2.connect( "tcp://127.0.0.1:4099" );
zmqpp::reactor reactor;
auto first_listener = [&socket_1](){
zmqpp::message response;
socket_1.receive(response);
cout << "first_listener: " << response.get(0) << endl;
};
auto second_listener = [&socket_2](){
zmqpp::message response;
socket_2.receive(response);
cout << "second_listener: " << response.get(0) << endl;
};
reactor.add( socket_1, first_listener );
reactor.add( socket_2, second_listener );
while( reactor.poll() && !interrupted ){
}
//interrupted
cout << "interrupted" << endl;
}catch( zmqpp::zmq_internal_exception& e ){
cerr << "Exception: " << e.what() << endl;
}catch( ... ){
cerr << "Unknown Exception: " << endl;
}
}
void usage(){
cout << "usage: test [producer|consumer]" << endl;
}
int main( int argc, char** argv ){
signal(SIGINT, my_signal_handler);
string task;
vector<string> arguments;
if( argc > 1 ){
int count = 0;
while( count < argc ){
if( count == 1 ) task = string(argv[1]);
arguments.push_back( argv[count] );
count++;
}
}else{
usage();
return 0;
}
if( task == "producer" ){
std::thread producer_thread( producer_task );
producer_thread.join();
}else if( task == "consumer" ){
std::thread consumer_thread( consumer_task );
consumer_thread.join();
}else{
usage();
return 0;
}
return 0;
}
Alternative using the reactor's poller and one callback:
zmqpp::context zmq_context;
zmqpp::socket socket_1 {
zmq_context,
zmqpp::socket_type::pull
};
socket_1.connect( "tcp://127.0.0.1:4099" );
//socket_1.subscribe("");
zmqpp::socket socket_2 {
zmq_context,
zmqpp::socket_type::pull
};
socket_2.connect( "tcp://127.0.0.1:4099" );
//socket_2.subscribe("");
zmqpp::socket pub_socket {
zmq_context,
zmqpp::socket_type::pub
};
pub_socket.bind( "tcp://127.0.0.1:4098" );
zmqpp::reactor reactor;
zmqpp::poller& poller = reactor.get_poller();
auto socket_listener = [&poller, &socket_1, &socket_2, &pub_socket](){
zmqpp::message response;
if( poller.has_input(socket_1) ){
socket_1.receive(response);
cout << "1: " << response.get(0) << endl;
pub_socket.send("OKAY 1");
}
if( poller.has_input(socket_2) ){
socket_2.receive(response);
cout << "2: " << response.get(0) << endl;
pub_socket.send("OKAY 2");
}
};
reactor.add( socket_1, socket_listener );
reactor.add( socket_2, socket_listener );
while( reactor.poll() != -1 && !interrupted ){
}
+1