// @(#)root/proofd:$Id$ // Author: Gerardo Ganis 12/12/2005 /************************************************************************* * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. * * All rights reserved. * * * * For the licensing terms see $ROOTSYS/LICENSE. * * For the list of contributors see $ROOTSYS/README/CREDITS. * *************************************************************************/ ////////////////////////////////////////////////////////////////////////// // // // XrdProofdProtocol // // // // Authors: G. Ganis, CERN, 2005 // // // // XrdProtocol implementation to coordinate 'proofserv' applications. // // // ////////////////////////////////////////////////////////////////////////// #include "XrdProofdPlatform.h" #include "XpdSysError.h" #include "XpdSysLogger.h" #include "XrdSys/XrdSysPriv.hh" #include "XrdOuc/XrdOucStream.hh" #include "XrdVersion.hh" #include "Xrd/XrdBuffer.hh" #include "Xrd/XrdScheduler.hh" #include "XrdProofdClient.h" #include "XrdProofdClientMgr.h" #include "XrdProofdConfig.h" #include "XrdProofdManager.h" #include "XrdProofdNetMgr.h" #include "XrdProofdPriorityMgr.h" #include "XrdProofdProofServMgr.h" #include "XrdProofdProtocol.h" #include "XrdProofdResponse.h" #include "XrdProofdProofServ.h" #include "XrdProofSched.h" #include "XrdROOT.h" #include "rpdconn.h" // Tracing utils #include "XrdProofdTrace.h" XrdOucTrace *XrdProofdTrace = 0; // Loggers: we need two to avoid deadlocks static XrdSysLogger gMainLogger; // // Static area: general protocol managing section int XrdProofdProtocol::fgCount = 0; XpdObjectQ XrdProofdProtocol::fgProtStack("ProtStack", "xproofd protocol anchor"); XrdSysRecMutex XrdProofdProtocol::fgBMutex; // Buffer management mutex XrdBuffManager *XrdProofdProtocol::fgBPool = 0; int XrdProofdProtocol::fgMaxBuffsz= 0; XrdSysError XrdProofdProtocol::fgEDest(0, "xpd"); XrdSysLogger *XrdProofdProtocol::fgLogger = 0; // // Static area: protocol configuration section bool XrdProofdProtocol::fgConfigDone = 0; // int XrdProofdProtocol::fgReadWait = 0; // Cluster manager XrdProofdManager *XrdProofdProtocol::fgMgr = 0; // Effective uid int XrdProofdProtocol::fgEUidAtStartup = -1; // Local definitions #define MAX_ARGS 128 // Macros used to set conditional options #ifndef XPDCOND #define XPDCOND(n,ns) ((n == -1 && ns == -1) || (n > 0 && n >= ns)) #endif #ifndef XPDSETSTRING #define XPDSETSTRING(n,ns,c,s) \ { if (XPDCOND(n,ns)) { \ SafeFree(c); c = strdup(s.c_str()); ns = n; }} #endif #ifndef XPDADOPTSTRING #define XPDADOPTSTRING(n,ns,c,s) \ { char *t = 0; \ XPDSETSTRING(n, ns, t, s); \ if (t && strlen(t)) { \ SafeFree(c); c = t; \ } else \ SafeFree(t); } #endif #ifndef XPDSETINT #define XPDSETINT(n,ns,i,s) \ { if (XPDCOND(n,ns)) { \ i = strtol(s.c_str(),0,10); ns = n; }} #endif typedef struct { kXR_int32 ptyp; // must be always 0 ! kXR_int32 rlen; kXR_int32 pval; kXR_int32 styp; } hs_response_t; typedef struct ResetCtrlcGuard { XrdProofdProtocol *xpd; int type; ResetCtrlcGuard(XrdProofdProtocol *p, int t) : xpd(p), type(t) { } ~ResetCtrlcGuard() { if (xpd && type != kXP_ctrlc) xpd->ResetCtrlC(); } } ResetCtrlcGuard_t; // // Derivation of XrdProofdConfig to read the port from the config file class XrdProofdProtCfg : public XrdProofdConfig { public: int fPort; // The port on which we listen XrdProofdProtCfg(const char *cfg, XrdSysError *edest = 0); int DoDirective(XrdProofdDirective *, char *, XrdOucStream *, bool); void RegisterDirectives(); }; //__________________________________________________________________________ XrdProofdProtCfg::XrdProofdProtCfg(const char *cfg, XrdSysError *edest) : XrdProofdConfig(cfg, edest) { // Constructor fPort = -1; RegisterDirectives(); } //__________________________________________________________________________ void XrdProofdProtCfg::RegisterDirectives() { // Register directives for configuration Register("port", new XrdProofdDirective("port", this, &DoDirectiveClass)); Register("xrd.protocol", new XrdProofdDirective("xrd.protocol", this, &DoDirectiveClass)); } //______________________________________________________________________________ int XrdProofdProtCfg::DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool) { // Parse directives if (!d) return -1; XrdOucString port(val); if (d->fName == "xrd.protocol") { port = cfg->GetWord(); port.replace("xproofd:", ""); } else if (d->fName != "port") { return -1; } if (port.length() > 0) { fPort = strtol(port.c_str(), 0, 10); } fPort = (fPort < 0) ? XPD_DEF_PORT : fPort; return 0; } extern "C" { //_________________________________________________________________________________ XrdProtocol *XrdgetProtocol(const char *, char *parms, XrdProtocol_Config *pi) { // This protocol is meant to live in a shared library. The interface below is // used by the server to obtain a copy of the protocol object that can be used // to decide whether or not a link is talking a particular protocol. // Return the protocol object to be used if static init succeeds if (XrdProofdProtocol::Configure(parms, pi)) { return (XrdProtocol *) new XrdProofdProtocol(pi); } return (XrdProtocol *)0; } //_________________________________________________________________________________ int XrdgetProtocolPort(const char * /*pname*/, char * /*parms*/, XrdProtocol_Config *pi) { // This function is called early on to determine the port we need to use. The // The default is ostensibly 1093 but can be overidden; which we allow. // Default XPD_DEF_PORT (1093) int port = XPD_DEF_PORT; if (pi) { XrdProofdProtCfg pcfg(pi->ConfigFN, pi->eDest); // Init some relevant quantities for tracing XrdProofdTrace = new XrdOucTrace(pi->eDest); pcfg.Config(0); if (pcfg.fPort > 0) { port = pcfg.fPort; } else { port = (pi && pi->Port > 0) ? pi->Port : XPD_DEF_PORT; } } return port; }} //__________________________________________________________________________________ XrdProofdProtocol::XrdProofdProtocol(XrdProtocol_Config *pi) : XrdProtocol("xproofd protocol handler"), fProtLink(this) { // Protocol constructor fLink = 0; fArgp = 0; fPClient = 0; fSecClient = 0; fAuthProt = 0; fResponses.reserve(10); fStdErrFD = (pi && pi->eDest) ? pi->eDest->baseFD() : fileno(stderr); // Instantiate a Proofd protocol object Reset(); } //______________________________________________________________________________ XrdProofdResponse *XrdProofdProtocol::Response(kXR_unt16 sid) { // Get response instance corresponding to stream ID 'sid' XPDLOC(ALL, "Protocol::Response") TRACE(HDBG, "sid: "< 0) if (sid <= fResponses.size()) return fResponses[sid-1]; return (XrdProofdResponse *)0; } //______________________________________________________________________________ XrdProofdResponse *XrdProofdProtocol::GetNewResponse(kXR_unt16 sid) { // Create new response instance for stream ID 'sid' XPDLOC(ALL, "Protocol::GetNewResponse") XrdOucString msg; XPDFORM(msg, "sid: %d", sid); if (sid > 0) { if (sid > fResponses.size()) { if (sid > fResponses.capacity()) { int newsz = (sid < 2 * fResponses.capacity()) ? 2 * fResponses.capacity() : sid+1 ; fResponses.reserve(newsz); if (TRACING(DBG)) { msg += " new capacity: "; msg += (int) fResponses.capacity(); } } int nnew = sid - fResponses.size(); while (nnew--) fResponses.push_back(new XrdProofdResponse()); if (TRACING(DBG)) { msg += "; new size: "; msg += (int) fResponses.size(); } } } else { TRACE(XERR,"wrong sid: "<Peek(hsbuff,sizeof(hsdata),fgReadWait)) != sizeof(hsdata)) { if (dlen <= 0) lp->setEtext("Match: handshake not received"); if (dlen == 12) { // Check if it is a request to open a file via 'rootd' hsdata.first = ntohl(hsdata.first); if (hsdata.first == 8) { if (strlen(fgMgr->RootdExe()) > 0) { if (fgMgr->IsRootdAllowed((const char *)lp->Host())) { TRACE(ALL, "matched rootd protocol on link: executing "<RootdExe()); XrdOucString em; if (StartRootd(lp, em) != 0) { emsg = "rootd: failed to start daemon: "; emsg += em; } } else { XPDFORM(emsg, "rootd-file serving not authorized for host '%s'", lp->Host()); } } else { emsg = "rootd-file serving not enabled"; } } if (emsg.length() > 0) { lp->setEtext(emsg.c_str()); } else { lp->setEtext("link transfered"); } return (XrdProtocol *)0; } TRACE(XERR, "peeked incomplete or empty information! (dlen: "<Send((char *)&hsresp, sizeof(hsresp))) { lp->setEtext("Match: handshake failed"); TRACE(XERR, "handshake failed"); return (XrdProtocol *)0; } // We can now read all 20 bytes and discard them (no need to wait for it) int len = sizeof(hsdata); if (lp->Recv(hsbuff, len) != len) { lp->setEtext("Match: reread failed"); TRACE(XERR, "reread failed"); return (XrdProtocol *)0; } // Get a protocol object off the stack (if none, allocate a new one) if (!(xp = fgProtStack.Pop())) xp = new XrdProofdProtocol(); // Bind the protocol to the link and return the protocol xp->fLink = lp; snprintf(xp->fSecEntity.prot, XrdSecPROTOIDSIZE, "host"); xp->fSecEntity.host = strdup((char *)lp->Host()); // Dummy data used by 'proofd' kXR_int32 dum[2]; if (xp->GetData("dummy",(char *)&dum[0],sizeof(dum)) != 0) { xp->Recycle(0,0,0); return (XrdProtocol *)0; } // We are done return (XrdProtocol *)xp; } //_____________________________________________________________________________ int XrdProofdProtocol::StartRootd(XrdLink *lp, XrdOucString &emsg) { // Transfer the connection to a rootd daemon to serve a file access request // Return 0 on success, -1 on failure XPDLOC(ALL, "Protocol::StartRootd") const char *prog = fgMgr->RootdExe(); const char **progArg = fgMgr->RootdArgs(); if (fgMgr->RootdFork()) { // Start rootd using fork() pid_t pid; if ((pid = fgMgr->Sched()->Fork(lp->Name()))) { if (pid < 0) { emsg = "rootd fork failed"; return -1; } return 0; } // In the child ... // Restablish standard error for the program we will exec dup2(fStdErrFD, STDERR_FILENO); close(fStdErrFD); // Force stdin/out to point to the socket FD (this will also bypass the // close on exec setting for the socket) dup2(lp->FDnum(), STDIN_FILENO); dup2(lp->FDnum(), STDOUT_FILENO); // Do the exec execv((const char *)prog, (char * const *)progArg); TRACE(XERR, "rootd: Oops! Exec(" <ROOTMgr()->DefaultVersion(); if (!roo) { emsg = "ROOT version undefined!"; return -1; } // The path to the executable XrdOucString pexe; XPDFORM(pexe, "%s/proofexecv", roo->BinDir()); if (access(pexe.c_str(), X_OK) != 0) { XPDFORM(emsg, "path '%s' does not exist or is not executable (errno: %d)", pexe.c_str(), (int)errno); return -1; } // Start the proofexecv XrdOucString cmd, exp; XPDFORM(cmd, "export ROOTBINDIR=\"%s\"; %s 20 0 %s %s", roo->BinDir(), pexe.c_str(), fgMgr->RootdUnixSrv()->path(), prog); int n = 1; while (progArg[n] != 0) { cmd += " "; cmd += progArg[n]; n++; } cmd += " &"; TRACE(HDBG, cmd); if (system(cmd.c_str()) == -1) { XPDFORM(emsg, "failure from 'system' (errno: %d)", (int)errno); return -1; } // Accept a connection from the second server int err = 0; rpdunix *uconn = fgMgr->RootdUnixSrv()->accept(-1, &err); if (!uconn || !uconn->isvalid(0)) { XPDFORM(emsg, "failure accepting callback (errno: %d)", -err); if (uconn) delete uconn; return -1; } TRACE(HDBG, "proofexecv connected!"); int rcc = 0; // Transfer the open descriptor to be used in rootd int fd = dup(lp->FDnum()); if (fd < 0 || (rcc = uconn->senddesc(fd)) != 0) { XPDFORM(emsg, "failure sending descriptor '%d' (original: %d); (errno: %d)", fd, lp->FDnum(), -rcc); if (uconn) delete uconn; return -1; } // Close the connection to the parent delete uconn; } // Done return 0; } //_____________________________________________________________________________ int XrdProofdProtocol::Stats(char *buff, int blen, int) { // Return statistics info about the protocol. // Not really implemented yet: this is a reduced XrdXrootd version. static char statfmt[] = "%ld"; // If caller wants only size, give it to him if (!buff) return sizeof(statfmt)+16; // We have only one statistic -- number of successful matches return snprintf(buff, blen, statfmt, fgCount); } //______________________________________________________________________________ void XrdProofdProtocol::Reset() { // Reset static and local vars // Init local vars fLink = 0; fPid = -1; fArgp = 0; fStatus = 0; fClntCapVer = 0; fConnType = kXPD_ClientMaster; fSuperUser = 0; fPClient = 0; fUserIn = ""; fGroupIn = ""; fCID = -1; fTraceID = ""; fAdminPath = ""; if (fAuthProt) { fAuthProt->Delete(); fAuthProt = 0; } memset(&fSecEntity, 0, sizeof(fSecEntity)); // Cleanup existing XrdProofdResponse objects std::vector::iterator ii = fResponses.begin(); // One per each logical connection while (ii != fResponses.end()) { (*ii)->Reset(); ii++; } } //______________________________________________________________________________ int XrdProofdProtocol::Configure(char *, XrdProtocol_Config *pi) { // Protocol configuration tool // Function: Establish configuration at load time. // Output: 1 upon success or 0 otherwise. XPDLOC(ALL, "Protocol::Configure") XrdOucString mp; // Only once if (fgConfigDone) return 1; fgConfigDone = 1; // Copy out the special info we want to use at top level fgLogger = pi->eDest->logger(); fgEDest.logger(fgLogger); if (XrdProofdTrace) delete XrdProofdTrace; // It could have been initialized in XrdgetProtocolPort XrdProofdTrace = new XrdOucTrace(&fgEDest); fgBPool = pi->BPool; fgReadWait = pi->readWait; // Pre-initialize some i/o values fgMaxBuffsz = fgBPool->MaxSize(); // Schedule protocol object cleanup; the maximum number of objects // and the max age are taken from XrdXrootdProtocol: this may need // some optimization in the future. #if 1 fgProtStack.Set(pi->Sched, XrdProofdTrace, TRACE_MEM); fgProtStack.Set((pi->ConnMax/3 ? pi->ConnMax/3 : 30), 60*60); #else fgProtStack.Set(pi->Sched, 3600); #endif // Default tracing options: always trace logins and errors for all // domains; if the '-d' option was specified on the command line then // trace also REQ and FORM. // NB: these are superseeded by settings in the config file (xpd.trace) XrdProofdTrace->What = TRACE_DOMAINS; TRACESET(XERR, 1); TRACESET(LOGIN, 1); TRACESET(RSP, 0); if (pi->DebugON) XrdProofdTrace->What |= (TRACE_REQ | TRACE_FORK); // Work as root to avoid contineous changes of the effective user // (users are logged in their box after forking) fgEUidAtStartup = geteuid(); if (!getuid()) XrdSysPriv::ChangePerm((uid_t)0, (gid_t)0); // Process the config file for directives meaningful to us // Create and Configure the manager fgMgr = new XrdProofdManager(pi, &fgEDest); if (fgMgr->Config(0)) return 0; mp = "global manager created"; TRACE(ALL, mp); // Issue herald indicating we configured successfully TRACE(ALL, "xproofd protocol version "<Set(fRequest.header.streamid); response->Set(fLink); TRACEP(this, REQ, "sid: " << sid << ", req id: " << fRequest.header.requestid << " (" << XrdProofdAux::ProofRequestTypes(fRequest.header.requestid)<< ")" << ", dlen: " <= 0 or we won't // be able to know how much data to read. if (fRequest.header.dlen < 0) { response->Send(kXR_ArgInvalid, "Process: Invalid request data length"); return fLink->setEtext("Process: protocol data length error"); } // Read any argument data at this point, except when the request is to forward // a buffer: the argument may have to be segmented and we're not prepared to do // that here. if (fRequest.header.requestid != kXP_sendmsg && fRequest.header.dlen) { if ((fArgp = GetBuff(fRequest.header.dlen+1, fArgp)) == 0) { response->Send(kXR_ArgTooLong, "fRequest.argument is too long"); return rc; } if ((rc = GetData("arg", fArgp->buff, fRequest.header.dlen))) return rc; fArgp->buff[fRequest.header.dlen] = '\0'; } // Continue with request processing at the resume point return Process2(); } //______________________________________________________________________________ int XrdProofdProtocol::Process2() { // Local processing method: here the request is dispatched to the appropriate // method XPDLOC(ALL, "Protocol::Process2") int rc = 0; XPD_SETRESP(this, "Process2"); TRACEP(this, REQ, "req id: " << fRequest.header.requestid << " (" << XrdProofdAux::ProofRequestTypes(fRequest.header.requestid) << ")"); ResetCtrlcGuard_t ctrlcguard(this, fRequest.header.requestid); // If the user is logged in check if the wanted action is to be done by us if (fStatus && (fStatus & XPD_LOGGEDIN)) { // Record time of the last action TouchAdminPath(); // We must have a client instance if here if (!fPClient) { TRACEP(this, XERR, "client undefined!!! "); response->Send(kXR_InvalidRequest,"client undefined!!! "); return 0; } bool formgr = 0; switch(fRequest.header.requestid) { case kXP_ctrlc: rc = CtrlC(); break; case kXP_touch: // Reset the asked-to-touch flag, if it was never set fPClient->Touch(1); break; case kXP_interrupt: rc = Interrupt(); break; case kXP_ping: rc = Ping(); break; case kXP_sendmsg: rc = SendMsg(); break; case kXP_urgent: rc = Urgent(); break; default: formgr = 1; } if (!formgr) { // Check the link if (!fLink || (fLink->FDnum() <= 0)) { TRACE(XERR, "link is undefined! "); return -1; } return rc; } } // The request is for the manager rc = fgMgr->Process(this); // Check the link if (!fLink || (fLink->FDnum() <= 0)) { TRACE(XERR, "link is undefined! "); return -1; } return rc; } //______________________________________________________________________ void XrdProofdProtocol::Recycle(XrdLink *, int, const char *) { // Recycle call. Release the instance and give it back to the stack. XPDLOC(ALL, "Protocol::Recycle") const char *srvtype[6] = {"ANY", "MasterWorker", "MasterMaster", "ClientMaster", "Internal", "Admin"}; XrdOucString buf; // Document the disconnect if (fPClient) XPDFORM(buf, "user %s disconnected; type: %s", fPClient->User(), srvtype[fConnType+1]); else XPDFORM(buf, "user disconnected; type: %s", srvtype[fConnType+1]); TRACEP(this, LOGIN, buf); // If we have a buffer, release it if (fArgp) { fgBPool->Release(fArgp); fArgp = 0; } // Locate the client instance XrdProofdClient *pmgr = fPClient; if (pmgr) { if (!Internal()) { TRACE(REQ,"External disconnection of protocol associated with pid "<ResetClientSlot(fCID); if(fgMgr && fgMgr->SessionMgr()) { XrdSysMutexHelper mhp(fgMgr->SessionMgr()->Mutex()); fgMgr->SessionMgr()->DisconnectFromProofServ(fPid); if((fConnType == 0) && fgMgr->SessionMgr()->Alive(this)) { TRACE(REQ, "Non-destroyed proofserv processes attached to this protocol ("<SessionMgr()->SetReconnectTime(true); } fgMgr->SessionMgr()->CheckActiveSessions(0); } else { TRACE(XERR, "No XrdProofdMgr ("<SessionMgr() : (void *) -1)<<")") } } else { // Internal connection: we need to remove this instance from the list // of proxy servers and to notify the attached clients. // Tell the session manager that this session has gone if (fgMgr && fgMgr->SessionMgr()) { XrdSysMutexHelper mhp(fgMgr->SessionMgr()->Mutex()); TRACE(HDBG, "fAdminPath: "<SessionMgr()->DeleteFromSessions(buf.c_str()); // Move the entry to the terminated sessions area fgMgr->SessionMgr()->MvSession(buf.c_str()); } else { TRACE(XERR,"No XrdProofdMgr ("<SessionMgr()<<")") } } } // Set fields to starting point (debugging mostly) Reset(); // Push ourselves on the stack fgProtStack.Push(&fProtLink); #if 0 if(fgProtStack.Push(&fProtLink) != 0) { XrdProofdProtocol *xp = fProtLink.objectItem(); fProtLink.setItem(0); delete xp; } #endif } //______________________________________________________________________________ XrdBuffer *XrdProofdProtocol::GetBuff(int quantum, XrdBuffer *argp) { // Allocate a buffer to handle quantum bytes; if argp points to an existing // buffer, its size is checked and re-allocated if needed XPDLOC(ALL, "Protocol::GetBuff") TRACE(HDBG, "len: "<= argp->bsize / 2 && quantum <= argp->bsize) return argp; } // Release the buffer if too small XrdSysMutexHelper mh(fgBMutex); if (argp) fgBPool->Release(argp); // Obtain a new one if ((argp = fgBPool->Obtain(quantum)) == 0) { TRACE(XERR, "could not get requested buffer (size: "<buff)<<", bsize:"<bsize); } // Done return argp; } //______________________________________________________________________________ void XrdProofdProtocol::ReleaseBuff(XrdBuffer *argp) { // Release a buffer previously allocated via GetBuff XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); } //______________________________________________________________________________ int XrdProofdProtocol::GetData(const char *dtype, char *buff, int blen) { // Get data from the open link XPDLOC(ALL, "Protocol::GetData") int rlen; // Read the data but reschedule the link if we have not received all of the // data within the timeout interval. TRACEP(this, HDBG, "dtype: "<<(dtype ? dtype : " - ")<<", blen: "<Recv(buff, blen, fgReadWait); if (rlen < 0) { if (rlen != -ENOMSG && rlen != -ECONNRESET) { XrdOucString emsg = "link read error: errno: "; emsg += -rlen; TRACEP(this, XERR, emsg.c_str()); return (fLink ? fLink->setEtext(emsg.c_str()) : -1); } else { TRACEP(this, HDBG, "connection closed by peer (errno: "<<-rlen<<")"); return -1; } } if (rlen < blen) { TRACEP(this, DBG, dtype << " timeout; read " < fgMaxBuffsz ? fgMaxBuffsz : len); // Get a buffer XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum); if (!argp) return -1; // Now send over all of the data as unsolicited messages XrdOucString msg; while (len > 0) { XrdProofdResponse *response = (sid > -1) ? xps->Response() : 0; if ((rc = GetData("data", argp->buff, quantum))) { { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); } return -1; } if (buf && !(*buf) && savebuf) *buf = new XrdSrvBuffer(argp->buff, quantum, 1); // Send if (sid > -1) { if (TRACING(HDBG)) XPDFORM(msg, "EXT: server ID: %d, sending: %d bytes", sid, quantum); if (!response || response->Send(kXR_attn, kXPD_msgsid, sid, argp->buff, quantum) != 0) { { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); } XPDFORM(msg, "EXT: server ID: %d, problems sending: %d bytes to server", sid, quantum); TRACEP(this, XERR, msg); return -1; } } else { // Get ID of the client int cid = ntohl(fRequest.sendrcv.cid); if (TRACING(HDBG)) XPDFORM(msg, "INT: client ID: %d, sending: %d bytes", cid, quantum); if (xps->SendData(cid, argp->buff, quantum) != 0) { { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); } XPDFORM(msg, "INT: client ID: %d, problems sending: %d bytes to client", cid, quantum); TRACEP(this, XERR, msg); return -1; } } TRACEP(this, HDBG, msg); // Next segment len -= quantum; if (len < quantum) quantum = len; } // Release the buffer { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); } // Done return 0; } //______________________________________________________________________________ int XrdProofdProtocol::SendDataN(XrdProofdProofServ *xps, XrdSrvBuffer **buf, bool savebuf) { // Send data over the open client links of session 'xps'. // Used when all the connected clients are eligible to receive the message. // Segmentation is done here, if required. XPDLOC(ALL, "Protocol::SendDataN") int rc = 0; TRACEP(this, HDBG, "length: "< fgMaxBuffsz ? fgMaxBuffsz : len); // Get a buffer XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum); if (!argp) return -1; // Now send over all of the data as unsolicited messages while (len > 0) { if ((rc = GetData("data", argp->buff, quantum))) { XrdProofdProtocol::ReleaseBuff(argp); return -1; } if (buf && !(*buf) && savebuf) *buf = new XrdSrvBuffer(argp->buff, quantum, 1); // Send to connected clients if (xps->SendDataN(argp->buff, quantum) != 0) { XrdProofdProtocol::ReleaseBuff(argp); return -1; } // Next segment len -= quantum; if (len < quantum) quantum = len; } // Release the buffer XrdProofdProtocol::ReleaseBuff(argp); // Done return 0; } //_____________________________________________________________________________ int XrdProofdProtocol::SendMsg() { // Handle a request to forward a message to another process XPDLOC(ALL, "Protocol::SendMsg") static const char *crecv[5] = {"master proofserv", "top master", "client", "undefined", "any"}; int rc = 0; XPD_SETRESP(this, "SendMsg"); // Unmarshall the data int psid = ntohl(fRequest.sendrcv.sid); int opt = ntohl(fRequest.sendrcv.opt); XrdOucString msg; // Find server session XrdProofdProofServ *xps = 0; if (!fPClient || !(xps = fPClient->GetServer(psid))) { XPDFORM(msg, "%s: session ID not found: %d", (Internal() ? "INT" : "EXT"), psid); TRACEP(this, XERR, msg.c_str()); response->Send(kXR_InvalidRequest, msg.c_str()); return 0; } // Message length int len = fRequest.header.dlen; if (!Internal()) { if (TRACING(HDBG)) { // Notify XPDFORM(msg, "EXT: sending %d bytes to proofserv (psid: %d, xps: %p, status: %d," " cid: %d)", len, psid, xps, xps->Status(), fCID); TRACEP(this, HDBG, msg.c_str()); } // Send to proofsrv our client ID if (fCID == -1) { TRACEP(this, REQ, "EXT: error getting clientSID"); response->Send(kXP_ServerError,"EXT: getting clientSID"); return 0; } if (SendData(xps, fCID)) { TRACEP(this, REQ, "EXT: error sending message to proofserv"); response->Send(kXP_reconnecting,"EXT: sending message to proofserv"); return 0; } // Notify to user response->Send(); } else { if (TRACING(HDBG)) { // Notify XPDFORM(msg, "INT: sending %d bytes to client/master (psid: %d, xps: %p, status: %d)", len, psid, xps, xps->Status()); TRACEP(this, HDBG, msg.c_str()); } bool saveStartMsg = 0; XrdSrvBuffer *savedBuf = 0; // Additional info about the message if (opt & kXPD_setidle) { TRACEP(this, DBG, "INT: setting proofserv in 'idle' state"); xps->SetStatus(kXPD_idle); PostSession(-1, fPClient->UI().fUser.c_str(), fPClient->UI().fGroup.c_str(), xps); } else if (opt & kXPD_querynum) { TRACEP(this, DBG, "INT: got message with query number"); } else if (opt & kXPD_startprocess) { TRACEP(this, DBG, "INT: setting proofserv in 'running' state"); xps->SetStatus(kXPD_running); PostSession(1, fPClient->UI().fUser.c_str(), fPClient->UI().fGroup.c_str(), xps); // Save start processing message for later clients xps->DeleteStartMsg(); saveStartMsg = 1; } else if (opt & kXPD_logmsg) { // We broadcast log messages only not idle to catch the // result from processing if (xps->Status() == kXPD_running) { TRACEP(this, DBG, "INT: broadcasting log message"); opt |= kXPD_fb_prog; } } bool fbprog = (opt & kXPD_fb_prog); if (!fbprog) { // // The message is strictly for the client requiring it if (SendData(xps, -1, &savedBuf, saveStartMsg) != 0) { response->Send(kXP_reconnecting, "SendMsg: INT: session is reconnecting: retry later"); return 0; } } else { // Send to all connected clients if (SendDataN(xps, &savedBuf, saveStartMsg) != 0) { response->Send(kXP_reconnecting, "SendMsg: INT: session is reconnecting: retry later"); return 0; } } // Save start processing messages, if required if (saveStartMsg) xps->SetStartMsg(savedBuf); if (TRACING(DBG)) { int ii = xps->SrvType(); if (ii > 3) ii = 3; if (ii < 0) ii = 4; XPDFORM(msg, "INT: message sent to %s (%d bytes)", crecv[ii], len); TRACEP(this, DBG, msg); } // Notify to proofsrv response->Send(); } // Over return 0; } //______________________________________________________________________________ int XrdProofdProtocol::Urgent() { // Handle generic request of a urgent message to be forwarded to the server XPDLOC(ALL, "Protocol::Urgent") unsigned int rc = 0; XPD_SETRESP(this, "Urgent"); // Unmarshall the data int psid = ntohl(fRequest.proof.sid); int type = ntohl(fRequest.proof.int1); int int1 = ntohl(fRequest.proof.int2); int int2 = ntohl(fRequest.proof.int3); TRACEP(this, REQ, "psid: "<GetServer(psid))) { TRACEP(this, XERR, "session ID not found: "<Send(kXR_InvalidRequest,"Urgent: session ID not found"); return 0; } TRACEP(this, DBG, "xps: "<Status()); // Check ID matching if (!xps->Match(psid)) { response->Send(kXP_InvalidRequest,"Urgent: IDs do not match - do nothing"); return 0; } // Check the link to the session if (!xps->Response()) { response->Send(kXP_InvalidRequest,"Urgent: session response object undefined - do nothing"); return 0; } // Prepare buffer int len = 3 *sizeof(kXR_int32); char *buf = new char[len]; // Type kXR_int32 itmp = static_cast(htonl(type)); memcpy(buf, &itmp, sizeof(kXR_int32)); // First info container itmp = static_cast(htonl(int1)); memcpy(buf + sizeof(kXR_int32), &itmp, sizeof(kXR_int32)); // Second info container itmp = static_cast(htonl(int2)); memcpy(buf + 2 * sizeof(kXR_int32), &itmp, sizeof(kXR_int32)); // Send over if (xps->Response()->Send(kXR_attn, kXPD_urgent, buf, len) != 0) { response->Send(kXP_ServerError, "Urgent: could not propagate request to proofsrv"); return 0; } // Notify to user response->Send(); TRACEP(this, DBG, "request propagated to proofsrv"); // Over return 0; } //___________________________________________________________________________ int XrdProofdProtocol::Interrupt() { // Handle an interrupt request XPDLOC(ALL, "Protocol::Interrupt") int rc = 0; XPD_SETRESP(this, "Interrupt"); // Unmarshall the data int psid = ntohl(fRequest.interrupt.sid); int type = ntohl(fRequest.interrupt.type); TRACEP(this, REQ, "psid: "<GetServer(psid))) { TRACEP(this, XERR, "session ID not found: "<Send(kXR_InvalidRequest,"Interrupt: session ID not found"); return 0; } if (xps) { // Check ID matching if (!xps->Match(psid)) { response->Send(kXP_InvalidRequest,"Interrupt: IDs do not match - do nothing"); return 0; } XrdOucString msg; XPDFORM(msg, "xps: %p, link ID: %s, proofsrv PID: %d", xps, xps->Response()->TraceID(), xps->SrvPID()); TRACEP(this, DBG, msg.c_str()); // Propagate the type as unsolicited if (xps->Response()->Send(kXR_attn, kXPD_interrupt, type) != 0) { response->Send(kXP_ServerError, "Interrupt: could not propagate interrupt code to proofsrv"); return 0; } // Notify to user response->Send(); TRACEP(this, DBG, "interrupt propagated to proofsrv"); } // Over return 0; } //___________________________________________________________________________ int XrdProofdProtocol::Ping() { // Handle a ping request. // For internal connections, ping is done asynchronously to avoid locking // problems; the session checker verifies that the admin file has been touched // recently enough; touching is done in Process2, so we have nothing to do here XPDLOC(ALL, "Protocol::Ping") int rc = 0; if (Internal()) { if (TRACING(HDBG)) { XPD_SETRESP(this, "Ping"); TRACEP(this, HDBG, "INT: nothing to do "); } return 0; } XPD_SETRESP(this, "Ping"); // Unmarshall the data int psid = ntohl(fRequest.sendrcv.sid); int asyncopt = ntohl(fRequest.sendrcv.opt); TRACEP(this, REQ, "psid: "< -1 && !(xps = fPClient->GetServer(psid)))) { TRACEP(this, XERR, "session ID not found: "<Send(kXR_InvalidRequest,"session ID not found"); return 0; } // For manager connections we are done kXR_int32 pingres = (psid > -1) ? 0 : 1; if (psid > -1 && xps && xps->IsValid()) { TRACEP(this, DBG, "EXT: psid: "<SessionMgr()->CheckFrequency(); // If asynchronous return the timeout for an answer if (asyncopt == 1) { TRACEP(this, DBG, "EXT: async: notifying timeout to client: "<Send(kXR_ok, checkfq); } // Admin path XrdOucString path(xps->AdminPath()); if (path.length() <= 0) { TRACEP(this, XERR, "EXT: admin path is empty! - protocol error"); if (asyncopt == 0) response->Send(kXP_ServerError, "EXT: admin path is empty! - protocol error"); return 0; } path += ".status"; // Current time int now = time(0); // Stat the admin file struct stat st0; if (stat(path.c_str(), &st0) != 0) { TRACEP(this, XERR, "EXT: cannot stat admin path: "<Send(kXP_ServerError, "EXT: cannot stat admin path"); return 0; } // Take the pid int pid = xps->SrvPID(); // If the session is alive ... if (XrdProofdAux::VerifyProcessByID(pid) != 0) { // If it as not touched during the last ~checkfq secs we ask for a refresh if ((now - st0.st_mtime) > checkfq - 5) { // Send the request (asking for further propagation) if (xps->VerifyProofServ(1) != 0) { TRACEP(this, XERR, "EXT: could not send verify request to proofsrv"); if (asyncopt == 0) response->Send(kXP_ServerError, "EXT: could not verify reuqest to proofsrv"); return 0; } // Wait for the action for checkfq secs, checking every 1 sec struct stat st1; int ns = checkfq; while (ns--) { if (stat(path.c_str(), &st1) == 0) { if (st1.st_mtime > st0.st_mtime) { pingres = 1; break; } } // Wait 1 sec TRACEP(this, DBG, "EXT: waiting "<Send(kXR_ok, pingres); } else { // Prepare buffer for asynchronous notification int len = sizeof(kXR_int32); char *buf = new char[len]; // Option kXR_int32 ifw = (kXR_int32)0; ifw = static_cast(htonl(ifw)); memcpy(buf, &ifw, sizeof(kXR_int32)); response->Send(kXR_attn, kXPD_ping, buf, len); } return 0; } else if (psid > -1) { // This is a failure for connections to sessions TRACEP(this, XERR, "session ID not found: "<Send(kXR_ok, pingres); // Done return 0; } //___________________________________________________________________________ void XrdProofdProtocol::PostSession(int on, const char *u, const char *g, XrdProofdProofServ *xps) { // Post change of session status XPDLOC(ALL, "Protocol::PostSession") // Tell the priority manager if (fgMgr && fgMgr->PriorityMgr()) { int pid = (xps) ? xps->SrvPID() : -1; if (pid < 0) { TRACE(XERR, "undefined session or process id"); return; } XrdOucString buf; XPDFORM(buf, "%d %s %s %d", on, u, g, pid); if (fgMgr->PriorityMgr()->Pipe()->Post(XrdProofdPriorityMgr::kChangeStatus, buf.c_str()) != 0) { TRACE(XERR, "problem posting the prority manager pipe"); } } // Tell the scheduler if (fgMgr && fgMgr->ProofSched()) { if (on == -1 && xps && xps->SrvType() == kXPD_TopMaster) { TRACE(DBG, "posting the scheduler pipe"); if (fgMgr->ProofSched()->Pipe()->Post(XrdProofSched::kReschedule, 0) != 0) { TRACE(XERR, "problem posting the scheduler pipe"); } } } // Tell the session manager if (fgMgr && fgMgr->SessionMgr()) { if (fgMgr->SessionMgr()->Pipe()->Post(XrdProofdProofServMgr::kChgSessionSt, 0) != 0) { TRACE(XERR, "problem posting the session manager pipe"); } } // Done return; } //___________________________________________________________________________ void XrdProofdProtocol::TouchAdminPath() { // Recording time of the last request on this instance XPDLOC(ALL, "Protocol::TouchAdminPath") XPD_SETRESPV(this, "TouchAdminPath"); TRACEP(this, HDBG, fAdminPath); if (fAdminPath.length() > 0) { int rc = 0; if ((rc = XrdProofdAux::Touch(fAdminPath.c_str())) != 0) { // In the case the file was not found and the connetion is internal // try also the terminated sessions, as the file could have been moved // in the meanwhile XrdOucString apath = fAdminPath; if (rc == -ENOENT && Internal()) { apath.replace("/activesessions/", "/terminatedsessions/"); apath.replace(".status", ""); rc = XrdProofdAux::Touch(apath.c_str()); } if (rc != 0 && rc != -ENOENT) { const char *type = Internal() ? "internal" : "external"; TRACEP(this, XERR, type<<": problems touching "<SrvType() != kXPD_Worker) { if (fgMgr->NetMgr()) { fgMgr->NetMgr()->BroadcastCtrlC(Client()->User()); } } } // Over return 0; }