/*****************************************************************************
* Project: RooFit *
* Package: RooFitCore *
* @(#)root/roofitcore:$Id$
* Authors: *
* WV, Wouter Verkerke, UC Santa Barbara, verkerke@slac.stanford.edu *
* DK, David Kirkby, UC Irvine, dkirkby@uci.edu *
* *
* Copyright (c) 2000-2005, Regents of the University of California *
* and Stanford University. All rights reserved. *
* *
* Redistribution and use in source and binary forms, *
* with or without modification, are permitted according to the terms *
* listed in LICENSE (http://roofit.sourceforge.net/license.txt) *
*****************************************************************************/
//////////////////////////////////////////////////////////////////////////////
//
// BEGIN_HTML
// RooRealMPFE is the multi-processor front-end for parallel calculation
// of RooAbsReal objects. Each RooRealMPFE forks a process that calculates
// the value of the proxies RooAbsReal object. The (re)calculation of
// the proxied object is started asynchronously with the calculate() option.
// A subsequent call to getVal() will return the calculated value when available
// If the calculation is still in progress when getVal() is called it blocks
// the calling process until the calculation is done. The forked calculation process
// is terminated when the front-end object is deleted
// Simple use demonstration
//
//
// RooAbsReal* slowFunc ;
//
// Double_t val = slowFunc->getVal() // Evaluate slowFunc in current process
//
// RooRealMPFE mpfe("mpfe","frontend to slowFunc",*slowFunc) ;
// mpfe.calculate() ; // Start calculation of slow-func in remote process
// // .. do other stuff here ..
// Double_t val = mpfe.getVal() // Wait for remote calculation to finish and retrieve value
//
//
// END_HTML
//
#include "Riostream.h"
#include "RooFit.h"
#ifndef _WIN32
#include "BidirMMapPipe.h"
#endif
#include
#include
#include "RooRealMPFE.h"
#include "RooArgSet.h"
#include "RooAbsCategory.h"
#include "RooRealVar.h"
#include "RooCategory.h"
#include "RooMPSentinel.h"
#include "RooMsgService.h"
#include "RooNLLVar.h"
#include "RooTrace.h"
#include "TSystem.h"
RooMPSentinel RooRealMPFE::_sentinel ;
using namespace std;
using namespace RooFit;
ClassImp(RooRealMPFE)
;
//_____________________________________________________________________________
RooRealMPFE::RooRealMPFE(const char *name, const char *title, RooAbsReal& arg, Bool_t calcInline) :
RooAbsReal(name,title),
_state(Initialize),
_arg("arg","arg",this,arg),
_vars("vars","vars",this),
_calcInProgress(kFALSE),
_verboseClient(kFALSE),
_verboseServer(kFALSE),
_inlineMode(calcInline),
_remoteEvalErrorLoggingState(RooAbsReal::PrintErrors),
_pipe(0),
_updateMaster(0),
_retrieveDispatched(kFALSE), _evalCarry(0.)
{
// Construct front-end object for object 'arg' whose evaluation will be calculated
// asynchronously in a separate process. If calcInline is true the value of 'arg'
// is calculate synchronously in the current process.
#ifdef _WIN32
_inlineMode = kTRUE;
#endif
initVars() ;
_sentinel.add(*this) ;
}
//_____________________________________________________________________________
RooRealMPFE::RooRealMPFE(const RooRealMPFE& other, const char* name) :
RooAbsReal(other, name),
_state(Initialize),
_arg("arg",this,other._arg),
_vars("vars",this,other._vars),
_calcInProgress(kFALSE),
_verboseClient(other._verboseClient),
_verboseServer(other._verboseServer),
_inlineMode(other._inlineMode),
_forceCalc(other._forceCalc),
_remoteEvalErrorLoggingState(other._remoteEvalErrorLoggingState),
_pipe(0),
_updateMaster(0),
_retrieveDispatched(kFALSE), _evalCarry(other._evalCarry)
{
// Copy constructor. Initializes in clean state so that upon eval
// this instance will create its own server processes
initVars() ;
_sentinel.add(*this) ;
}
//_____________________________________________________________________________
RooRealMPFE::~RooRealMPFE()
{
// Destructor
if (_state==Client) standby();
_sentinel.remove(*this);
}
//_____________________________________________________________________________
void RooRealMPFE::initVars()
{
// Initialize list of variables of front-end argument 'arg'
// Empty current lists
_vars.removeAll() ;
_saveVars.removeAll() ;
// Retrieve non-constant parameters
RooArgSet* vars = _arg.arg().getParameters(RooArgSet()) ;
//RooArgSet* ncVars = (RooArgSet*) vars->selectByAttrib("Constant",kFALSE) ;
RooArgList varList(*vars) ;
// Save in lists
_vars.add(varList) ;
_saveVars.addClone(varList) ;
_valueChanged.resize(_vars.getSize()) ;
_constChanged.resize(_vars.getSize()) ;
// Force next calculation
_forceCalc = kTRUE ;
delete vars ;
//delete ncVars ;
}
Double_t RooRealMPFE::getCarry() const
{
if (_inlineMode) {
RooAbsTestStatistic* tmp = dynamic_cast(_arg.absArg());
if (tmp) return tmp->getCarry();
else return 0.;
} else {
return _evalCarry;
}
}
//_____________________________________________________________________________
void RooRealMPFE::initialize()
{
// Initialize the remote process and message passing
// pipes between current process and remote process
// Trivial case: Inline mode
if (_inlineMode) {
_state = Inline ;
return ;
}
#ifndef _WIN32
// Clear eval error log prior to forking
// to avoid confusions...
clearEvalErrorLog() ;
// Fork server process and setup IPC
_pipe = new BidirMMapPipe();
if (_pipe->isChild()) {
// Start server loop
RooTrace::callgrind_zero() ;
_state = Server ;
serverLoop();
// Kill server at end of service
if (_verboseServer) ccoutD(Minimization) << "RooRealMPFE::initialize(" <<
GetName() << ") server process terminating" << endl ;
delete _arg.absArg();
delete _pipe;
_exit(0) ;
} else {
// Client process - fork successul
if (_verboseClient) ccoutD(Minimization) << "RooRealMPFE::initialize(" <<
GetName() << ") successfully forked server process " <<
_pipe->pidOtherEnd() << endl ;
_state = Client ;
_calcInProgress = kFALSE ;
}
#endif // _WIN32
}
//_____________________________________________________________________________
void RooRealMPFE::serverLoop()
{
// Server loop of remote processes. This function will return
// only when an incoming TERMINATE message is received.
#ifndef _WIN32
int msg ;
Int_t idx, index, numErrors ;
Double_t value ;
Bool_t isConst ;
clearEvalErrorLog() ;
while(*_pipe && !_pipe->eof()) {
*_pipe >> msg;
if (Terminate == msg) {
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> Terminate" << endl;
// send terminate acknowledged to client
*_pipe << msg << BidirMMapPipe::flush;
break;
}
switch (msg) {
case SendReal:
{
*_pipe >> idx >> value >> isConst;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> SendReal [" << idx << "]=" << value << endl ;
RooRealVar* rvar = (RooRealVar*)_vars.at(idx) ;
rvar->setVal(value) ;
if (rvar->isConstant() != isConst) {
rvar->setConstant(isConst) ;
}
}
break ;
case SendCat:
{
*_pipe >> idx >> index;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> SendCat [" << idx << "]=" << index << endl ;
((RooCategory*)_vars.at(idx))->setIndex(index) ;
}
break ;
case Calculate:
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> Calculate" << endl ;
_value = _arg ;
break ;
case CalculateNoOffset:
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> Calculate" << endl ;
RooAbsReal::setHideOffset(kFALSE) ;
_value = _arg ;
RooAbsReal::setHideOffset(kTRUE) ;
break ;
case Retrieve:
{
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> Retrieve" << endl ;
msg = ReturnValue;
numErrors = numEvalErrors();
*_pipe << msg << _value << getCarry() << numErrors;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC toClient> ReturnValue " << _value << " NumError " << numErrors << endl ;
if (numErrors) {
// Loop over errors
std::string objidstr;
{
ostringstream oss2;
// Format string with object identity as this cannot be evaluated on the other side
oss2 << "PID" << gSystem->GetPid() << "/";
printStream(oss2,kName|kClassName|kArgs,kInline);
objidstr = oss2.str();
}
std::map > >::const_iterator iter = evalErrorIter();
const RooAbsArg* ptr = 0;
for (int i = 0; i < numEvalErrorItems(); ++i) {
list::const_iterator iter2 = iter->second.second.begin();
for (; iter->second.second.end() != iter2; ++iter2) {
ptr = iter->first;
*_pipe << ptr << iter2->_msg << iter2->_srvval << objidstr;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC toClient> sending error log Arg " << iter->first << " Msg " << iter2->_msg << endl ;
}
}
// let other end know that we're done with the list of errors
ptr = 0;
*_pipe << ptr;
// Clear error list on local side
clearEvalErrorLog();
}
*_pipe << BidirMMapPipe::flush;
}
break;
case ConstOpt:
{
Bool_t doTrack ;
int code;
*_pipe >> code >> doTrack;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> ConstOpt " << code << " doTrack = " << (doTrack?"T":"F") << endl ;
((RooAbsReal&)_arg.arg()).constOptimizeTestStatistic(static_cast(code),doTrack) ;
break ;
}
case Verbose:
{
Bool_t flag ;
*_pipe >> flag;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> Verbose " << (flag?1:0) << endl ;
_verboseServer = flag ;
}
break ;
case ApplyNLLW2:
{
Bool_t flag ;
*_pipe >> flag;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> ApplyNLLW2 " << (flag?1:0) << endl ;
// Do application of weight-squared here
doApplyNLLW2(flag) ;
}
break ;
case EnableOffset:
{
Bool_t flag ;
*_pipe >> flag;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> EnableOffset " << (flag?1:0) << endl ;
// Enable likelihoof offsetting here
((RooAbsReal&)_arg.arg()).enableOffsetting(flag) ;
}
break ;
case LogEvalError:
{
int iflag2;
*_pipe >> iflag2;
RooAbsReal::ErrorLoggingMode flag2 = static_cast(iflag2);
RooAbsReal::setEvalErrorLoggingMode(flag2) ;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> LogEvalError flag = " << flag2 << endl ;
}
break ;
default:
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> Unknown message (code = " << msg << ")" << endl ;
break ;
}
}
#endif // _WIN32
}
//_____________________________________________________________________________
void RooRealMPFE::calculate() const
{
// Client-side function that instructs server process to start
// asynchronuous (re)calculation of function value. This function
// returns immediately. The calculated value can be retrieved
// using getVal()
// Start asynchronous calculation of arg value
if (_state==Initialize) {
// cout << "RooRealMPFE::calculate(" << GetName() << ") initializing" << endl ;
const_cast(this)->initialize() ;
}
// Inline mode -- Calculate value now
if (_state==Inline) {
// cout << "RooRealMPFE::calculate(" << GetName() << ") performing Inline calculation NOW" << endl ;
_value = _arg ;
clearValueDirty() ;
}
#ifndef _WIN32
// Compare current value of variables with saved values and send changes to server
if (_state==Client) {
// cout << "RooRealMPFE::calculate(" << GetName() << ") state is Client trigger remote calculation" << endl ;
Int_t i(0) ;
RooFIter viter = _vars.fwdIterator() ;
RooFIter siter = _saveVars.fwdIterator() ;
//for (i=0 ; i<_vars.getSize() ; i++) {
RooAbsArg *var, *saveVar ;
while((var = viter.next())) {
saveVar = siter.next() ;
//Bool_t valChanged = !(*var==*saveVar) ;
Bool_t valChanged,constChanged ;
if (!_updateMaster) {
valChanged = !var->isIdentical(*saveVar,kTRUE) ;
constChanged = (var->isConstant() != saveVar->isConstant()) ;
_valueChanged[i] = valChanged ;
_constChanged[i] = constChanged ;
} else {
valChanged = _updateMaster->_valueChanged[i] ;
constChanged = _updateMaster->_constChanged[i] ;
}
if ( valChanged || constChanged || _forceCalc) {
//cout << "RooRealMPFE::calculate(" << GetName() << " variable " << var->GetName() << " changed " << endl ;
if (_verboseClient) cout << "RooRealMPFE::calculate(" << GetName()
<< ") variable " << _vars.at(i)->GetName() << " changed" << endl ;
if (constChanged) {
((RooRealVar*)saveVar)->setConstant(var->isConstant()) ;
}
saveVar->copyCache(var) ;
// send message to server
if (dynamic_cast(var)) {
int msg = SendReal ;
Double_t val = ((RooAbsReal*)var)->getVal() ;
Bool_t isC = var->isConstant() ;
*_pipe << msg << i << val << isC;
if (_verboseServer) cout << "RooRealMPFE::calculate(" << GetName()
<< ") IPC toServer> SendReal [" << i << "]=" << val << (isC?" (Constant)":"") << endl ;
} else if (dynamic_cast(var)) {
int msg = SendCat ;
UInt_t idx = ((RooAbsCategory*)var)->getIndex() ;
*_pipe << msg << i << idx;
if (_verboseServer) cout << "RooRealMPFE::calculate(" << GetName()
<< ") IPC toServer> SendCat [" << i << "]=" << idx << endl ;
}
}
i++ ;
}
int msg = hideOffset() ? Calculate : CalculateNoOffset;
*_pipe << msg;
if (_verboseServer) cout << "RooRealMPFE::calculate(" << GetName()
<< ") IPC toServer> Calculate " << endl ;
// Clear dirty state and mark that calculation request was dispatched
clearValueDirty() ;
_calcInProgress = kTRUE ;
_forceCalc = kFALSE ;
msg = Retrieve ;
*_pipe << msg << BidirMMapPipe::flush;
if (_verboseServer) cout << "RooRealMPFE::evaluate(" << GetName()
<< ") IPC toServer> Retrieve " << endl ;
_retrieveDispatched = kTRUE ;
} else if (_state!=Inline) {
cout << "RooRealMPFE::calculate(" << GetName()
<< ") ERROR not in Client or Inline mode" << endl ;
}
#endif // _WIN32
}
//_____________________________________________________________________________
Double_t RooRealMPFE::getValV(const RooArgSet* /*nset*/) const
{
// If value needs recalculation and calculation has not beed started
// with a call to calculate() start it now. This function blocks
// until remote process has finished calculation and returns
// remote value
if (isValueDirty()) {
// Cache is dirty, no calculation has been started yet
//cout << "RooRealMPFE::getValF(" << GetName() << ") cache is dirty, caling calculate and evaluate" << endl ;
calculate() ;
_value = evaluate() ;
} else if (_calcInProgress) {
//cout << "RooRealMPFE::getValF(" << GetName() << ") calculation in progress, calling evaluate" << endl ;
// Cache is clean and calculation is in progress
_value = evaluate() ;
} else {
//cout << "RooRealMPFE::getValF(" << GetName() << ") cache is clean, doing nothing" << endl ;
// Cache is clean and calculated value is in cache
}
// cout << "RooRealMPFE::getValV(" << GetName() << ") value = " << Form("%5.10f",_value) << endl ;
return _value ;
}
//_____________________________________________________________________________
Double_t RooRealMPFE::evaluate() const
{
// Send message to server process to retrieve output value
// If error were logged use logEvalError() on remote side
// transfer those errors to the local eval error queue.
// Retrieve value of arg
Double_t return_value = 0;
if (_state==Inline) {
return_value = _arg ;
} else if (_state==Client) {
#ifndef _WIN32
bool needflush = false;
int msg;
Double_t value;
// If current error loggin state is not the same as remote state
// update the remote state
if (evalErrorLoggingMode() != _remoteEvalErrorLoggingState) {
msg = LogEvalError ;
RooAbsReal::ErrorLoggingMode flag = evalErrorLoggingMode() ;
*_pipe << msg << flag;
needflush = true;
_remoteEvalErrorLoggingState = evalErrorLoggingMode() ;
}
if (!_retrieveDispatched) {
msg = Retrieve ;
*_pipe << msg;
needflush = true;
if (_verboseServer) cout << "RooRealMPFE::evaluate(" << GetName()
<< ") IPC toServer> Retrieve " << endl ;
}
if (needflush) *_pipe << BidirMMapPipe::flush;
_retrieveDispatched = kFALSE ;
Int_t numError;
*_pipe >> msg >> value >> _evalCarry >> numError;
if (msg!=ReturnValue) {
cout << "RooRealMPFE::evaluate(" << GetName()
<< ") ERROR: unexpected message from server process: " << msg << endl ;
return 0 ;
}
if (_verboseServer) cout << "RooRealMPFE::evaluate(" << GetName()
<< ") IPC fromServer> ReturnValue " << value << endl ;
if (_verboseServer) cout << "RooRealMPFE::evaluate(" << GetName()
<< ") IPC fromServer> NumErrors " << numError << endl ;
if (numError) {
// Retrieve remote errors and feed into local error queue
char *msgbuf1 = 0, *msgbuf2 = 0, *msgbuf3 = 0;
RooAbsArg *ptr = 0;
while (true) {
*_pipe >> ptr;
if (!ptr) break;
*_pipe >> msgbuf1 >> msgbuf2 >> msgbuf3;
if (_verboseServer) cout << "RooRealMPFE::evaluate(" << GetName()
<< ") IPC fromServer> retrieving error log Arg " << ptr << " Msg " << msgbuf1 << endl ;
logEvalError(reinterpret_cast(ptr),msgbuf3,msgbuf1,msgbuf2) ;
}
std::free(msgbuf1);
std::free(msgbuf2);
std::free(msgbuf3);
}
// Mark end of calculation in progress
_calcInProgress = kFALSE ;
return_value = value ;
#endif // _WIN32
}
return return_value;
}
//_____________________________________________________________________________
void RooRealMPFE::standby()
{
// Terminate remote server process and return front-end class
// to standby mode. Calls to calculate() or evaluate() after
// this call will automatically recreated the server process.
#ifndef _WIN32
if (_state==Client) {
if (_pipe->good()) {
// Terminate server process ;
if (_verboseServer) cout << "RooRealMPFE::standby(" << GetName()
<< ") IPC toServer> Terminate " << endl;
int msg = Terminate;
*_pipe << msg << BidirMMapPipe::flush;
// read handshake
msg = 0;
*_pipe >> msg;
if (Terminate != msg || 0 != _pipe->close()) {
std::cerr << "In " << __func__ << "(" << __FILE__ ", " << __LINE__ <<
"): Server shutdown failed." << std::endl;
}
} else {
if (_verboseServer) {
std::cerr << "In " << __func__ << "(" << __FILE__ ", " <<
__LINE__ << "): Pipe has already shut down, not sending "
"Terminate to server." << std::endl;
}
}
// Close pipes
delete _pipe;
_pipe = 0;
// Revert to initialize state
_state = Initialize;
}
#endif // _WIN32
}
//_____________________________________________________________________________
void RooRealMPFE::constOptimizeTestStatistic(ConstOpCode opcode, Bool_t doAlsoTracking)
{
// Intercept call to optimize constant term in test statistics
// and forward it to object on server side.
#ifndef _WIN32
if (_state==Client) {
int msg = ConstOpt ;
int op = opcode;
*_pipe << msg << op << doAlsoTracking;
if (_verboseServer) cout << "RooRealMPFE::constOptimize(" << GetName()
<< ") IPC toServer> ConstOpt " << opcode << endl ;
initVars() ;
}
#endif // _WIN32
if (_state==Inline) {
((RooAbsReal&)_arg.arg()).constOptimizeTestStatistic(opcode,doAlsoTracking) ;
}
}
//_____________________________________________________________________________
void RooRealMPFE::setVerbose(Bool_t clientFlag, Bool_t serverFlag)
{
// Control verbose messaging related to inter process communication
// on both client and server side
#ifndef _WIN32
if (_state==Client) {
int msg = Verbose ;
*_pipe << msg << serverFlag;
if (_verboseServer) cout << "RooRealMPFE::setVerbose(" << GetName()
<< ") IPC toServer> Verbose " << (serverFlag?1:0) << endl ;
}
#endif // _WIN32
_verboseClient = clientFlag ; _verboseServer = serverFlag ;
}
//_____________________________________________________________________________
void RooRealMPFE::applyNLLWeightSquared(Bool_t flag)
{
// Control verbose messaging related to inter process communication
// on both client and server side
#ifndef _WIN32
if (_state==Client) {
int msg = ApplyNLLW2 ;
*_pipe << msg << flag;
if (_verboseServer) cout << "RooRealMPFE::applyNLLWeightSquared(" << GetName()
<< ") IPC toServer> ApplyNLLW2 " << (flag?1:0) << endl ;
}
#endif // _WIN32
doApplyNLLW2(flag) ;
}
//_____________________________________________________________________________
void RooRealMPFE::doApplyNLLW2(Bool_t flag)
{
RooNLLVar* nll = dynamic_cast(_arg.absArg()) ;
if (nll) {
nll->applyWeightSquared(flag) ;
}
}
//_____________________________________________________________________________
void RooRealMPFE::enableOffsetting(Bool_t flag)
{
// Control verbose messaging related to inter process communication
// on both client and server side
#ifndef _WIN32
if (_state==Client) {
int msg = EnableOffset ;
*_pipe << msg << flag;
if (_verboseServer) cout << "RooRealMPFE::enableOffsetting(" << GetName()
<< ") IPC toServer> EnableOffset " << (flag?1:0) << endl ;
}
#endif // _WIN32
((RooAbsReal&)_arg.arg()).enableOffsetting(flag) ;
}