ToolDAQFramework
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
MyToolZMQMultiThread.cpp
Go to the documentation of this file.
1 #include "MyToolZMQMultiThread.h"
2 
4 
6 
7 
9 
10 
11 bool MyToolZMQMultiThread::Initialise(std::string configfile, DataModel &data){
12 
13  if(configfile!="") m_variables.Initialise(configfile);
14  //m_variables.Print();
15 
16  m_data= &data;
17  m_log= m_data->Log;
18 
19  if(!m_variables.Get("verbose",m_verbose)) m_verbose=1;
20 
21  int threadcount=0;
22  if(!m_variables.Get("Threads",threadcount)) threadcount=4;
23 
24  m_util=new Utilities(m_data->context);
25 
26  ManagerSend=new zmq::socket_t(*m_data->context,ZMQ_PUSH);
27  ManagerSend->bind("inproc://MyToolZMQMultiThreadSend");
28  ManagerReceive=new zmq::socket_t(*m_data->context,ZMQ_PULL);
29  ManagerReceive->bind("inproc://MyToolZMQMultiThreadReceive");
30 
31  items[0].socket=*ManagerSend;
32  items[0].fd=0;
33  items[0].events=ZMQ_POLLOUT;
34  items[0].revents=0;
35  items[1].socket=*ManagerReceive;
36  items[1].fd=0;
37  items[1].events=ZMQ_POLLIN;
38  items[1].revents=0;
39 
40  for(int i=0;i<threadcount;i++){
42  args.push_back(tmparg);
43  std::stringstream tmp;
44  tmp<<"T"<<i;
45  m_util->CreateThread(tmp.str(), &Thread, args.at(i));
46  }
47 
48  m_freethreads=threadcount;
49 
50 
51 
52  return true;
53 }
54 
55 
57 
58  zmq::poll(&items[0], 2, 0);
59 
60  if ((items[1].revents & ZMQ_POLLIN)){
61 
62  zmq::message_t message;
63  ManagerReceive->recv(&message);
64  std::istringstream iss(static_cast<char*>(message.data()));
65  std::cout<<"reply = "<<iss.str()<<std::endl;
66  m_freethreads++;
67 
68  }
69 
70  if ((items[0].revents & ZMQ_POLLOUT)){
71 
72  if(m_freethreads>0){
73 
74  // Utilities::SendPointer(ManagerSend,pointer);
75  std::string greeting="HI";
76  zmq::message_t message(greeting.length()+1);
77  snprintf ((char *) message.data(), greeting.length()+1 , "%s" , greeting.c_str()) ;
78  ManagerSend->send(message);
79  m_freethreads--;
80  std::cout<<"sending Hi"<<std::endl;
81  }
82 
83  }
84 
85  std::cout<<"free threads="<<m_freethreads<<":"<<args.size()<<std::endl;
86  sleep(1);
87  return true;
88 }
89 
90 
92 
93  for(int i=0;i<args.size();i++) m_util->KillThread(args.at(i));
94 
95  args.clear();
96 
97  delete m_util;
98  m_util=0;
99 
100  delete ManagerSend;
101  ManagerSend=0;
102 
103  delete ManagerReceive;
104  ManagerReceive=0;
105 
106  return true;
107 }
108 
110 
112 
113  zmq::socket_t ThreadReceive(*args->context,ZMQ_PULL);
114  ThreadReceive.connect("inproc://MyToolZMQMultiThreadSend");
115  zmq::socket_t ThreadSend(*args->context,ZMQ_PUSH);
116  ThreadSend.connect("inproc://MyToolZMQMultiThreadReceive");
117 
118  zmq::pollitem_t initems[1] = {{ThreadReceive,0,ZMQ_POLLIN,0}};
119  zmq::pollitem_t outitems[1] = {{ThreadSend,0,ZMQ_POLLOUT,0}};
120 
121  zmq::poll(&initems[0], 1, 100);
122 
123  if ((initems[0].revents & ZMQ_POLLIN)){
124 
125  zmq::message_t message;
126  ThreadReceive.recv(&message);
127  std::istringstream iss(static_cast<char*>(message.data()));
128 
129  sleep(10);
130 
131  zmq::poll(&outitems[0], 1, 10000);
132  if ((outitems[0].revents & ZMQ_POLLOUT)){
133 
134  std::string greeting="hello";
135  zmq::message_t message(greeting.length()+1);
136  snprintf ((char *) message.data(), greeting.length()+1 , "%s" , greeting.c_str()) ;
137  ThreadSend.send(message);
138  }
139 
140  }
141 
142 }
bool Initialise(std::string configfile, DataModel &data)
Initialise Function for setting up Tool resorces.
bool Execute()
Executre function used to perform Tool perpose.
Logging * Log
Log class pointer for use in Tools, it can be used to send messages which can have multiple error lev...
Definition: DataModel.h:60
Thread_args * CreateThread(std::string ThreadName, void(*func)(std::string))
Definition: Utilities.cpp:130
int m_freethreads
Keeps track of free threads.
zmq::socket_t * ManagerSend
Socket to send information to threads.
Utilities * m_util
Pointer to utilities class to help with threading.
zmq::context_t * context
ZMQ context used for ZMQ socket creation.
Definition: Utilities.h:58
bool Finalise()
Finalise funciton used to clean up resorces.
bool KillThread(Thread_args *&args)
Kill a thread assosiated to args.
Definition: Utilities.cpp:234
zmq::socket_t * ManagerReceive
Socket to receive information form threads.
static void Thread(Thread_args *arg)
Function to be run by the thread in a loop. Make sure not to block in it.
zmq::pollitem_t items[2]
This is used to both inform the poll and store its output. Allows for multitasking sockets...
std::vector< MyToolZMQMultiThread_args * > args
Vector of thread args (also holds pointers to the threads)
MyToolZMQMultiThread()
Simple constructor.