Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
THttpWSHandler.cxx
Go to the documentation of this file.
1 // $Id$
2 // Author: Sergey Linev 20/10/2017
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2013, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #include "THttpWSHandler.h"
13 
14 #include "THttpWSEngine.h"
15 #include "THttpCallArg.h"
16 #include "TSystem.h"
17 
18 #include <thread>
19 #include <chrono>
20 
21 /////////////////////////////////////////////////////////////////////////
22 ///
23 /// THttpWSHandler
24 ///
25 /// Class for user-side handling of websocket with THttpServer
26 /// 1. Create derived from THttpWSHandler class and implement
27 /// ProcessWS() method, where all web sockets request handled.
28 /// 2. Register instance of derived class to running THttpServer
29 ///
30 /// TUserWSHandler *handler = new TUserWSHandler("name1","title");
31 /// THttpServer *server = new THttpServer("http:8090");
32 /// server->Register("/subfolder", handler)
33 ///
34 /// 3. Now server can accept web socket connection from outside.
35 /// For instance, from JavaScirpt one can connect to it with code:
36 ///
37 /// var ws = new WebSocket("ws://hostname:8090/subfolder/name1/root.websocket")
38 ///
39 /// 4. In the ProcessWS(THttpCallArg *arg) method following code should be implemented:
40 ///
41 /// if (arg->IsMethod("WS_CONNECT")) {
42 /// return true; // to accept incoming request
43 /// }
44 ///
45 /// if (arg->IsMethod("WS_READY")) {
46 /// fWSId = arg->GetWSId(); // fWSId should be member of the user class
47 /// return true; // connection established
48 /// }
49 ///
50 /// if (arg->IsMethod("WS_CLOSE")) {
51 /// fWSId = 0;
52 /// return true; // confirm close of socket
53 /// }
54 ///
55 /// if (arg->IsMethod("WS_DATA")) {
56 /// // received data stored as POST data
57 /// std::string str((const char *)arg->GetPostData(), arg->GetPostDataLength());
58 /// std::cout << "got string " << str << std::endl;
59 /// // immediately send data back using websocket id
60 /// SendCharStarWS(fWSId, "our reply");
61 /// return true;
62 /// }
63 ///
64 ///////////////////////////////////////////////////////////////////////////
65 
66 ClassImp(THttpWSHandler);
67 
68 ////////////////////////////////////////////////////////////////////////////////
69 /// normal constructor
70 
71 THttpWSHandler::THttpWSHandler(const char *name, const char *title, Bool_t syncmode) : TNamed(name, title), fSyncMode(syncmode)
72 {
73 }
74 
75 ////////////////////////////////////////////////////////////////////////////////
76 /// destructor
77 /// Make sure that all sending threads are stopped correctly
78 
79 THttpWSHandler::~THttpWSHandler()
80 {
81  SetDisabled();
82 
83  std::vector<std::shared_ptr<THttpWSEngine>> clr;
84 
85  {
86  std::lock_guard<std::mutex> grd(fMutex);
87  std::swap(clr, fEngines);
88  }
89 
90  for (auto &eng : clr) {
91  eng->fDisabled = true;
92  if (eng->fHasSendThrd) {
93  eng->fHasSendThrd = false;
94  if (eng->fWaiting)
95  eng->fCond.notify_all();
96  eng->fSendThrd.join();
97  }
98  eng->ClearHandle(kTRUE); // terminate connection before starting destructor
99  }
100 }
101 
102 /// Returns current number of websocket connections
103 Int_t THttpWSHandler::GetNumWS()
104 {
105  std::lock_guard<std::mutex> grd(fMutex);
106  return fEngines.size();
107 }
108 
109 ////////////////////////////////////////////////////////////////////////////////
110 /// Return websocket id with given sequential number
111 /// Number of websockets returned with GetNumWS() method
112 
113 UInt_t THttpWSHandler::GetWS(Int_t num)
114 {
115  std::lock_guard<std::mutex> grd(fMutex);
116  auto iter = fEngines.begin() + num;
117  return (*iter)->GetId();
118 }
119 
120 ////////////////////////////////////////////////////////////////////////////////
121 /// Find websocket connection handle with given id
122 /// If book_send parameter specified, have to book send operation under the mutex
123 
124 std::shared_ptr<THttpWSEngine> THttpWSHandler::FindEngine(UInt_t wsid, Bool_t book_send)
125 {
126  if (IsDisabled())
127  return nullptr;
128 
129  std::lock_guard<std::mutex> grd(fMutex);
130 
131  for (auto &eng : fEngines)
132  if (eng->GetId() == wsid) {
133 
134  // not allow to work with disabled engine
135  if (eng->fDisabled)
136  return nullptr;
137 
138  if (book_send) {
139  if (eng->fMTSend) {
140  Error("FindEngine", "Try to book next send operation before previous completed");
141  return nullptr;
142  }
143  eng->fMTSend = kTRUE;
144  }
145  return eng;
146  }
147 
148  return nullptr;
149 }
150 
151 ////////////////////////////////////////////////////////////////////////////////
152 /// Remove and destroy WS connection
153 
154 void THttpWSHandler::RemoveEngine(std::shared_ptr<THttpWSEngine> &engine, Bool_t terminate)
155 {
156  if (!engine) return;
157 
158  {
159  std::lock_guard<std::mutex> grd(fMutex);
160 
161  for (auto iter = fEngines.begin(); iter != fEngines.end(); iter++)
162  if (*iter == engine) {
163  if (engine->fMTSend)
164  Error("RemoveEngine", "Trying to remove WS engine during send operation");
165 
166  engine->fDisabled = true;
167  fEngines.erase(iter);
168  break;
169  }
170  }
171 
172  engine->ClearHandle(terminate);
173 
174  if (engine->fHasSendThrd) {
175  engine->fHasSendThrd = false;
176  if (engine->fWaiting)
177  engine->fCond.notify_all();
178  engine->fSendThrd.join();
179  }
180 }
181 
182 ////////////////////////////////////////////////////////////////////////////////
183 /// Process request to websocket
184 /// Different kind of requests coded into THttpCallArg::Method
185 /// "WS_CONNECT" - connection request
186 /// "WS_READY" - connection ready
187 /// "WS_CLOSE" - connection closed
188 /// All other are normal data, which are delivered to users
189 
190 Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg)
191 {
192  if (IsDisabled())
193  return kFALSE;
194 
195  if (!arg->GetWSId())
196  return ProcessWS(arg.get());
197 
198  // normally here one accept or reject connection requests
199  if (arg->IsMethod("WS_CONNECT"))
200  return ProcessWS(arg.get());
201 
202  auto engine = FindEngine(arg->GetWSId());
203 
204  if (arg->IsMethod("WS_READY")) {
205 
206  if (engine) {
207  Error("HandleWS", "WS engine with similar id exists %u", arg->GetWSId());
208  RemoveEngine(engine, kTRUE);
209  }
210 
211  engine = arg->TakeWSEngine();
212  {
213  std::lock_guard<std::mutex> grd(fMutex);
214  fEngines.emplace_back(engine);
215  }
216 
217  if (!ProcessWS(arg.get())) {
218  // if connection refused, remove engine again
219  RemoveEngine(engine, kTRUE);
220  return kFALSE;
221  }
222 
223  return kTRUE;
224  }
225 
226  if (arg->IsMethod("WS_CLOSE")) {
227  // connection is closed, one can remove handle
228 
229  RemoveEngine(engine);
230 
231  return ProcessWS(arg.get());
232  }
233 
234  if (engine && engine->PreProcess(arg)) {
235  PerformSend(engine);
236  return kTRUE;
237  }
238 
239  Bool_t res = ProcessWS(arg.get());
240 
241  if (engine)
242  engine->PostProcess(arg);
243 
244  return res;
245 }
246 
247 ////////////////////////////////////////////////////////////////////////////////
248 /// Close connection with given websocket id
249 
250 void THttpWSHandler::CloseWS(UInt_t wsid)
251 {
252  auto engine = FindEngine(wsid);
253 
254  RemoveEngine(engine, kTRUE);
255 }
256 
257 ////////////////////////////////////////////////////////////////////////////////
258 /// Send data stored in the buffer
259 /// Returns 0 - when operation was executed immediately
260 /// 1 - when send operation will be performed in different thread
261 
262 Int_t THttpWSHandler::RunSendingThrd(std::shared_ptr<THttpWSEngine> engine)
263 {
264  if (IsSyncMode() || !engine->SupportSendThrd()) {
265  // this is case of longpoll engine, no extra thread is required for it
266  if (engine->CanSendDirectly())
267  return PerformSend(engine);
268 
269  // handling will be performed in following http request handler
270 
271  if (!IsSyncMode()) return 1;
272 
273  // now we should wait until next polling requests is processed
274  // or when connection is closed or handler is shutdown
275 
276  Int_t sendcnt = fSendCnt, loopcnt(0);
277 
278  while (!IsDisabled() && !engine->fDisabled) {
279  gSystem->ProcessEvents();
280  // if send counter changed - current send operation is completed
281  if (sendcnt != fSendCnt)
282  return 0;
283  if (loopcnt++ > 1000) {
284  loopcnt = 0;
285  std::this_thread::sleep_for(std::chrono::milliseconds(1));
286  }
287  }
288 
289  return -1;
290  }
291 
292  // probably this thread can continuously run
293  std::thread thrd([this, engine] {
294  while (!IsDisabled() && !engine->fDisabled) {
295  PerformSend(engine);
296  if (IsDisabled() || engine->fDisabled) break;
297  std::unique_lock<std::mutex> lk(engine->fMutex);
298  if (engine->fKind == THttpWSEngine::kNone) {
299  engine->fWaiting = true;
300  engine->fCond.wait(lk);
301  engine->fWaiting = false;
302  }
303  }
304  });
305 
306  engine->fSendThrd.swap(thrd);
307 
308  engine->fHasSendThrd = true;
309 
310  return 1;
311 }
312 
313 
314 ////////////////////////////////////////////////////////////////////////////////
315 /// Perform send operation, stored in buffer
316 
317 Int_t THttpWSHandler::PerformSend(std::shared_ptr<THttpWSEngine> engine)
318 {
319  {
320  std::lock_guard<std::mutex> grd(engine->fMutex);
321 
322  // no need to do something - operation was processed already by somebody else
323  if (engine->fKind == THttpWSEngine::kNone)
324  return 0;
325 
326  if (engine->fSending)
327  return 1;
328  engine->fSending = true;
329  }
330 
331  if (IsDisabled() || engine->fDisabled)
332  return 0;
333 
334  switch (engine->fKind) {
335  case THttpWSEngine::kData:
336  engine->Send(engine->fData.data(), engine->fData.length());
337  break;
338  case THttpWSEngine::kHeader:
339  engine->SendHeader(engine->fHdr.c_str(), engine->fData.data(), engine->fData.length());
340  break;
341  case THttpWSEngine::kText:
342  engine->SendCharStar(engine->fData.c_str());
343  break;
344  default:
345  break;
346  }
347 
348  engine->fData.clear();
349  engine->fHdr.clear();
350 
351  {
352  std::lock_guard<std::mutex> grd(engine->fMutex);
353  engine->fSending = false;
354  engine->fKind = THttpWSEngine::kNone;
355  }
356 
357  return CompleteSend(engine);
358 }
359 
360 
361 ////////////////////////////////////////////////////////////////////////////////
362 /// Complete current send operation
363 
364 Int_t THttpWSHandler::CompleteSend(std::shared_ptr<THttpWSEngine> &engine)
365 {
366  fSendCnt++;
367  engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
368  CompleteWSSend(engine->GetId());
369  return 0; // indicates that operation is completed
370 }
371 
372 
373 ////////////////////////////////////////////////////////////////////////////////
374 /// Send binary data via given websocket id
375 /// Returns -1 - in case of error
376 /// 0 - when operation was executed immediately
377 /// 1 - when send operation will be performed in different thread
378 
379 Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len)
380 {
381  auto engine = FindEngine(wsid, kTRUE);
382  if (!engine) return -1;
383 
384  if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
385  engine->Send(buf, len);
386  return CompleteSend(engine);
387  }
388 
389  bool notify = false;
390 
391  // now we indicate that there is data and any thread can access it
392  {
393  std::lock_guard<std::mutex> grd(engine->fMutex);
394 
395  if (engine->fKind != THttpWSEngine::kNone) {
396  Error("SendWS", "Data kind is not empty - something screwed up");
397  return -1;
398  }
399 
400  notify = engine->fWaiting;
401 
402  engine->fKind = THttpWSEngine::kData;
403 
404  engine->fData.resize(len);
405  std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
406  }
407 
408  if (engine->fHasSendThrd) {
409  if (notify) engine->fCond.notify_all();
410  return 1;
411  }
412 
413  return RunSendingThrd(engine);
414 }
415 
416 
417 ////////////////////////////////////////////////////////////////////////////////
418 /// Send binary data with text header via given websocket id
419 /// Returns -1 - in case of error,
420 /// 0 - when operation was executed immediately,
421 /// 1 - when send operation will be performed in different thread,
422 
423 Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
424 {
425  auto engine = FindEngine(wsid, kTRUE);
426  if (!engine) return -1;
427 
428  if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
429  engine->SendHeader(hdr, buf, len);
430  return CompleteSend(engine);
431  }
432 
433  bool notify = false;
434 
435  // now we indicate that there is data and any thread can access it
436  {
437  std::lock_guard<std::mutex> grd(engine->fMutex);
438 
439  if (engine->fKind != THttpWSEngine::kNone) {
440  Error("SendWS", "Data kind is not empty - something screwed up");
441  return -1;
442  }
443 
444  notify = engine->fWaiting;
445 
446  engine->fKind = THttpWSEngine::kHeader;
447 
448  engine->fHdr = hdr;
449  engine->fData.resize(len);
450  std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
451  }
452 
453  if (engine->fHasSendThrd) {
454  if (notify) engine->fCond.notify_all();
455  return 1;
456  }
457 
458  return RunSendingThrd(engine);
459 }
460 
461 ////////////////////////////////////////////////////////////////////////////////
462 /// Send string via given websocket id
463 /// Returns -1 - in case of error,
464 /// 0 - when operation was executed immediately,
465 /// 1 - when send operation will be performed in different thread,
466 
467 Int_t THttpWSHandler::SendCharStarWS(UInt_t wsid, const char *str)
468 {
469  auto engine = FindEngine(wsid, kTRUE);
470  if (!engine) return -1;
471 
472  if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
473  engine->SendCharStar(str);
474  return CompleteSend(engine);
475  }
476 
477  bool notify = false;
478 
479  // now we indicate that there is data and any thread can access it
480  {
481  std::lock_guard<std::mutex> grd(engine->fMutex);
482 
483  if (engine->fKind != THttpWSEngine::kNone) {
484  Error("SendWS", "Data kind is not empty - something screwed up");
485  return -1;
486  }
487 
488  notify = engine->fWaiting;
489 
490  engine->fKind = THttpWSEngine::kText;
491  engine->fData = str;
492  }
493 
494  if (engine->fHasSendThrd) {
495  if (notify) engine->fCond.notify_all();
496  return 1;
497  }
498 
499  return RunSendingThrd(engine);
500 }