35 TParallelMergingFile::TParallelMergingFile(
const char *filename, Option_t *option ,
36 const char *ftitle , Int_t compress ) :
37 TMemFile(filename,option,ftitle,compress),fSocket(0),fServerIdx(-1),fServerVersion(0),fClassSent(0),fMessage(kMESS_OBJECT)
39 TString serverurl = strstr(fUrl.GetOptions(),
"pmerge=");
40 if (serverurl.Length()) {
41 serverurl.ReplaceAll(
"pmerge=",
"pmerge://");
42 fServerLocation = TUrl(serverurl);
49 TParallelMergingFile::~TParallelMergingFile()
59 void TParallelMergingFile::Close(Option_t *option)
61 TMemFile::Close(option);
63 if (0==fSocket->Send(
"Finished")) {
64 Warning(
"Close",
"Failed to send the finishing message to the server %s:%d",fServerLocation.GetHost(),fServerLocation.GetPort());
76 Bool_t TParallelMergingFile::UploadAndReset()
80 const char *host = fServerLocation.GetHost();
81 Int_t port = fServerLocation.GetPort();
82 if (host == 0 || host[0] ==
'\0') {
88 fSocket =
new TSocket(host,port);
89 if (!fSocket->IsValid()) {
90 Error(
"UploadAndReset",
"Could not contact the server %s:%d\n",host,port);
98 Int_t n = fSocket->Recv(fServerIdx, kind);
100 if (n < 0 && kind != 0 )
102 Error(
"UploadAndReset",
"Unexpected server message: kind=%d idx=%d\n",kind,fServerIdx);
107 n = fSocket->Recv(fServerVersion, kind);
108 if (n < 0 && kind != 1 )
110 Fatal(
"UploadAndReset",
"Unexpected server message: kind=%d status=%d\n",kind,fServerVersion);
112 Info(
"UploadAndReset",
"Connected to fastMergeServer version %d with index %d\n",fServerVersion,fServerIdx);
114 TMessage::EnableSchemaEvolutionForAll(kTRUE);
117 fMessage.Reset(kMESS_ANY);
118 fMessage.WriteInt(fServerIdx);
119 fMessage.WriteTString(GetName());
120 fMessage.WriteLong64(GetEND());
125 if ((error = fSocket->Send(fMessage)) <= 0) {
126 Error(
"UploadAndReset",
"Upload to the merging server failed with %d\n",error);
133 Int_t isize = fClassIndex->GetSize();
135 fClassSent =
new TArrayC(isize);
137 if (isize > fClassSent->GetSize()) {
138 fClassSent->Set(isize);
141 for(Int_t c = 0; c < isize; ++c) {
142 if (fClassIndex->fArray[c]) {
143 fClassSent->fArray[c] = 1;
165 Int_t TParallelMergingFile::Write(
const char *, Int_t opt, Int_t bufsiz)
167 Int_t nbytes = TMemFile::Write(0,opt,bufsiz);
177 Int_t TParallelMergingFile::Write(
const char *n, Int_t opt, Int_t bufsize)
const
179 Error(
"Write const",
"A const TFile object should not be saved. We try to proceed anyway.");
180 return const_cast<TParallelMergingFile*
>(
this)->Write(n, opt, bufsize);
188 void TParallelMergingFile::WriteStreamerInfo()
190 if (!fWritable)
return;
191 if (!fClassIndex)
return;
193 if (fClassIndex->fArray[0] == 0)
return;
197 Int_t isize = fClassIndex->GetSize();
198 Int_t ssize = fClassSent->GetSize();
199 for(Int_t c = 0; c < isize && c < ssize; ++c) {
200 if (fClassSent->fArray[c]) {
201 fClassIndex->fArray[c] = 0;
206 TMemFile::WriteStreamerInfo();