#include "netcommunication.h" #include NetCommunication::NetCommunication(const CharPtrVector& funcNameArgs, int nh) { // pvmConst contains information about which tags and dataencoding to use pvmConst = new PVMConstants(); nHostInn = nh; int i; if (funcNameArgs.Size() <= 0) { cerr << "Must have name of the function to start on slaves\n"; exit(EXIT_FAILURE); } slaveProgram = new char[strlen(funcNameArgs[0]) + 1]; strcpy(slaveProgram, funcNameArgs[0]); numarg = funcNameArgs.Size() - 1; slaveArguments = new char*[numarg + 1]; for (i = 0; i < numarg; i++) { slaveArguments[i] = new char[strlen(funcNameArgs[i + 1]) + 1]; strcpy(slaveArguments[i], funcNameArgs[i + 1]); } slaveArguments[numarg] = NULL; numVar = -1; mytid = -1; nhost = 0; narch = 0; numProcesses = 0; numGoodProcesses = 0; NETSTARTED = 0; tids = NULL; status = NULL; ERROR = -1; SUCCESS = 1; likelihoodHJ = 0.0; likelihoodSA = 0.0; likelihoodBFGS = 0.0; convergedSA = 0; convergedHJ = 0; convergedBFGS = 0; maxNumHosts = 500; } NetCommunication::~NetCommunication() { int i; if (tids != NULL) delete[] tids; if (status != NULL) delete[] status; if (hostTids != NULL) delete[] hostTids; if (dataIDs != NULL) delete[] dataIDs; delete[] slaveProgram; for (i = 0; i < numarg; i++) delete[] slaveArguments[i]; delete[] slaveArguments; if (NETSTARTED == 1) stopNetCommunication(); delete pvmConst; } // ******************************************************** // Functions for starting and stopping netcommunication // ******************************************************** int NetCommunication::startPVM() { /* Þetta fall er afgreitt í bili! ATH: Þarf e.t.v. að skoða parametrana í MPI_Init. */ int info; // Held að það sé í lagi að hafa bara NULL hér... MPI_Init(NULL, NULL); if (mytid < 0) { MPI_Comm_rank(MPI_COMM_WORLD, &mytid); if (mytid < 0) { printErrorMsg("Error in netcommunication - MPI not started"); return ERROR; } int flag; flag = 0; // Checks whether MPI_Init has been called successfully. MPI_Initialized(&flag); if (!flag) { printErrorMsg("Error in netcommunication - MPI_Init has not been called!"); } // nhost á að vera 1 hérna MPI_Comm_size(MPI_COMM_WORLD, &nhost); tids = new int[maxNumHosts]; status = new int[maxNumHosts]; hostTids = new int[maxNumHosts]; //Added jongud dataIDs = new int[maxNumHosts]; //Added jongud if (nHostInn > 0) nhost = nHostInn; } return 1; } int NetCommunication::startNetCommunication() { /* Þetta fall er afgreitt í bili! */ int i, OK, info; if (NETSTARTED == 1 && mytid >= 0) { // have alredy enrolled in MPI and spawned program on slaves cerr << "Warning in netcommunication - already enrolled in MPI and running " << slaveProgram << " on slaves\n"; return SUCCESS; } else { if (numVar <= 0) { cerr << "Error in netcommunication - number of variables must be positive\n"; return ERROR; } OK = startPVM(); int* errcodes = new int[nhost]; if (OK == 1) { /* � PVM var notað pvm_catchout(stdin) til að fá output úr child processunum, í MPI setur maður flag á eftir mpirun til að fá sambærilega hegðun, þá búast til skrár fyrir hvert process sem skrifa út allt sem kemur úr stdout og stderr í þeim. */ MPI_Comm_spawn(slaveProgram, slaveArguments, nhost, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &intercomm, errcodes); int tidsCounter; for (i = 0; i < nhost; i++) { tidsCounter = i; if(errcodes[i] == 0) { numProcesses++; numGoodProcesses++; status[i] = 1; tids[i]=i; } else { cerr << "Error in netcommunication - unable to spawn process\n"; return ERROR; } } delete [] errcodes; } /* Þýðandinn var eitthvað að kvarta hérna... for (i = 0; i < nhost; i++) { //hosts to be monitored for deletion, suspension and resumption hostTids[i] = hostp[i].hi_tid; } */ if (OK == 1) { // Have started slaveProgram slaveArguments on all nhost hosts. // send initial info to all slave processes OK = startProcesses(); if (OK == 1) { NETSTARTED = 1; return SUCCESS; } else if (OK == -1) { return ERROR; } else { printErrorMsg("Error in netcommunication - unrecognised return value"); stopNetCommunication(); return ERROR; } } else if (OK == 0) { stopNetCommunication(); return ERROR; } else { printErrorMsg("Error in netcommunication - unrecognised return value"); stopNetCommunication(); return ERROR; } } } void NetCommunication::stopNetCommunication() { /* Fínt í bili. */ int i, tid, info, numTasks; int stopparam = -1; MPI_Comm_rank(MPI_COMM_WORLD, &tid); if (tid >= 0) { for(int i=0; igetStopTag(), intercomm); } MPI_Finalize(); } mytid = -1; NETSTARTED = 0; } int NetCommunication::startProcesses() { /* Þetta fall er afgreitt í bili! */ //Send number of variables, group name and number of processes to spawned processes int cansend = 1; int i, info; for (i = 0; i < nhost; i++) { // send initial message to all spawned processes cansend = sendInitialMessage(i); if (cansend == -1) { // Error occured in sending inital message to process with id = i printErrorMsg("Error in netcommunication - unable to send message"); return ERROR; } else if (cansend == 0) { printErrorMsg("Error in netcommunication - unable to send message"); status[i] = -1; return ERROR; } else if (cansend == 1) { status[i] = 1; } else { printErrorMsg("Error in netcommunication - unrecognised return value"); stopNetCommunication(); return ERROR; } } return SUCCESS; } int NetCommunication::sendInitialMessage(int id) { int OK, info; if (id < 0 || id >= nhost) { printErrorMsg("Error in netcommunication - invalid slave ID"); return 0; } // check if process with identity = id is up and running // spurning að sleppa þessu í bili, leyfum þessu að vera á meðan hitt er klárað. OK = checkProcess(id); if (OK == 1) { MPI_Comm parentcomm; MPI_Comm_get_parent( &parentcomm ); if(parentcomm == MPI_COMM_NULL) { MPI_Send(&numVar, 1, MPI_INT, id, pvmConst->getStartTag(), intercomm); MPI_Send(&id, 1, MPI_INT, id, pvmConst->getStartTag(), intercomm); } else { printErrorMsg("Error in netcommunication - slave calling master send"); stopNetCommunication(); } return SUCCESS; } else if (OK == -1) { printErrorMsg("Error in netcommunication - unable to check status"); stopNetCommunication(); return ERROR; } else if (OK == 0) { printErrorMsg("Error in netcommunication - unable to send initial message"); return OK; } else { printErrorMsg("Error in netcommunication - unrecognised return value"); stopNetCommunication(); return ERROR; } } int NetCommunication::checkProcess(int id) { /* Þetta fall er ekki að gera neitt, því það sendir adrei nein út með tagginu getTaskDiedTag() */ int info, bufID, recvTid, flag; MPI_Status stats, recvstats; assert(id >= 0); assert(id < numProcesses); //bufID = pvm_probe(tids[id], pvmConst->getTaskDiedTag()); // Non-blocking probe which checks for a message with this tag, if there is no message then // flag is false, otherwise it is true, then something is maybe wrong with the process! // ATH: Þetta flag mun líklega alltaf vera false... það tékkar bara strax og heldur svo // áfram, þetta virkar eins og pvm_probe, svo þetta ætti að vera í lagi hér. // Held samt að það sé irrelevant að vera með þetta checkprocess núna, það á allt að // vera í lagi... bufID = MPI_Iprobe(id, pvmConst->getTaskDiedTag(), intercomm, &flag, &stats); if (flag == true) { // message has arrived from tids[id] that has halted //info = pvm_recv(tids[id], pvmConst->getTaskDiedTag()); // Blocking receive-message for bookkeeping of status of the process. MPI_Recv(&recvTid, 1, MPI_INT, stats.MPI_SOURCE, pvmConst->getTaskDiedTag(), intercomm, &recvstats); //if (info < 0) { // printErrorMsg("Error in netcommunication - unable to check process"); // return ERROR; //} //info = pvm_upkint(&recvTid, 1, 1); //if (info < 0) { // printErrorMsg("Error in netcommunication - unable to check process"); // return ERROR; //} if (recvTid != tids[id]) return ERROR; status[id] = -1; numGoodProcesses--; return 0; } else { return SUCCESS; } } void NetCommunication::checkProcesses() { /* Þetta fall er komið í bili. Þetta fall erl íka í raun óþarfi... */ int i, info, tidDown, flag; MPI_Request req; MPI_Status nonb; MPI_Irecv(&tidDown,1,MPI_INT,MPI_ANY_SOURCE,pvmConst->getTaskDiedTag(),intercomm,&req); MPI_Test(&req, &flag, &nonb); while (flag == true) { // got message that task is down, receive it i = 0; while ((tids[i] != tidDown) && (i < numProcesses)) i++; assert((i >= 0) && (i < numProcesses)); status[i] = -1; numGoodProcesses--; MPI_Irecv (&tidDown,1,MPI_INT,MPI_ANY_SOURCE,pvmConst->getTaskDiedTag(),intercomm,&req); MPI_Test(&req, &flag, &nonb); } } void NetCommunication::getHealthOfProcesses(int* procTids) { /* Þetta fall er afgreitt. */ checkProcesses(); int i; for (i = 0; i < numProcesses; i++) procTids[i] = status[i]; } // ******************************************************** // Functions for sending and receiving messages // ******************************************************** int NetCommunication::sendData(const ParameterVector& sendP) { /* Komið í bili, þarf samt að skoða MPI_PACK eða eitthvað álíka til að raða inn í buffer og senda strengina, gæti verið að maður þurfi þá að pakka int með sem er lengd char fylkisins. Það er samt bara kallað á þetta fall einu sinni í byrjun til að senda switches, svo að það ætti að vera í lagi. !!! ATH !!! Passa að allir sem kalla á þetta sendi communicator !!! !!! */ // must absolutely check if this is possible or can not delete // stringValue now.}}}}}}}}}}}}}x int i, info; char** stringValue; if (NETSTARTED == 1) { stringValue = new char*[numVar]; for (i = 0; i < numVar; i++) { stringValue[i] = new char(strlen(sendP[i].getName())+1); strcpy(stringValue[i], sendP[i].getName()); // This was done with pvm_mcast in the old version, it works similar to this, but it might be // broadcasting the data via a tree structure, this should not create too much overhead. for(int j = 0; jgetMasterSendStringTag(), intercomm); } }; assert(sendP.Size() >= numVar); for (i = 0; i < numVar; i++) delete [] stringValue[i]; delete [] stringValue; if (info < 0) { printErrorMsg("Error in netcommunication - unable to send data"); stopNetCommunication(); return ERROR; } else return SUCCESS; } else { printErrorMsg("Error in netcommunication - unable to send data"); return ERROR; } } int NetCommunication::sendData(const ParameterVector& sendP, int processID) { /* Búið í bili... */ int i, info; char** stringValue; if (NETSTARTED == 1) { stringValue = new char*[numVar]; for (i = 0; i < numVar; i++) { stringValue[i] = new char(strlen(sendP[i].getName())+1); strcpy(stringValue[i], sendP[i].getName()); // This could be causing some overhead, could consider packing it in a buffer // before I send it, like the old pvm version, let's see if this works ok. // I think this function is only called once. MPI_Send(stringValue[i],strlen(stringValue[i])+1, MPI_BYTE,tids[processID],pvmConst->getMasterSendStringTag(),intercomm); }; assert(sendP.Size() >= numVar); for (i = 0; i < numVar; i++) delete [] stringValue[i]; delete [] stringValue; return SUCCESS; } else { printErrorMsg("Error in netcommunication - unable to send data"); return ERROR; } } int NetCommunication::sendBoundData(const DoubleVector& sendP) { /* Komið í bili... */ int i, info; double* temp; if (NETSTARTED == 1) { temp = new double[numVar]; for (i = 0; i < numVar; i++) temp[i] = sendP[i]; for(int j = 0; j< nhost; j++) { // This was originally done with pvm_mcast, question if this is causing overhead. MPI_Send(temp, numVar , MPI_DOUBLE,j,pvmConst->getMasterSendBoundTag(),intercomm); } delete[] temp; return SUCCESS; } else { printErrorMsg("Error in netcommunication - unable to send data"); return ERROR; } } int NetCommunication::sendBoundData(const DoubleVector& sendP, int processID) { /* Komið í bili! */ int i, info; double* temp; if (NETSTARTED == 1) { temp = new double[numVar]; for (i = 0; i < numVar; i++) temp[i] = sendP[i]; MPI_Send(temp,numVar, MPI_DOUBLE,tids[processID],pvmConst->getMasterSendBoundTag(),intercomm); delete[] temp; return SUCCESS; } else { printErrorMsg("Error in netcommunication - unable to send data"); return ERROR; } } int NetCommunication::sendData(NetDataVariables* sendP, int processID) { /* Komið í bili! */ int info; int cansend = 1; assert(processID >= 0); assert(processID < numProcesses); if (NETSTARTED == 1) { // check is process with id = processID is up and running cansend = checkProcess(processID); if (cansend == -1) { printErrorMsg("Error in netcommunication - invalid process ID"); stopNetCommunication(); return ERROR; } else if (cansend == 0) { //process with id = processID is not up and running return cansend; } else if (cansend == 1) { MPI_Send(&sendP->tag,1,MPI_INT, tids[processID],pvmConst->getMasterSendVarTag(),intercomm); MPI_Send(&sendP->x_id,1,MPI_INT, tids[processID],pvmConst->getMasterSendVarTag(),intercomm); MPI_Send(sendP->x,numVar,MPI_DOUBLE, tids[processID],pvmConst->getMasterSendVarTag(),intercomm); return SUCCESS; } else { printErrorMsg("Error in netcommunication - unable to send data"); stopNetCommunication(); return ERROR; } } else { printErrorMsg("Error in netcommunication - unable to send data"); return ERROR; } } int NetCommunication::receiveData(NetDataResult* rp) { /* Komið í bili... Þarf að passa að kasta villu ef einhver af þessum nær ekki að receive-a, nota kannski MPI_Probe... */ int info; MPI_Status status, status2; if (NETSTARTED == 1) { MPI_Recv(&rp->tag, 1, MPI_INT, MPI_ANY_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status); MPI_Recv(&rp->result, 1, MPI_DOUBLE, status.MPI_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status2); MPI_Recv(&rp->who, 1, MPI_INT, status.MPI_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status2); MPI_Recv(&rp->x_id, 1, MPI_INT, status.MPI_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status2); cout << "Skrifa result úr netcomm: " << rp->result << "\n"; return SUCCESS; } else { printErrorMsg("Error in netcommunication - unable to receive data"); return ERROR; } } // ******************************************************** // Functions which set/return information about netcommunication // ******************************************************** int NetCommunication::getNumHosts() { return nhost; } int NetCommunication::getNumProcesses() { return numProcesses; } int NetCommunication::getNumVar() { return numVar; } int NetCommunication::getNumRunningProcesses() { return numGoodProcesses; } int NetCommunication::netCommStarted() { return NETSTARTED; } void NetCommunication::setNumInSendVar(int nVar) { if (nVar <= 0) { cerr << "Error in netcommunication - number of variables must be positive\n"; exit(EXIT_FAILURE); } numVar = nVar; } void NetCommunication::printErrorMsg(const char* errorMsg) { /* Eina fallið sem ég virðist þurfa að eiga eitthvað við hér... */ char* msg; msg = new char[strlen(errorMsg) + 1]; strcpy(msg, errorMsg); // �kvað að gera þetta svona, vona að þetta flood-i ekki command line... cout << msg << "\n"; delete[] msg; cerr << errorMsg << endl; } int NetCommunication::netError() { return ERROR; } int NetCommunication::netSuccess() { return SUCCESS; } MasterCommunication::MasterCommunication(CommandLineInfo* info) : NetCommunication(info->getFunction(), info->getNumProc()) { int wait = info->getWaitMaster(); tmout = new timeval; if (wait == -1) tmout = NULL; else if (wait >= 0) { tmout->tv_sec = wait; tmout->tv_usec = 0; } else { cerr << "Error in netcommunication - invalid value for wait " << wait << "\n"; exit(EXIT_FAILURE); } } MasterCommunication::~MasterCommunication() { delete tmout; } int MasterCommunication::receiveData(NetDataResult* rp) { /* Komið í bili... Þarf að passa að kasta villu ef einhver af þessum nær ekki að receive-a, nota kannski MPI_Probe, þetta var gert með Timeout receive í gömlu útgáfunni... */ int info; MPI_Status status, status2; if (NETSTARTED == 1) { MPI_Recv(&rp->tag, 1, MPI_INT, MPI_ANY_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status); MPI_Recv(&rp->result, 1, MPI_DOUBLE, status.MPI_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status2); MPI_Recv(&rp->who, 1, MPI_INT, status.MPI_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status2); MPI_Recv(&rp->x_id, 1, MPI_INT, status.MPI_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status2); return SUCCESS; } else { printErrorMsg("Error in netcommunication - unable to receive data"); return ERROR; } }