38 ClassImp(TPacketizerMulti);
43 TPacketizerMulti::TPacketizerMulti(TDSet *dset, TList *wrks,
44 Long64_t first, Long64_t num,
45 TList *input, TProofProgressStatus *st)
46 : TVirtualPacketizer(input, st)
48 PDB(kPacketizer,1) Info("TPacketizerMulti",
49 "enter (first %lld, num %lld)", first, num);
56 if (!dset || !wrks || !input || !st) {
57 Error(
"TPacketizerMulti",
"invalid inputs: dset:%p wrks:%p input:%p st:%p",
58 dset, wrks, input, st);
62 fPacketizers =
new TList;
65 TNamed *progTimerFlag =
new TNamed(
"PROOF_StartProgressTimer",
"no");
66 input->Add(progTimerFlag);
69 TVirtualPacketizer *packetizer = 0;
71 if (!(dset->TestBit(TDSet::kMultiDSet))) {
72 if ((packetizer = CreatePacketizer(dset, wrks, first, num, input, st))) {
73 fPacketizers->Add(packetizer);
74 fTotalEntries = packetizer->GetTotalEntries();
76 Error(
"TPacketizerMulti",
"problems initializing packetizer for single dataset");
77 input->Remove(progTimerFlag);
83 TIter nxds(dset->GetListOfElements());
85 while ((ds = (TDSet *)nxds())) {
86 if ((packetizer = CreatePacketizer(ds, wrks, first, num, input, st))) {
87 fPacketizers->Add(packetizer);
88 fTotalEntries += packetizer->GetTotalEntries();
90 Error(
"TPacketizerMulti",
"problems initializing packetizer for dataset '%s'", ds->GetName());
95 input->Remove(progTimerFlag);
99 if (fPacketizers->GetSize() <= 0) {
100 Error(
"TPacketizerMulti",
"no valid packetizer could be initialized - aborting");
101 SafeDelete(fPacketizers);
104 Info(
"TPacketizerMulti",
"%d packetizer(s) have been successfully initialized (%lld events in total)",
105 fPacketizers->GetSize(), fTotalEntries);
107 TIter nxp(fPacketizers);
108 while ((packetizer = (TVirtualPacketizer *) nxp()))
109 packetizer->SetTotalEntries(fTotalEntries);
113 fPacketizersIter =
new TIter(fPacketizers);
116 if (!(fCurrent = (TVirtualPacketizer *) fPacketizersIter->Next())) {
118 Error(
"TPacketizerMulti",
"could not point to the first valid packetizer");
119 fPacketizers->SetOwner(kTRUE);
120 SafeDelete(fPacketizers);
121 SafeDelete(fPacketizersIter);
126 fAssignedPack =
new TMap;
131 PDB(kPacketizer,1) Info("TPacketizerMulti", "done");
137 TPacketizerMulti::~TPacketizerMulti()
140 fPacketizers->SetOwner(kTRUE);
141 SafeDelete(fPacketizers);
143 SafeDelete(fPacketizers);
146 fAssignedPack->SetOwner(kFALSE);
147 SafeDelete(fAssignedPack);
149 SafeDelete(fPacketizersIter);
157 TDSetElement *TPacketizerMulti::GetNextPacket(TSlave *wrk, TMessage *r)
159 TDSetElement *elem = 0;
162 if (!fValid)
return elem;
165 TVirtualPacketizer *lastPacketizer =
dynamic_cast<TVirtualPacketizer *
>(fAssignedPack->GetValue(wrk));
166 if (lastPacketizer && lastPacketizer != fCurrent) {
168 Info("GetNextPacket", "%s: asking old packetizer %p ... ", wrk->GetOrdinal(), lastPacketizer);
169 if ((elem = lastPacketizer->GetNextPacket(wrk, r))) return elem;
172 TVirtualSlaveStat *oldstat =
dynamic_cast<TVirtualSlaveStat *
>(lastPacketizer->GetSlaveStats()->GetValue(wrk));
173 TVirtualSlaveStat *curstat =
dynamic_cast<TVirtualSlaveStat *
>(fCurrent->GetSlaveStats()->GetValue(wrk));
174 if (oldstat && curstat)
175 *(curstat->GetProgressStatus()) += *(oldstat->GetProgressStatus());
187 Info("GetNextPacket", "%s: asking current packetizer %p ... ", wrk->GetOrdinal(), fCurrent);
188 if (!(elem = fCurrent->GetNextPacket(wrk, r))) {
190 TMap *oldStats = (lastPacketizer && lastPacketizer == fCurrent) ? lastPacketizer->GetSlaveStats() : 0;
192 fCurrent = (TVirtualPacketizer *) fPacketizersIter->Next();
196 TVirtualSlaveStat *oldstat =
dynamic_cast<TVirtualSlaveStat *
>(oldStats->GetValue(wrk));
197 TVirtualSlaveStat *curstat =
dynamic_cast<TVirtualSlaveStat *
>(fCurrent->GetSlaveStats()->GetValue(wrk));
198 if (oldstat && curstat)
199 *(curstat->GetProgressStatus()) += *(oldstat->GetProgressStatus());
202 Info("GetNextPacket", "%s: asking new packetizer %p ... ", wrk->GetOrdinal(), fCurrent);
203 elem = fCurrent->GetNextPacket(wrk, r);
208 TPair *pair =
dynamic_cast<TPair *
>(fAssignedPack->FindObject(wrk));
210 pair->SetValue(fCurrent);
212 fAssignedPack->Add(wrk, fCurrent);
215 Info("GetNextPacket", "assigned packetizer %p to %s (check: %p)",
216 fCurrent, wrk->GetOrdinal(), fAssignedPack->GetValue(wrk));
220 if (fProgressStatus->GetEntries() >= fTotalEntries) {
221 if (fProgressStatus->GetEntries() > fTotalEntries)
222 Error(
"GetNextPacket",
"Processed too many entries!");
224 SafeDelete(fProgress);
235 TVirtualPacketizer *TPacketizerMulti::CreatePacketizer(TDSet *dset, TList *wrks,
236 Long64_t first, Long64_t num,
237 TList *input, TProofProgressStatus *st)
239 TVirtualPacketizer *packetizer = 0;
242 if (!dset || !wrks || !input || !st) {
243 Error(
"CreatePacketizer",
"invalid inputs: dset:%p wrks:%p input:%p st:%p",
244 dset, wrks, input, st);
249 if (dset->TestBit(TDSet::kEmpty)) {
250 Error(
"CreatePacketizer",
"dataset is empty: protocol error?");
254 TString packetizername;
255 TList *listOfMissingFiles = 0;
264 if (!(listOfMissingFiles = (TList *) input->FindObject(
"MissingFiles"))) {
266 listOfMissingFiles =
new TList;
268 input->Add(listOfMissingFiles);
270 dset->Lookup(kTRUE, &listOfMissingFiles);
272 if (!(dset->GetListOfElements()) ||
273 !(dset->GetListOfElements()->GetSize())) {
274 Error(
"CreatePacketizer",
"no files from the data set were found - skipping");
278 if (TProof::GetParameter(input,
"PROOF_Packetizer", packetizername) != 0) {
280 packetizername =
"TPacketizer";
282 Info(
"CreatePacketizer",
"using alternate packetizer: %s", packetizername.Data());
286 cl = TClass::GetClass(packetizername);
288 Error(
"CreatePacketizer",
"class '%s' not found", packetizername.Data());
293 callEnv.InitWithPrototype(cl, cl->GetName(),
"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
294 if (!callEnv.IsValid()) {
295 Error(
"CreatePacketizer",
"cannot find correct constructor for '%s'", cl->GetName());
298 callEnv.ResetParam();
299 callEnv.SetParam((Long_t) dset);
300 callEnv.SetParam((Long_t) wrks);
301 callEnv.SetParam((Long64_t) first);
302 callEnv.SetParam((Long64_t) num);
303 callEnv.SetParam((Long_t) input);
304 callEnv.SetParam((Long_t) st);
307 dset->SetBit(TDSet::kValidityChecked);
308 dset->ResetBit(TDSet::kSomeInvalid);
312 callEnv.Execute(ret);
313 if ((packetizer = (TVirtualPacketizer *)ret) == 0) {
314 Error(
"CreatePacketizer",
"cannot construct '%s'", cl->GetName());
318 if (!packetizer->IsValid()) {
319 Error(
"CreatePacketizer",
320 "instantiated packetizer object '%s' is invalid", cl->GetName());
321 SafeDelete(packetizer);
325 TDSetElement *elem = 0;
326 if (dset->TestBit(TDSet::kSomeInvalid)) {
327 TIter nxe(dset->GetListOfElements());
328 while ((elem = (TDSetElement *)nxe())) {
329 if (!elem->GetValid()) {
330 listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
331 dset->Remove(elem, kFALSE);
335 dset->ResetBit(TDSet::kSomeInvalid);