#include "TMessage.h" #include "TBenchmark.h" #include "TSocket.h" #include "TH2.h" #include "TTree.h" #include "TMemFile.h" #include "TRandom.h" #include "TError.h" #include "TFileMerger.h" #include "TServerSocket.h" #include "TPad.h" #include "TCanvas.h" #include "TMonitor.h" #include "TFileCacheWrite.h" #include "TSystem.h" #include "THashTable.h" #include "TMath.h" #include "TTimeStamp.h" const int kIncremental = 0; const int kReplaceImmediately = 1; const int kReplaceWait = 2; #include "TKey.h" static Bool_t R__NeedInitialMerge(TDirectory *dir) { if (dir==0) return kFALSE; TIter nextkey(dir->GetListOfKeys()); TKey *key; while( (key = (TKey*)nextkey()) ) { TClass *cl = TClass::GetClass(key->GetClassName()); if (cl->InheritsFrom(TDirectory::Class())) { TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName()); if (!subdir) { subdir = (TDirectory *)key->ReadObj(); } if (R__NeedInitialMerge(subdir)) { return kTRUE; } } else { if (0 != cl->GetResetAfterMerge()) { return kTRUE; } } } return kFALSE; } static void R__DeleteObject(TDirectory *dir, Bool_t withReset) { if (dir==0) return; TIter nextkey(dir->GetListOfKeys()); TKey *key; while( (key = (TKey*)nextkey()) ) { TClass *cl = TClass::GetClass(key->GetClassName()); if (cl->InheritsFrom(TDirectory::Class())) { TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName()); if (!subdir) { subdir = (TDirectory *)key->ReadObj(); } R__DeleteObject(subdir,withReset); } else { Bool_t todelete = kFALSE; if (withReset) { todelete = (0 != cl->GetResetAfterMerge()); } else { todelete = (0 == cl->GetResetAfterMerge()); } if (todelete) { key->Delete(); dir->GetListOfKeys()->Remove(key); delete key; } } } } static void R__MigrateKey(TDirectory *destination, TDirectory *source) { if (destination==0 || source==0) return; TIter nextkey(source->GetListOfKeys()); TKey *key; while( (key = (TKey*)nextkey()) ) { TClass *cl = TClass::GetClass(key->GetClassName()); if (cl->InheritsFrom(TDirectory::Class())) { TDirectory *source_subdir = (TDirectory *)source->GetList()->FindObject(key->GetName()); if (!source_subdir) { source_subdir = (TDirectory *)key->ReadObj(); } TDirectory *destination_subdir = destination->GetDirectory(key->GetName()); if (!destination_subdir) { destination_subdir = destination->mkdir(key->GetName()); } R__MigrateKey(destination,source); } else { TKey *oldkey = destination->GetKey(key->GetName()); if (oldkey) { oldkey->Delete(); delete oldkey; } TKey *newkey = new TKey(destination,*key,0 /* pidoffset */); // a priori the file are from the same client .. destination->GetFile()->SumBuffer(newkey->GetObjlen()); newkey->WriteFile(0); if (destination->GetFile()->TestBit(TFile::kWriteError)) { return; } } } destination->SaveSelf(); } struct ClientInfo { TFile *fFile; // This object does *not* own the file, it will be own by the owner of the ClientInfo. TString fLocalName; UInt_t fContactsCount; TTimeStamp fLastContact; Double_t fTimeSincePrevContact; ClientInfo() : fFile(0), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {} ClientInfo(const char *filename, UInt_t clientId) : fFile(0), fContactsCount(0), fTimeSincePrevContact(0) { fLocalName.Form("%s-%d-%d",filename,clientId,gSystem->GetPid()); } void Set(TFile *file) { // Register the new file as coming from this client. if (file != fFile) { // We need to keep any of the keys from the previous file that // are not in the new file. if (fFile) { R__MigrateKey(fFile,file); // delete the previous memory file (if any) delete file; } else { fFile = file; } } TTimeStamp now; fTimeSincePrevContact = now.AsDouble() - fLastContact.AsDouble(); fLastContact = now; ++fContactsCount; } }; struct ParallelFileMerger : public TObject { typedef std::vector ClientColl_t; TString fFilename; TBits fClientsContact; // UInt_t fNClientsContact; // ClientColl_t fClients; TTimeStamp fLastMerge; TFileMerger fMerger; ParallelFileMerger(const char *filename, Bool_t writeCache = kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(kFALSE,kTRUE) { // Default constructor. fMerger.SetPrintLevel(0); fMerger.OutputFile(filename,"RECREATE"); if (writeCache) new TFileCacheWrite(fMerger.GetOutputFile(),32*1024*1024); } ~ParallelFileMerger() { // Destructor. for(unsigned int f = 0 ; f < fClients.size(); ++f) { fprintf(stderr,"Client %d reported %u times\n",f,fClients[f].fContactsCount); } for( ClientColl_t::iterator iter = fClients.begin(); iter != fClients.end(); ++iter) { delete iter->fFile; } } ULong_t Hash() const { // Return hash value for this object. return fFilename.Hash(); } const char *GetName() const { // Return the name of the object which is the name of the output file. return fFilename; } Bool_t InitialMerge(TFile *input) { // Initial merge of the input to copy the resetable object (TTree) into the output // and remove them from the input file. fMerger.AddFile(input); Bool_t result = fMerger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kResetable); R__DeleteObject(input,kTRUE); return result; } Bool_t Merge() { // Merge the current inputs into the output file. R__DeleteObject(fMerger.GetOutputFile(),kFALSE); // Remove object that can *not* be incrementally merge and will *not* be reset by the client code. for(unsigned int f = 0 ; f < fClients.size(); ++f) { fMerger.AddFile(fClients[f].fFile); } Bool_t result = fMerger.PartialMerge(TFileMerger::kAllIncremental); // Remove any 'resetable' object (like TTree) from the input file so that they will not // be re-merged. Keep only the object that always need to be re-merged (Histograms). for(unsigned int f = 0 ; f < fClients.size(); ++f) { if (fClients[f].fFile) { R__DeleteObject(fClients[f].fFile,kTRUE); } else { // We back up the file (probably due to memory constraint) TFile *file = TFile::Open(fClients[f].fLocalName,"UPDATE"); R__DeleteObject(file,kTRUE); // Remove object that can be incrementally merge and will be reset by the client code. file->Write(); delete file; } } fLastMerge = TTimeStamp(); fNClientsContact = 0; fClientsContact.Clear(); return result; } Bool_t NeedFinalMerge() { // Return true, if there is any data that has not been merged. return fClientsContact.CountBits() > 0; } Bool_t NeedMerge(Float_t clientThreshold) { // Return true, if enough client have reported if (fClients.size()==0) { return kFALSE; } // Calculate average and rms of the time between the last 2 contacts. Double_t sum = 0; Double_t sum2 = 0; for(unsigned int c = 0 ; c < fClients.size(); ++c) { sum += fClients[c].fTimeSincePrevContact; sum2 += fClients[c].fTimeSincePrevContact*fClients[c].fTimeSincePrevContact; } Double_t avg = sum / fClients.size(); Double_t sigma = sum2 ? TMath::Sqrt( sum2 / fClients.size() - avg*avg) : 0; Double_t target = avg + 2*sigma; TTimeStamp now; if ( (now.AsDouble() - fLastMerge.AsDouble()) > target) { // Float_t cut = clientThreshold * fClients.size(); // if (!(fClientsContact.CountBits() > cut )) { // for(unsigned int c = 0 ; c < fClients.size(); ++c) { // fprintf(stderr,"%d:%f ",c,fClients[c].fTimeSincePrevContact); // } // fprintf(stderr,"merge:%f avg:%f target:%f\n",(now.AsDouble() - fLastMerge.AsDouble()),avg,target); // } return kTRUE; } Float_t cut = clientThreshold * fClients.size(); return fClientsContact.CountBits() > cut || fNClientsContact > 2*cut; } void RegisterClient(UInt_t clientId, TFile *file) { // Register that a client has sent a file. ++fNClientsContact; fClientsContact.SetBitNumber(clientId); if (fClients.size() < clientId+1) { fClients.push_back( ClientInfo(fFilename,clientId) ); } fClients[clientId].Set(file); } ClassDef(ParallelFileMerger,0); }; void parallelMergeServer(bool cache = false) { // This script shows how to make a simple iterative server that // can accept connections while handling currently open connections. // Compare this script to hserv.C that blocks on accept. // In this script a server socket is created and added to a monitor. // A monitor object is used to monitor connection requests on // the server socket. After accepting the connection // the new socket is added to the monitor and immediately ready // for use. Once two connections are accepted the server socket // is removed from the monitor and closed. The monitor continues // monitoring the sockets. // // To run this demo do the following: // - Open three windows // - Start ROOT in all three windows // - Execute in the first window: .x hserv2.C // - Execute in the second and third windows: .x hclient.C //Author: Fons Rademakers // Open a server socket looking for connections on a named service or // on a specified port. //TServerSocket *ss = new TServerSocket("rootserv", kTRUE); TServerSocket *ss = new TServerSocket(1095, kTRUE, 100); if (!ss->IsValid()) { return; } TMonitor *mon = new TMonitor; mon->Add(ss); UInt_t clientCount = 0; UInt_t clientIndex = 0; THashTable mergers; enum StatusKind { kStartConnection = 0, kProtocol = 1, kProtocolVersion = 1 }; printf("fastMergeServerHist ready to accept connections\n"); while (1) { TMessage *mess; TSocket *s; // NOTE: this needs to be update to handle the case where the client // dies. s = mon->Select(); if (s->IsA() == TServerSocket::Class()) { if (clientCount > 100) { printf("only accept 100 clients connections\n"); mon->Remove(ss); ss->Close(); } else { TSocket *client = ((TServerSocket *)s)->Accept(); client->Send(clientIndex, kStartConnection); client->Send(kProtocolVersion, kProtocol); ++clientCount; ++clientIndex; mon->Add(client); printf("Accept %d connections\n",clientCount); } continue; } s->Recv(mess); if (mess==0) { Error("fastMergeServer","The client did not send a message\n"); } else if (mess->What() == kMESS_STRING) { char str[64]; mess->ReadString(str, 64); printf("Client %d: %s\n", clientCount, str); mon->Remove(s); printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(), s->GetBytesSent()); s->Close(); --clientCount; if (mon->GetActive() == 0 || clientCount == 0) { printf("No more active clients... stopping\n"); break; } } else if (mess->What() == kMESS_ANY) { Long64_t length; TString filename; Int_t clientId; mess->ReadInt(clientId); mess->ReadTString(filename); mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t. // Info("fastMergeServerHist","Received input from client %d for %s",clientId,filename.Data()); TMemFile *transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length,"UPDATE"); // UPDATE because we need to remove the TTree after merging them. mess->SetBufferOffset(mess->Length()+length); const Float_t clientThreshold = 0.75; // control how often the histogram are merged. Here as soon as half the clients have reported. ParallelFileMerger *info = (ParallelFileMerger*)mergers.FindObject(filename); if (!info) { info = new ParallelFileMerger(filename,cache); mergers.Add(info); } if (R__NeedInitialMerge(transient)) { info->InitialMerge(transient); } info->RegisterClient(clientId,transient); if (info->NeedMerge(clientThreshold)) { // Enough clients reported. Info("fastMergeServerHist","Merging input from %ld clients (%d)",info->fClients.size(),clientId); info->Merge(); } transient = 0; } else if (mess->What() == kMESS_OBJECT) { printf("got object of class: %s\n", mess->GetClass()->GetName()); } else { printf("*** Unexpected message ***\n"); } delete mess; } TIter next(&mergers); ParallelFileMerger *info; while ( (info = (ParallelFileMerger*)next()) ) { if (info->NeedFinalMerge()) { info->Merge(); } } mergers.Delete(); delete mon; delete ss; }