Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TPacketizerAdaptive.h
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Jan Iwaszkiewicz 11/12/06
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TPacketizerAdaptive
13 #define ROOT_TPacketizerAdaptive
14 
15 //////////////////////////////////////////////////////////////////////////
16 // //
17 // TPacketizerAdaptive //
18 // //
19 // This packetizer is based on TPacketizer but uses different //
20 // load-balancing algorithms and data structures. //
21 // Two main improvements in the load-balancing strategy: //
22 // - First one was to change the order in which the files are assigned //
23 // to the computing nodes in such a way that network transfers are //
24 // evenly distributed in the query time. Transfer of the remote files //
25 // was often becoming a bottleneck at the end of a query. //
26 // - The other improvement is the use of time-based packet size. We //
27 // measure the processing rate of all the nodes and calculate the //
28 // packet size, so that it takes certain amount of time. In this way //
29 // packetizer prevents the situation where the query can't finish //
30 // because of one slow node. //
31 // //
32 // The data structures: TFileStat, TFileNode and TSlaveStat are //
33 // enriched + changed and TFileNode::Compare method is changed. //
34 // //
35 //////////////////////////////////////////////////////////////////////////
36 
37 #include "TVirtualPacketizer.h"
38 
39 
40 class TMessage;
41 class TTree;
42 class TMap;
43 class TNtupleD;
44 class TProofStats;
45 class TRandom;
46 class TSortedList;
47 
48 class TPacketizerAdaptive : public TVirtualPacketizer {
49 
50 public: // public because of Sun CC bug
51  class TFileNode;
52  class TFileStat;
53  class TSlaveStat;
54 
55 private:
56  TList *fFileNodes; // nodes with files
57  TList *fUnAllocated; // nodes with unallocated files
58  TList *fActive; // nodes with unfinished files
59  Int_t fMaxPerfIdx; // maximum of our slaves' performance index
60  TList *fPartitions; // list of partitions on nodes
61 
62  TSortedList *fFilesToProcess; // Global list of files (TFileStat) to be processed
63 
64  Bool_t fCachePacketSync; // control synchronization of cache and packet sizes
65  Double_t fMaxEntriesRatio; // max file entries to avg allowed ratio for cache-to-packet sync
66 
67  Float_t fFractionOfRemoteFiles; // fraction of TDSetElements that are on non-workers
68  Long64_t fNEventsOnRemLoc; // number of events in currently
69  // unalloc files on non-worker loc.
70  Float_t fBaseLocalPreference; // indicates how much more likely the nodes will be
71  // to open their local files (1 means indifferent)
72  Bool_t fForceLocal; // if 1 - eliminate the remote processing
73 
74  Long_t fMaxSlaveCnt; // maximum number of workers per filenode (Long_t to avoid
75  // warnings from backward compatibility support)
76  Int_t fPacketAsAFraction; // used to calculate the packet size
77  // fPacketSize = fTotalEntries / (fPacketAsAFraction * nslaves)
78  // fPacketAsAFraction can be interpreted as follows:
79  // assuming all slaves have equal processing rate, packet size
80  // is (#events processed by 1 slave) / fPacketSizeAsAFraction.
81  // It can be set with PROOF_PacketAsAFraction in input list.
82  Int_t fStrategy; // 0 means the classic and 1 (default) - the adaptive strategy
83  Int_t fTryReassign; // Controls attempts to reassign packets (0 == no reassignment)
84 
85  TPacketizerAdaptive();
86  TPacketizerAdaptive(const TPacketizerAdaptive&); // no implementation, will generate
87  void InitStats(); // initialise the stats
88  void operator=(const TPacketizerAdaptive&); // error on accidental usage
89 
90  TFileNode *NextNode();
91  void RemoveUnAllocNode(TFileNode *);
92 
93  TFileNode *NextActiveNode();
94  void RemoveActiveNode(TFileNode *);
95 
96  TFileStat *GetNextUnAlloc(TFileNode *node = 0, const char *nodeHostName = 0);
97  TFileStat *GetNextActive();
98  void RemoveActive(TFileStat *file);
99 
100  void Reset();
101  void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent = -1, Bool_t byfile = kFALSE);
102  Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles);
103  void SplitPerHost(TList *elements, TList **listOfMissingFiles);
104 
105 public:
106  TPacketizerAdaptive(TDSet *dset, TList *slaves, Long64_t first, Long64_t num,
107  TList *input, TProofProgressStatus *st);
108  virtual ~TPacketizerAdaptive();
109 
110  Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st,
111  Double_t latency, TList **listOfMissingFiles = 0);
112  Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls);
113  Float_t GetCurrentRate(Bool_t &all);
114  Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent);
115  TDSetElement *GetNextPacket(TSlave *sl, TMessage *r);
116  void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles);
117 
118  Int_t GetActiveWorkers();
119 
120  ClassDef(TPacketizerAdaptive,0) //Generate work packets for parallel processing
121 };
122 
123 #endif