66 ClassImp(THttpWSHandler);
71 THttpWSHandler::THttpWSHandler(
const char *name,
const char *title, Bool_t syncmode) : TNamed(name, title), fSyncMode(syncmode)
79 THttpWSHandler::~THttpWSHandler()
83 std::vector<std::shared_ptr<THttpWSEngine>> clr;
86 std::lock_guard<std::mutex> grd(fMutex);
87 std::swap(clr, fEngines);
90 for (
auto &eng : clr) {
91 eng->fDisabled =
true;
92 if (eng->fHasSendThrd) {
93 eng->fHasSendThrd =
false;
95 eng->fCond.notify_all();
96 eng->fSendThrd.join();
98 eng->ClearHandle(kTRUE);
103 Int_t THttpWSHandler::GetNumWS()
105 std::lock_guard<std::mutex> grd(fMutex);
106 return fEngines.size();
113 UInt_t THttpWSHandler::GetWS(Int_t num)
115 std::lock_guard<std::mutex> grd(fMutex);
116 auto iter = fEngines.begin() + num;
117 return (*iter)->GetId();
124 std::shared_ptr<THttpWSEngine> THttpWSHandler::FindEngine(UInt_t wsid, Bool_t book_send)
129 std::lock_guard<std::mutex> grd(fMutex);
131 for (
auto &eng : fEngines)
132 if (eng->GetId() == wsid) {
140 Error(
"FindEngine",
"Try to book next send operation before previous completed");
143 eng->fMTSend = kTRUE;
154 void THttpWSHandler::RemoveEngine(std::shared_ptr<THttpWSEngine> &engine, Bool_t terminate)
159 std::lock_guard<std::mutex> grd(fMutex);
161 for (
auto iter = fEngines.begin(); iter != fEngines.end(); iter++)
162 if (*iter == engine) {
164 Error(
"RemoveEngine",
"Trying to remove WS engine during send operation");
166 engine->fDisabled =
true;
167 fEngines.erase(iter);
172 engine->ClearHandle(terminate);
174 if (engine->fHasSendThrd) {
175 engine->fHasSendThrd =
false;
176 if (engine->fWaiting)
177 engine->fCond.notify_all();
178 engine->fSendThrd.join();
190 Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg)
196 return ProcessWS(arg.get());
199 if (arg->IsMethod(
"WS_CONNECT"))
200 return ProcessWS(arg.get());
202 auto engine = FindEngine(arg->GetWSId());
204 if (arg->IsMethod(
"WS_READY")) {
207 Error(
"HandleWS",
"WS engine with similar id exists %u", arg->GetWSId());
208 RemoveEngine(engine, kTRUE);
211 engine = arg->TakeWSEngine();
213 std::lock_guard<std::mutex> grd(fMutex);
214 fEngines.emplace_back(engine);
217 if (!ProcessWS(arg.get())) {
219 RemoveEngine(engine, kTRUE);
226 if (arg->IsMethod(
"WS_CLOSE")) {
229 RemoveEngine(engine);
231 return ProcessWS(arg.get());
234 if (engine && engine->PreProcess(arg)) {
239 Bool_t res = ProcessWS(arg.get());
242 engine->PostProcess(arg);
250 void THttpWSHandler::CloseWS(UInt_t wsid)
252 auto engine = FindEngine(wsid);
254 RemoveEngine(engine, kTRUE);
262 Int_t THttpWSHandler::RunSendingThrd(std::shared_ptr<THttpWSEngine> engine)
264 if (IsSyncMode() || !engine->SupportSendThrd()) {
266 if (engine->CanSendDirectly())
267 return PerformSend(engine);
271 if (!IsSyncMode())
return 1;
276 Int_t sendcnt = fSendCnt, loopcnt(0);
278 while (!IsDisabled() && !engine->fDisabled) {
279 gSystem->ProcessEvents();
281 if (sendcnt != fSendCnt)
283 if (loopcnt++ > 1000) {
285 std::this_thread::sleep_for(std::chrono::milliseconds(1));
293 std::thread thrd([
this, engine] {
294 while (!IsDisabled() && !engine->fDisabled) {
296 if (IsDisabled() || engine->fDisabled)
break;
297 std::unique_lock<std::mutex> lk(engine->fMutex);
298 if (engine->fKind == THttpWSEngine::kNone) {
299 engine->fWaiting =
true;
300 engine->fCond.wait(lk);
301 engine->fWaiting =
false;
306 engine->fSendThrd.swap(thrd);
308 engine->fHasSendThrd =
true;
317 Int_t THttpWSHandler::PerformSend(std::shared_ptr<THttpWSEngine> engine)
320 std::lock_guard<std::mutex> grd(engine->fMutex);
323 if (engine->fKind == THttpWSEngine::kNone)
326 if (engine->fSending)
328 engine->fSending =
true;
331 if (IsDisabled() || engine->fDisabled)
334 switch (engine->fKind) {
335 case THttpWSEngine::kData:
336 engine->Send(engine->fData.data(), engine->fData.length());
338 case THttpWSEngine::kHeader:
339 engine->SendHeader(engine->fHdr.c_str(), engine->fData.data(), engine->fData.length());
341 case THttpWSEngine::kText:
342 engine->SendCharStar(engine->fData.c_str());
348 engine->fData.clear();
349 engine->fHdr.clear();
352 std::lock_guard<std::mutex> grd(engine->fMutex);
353 engine->fSending =
false;
354 engine->fKind = THttpWSEngine::kNone;
357 return CompleteSend(engine);
364 Int_t THttpWSHandler::CompleteSend(std::shared_ptr<THttpWSEngine> &engine)
367 engine->fMTSend =
false;
368 CompleteWSSend(engine->GetId());
379 Int_t THttpWSHandler::SendWS(UInt_t wsid,
const void *buf,
int len)
381 auto engine = FindEngine(wsid, kTRUE);
382 if (!engine)
return -1;
384 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
385 engine->Send(buf, len);
386 return CompleteSend(engine);
393 std::lock_guard<std::mutex> grd(engine->fMutex);
395 if (engine->fKind != THttpWSEngine::kNone) {
396 Error(
"SendWS",
"Data kind is not empty - something screwed up");
400 notify = engine->fWaiting;
402 engine->fKind = THttpWSEngine::kData;
404 engine->fData.resize(len);
405 std::copy((
const char *)buf, (
const char *)buf + len, engine->fData.begin());
408 if (engine->fHasSendThrd) {
409 if (notify) engine->fCond.notify_all();
413 return RunSendingThrd(engine);
423 Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid,
const char *hdr,
const void *buf,
int len)
425 auto engine = FindEngine(wsid, kTRUE);
426 if (!engine)
return -1;
428 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
429 engine->SendHeader(hdr, buf, len);
430 return CompleteSend(engine);
437 std::lock_guard<std::mutex> grd(engine->fMutex);
439 if (engine->fKind != THttpWSEngine::kNone) {
440 Error(
"SendWS",
"Data kind is not empty - something screwed up");
444 notify = engine->fWaiting;
446 engine->fKind = THttpWSEngine::kHeader;
449 engine->fData.resize(len);
450 std::copy((
const char *)buf, (
const char *)buf + len, engine->fData.begin());
453 if (engine->fHasSendThrd) {
454 if (notify) engine->fCond.notify_all();
458 return RunSendingThrd(engine);
467 Int_t THttpWSHandler::SendCharStarWS(UInt_t wsid,
const char *str)
469 auto engine = FindEngine(wsid, kTRUE);
470 if (!engine)
return -1;
472 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
473 engine->SendCharStar(str);
474 return CompleteSend(engine);
481 std::lock_guard<std::mutex> grd(engine->fMutex);
483 if (engine->fKind != THttpWSEngine::kNone) {
484 Error(
"SendWS",
"Data kind is not empty - something screwed up");
488 notify = engine->fWaiting;
490 engine->fKind = THttpWSEngine::kText;
494 if (engine->fHasSendThrd) {
495 if (notify) engine->fCond.notify_all();
499 return RunSendingThrd(engine);