ToolDAQFramework
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
Utilities.cpp
Go to the documentation of this file.
1 #include <Utilities.h>
2 
3 Utilities::Utilities(zmq::context_t* zmqcontext){
4  context=zmqcontext;
5  Threads.clear();
6 }
7 
8 bool Utilities::AddService(std::string ServiceName, unsigned int port, bool StatusQuery){
9 
10  zmq::socket_t Ireceive (*context, ZMQ_PUSH);
11  Ireceive.connect("inproc://ServicePublish");
12 
13  boost::uuids::uuid m_UUID;
14  m_UUID = boost::uuids::random_generator()();
15 
16  std::stringstream test;
17  test<<"Add "<< ServiceName <<" "<<m_UUID<<" "<<port<<" "<<((int)StatusQuery) ;
18 
19  zmq::message_t send(test.str().length()+1);
20  snprintf ((char *) send.data(), test.str().length()+1 , "%s" ,test.str().c_str()) ;
21 
22  return Ireceive.send(send);
23 
24 
25 }
26 
27 
28 bool Utilities::RemoveService(std::string ServiceName){
29 
30  zmq::socket_t Ireceive (*context, ZMQ_PUSH);
31  Ireceive.connect("inproc://ServicePublish");
32 
33  std::stringstream test;
34  test<<"Delete "<< ServiceName << " ";
35  zmq::message_t send(test.str().length()+1);
36  snprintf ((char *) send.data(), test.str().length()+1 , "%s" ,test.str().c_str()) ;
37 
38  return Ireceive.send(send);
39 
40 }
41 
42 int Utilities::UpdateConnections(std::string ServiceName, zmq::socket_t* sock, std::map<std::string,Store*> &connections){
43 
44  boost::uuids::uuid m_UUID=boost::uuids::random_generator()();
45  long msg_id=0;
46 
47  zmq::socket_t Ireceive (*context, ZMQ_DEALER);
48  Ireceive.connect("inproc://ServiceDiscovery");
49 
50 
51  zmq::message_t send(4);
52  snprintf ((char *) send.data(), 4 , "%s" ,"All") ;
53 
54 
55  Ireceive.send(send);
56 
57  zmq::message_t receive;
58  Ireceive.recv(&receive);
59  std::istringstream iss(static_cast<char*>(receive.data()));
60 
61  int size;
62  iss>>size;
63 
64  for(int i=0;i<size;i++){
65 
66  Store *service = new Store;
67 
68  zmq::message_t servicem;
69  Ireceive.recv(&servicem);
70 
71  std::istringstream ss(static_cast<char*>(servicem.data()));
72  service->JsonParser(ss.str());
73 
74  std::string type;
75  std::string uuid;
76  std::string ip;
77  std::string port;
78  service->Get("msg_value",type);
79  service->Get("uuid",uuid);
80  service->Get("ip",ip);
81  service->Get("remote_port",port);
82  std::string tmp=ip + ":" + port;
83 
84  //if(type == ServiceName && connections.count(uuid)==0){
85  if(type == ServiceName && connections.count(tmp)==0){
86  connections[tmp]=service;
87  //std::string ip;
88  //std::string port;
89  //service->Get("ip",ip);
90  //service->Get("remote_port",port);
91  tmp="tcp://"+ tmp;
92  sock->connect(tmp.c_str());
93  }
94  else{
95  delete service;
96  service=0;
97  }
98 
99 
100  }
101 
102  return connections.size();
103  }
104 
105 Thread_args* Utilities::CreateThread(std::string ThreadName, void (*func)(Thread_args*), Thread_args* args){
106 
107  if(Threads.count(ThreadName)==0){
108 
109  if(args==0) args = new Thread_args();
110 
111  args->context=context;
112  args->ThreadName=ThreadName;
113  args->func=func;
114  args->running=true;
115 
116  pthread_create(&(args->thread), NULL, Utilities::Thread, args);
117 
118  args->sock=0;
119  Threads[ThreadName]=args;
120 
121 }
122 
123  else args=0;
124 
125  return args;
126 
127 }
128 
129 
130 Thread_args* Utilities::CreateThread(std::string ThreadName, void (*func)(std::string)){
131  Thread_args *args =0;
132 
133  if(Threads.count(ThreadName)==0){
134 
135  args = new Thread_args(context, ThreadName, func);
136  pthread_create(&(args->thread), NULL, Utilities::String_Thread, args);
137  args->sock=0;
138  args->running=true;
139  Threads[ThreadName]=args;
140  }
141 
142  return args;
143 }
144 
145 void *Utilities::String_Thread(void *arg){
146 
147 
148  Thread_args *args = static_cast<Thread_args *>(arg);
149 
150  zmq::socket_t IThread(*(args->context), ZMQ_PAIR);
152  std::stringstream tmp;
153  tmp<<"inproc://"<<args->ThreadName;
154  IThread.bind(tmp.str().c_str());
155 
156 
157  zmq::pollitem_t initems[] = {
158  {IThread, 0, ZMQ_POLLIN, 0}};
159 
160  args->running = true;
161 
162  while(!args->kill){
163  if(args->running){
164 
165  std::string command="";
166 
167  zmq::poll(&initems[0], 1, 0);
168 
169  if ((initems[0].revents & ZMQ_POLLIN)){
170 
171  zmq::message_t message;
172  IThread.recv(&message);
173  command=std::string(static_cast<char *>(message.data()));
174 
175  }
176 
177  args->func_with_string(command);
178  }
179 
180  else usleep(100);
181 
182  }
183 
184  pthread_exit(NULL);
185  }
186 
187 void *Utilities::Thread(void *arg){
188 
189  Thread_args *args = static_cast<Thread_args *>(arg);
190 
191  while (!args->kill){
192 
193  if(args->running) args->func(args );
194  else usleep(100);
195 
196  }
197 
198  pthread_exit(NULL);
199 
200 }
201 
202 bool Utilities::MessageThread(Thread_args* args, std::string Message, bool block){
203 
204  bool ret=false;
205 
206  if(args){
207 
208  if(!args->sock){
209 
210  args->sock = new zmq::socket_t(*(args->context), ZMQ_PAIR);
211  std::stringstream tmp;
212  tmp<<"inproc://"<<args->ThreadName;
213  args->sock->connect(tmp.str().c_str());
214 
215  }
216 
217  zmq::message_t msg(Message.length()+1);
218  snprintf((char *)msg.data(), Message.length()+1, "%s", Message.c_str());
219 
220  if(block) ret=args->sock->send(msg);
221  else ret=args->sock->send(msg, ZMQ_NOBLOCK);
222 
223  }
224 
225  return ret;
226 
227 }
228 
229 bool Utilities::MessageThread(std::string ThreadName, std::string Message, bool block){
230 
231  return MessageThread(Threads[ThreadName],Message,block);
232 }
233 
235 
236  bool ret=false;
237 
238  if(args){
239 
240  args->running=false;
241  args->kill=true;
242 
243  pthread_join(args->thread, NULL);
244  delete args;
245  args=0;
246 
247 
248  }
249 
250  return ret;
251 
252 }
253 
254 bool Utilities::KillThread(std::string ThreadName){
255 
256  return KillThread(Threads[ThreadName]);
257 
258 }
259 
260 
262 
264 bool util::FileExists(std::string pathname, std::string filename) {
265  std::string filepath = pathname + "/" + filename;
266  bool exists = access(filepath.c_str(), F_OK) != -1;
267  if(!exists) {
268  std::stringstream ss;
269  ss << "FATAL: " << filepath << " not found or inaccessible";
270  util::Log(ss.str(), util::FATAL);
271  return false;
272  }
273  return true;
274 }
276 void util::Log(const std::string & message, const int message_level) {
277  std::stringstream tmp;
278  tmp << "[" << message_level << "] " << message;
279  std::cout << tmp.str() << std::endl;
280 }
282 void util::Log(std::stringstream & message, const int message_level) {
283  Log(message.str(), message_level);
284  message.str("");
285 }
bool MessageThread(Thread_args *args, std::string Message, bool block=true)
Send simple string to String thread.
Definition: Utilities.cpp:202
bool running
Bool flag to tell the thread to run (if not set thread goes into wait cycle.
Definition: Utilities.h:64
bool AddService(std::string ServiceName, unsigned int port, bool StatusQuery=false)
Broadcasts an available service (only in remote mode)
Definition: Utilities.cpp:8
bool kill
Bool flay used to kill the thread.
Definition: Utilities.h:65
std::string ThreadName
name of thread (deffined at creation)
Definition: Utilities.h:59
zmq::socket_t * sock
ZMQ socket pointer is assigned in string thread,but can be sued otherwise.
Definition: Utilities.h:63
static void * String_Thread(void *arg)
Simpe string thread.
Definition: Utilities.cpp:145
static void * Thread(void *arg)
Thread with args.
Definition: Utilities.cpp:187
void(* func_with_string)(std::string)
function pointer to string thread
Definition: Utilities.h:60
void(* func)(Thread_args *)
function pointer to thread with args
Definition: Utilities.h:61
Thread_args * CreateThread(std::string ThreadName, void(*func)(std::string))
Definition: Utilities.cpp:130
Utilities(zmq::context_t *zmqcontext)
Simple constructor.
Definition: Utilities.cpp:3
std::map< std::string, Thread_args * > Threads
Map of threads managed by the utilities class.
Definition: Utilities.h:148
pthread_t thread
Simple constructor underlying thread that interface is built ontop of.
Definition: Utilities.h:62
zmq::context_t * context
ZMQ context used for ZMQ socket creation.
Definition: Utilities.h:58
void Log(const std::string &message, const int message_level)
Format messages in the same way as for tools.
Definition: Utilities.cpp:276
bool KillThread(Thread_args *&args)
Kill a thread assosiated to args.
Definition: Utilities.cpp:234
bool RemoveService(std::string ServiceName)
Removes service broadcasts for a service.
Definition: Utilities.cpp:28
bool FileExists(std::string pathname, std::string filename)
Check if a file exists.
Definition: Utilities.cpp:264
zmq::context_t * context
ZMQ context pointer.
Definition: Utilities.h:145
int UpdateConnections(std::string ServiceName, zmq::socket_t *sock, std::map< std::string, Store * > &connections)
Dynamically connects a socket tp services broadcast with a specific name.
Definition: Utilities.cpp:42