Log In | Get Help   
Home My Page Projects Code Snippets Project Openings Mareframe
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files
[mareframe] View of /trunk/paramin-beta/netcommunication.cc
[mareframe] / trunk / paramin-beta / netcommunication.cc Repository:
ViewVC logotype

View of /trunk/paramin-beta/netcommunication.cc

Parent Directory Parent Directory | Revision Log Revision Log


Revision 8 - (download) (annotate)
Mon May 11 11:12:20 2015 UTC (9 years ago) by ulcessvp
File size: 19049 byte(s)
Cambiado MPI_CHAR -> MPI_BYTE
#include "netcommunication.h"
#include <iostream>

NetCommunication::NetCommunication(const CharPtrVector& funcNameArgs, int nh) {
  	// pvmConst contains information about which tags and dataencoding to use
  	pvmConst = new PVMConstants();
  	nHostInn = nh;
  	int i;
  	if (funcNameArgs.Size() <= 0) 
	{
    	cerr << "Must have name of the function to start on slaves\n";
    	exit(EXIT_FAILURE);
  	}
  	slaveProgram = new char[strlen(funcNameArgs[0]) + 1];
  	strcpy(slaveProgram, funcNameArgs[0]);
  	numarg = funcNameArgs.Size() - 1;
  	slaveArguments = new char*[numarg + 1];
  	for (i = 0; i < numarg; i++) 
	{
    	slaveArguments[i] = new char[strlen(funcNameArgs[i + 1]) + 1];
    	strcpy(slaveArguments[i], funcNameArgs[i + 1]);
  	}
  	slaveArguments[numarg] = NULL;
	
	numVar = -1;
	mytid = -1;
	nhost = 0;
	narch = 0;
	numProcesses = 0;
	numGoodProcesses = 0;
	NETSTARTED = 0;
	tids = NULL;
	status = NULL;
	ERROR = -1;
	SUCCESS = 1;
	
	likelihoodHJ = 0.0;
	likelihoodSA = 0.0;
	likelihoodBFGS = 0.0;
	convergedSA = 0;
	convergedHJ = 0;
	convergedBFGS = 0;
	maxNumHosts = 500;
}

NetCommunication::~NetCommunication() 
{
  	int i;
  	if (tids != NULL)
    	delete[] tids;
  	if (status != NULL)
    	delete[] status;
  	if (hostTids != NULL)
    	delete[] hostTids;
  	if (dataIDs != NULL)
    	delete[] dataIDs;
  	delete[] slaveProgram;
  	for (i = 0; i < numarg; i++)
    	delete[] slaveArguments[i];
  	delete[] slaveArguments;
  	if (NETSTARTED == 1)
    	stopNetCommunication();
  	delete pvmConst;
}

// ********************************************************
// Functions for starting and stopping netcommunication
// ********************************************************
int NetCommunication::startPVM() 
{
	/*
		Þetta fall er afgreitt í bili!
		ATH: Þarf e.t.v. að skoða parametrana í MPI_Init.
	*/
	int info;
  	// Held að það sé í lagi að hafa bara NULL hér...
  	MPI_Init(NULL, NULL);
	if (mytid < 0) 
	{
    	MPI_Comm_rank(MPI_COMM_WORLD, &mytid);
    	if (mytid < 0) 
		{
      		printErrorMsg("Error in netcommunication - MPI not started");
      		return ERROR;
    	}
	
		int flag;
		flag = 0;
		// Checks whether MPI_Init has been called successfully.
		MPI_Initialized(&flag);
		if (!flag)
		{
			printErrorMsg("Error in netcommunication - MPI_Init has not been called!");
		}
		// nhost á að vera 1 hérna
		MPI_Comm_size(MPI_COMM_WORLD, &nhost);

    	tids = new int[maxNumHosts];
    	status = new int[maxNumHosts];
    	hostTids = new int[maxNumHosts];  //Added jongud
    	dataIDs = new int[maxNumHosts];   //Added jongud

    	if (nHostInn > 0)
      		nhost = nHostInn;
	}
	return 1;
}

int NetCommunication::startNetCommunication() 
{
	/*
		Þetta fall er afgreitt í bili!
	*/
	int i, OK, info;
  	if (NETSTARTED == 1 && mytid >= 0) 
	{
    	// have alredy enrolled in MPI and spawned program on slaves
    	cerr << "Warning in netcommunication - already enrolled in MPI and running " << slaveProgram << " on slaves\n";
    	return SUCCESS;
	} 
	else 
	{
    	if (numVar <= 0) 
		{
      		cerr << "Error in netcommunication - number of variables must be positive\n";
      		return ERROR;
    	}
    	OK = startPVM();
		int* errcodes = new int[nhost];
		if (OK == 1) 
		{
			/* 
				� PVM var notað pvm_catchout(stdin) til að fá output úr
				child processunum, í MPI setur maður flag á eftir mpirun
				til að fá sambærilega hegðun, þá búast til skrár fyrir hvert
				process sem skrifa út allt sem kemur úr stdout og stderr í þeim. 
			*/
			MPI_Comm_spawn(slaveProgram, slaveArguments, nhost, MPI_INFO_NULL, 
	          	0, MPI_COMM_WORLD, &intercomm, errcodes);
			int tidsCounter;
			for (i = 0; i < nhost; i++)
			{
				tidsCounter = i;
				if(errcodes[i] == 0)
				{
					numProcesses++;
					numGoodProcesses++;
					status[i] = 1;
					tids[i]=i;
				}
				else
				{
					cerr << "Error in netcommunication - unable to spawn process\n";
					return ERROR;
				}
			}
			delete [] errcodes;
		}

		/*
		Þýðandinn var eitthvað að kvarta hérna...
    	for (i = 0; i < nhost; i++) 
		{
      		
			//hosts to be monitored for deletion, suspension and resumption
      		hostTids[i] = hostp[i].hi_tid;
    	}

		*/
    	if (OK == 1) 
		{
      		// Have started slaveProgram slaveArguments on all nhost hosts.
      		// send initial info to all slave processes
      		OK = startProcesses();
      		if (OK == 1) 
			{
        		NETSTARTED = 1;
        		return SUCCESS;
      		} 
			else if (OK == -1) 
			{
        		return ERROR;
      		} 
			else 
			{
        		printErrorMsg("Error in netcommunication - unrecognised return value");
        		stopNetCommunication();
        		return ERROR;
      		}

    	} 
		else if (OK == 0) 
		{
      		stopNetCommunication();
      		return ERROR;
    	} 
		else 
		{
      		printErrorMsg("Error in netcommunication - unrecognised return value");
      		stopNetCommunication();
      		return ERROR;
    	}
  	}
}

void NetCommunication::stopNetCommunication() 
{
	/*
		Fínt í bili.
	*/
	int i, tid, info, numTasks;
  	int stopparam = -1;
	
  	MPI_Comm_rank(MPI_COMM_WORLD, &tid);
  	if (tid >= 0) 
	{
		for(int i=0; i<nhost; i++)
		{
			MPI_Send(&stopparam, 1, MPI_INT, i, pvmConst->getStopTag(), intercomm);
		}
		MPI_Finalize();
  	}
  	mytid = -1;
  	NETSTARTED = 0;
}

int NetCommunication::startProcesses() 
{
	/*
		Þetta fall er afgreitt í bili!
	*/
  //Send number of variables, group name and number of processes to spawned processes
  	int cansend = 1;
  	int i, info;

  	for (i = 0; i < nhost; i++) 
	{
    	// send initial message to all spawned processes
    	cansend = sendInitialMessage(i);
    	if (cansend == -1) 
		{
      		// Error occured in sending inital message to process with id = i
      		printErrorMsg("Error in netcommunication - unable to send message");
      		return ERROR;
		} 
		else if (cansend == 0) 
		{
      		printErrorMsg("Error in netcommunication - unable to send message");
      		status[i] = -1;
      		return ERROR;
    	} 
		else if (cansend == 1) 
		{
      		status[i] = 1;
    	} 
		else 
		{
      		printErrorMsg("Error in netcommunication - unrecognised return value");
      		stopNetCommunication();
      		return ERROR;
    	}
  	}
  	return SUCCESS;
}

int NetCommunication::sendInitialMessage(int id) 
{
  	int OK, info;
	
  	if (id < 0 || id >= nhost) 
	{
    	printErrorMsg("Error in netcommunication - invalid slave ID");
    	return 0;
  	}

  	// check if process with identity = id is up and running
  	// spurning að sleppa þessu í bili, leyfum þessu að vera á meðan hitt er klárað.
  	OK = checkProcess(id);
  	if (OK == 1) 
	{
		MPI_Comm parentcomm;
		MPI_Comm_get_parent( &parentcomm );
		if(parentcomm == MPI_COMM_NULL)
		{
			MPI_Send(&numVar, 1, MPI_INT, id, pvmConst->getStartTag(), intercomm);
			MPI_Send(&id, 1, MPI_INT, id, pvmConst->getStartTag(), intercomm);
		}
		else
		{
			printErrorMsg("Error in netcommunication - slave calling master send");
	    	stopNetCommunication();
		}
    	return SUCCESS;

  	} 
	else if (OK == -1) 
	{
    	printErrorMsg("Error in netcommunication - unable to check status");
    	stopNetCommunication();
    	return ERROR;
  	} 
	else if (OK == 0) 
	{
    	printErrorMsg("Error in netcommunication - unable to send initial message");
    	return OK;
  	} 
	else 
	{
    	printErrorMsg("Error in netcommunication - unrecognised return value");
    	stopNetCommunication();
    	return ERROR;
  	}
}

int NetCommunication::checkProcess(int id) {
	/*
		Þetta fall er ekki að gera neitt, því það sendir adrei nein
		út með tagginu getTaskDiedTag()
	*/
  	int info, bufID, recvTid, flag;
  	MPI_Status stats, recvstats;
  	assert(id >= 0);
  	assert(id < numProcesses);

  	//bufID = pvm_probe(tids[id], pvmConst->getTaskDiedTag());

  	// Non-blocking probe which checks for a message with this tag, if there is no message then
  	// flag is false, otherwise it is true, then something is maybe wrong with the process!
	
	// ATH: Þetta flag mun líklega alltaf vera false... það tékkar bara strax og heldur svo 
	// áfram, þetta virkar eins og pvm_probe, svo þetta ætti að vera í lagi hér.
	// Held samt að það sé irrelevant að vera með þetta checkprocess núna, það á allt að
	// vera í lagi... 
	bufID = MPI_Iprobe(id, pvmConst->getTaskDiedTag(), intercomm, &flag, &stats);
  	if (flag == true) {
    // message has arrived from tids[id] that has halted
    //info = pvm_recv(tids[id], pvmConst->getTaskDiedTag());

	// Blocking receive-message for bookkeeping of status of the process.
		MPI_Recv(&recvTid, 1, MPI_INT, stats.MPI_SOURCE, pvmConst->getTaskDiedTag(), intercomm, &recvstats);
  	//if (info < 0) {
    //  printErrorMsg("Error in netcommunication - unable to check process");
    //  return ERROR;
    //}

    //info = pvm_upkint(&recvTid, 1, 1);
    //if (info < 0) {
    //  printErrorMsg("Error in netcommunication - unable to check process");
    //  return ERROR;
    //}
	
	
    if (recvTid != tids[id])
      	return ERROR;

    status[id] = -1;
    numGoodProcesses--;
    return 0;

  	} 
  	else 
	{
    	return SUCCESS;
  	}
}

void NetCommunication::checkProcesses() {
	/*
		Þetta fall er komið í bili.
		Þetta fall erl íka í raun óþarfi...
	*/
  int i, info, tidDown, flag;
  MPI_Request req;
  MPI_Status nonb;
  MPI_Irecv(&tidDown,1,MPI_INT,MPI_ANY_SOURCE,pvmConst->getTaskDiedTag(),intercomm,&req);
  MPI_Test(&req, &flag, &nonb);
  while (flag == true) {
    // got message that task is down, receive it
    i = 0;
    while ((tids[i] != tidDown) && (i < numProcesses))
      i++;

    assert((i >= 0) && (i < numProcesses));
    status[i] = -1;
    numGoodProcesses--;
	MPI_Irecv (&tidDown,1,MPI_INT,MPI_ANY_SOURCE,pvmConst->getTaskDiedTag(),intercomm,&req);
	MPI_Test(&req, &flag, &nonb);
  }
}

void NetCommunication::getHealthOfProcesses(int* procTids) {
	/*
		Þetta fall er afgreitt.
		
	*/
  checkProcesses();
  int i;
  for (i = 0; i < numProcesses; i++)
    procTids[i] = status[i];
}

// ********************************************************
// Functions for sending and receiving messages
// ********************************************************
int NetCommunication::sendData(const ParameterVector& sendP) {
	/*
		Komið í bili, þarf samt að skoða MPI_PACK eða eitthvað álíka til að 
		raða inn í buffer og senda strengina, gæti verið að maður þurfi þá að
		pakka int með sem er lengd char fylkisins. Það er samt bara kallað á þetta
		fall einu sinni í byrjun til að senda switches, svo að það ætti að vera í lagi.
		!!! ATH !!!
		Passa að allir sem kalla á þetta sendi communicator !!!
		!!!
	*/
    // must absolutely check if this is possible or can not delete
    // stringValue now.}}}}}}}}}}}}}x
    int i, info;
  char** stringValue;
  if (NETSTARTED == 1) {
      stringValue = new char*[numVar];
      for (i = 0; i < numVar; i++) {
	  	stringValue[i] = new char(strlen(sendP[i].getName())+1);
        strcpy(stringValue[i], sendP[i].getName());
		// This was done with pvm_mcast in the old version, it works similar to this, but it might be
		// broadcasting the data via a tree structure, this should not create too much overhead.
		for(int j = 0; j<nhost; j++)
		{
			MPI_Send(stringValue[i],strlen(stringValue[i])+1, MPI_BYTE, j, pvmConst->getMasterSendStringTag(), intercomm);
		}
      };
    assert(sendP.Size() >= numVar);
    
	for (i = 0; i < numVar; i++)
	delete [] stringValue[i];
    delete [] stringValue;
    if (info < 0) {
      printErrorMsg("Error in netcommunication - unable to send data");
      stopNetCommunication();
      return ERROR;
    } else
      return SUCCESS;

  } else {
    printErrorMsg("Error in netcommunication - unable to send data");
    return ERROR;
  }
}

int NetCommunication::sendData(const ParameterVector& sendP, int processID) {
	/*
		Búið í bili...
	*/
	int i, info;
  	char** stringValue;
  	if (NETSTARTED == 1) 
	{
      	stringValue = new char*[numVar];
      	for (i = 0; i < numVar; i++) 
		{
	  		stringValue[i] = new char(strlen(sendP[i].getName())+1);
        	strcpy(stringValue[i], sendP[i].getName());
			// This could be causing some overhead, could consider packing it in a buffer
			// before I send it, like the old pvm version, let's see if this works ok.
			// I think this function is only called once.
			MPI_Send(stringValue[i],strlen(stringValue[i])+1, MPI_BYTE,tids[processID],pvmConst->getMasterSendStringTag(),intercomm);
      	};
    	assert(sendP.Size() >= numVar);

    	for (i = 0; i < numVar; i++)
			delete [] stringValue[i];
    	delete [] stringValue;
		return SUCCESS;
  	} 
	else 
	{
    	printErrorMsg("Error in netcommunication - unable to send data");
    	return ERROR;
  	}
}

int NetCommunication::sendBoundData(const DoubleVector& sendP) 
{
	/*
		Komið í bili...
	*/
  	int i, info;
  	double* temp;

  	if (NETSTARTED == 1) 
	{
    	temp = new double[numVar];
    	for (i = 0; i < numVar; i++)
      		temp[i] = sendP[i];
		for(int j = 0; j< nhost; j++)
		{
			// This was originally done with pvm_mcast, question if this is causing overhead.
			MPI_Send(temp, numVar , MPI_DOUBLE,j,pvmConst->getMasterSendBoundTag(),intercomm);
		}
    	delete[] temp;
		return SUCCESS;
  	} 
	else 
	{
    	printErrorMsg("Error in netcommunication - unable to send data");
    	return ERROR;
  	}
}

int NetCommunication::sendBoundData(const DoubleVector& sendP, int processID) 
{
	/*
		Komið í bili!
	*/
  	int i, info;
  	double* temp;

  	if (NETSTARTED == 1) 
	{
    	temp = new double[numVar];
    	for (i = 0; i < numVar; i++)
      		temp[i] = sendP[i];

		MPI_Send(temp,numVar, MPI_DOUBLE,tids[processID],pvmConst->getMasterSendBoundTag(),intercomm); 
    	delete[] temp;
    	return SUCCESS;
  	} 
	else 
	{
    	printErrorMsg("Error in netcommunication - unable to send data");
    	return ERROR;
  	}
}

int NetCommunication::sendData(NetDataVariables* sendP, int processID) 
{
	/*
		Komið í bili!
	*/
  	int info;
  	int cansend = 1;
  	assert(processID >= 0);
  	assert(processID < numProcesses);

  	if (NETSTARTED == 1) 
	{
    	// check is process with id = processID is up and running
    	cansend = checkProcess(processID);
    	if (cansend == -1) 
		{
      		printErrorMsg("Error in netcommunication - invalid process ID");
      		stopNetCommunication();
      		return ERROR;

    	} 
		else if (cansend == 0) 
		{
      		//process with id = processID is not up and running
      		return cansend;

    	} 
		else if (cansend == 1) 
		{
			MPI_Send(&sendP->tag,1,MPI_INT, tids[processID],pvmConst->getMasterSendVarTag(),intercomm);
			MPI_Send(&sendP->x_id,1,MPI_INT, tids[processID],pvmConst->getMasterSendVarTag(),intercomm);
			MPI_Send(sendP->x,numVar,MPI_DOUBLE, tids[processID],pvmConst->getMasterSendVarTag(),intercomm);
      		return SUCCESS;
		} 
		else 
		{
      		printErrorMsg("Error in netcommunication - unable to send data");
      		stopNetCommunication();
      		return ERROR;
    	}

 	} 
	else 
	{
    	printErrorMsg("Error in netcommunication - unable to send data");
    	return ERROR;
  	}
}

int NetCommunication::receiveData(NetDataResult* rp) 
{
	/*
		Komið í bili...
		Þarf að passa að kasta villu ef einhver af þessum nær ekki að receive-a, nota kannski
		MPI_Probe...
	*/
  	int info;
	MPI_Status status, status2;

  	if (NETSTARTED == 1) 
	{
		MPI_Recv(&rp->tag, 1, MPI_INT, MPI_ANY_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status);
		MPI_Recv(&rp->result, 1, MPI_DOUBLE, status.MPI_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status2);
		MPI_Recv(&rp->who, 1, MPI_INT, status.MPI_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status2);
		MPI_Recv(&rp->x_id, 1, MPI_INT, status.MPI_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status2);
		cout << "Skrifa result úr netcomm: " << rp->result << "\n";
    	return SUCCESS;
  	} 
	else 
	{
    	printErrorMsg("Error in netcommunication - unable to receive data");
    	return ERROR;
  	}
}

// ********************************************************
// Functions which set/return information about netcommunication
// ********************************************************
int NetCommunication::getNumHosts() 
{
  	return nhost;
}

int NetCommunication::getNumProcesses() 
{
  	return numProcesses;
}

int NetCommunication::getNumVar() 
{
  	return numVar;
}

int NetCommunication::getNumRunningProcesses() 
{
  	return numGoodProcesses;
}

int NetCommunication::netCommStarted() 
{
  	return NETSTARTED;
}

void NetCommunication::setNumInSendVar(int nVar) 
{
  	if (nVar <= 0) 
	{
    	cerr << "Error in netcommunication - number of variables must be positive\n";
    	exit(EXIT_FAILURE);
  	}
  	numVar = nVar;
}

void NetCommunication::printErrorMsg(const char* errorMsg) 
{
	/*
		Eina fallið sem ég virðist þurfa að eiga eitthvað við hér...
	*/
  	char* msg;
  	msg = new char[strlen(errorMsg) + 1];
  	strcpy(msg, errorMsg);
	// �kvað að gera þetta svona, vona að þetta flood-i ekki command line...
	cout << msg << "\n";
  	delete[] msg;
  	cerr << errorMsg << endl;
}

int NetCommunication::netError() {
  return ERROR;
}

int NetCommunication::netSuccess() {
  return SUCCESS;
}

MasterCommunication::MasterCommunication(CommandLineInfo* info)
  : NetCommunication(info->getFunction(), info->getNumProc()) 
{
  	int wait = info->getWaitMaster();
  	tmout = new timeval;
  	if (wait == -1)
    	tmout = NULL;
  	else if (wait >= 0) 
	{
    	tmout->tv_sec = wait;
    	tmout->tv_usec = 0;
  	} 
	else 
	{
    	cerr << "Error in netcommunication - invalid value for wait " << wait << "\n";
    	exit(EXIT_FAILURE);
  	}
}

MasterCommunication::~MasterCommunication() 
{
  	delete tmout;
}

int MasterCommunication::receiveData(NetDataResult* rp) 
{
	/*
		Komið í bili...
		Þarf að passa að kasta villu ef einhver af þessum nær ekki að receive-a, nota kannski
		MPI_Probe, þetta var gert með Timeout receive í gömlu útgáfunni...
	*/
  	int info;
	MPI_Status status, status2;

  	if (NETSTARTED == 1) 
	{
		MPI_Recv(&rp->tag, 1, MPI_INT, MPI_ANY_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status);
		MPI_Recv(&rp->result, 1, MPI_DOUBLE, status.MPI_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status2);
		MPI_Recv(&rp->who, 1, MPI_INT, status.MPI_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status2);
		MPI_Recv(&rp->x_id, 1, MPI_INT, status.MPI_SOURCE, pvmConst->getMasterReceiveDataTag(), intercomm, &status2);
    	return SUCCESS;
  	} 
	else 
	{
    	printErrorMsg("Error in netcommunication - unable to receive data");
    	return ERROR;
  	}
}

root@forge.cesga.es
ViewVC Help
Powered by ViewVC 1.0.0  

Powered By FusionForge