23 #include <sys/socket.h>
51 TMPClient::TMPClient(
unsigned nWorkers) : fIsParent(true), fWorkerPids(), fMon(), fNWorkers(0)
58 if (gSystem->GetSysInfo(&si) == 0)
70 TMPClient::~TMPClient()
72 Broadcast(MPCode::kShutdownOrder);
73 TList *l = fMon.GetListOfActives();
76 l = fMon.GetListOfDeActives();
88 using Py_IsInitialized_type = int (*)(void);
89 using PyGILState_Ensure_type =
void* (*)(void);
90 using PyGILState_Release_type = void (*)(
void*);
91 void* fPyGILState_STATE =
nullptr;
92 template<
class FPTYPE>
93 FPTYPE GetSymT(
const char* name) {
return (FPTYPE) dlsym(
nullptr,name);}
97 auto Py_IsInitialized = GetSymT<Py_IsInitialized_type>(
"Py_IsInitialized");
98 if (!Py_IsInitialized || !Py_IsInitialized())
return;
99 auto PyGILState_Ensure = GetSymT<PyGILState_Ensure_type>(
"PyGILState_Ensure");
100 if (PyGILState_Ensure) fPyGILState_STATE = PyGILState_Ensure();
105 auto PyGILState_Release = GetSymT<PyGILState_Release_type>(
"PyGILState_Release");
106 if (fPyGILState_STATE && PyGILState_Release) PyGILState_Release(fPyGILState_STATE);
128 bool TMPClient::Fork(TMPWorker &server)
130 std::string basePath =
"/tmp/ROOTMP-";
135 unsigned nWorker = 0;
136 for (; nWorker < fNWorkers; ++nWorker) {
138 int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sockets);
140 Error(
"TMPClient::Fork",
"[E][C] Could not create socketpair. Error n. . Now retrying.\n%d", errno);
147 ROOT::Internal::TGILRAII tgilraai;
157 TSocket *s =
new TSocket(sockets[0], (std::to_string(pid)).c_str());
158 if (s && s->IsValid()) {
160 fWorkerPids.push_back(pid);
162 Error(
"TMPClient::Fork",
"[E][C] Could not connect to worker with pid %d. Giving up.\n", pid);
177 TSeqCollection *signalHandlers = gSystem->GetListOfSignalHandlers();
178 TSignalHandler *sh =
nullptr;
179 if (signalHandlers && signalHandlers->GetSize() > 0)
180 sh = (TSignalHandler *)signalHandlers->First();
182 gSystem->RemoveSignalHandler(sh);
185 TSeqCollection *fileHandlers = gSystem->GetListOfFileHandlers();
187 for (
auto h : *fileHandlers) {
188 if (h && ((TFileHandler *)h)->GetFd() == 0) {
189 gSystem->RemoveFileHandler((TFileHandler *)h);
195 if (fMon.GetListOfActives()) {
196 while (fMon.GetListOfActives()->GetSize() > 0) {
197 TSocket *s = (TSocket *) fMon.GetListOfActives()->First();
202 if (fMon.GetListOfDeActives()) {
203 while (fMon.GetListOfDeActives()->GetSize() > 0) {
204 TSocket *s = (TSocket *) fMon.GetListOfDeActives()->First();
212 if (gGuiFactory != gBatchGuiFactory)
214 gGuiFactory = gBatchGuiFactory;
216 if (gVirtualX != gGXBatch)
219 gVirtualX = gGXBatch;
222 server.Init(sockets[1], nWorker);
248 unsigned TMPClient::Broadcast(
unsigned code,
unsigned nMessages)
251 nMessages = fNWorkers;
256 std::unique_ptr<TList> lp(fMon.GetListOfActives());
258 if (count == nMessages)
260 if (MPSend((TSocket *)s, code)) {
261 fMon.DeActivate((TSocket *)s);
264 Error(
"TMPClient:Broadcast",
"[E] Could not send message to server\n");
281 void TMPClient::DeActivate(TSocket *s)
295 void TMPClient::Remove(TSocket *s)
308 void TMPClient::ReapWorkers()
310 for (
auto &pid : fWorkerPids) {
311 waitpid(pid,
nullptr, 0);
329 void TMPClient::HandleMPCode(MPCodeBufPair &msg, TSocket *s)
331 unsigned code = msg.first;
333 const char *str = ReadBuffer<const char*>(msg.second.get());
335 if (code == MPCode::kMessage) {
336 Error(
"TMPClient::HandleMPCode",
"[I][C] message received: %s\n", str);
337 }
else if (code == MPCode::kError) {
338 Error(
"TMPClient::HandleMPCode",
"[E][C] error message received: %s\n", str);
339 }
else if (code == MPCode::kShutdownNotice || code == MPCode::kFatalError) {
341 Error(
"TMPClient::HandleMPCode",
"[I][C] shutdown notice received from %s\n", str);
344 Error(
"TMPClient::HandleMPCode",
"[W][C] unknown code received. code=%d\n", code);