47 void fastMergeServer(
bool cache =
false) {
51 TServerSocket *ss =
new TServerSocket(9090, kTRUE);
56 TMonitor *mon =
new TMonitor;
60 UInt_t clientCount = 0;
61 TMemFile *
transient = 0;
63 TFileMerger merger(kFALSE,kFALSE);
64 merger.SetPrintLevel(0);
72 if (cache)
new TFileCacheWrite(merger.GetOutputFile(),32*1024*1024);
79 if (s->IsA() == TServerSocket::Class()) {
80 if (clientCount > 100) {
81 printf(
"only accept 100 clients connections\n");
85 TSocket *client = ((TServerSocket *)s)->Accept();
86 client->Send(clientCount, kStartConnection);
87 client->Send(kProtocolVersion, kProtocol);
90 printf(
"Accept %d connections\n",clientCount);
98 Error(
"fastMergeServer",
"The client did not send a message\n");
99 }
else if (mess->What() == kMESS_STRING) {
101 mess->ReadString(str, 64);
102 printf(
"Client %d: %s\n", clientCount, str);
104 printf(
"Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
108 if (mon->GetActive() == 0 || clientCount == 0) {
109 printf(
"No more active clients... stopping\n");
112 }
else if (mess->What() == kMESS_ANY) {
117 mess->ReadInt(clientId);
118 mess->ReadTString(filename);
119 mess->ReadLong64(length);
121 Info(
"fastMergeServer",
"Receive input from client %d for %s",clientId,filename.Data());
124 transient =
new TMemFile(filename,mess->Buffer() + mess->Length(),length);
125 mess->SetBufferOffset(mess->Length()+length);
126 merger.OutputFile(filename,
"UPDATE");
127 merger.AddAdoptFile(
transient);
129 merger.PartialMerge(TFileMerger::kAllIncremental);
131 }
else if (mess->What() == kMESS_OBJECT) {
132 printf(
"got object of class: %s\n", mess->GetClass()->GetName());
134 printf(
"*** Unexpected message ***\n");