#include "netinterface.h"
// ********************************************************
// Functions for sending/receiving
// ********************************************************
void NetInterface::sendOne() {
int cansend, tid, dataid;
//tid = pManager->getNextTidToSend(net);
tid = pManager->getNextTidToSend(dctrl->getNumLeftToSend(), net);
while (tid == waitForBetterProcesses())
tid = pManager->getNextTidToSend(dctrl->getNumLeftToSend(), net);
if (tid < 0) {
cerr << "Error in netinterface - no processes available\n";
stopNetComm();
exit(EXIT_FAILURE);
}
assert(!dataSet->isEmpty());
dataid = dataSet->get();
cansend = sendOne(tid, dataid);
if (cansend != 1) {
// could not send data with id = dataid sendDataso update queue
dataSet->putFirst(dataid);
cerr << "Error in netinterface - failed to send data\n";
stopNetComm();
exit(EXIT_FAILURE);
}
}
int NetInterface::sendOne(int processID, int x_id) {
// Function returns ERROR if error occures while trying to send
// Function returns SUCCESS if successfully sent data
// Function return 0 if process with process identity = processID can not be used.
int i, cansend;
// Vector vec;
DoubleVector vecSend;
if (dctrl == NULL) {
cerr << "Error in netinterface - no valid datagroup\n";
exit(EXIT_FAILURE);
}
if (processID < 0 || (processID >= net->getNumProcesses())) {
cerr << "Error in netinterface - invalid process id\n";
stopNetComm();
return net->netError();
}
if (x_id < 0 || (x_id >= dctrl->getTotalSet())) {
cerr << "Error in netinterface - invalid vector id\n";
stopNetComm();
return net->netError();
}
// prepare data to be sent
assert(numVarToSend > 0);
NetDataVariables* sp = new NetDataVariables(numVarToSend);
// vec = dctrl->getX(x_id);
vecSend = prepareVectorToSend(dctrl->getX(x_id));
for (i = 0; i < numVarToSend; i++) {
sp->x[i] = vecSend[i];
}
sp->x_id = x_id;
sp->tag = dctrl->getTag();
cansend = net->sendData(sp, processID);
delete sp;
if (cansend == 1) {
//successfully sent sp using processID
dctrl->sentOne(x_id);
pManager->sent(processID);
return net->netSuccess();
} else if (cansend == -1) {
cerr << "Error in netinterface - failed to send data\n";
return net->netError();
} else if (cansend == 0) {
// process with id = processID is down
pManager->processDown(processID);
return cansend;
} else {
cerr << "Error in netinterface - unrecognised return value\n";
stopNetComm();
return net->netError();
}
}
int NetInterface::resend() {
int i, canresend;
int tid = -1;
int sendID;
// Vector vec;
DoubleVector vecSend;
if (dctrl == NULL) {
cerr << "Error in netinterface - no valid datagroup\n";
exit(EXIT_FAILURE);
}
if (!pManager->isFreeProc()) {
cerr << "Error in netinterface - no free processes\n";
stopNetComm();
return net->netError();
}
if (dctrl->allReceived()) {
cerr << "Error in netinterface - no data to resend\n";
stopNetComm();
return net->netError();
}
tid = pManager->getNextTidToSend(dctrl->getNumLeftToSend(), net);
if (tid < 0)
tid = pManager->getTidToSend(net);
if (tid < 0) {
// there are no free processes
cerr << "Error in netinterface - no free processes\n";
stopNetComm();
return net->netError();
}
sendID = dctrl->getNextXToResend();
// vec = dctrl->getX(sendID);
vecSend = prepareVectorToSend(dctrl->getX(sendID));
assert(numVarToSend > 0);
NetDataVariables* sp = new NetDataVariables(numVarToSend);
for (i = 0; i < numVarToSend; i++)
sp->x[i] = vecSend[i];
sp->x_id = sendID;
sp->tag = dctrl->getTag();
canresend = net->sendData(sp, tid);
while (canresend == 0 && pManager->isFreeProc()) {
tid = pManager->getNextTidToSend(dctrl->getNumLeftToSend(), net);
if (tid < 0)
tid = pManager->getTidToSend(net);
canresend = net->sendData(sp, tid);
}
delete sp;
if (canresend == 1) {
dctrl->resentOne(sendID);
pManager->sent(tid);
return net->netSuccess();
} else if (canresend == -1) {
cerr << "Error in netinterface - failed to resend data\n";
return net->netError();
} else if (canresend == 0 && !pManager->isFreeProc()) {
cerr << "Error in netinterface - failed to resend data\n";
stopNetComm();
return net->netError();
} else {
cerr << "Error in netinterface - unrecognised return value\n";
stopNetComm();
return net->netError();
}
}
int NetInterface::receiveOne() {
// function returns SUCCESS if successfully could receive data
// function return ERROR if netcommunication is down or error occurs
// functions returns 0 if cannot receive data but netcommunication is OK
int received;
double res;
if (dctrl == NULL) {
cerr << "Error in netinterface - no valid datagroup\n";
exit(EXIT_FAILURE);
}
NetDataResult* dp = new NetDataResult;
received = net->receiveData(dp);
if (received == 1 && (dp->who >= 0)) {
// received data successfully
pManager->setFreeProc(dp->who);
if (dctrl->getTag() == dp->tag) {
// received data belonging to datagroup
if (scaler != NULL) {
// Vector tempV;
// tempV = makeVector(dctrl->getX(dp->x_id));
// res = scaler->scaleResult(dp->result, dp->x_id, tempV);
res = scaler->scaleResult(dp->result, dp->x_id, makeVector(dctrl->getX(dp->x_id)));
} else {
res = dp->result;
}
if (dctrl->hasAnswer(dp->x_id)) {
receiveID = -1;
} else {
receiveID = dp->x_id;
dctrl->setY(dp->x_id, res);
}
} else {
// received data which does not belong to current datagroup
receiveID = -1;
}
delete dp;
return received;
} else if (received == 1 && dp->who == -1) {
// Was not able to receive but can continue netcommunication,
cerr << "Error in netinterface - could not receive data\n";
receiveID = -1;
delete dp;
// probably timeout occured so should check for health of processes.
net->getHealthOfProcesses(pManager->getStatus());
return !received;
} else if (received == -1) {
cerr << "Error in netinterface - failed to receive data\n";
receiveID = -1;
delete dp;
return net->netError();
} else {
cerr << "Error in netinterface - unrecognised return value\n";
receiveID = -1;
stopNetComm();
delete dp;
return net->netError();
}
}
int NetInterface::sendToAllIdleHosts() {
// functions returns ERROR if error occures in netcommunication
// function return SUCCESS if successfully sent all available data
int tid = 0;
int dataID;
int cansend = 1;
if (dctrl == NULL) {
cerr << "Error in netinterface - no valid datagroup\n";
exit(EXIT_FAILURE);
}
if (!dctrl->allSent()) {
tid = pManager->getNextTidToSend(dctrl->getNumLeftToSend(), net);
};
while ((tid != this->noAvailableProcesses()) && !dctrl->allSent()
&& (cansend == 1 || cansend == 0)) {
// have not sent all data belonging to dctrl
if (tid == this->waitForBetterProcesses())
tid = pManager->getTidToSend(net);
if (tid != this->noAvailableProcesses()) {
assert(tid >= 0);
assert(!dataSet->isEmpty());
dataID = dataSet->get();
cansend = sendOne(tid, dataID);
if (cansend != 1)
dataSet->putFirst(dataID);
if (!dctrl->allSent() && (cansend == 1 || cansend == 0))
tid = pManager->getNextTidToSend(dctrl->getNumLeftToSend(), net);
}
}
if (cansend > 1 || cansend < -1) {
cerr << "Error in netinterface - unrecognised return value\n";
stopNetComm();
return net->netError();
} else if (cansend == -1) {
return net->netError();
} else {
return net->netSuccess();
}
}
int NetInterface::sendToIdleHosts() {
// function returns ERROR if error occures while trying to send data,
// function returns SUCCESS if successfully sent available data
int tid = 0;
int dataID;
int cansend = 1;
if (dctrl == NULL) {
cerr << "Error in netinterface - no valid datagroup\n";
exit(EXIT_FAILURE);
}
if (!dctrl->allSent()) {
// dctrl->getNumLeftToSend() er í lagi, vesen í getNextTidToSend
tid = pManager->getNextTidToSend(dctrl->getNumLeftToSend(), net);
//tid = 0;
}
while ((tid != this->noAvailableProcesses()) && (tid != this->waitForBetterProcesses())
&& (!dctrl->allSent()) && (cansend == 1 || cansend == 0)) {
// Have not sent all data and there is a suitable process available
assert(tid >= 0);
assert(!dataSet->isEmpty());
dataID = dataSet->get();
cansend = sendOne(tid, dataID);
if (cansend != 1)
dataSet->putFirst(dataID);
if ((cansend == 1 || cansend == 0) && !dctrl->allSent()) {
tid = pManager->getNextTidToSend(dctrl->getNumLeftToSend(), net);
};
}
if (cansend > 1 || cansend < -1) {
cerr << "Error in netinterface - unrecognised return value\n";
stopNetComm();
return net->netError();
} else if (cansend == -1) {
return net->netError();
} else {
return net->netSuccess();
}
}
int NetInterface::sendAll() {
assert(dctrl != NULL);
int OK = sendToIdleHosts();
while (!dctrl->allSent() && OK == 1) {
// try to receive data from a host as no more suitable processes
// available and still have data to send
OK = receiveAndSend();
}
return OK;
}
int NetInterface::receiveAndSend() {
// This function tries to keep all processes busy by first trying to
// receive and then sending available data to all suitable processes
// It is assumed that there is an outstanding message to be received.
int counter = 0;
int cansend = 1;
int canreceive = 0;
while (counter < numTries && canreceive == 0) {
// Try to receive again from a process, netcommunication is OK but
// could not receive probably because there are no messages coming in
canreceive = receiveOne();
counter++;
}
if (canreceive == 1) {
// Received successfully, at least one process is free
cansend = sendToIdleHosts();
return cansend;
} else if (canreceive == -1) {
// Error occured while trying to receive, netcommunication is down
return net->netError();
} else if (canreceive == 0) {
// Did not receive any data
if (!pManager->isFreeProc()) {
// no free process after trying to receive
cerr << "Error in netinterface - not receiving from busy process\n";
return net->netError();
} else {
// There is a free processs that can be used, will try to send
cansend = sendToAllIdleHosts();
return cansend;
}
} else {
cerr << "Error in netinterface - unrecognised return value\n";
stopNetComm();
return net->netError();
}
}
int NetInterface::receiveOnCondition(Condition* con) {
int counter = 0;
int cond = 0;
int receiveID;
int canreceive = 1;
while (!dctrl->allReceived() && !cond&& canreceive == 1) {
canreceive = receiveOne();
while (counter < (numTries - 1) && canreceive == 0) {
canreceive = receiveOne();
counter++;
}
if (canreceive == 1) {
receiveID = getReceiveID();
if (receiveID >= 0)
cond = con->computeCondition();
}
counter = 0;
}
if (canreceive == -1) {
return net->netError();
} else if (canreceive == 0) {
cerr << "Error in netinterface - no received data\n";
stopNetComm();
return net->netError();
} else if (cond == 1) {
// condition has been met
return cond;
} else if (dctrl->allReceived() && cond == 0) {
// condition has not been met and nothing to receive
return cond;
} else {
cerr << "Error in netinterface - unrecognised return value\n";
stopNetComm();
return net->netError();
}
}
int NetInterface::receiveAll() {
int counter = 0;
int canreceive = 1;
int cansend = 1;
while (!(dctrl->allReceived()) && (canreceive == 0 || canreceive == 1) && cansend == 1) {
canreceive = receiveOne();
while (counter < (numTries - 1) && canreceive == 0) {
canreceive = receiveOne();
counter++;
}
if (canreceive == 0) {
// Did not receive anything - try to resend data
if (!pManager->isFreeProc()) {
cerr << "Error in netinterface - no received data\n";
canreceive = -1;
} else if (!dctrl->allReceived() && pManager->isFreeProc()) {
// have not received all so will resend data
cansend = resend();
}
}
counter = 0;
}
if (canreceive == -1 || cansend == -1) {
return net->netError();
} else if (canreceive > 1 || canreceive < -1) {
stopNetComm();
return net->netError();
} else if (cansend > 1 || cansend < -1) {
stopNetComm();
return net->netError();
} else if (dctrl->allReceived()) {
return net->netSuccess();
} else {
cerr << "Error in netinterface - unrecognised return value\n";
stopNetComm();
return net->netError();
}
}
int NetInterface::sendAndReceiveAllData() {
int cansend_receive;
cansend_receive = sendAll();
if (cansend_receive == 1)
cansend_receive = receiveAll();
if (cansend_receive == 1)
return net->netSuccess();
else
return net->netError();
}
int NetInterface::sendAllOnCondition(Condition* con) {
int cond = 0;
int receiveID;
int OK = 1;
// start by sending data to all free suitable hosts.
OK = sendToIdleHosts();
// receive and send until received all or condition is true or error
while (!dctrl->allSent() && cond == 0&& OK == 1) {
// try to receive data from a host
OK = receiveAndSend();
// if received from dctrl then check if condition is true
if (OK == 1) {
receiveID = getReceiveID();
if (receiveID >= 0)
cond = con->computeCondition();
}
}
if (OK != 1)
return net->netError();
else
return cond;
}
int NetInterface::sendAndReceiveSetData(Condition* con) {
int cond = 0;
int sendreceive = 1;
int numTries = 0;
int counter, newreturns, numLeftToReceive, totalNumHosts;
while ((sendreceive != -1) && cond == 0) {
counter = getNumDataItemsSet();
newreturns = getNumDataItemsAnswered();
numLeftToReceive = counter - newreturns;
totalNumHosts = getTotalNumProc();
sendreceive = sendToAllIdleHosts();
if (sendreceive == 1) {
numTries = 0;
if ((numLeftToReceive < totalNumHosts) && dctrl->allSent()) {
// can start resending, waiting for last data to come in
sendreceive = receiveOne();
while (numTries < (numTries - 1) && sendreceive == 0) {
sendreceive = receiveOne();
numTries++;
}
if (sendreceive == 0) {
if (!pManager->isFreeProc()) {
cerr << "Error in netinterface - received no data\n";
sendreceive = -1;
stopNetComm();
} else if (!dctrl->allReceived() && pManager->isFreeProc()) {
sendreceive = resend();
}
}
numTries = 0;
} else {
sendreceive = receiveOne();
while (numTries < (numTries - 1) && sendreceive == 0) {
sendreceive = receiveOne();
numTries++;
}
if (sendreceive == 0) {
if (!pManager->isFreeProc()) {
cerr << "Error in netinterface - received no data\n";
sendreceive = -1;
stopNetComm();
}
}
}
if (sendreceive == 1)
cond = con->computeCondition();
}
}
if (sendreceive == -1)
return net->netError();
else if (cond == 1)
return cond;
else
return net->netError();
}
int NetInterface::sendAndReceiveTillCondition(Condition* con) {
int cond;
cond = sendAllOnCondition(con);
if (cond == 0)
cond = receiveOnCondition(con);
return cond;
}
// ********************************************************
// Functions concerning netcommunication
// ********************************************************
int NetInterface::startNetComm() {
return net->startNetCommunication();
}
void NetInterface::sendStringValue() {
assert(numVarToSend > 0);
assert(switches.Size() == numVarToSend);
int SEND = net->sendData(switches);
if (!(SEND == netSuccess())) {
cerr << "Error in netinterface - failed to send switches\n";
exit(EXIT_FAILURE);
}
}
void NetInterface::sendStringValue(int processID) {
assert(numVarToSend > 0);
assert(switches.Size() == numVarToSend);
int SEND = net->sendData(switches,processID);
if (!(SEND == netSuccess())) {
cerr << "Error in netinterface - failed to send switches\n";
exit(EXIT_FAILURE);
}
}
void NetInterface::sendBoundValues() {
assert(numVarToSend > 0);
assert(lowerBound.Size() == numVarToSend);
int SEND = net->sendBoundData(lowerBound);
if (!(SEND == netSuccess())) {
cerr << "Error in netinterface - failed to send lowerbounds\n";
exit(EXIT_FAILURE);
}
assert(upperBound.Size() == numVarToSend);
SEND = net->sendBoundData(upperBound);
if (!(SEND == netSuccess())) {
cerr << "Error in netinterface - failed to send upperbounds\n";
exit(EXIT_FAILURE);
}
}
void NetInterface::sendBoundValues(int processID) {
assert(numVarToSend > 0);
assert(lowerBound.Size() == numVarToSend);
int SEND = net->sendBoundData(lowerBound, processID);
if (!(SEND == netSuccess())) {
cerr << "Error in netinterface - failed to send lowerbounds\n";
exit(EXIT_FAILURE);
}
assert(upperBound.Size() == numVarToSend);
SEND = net->sendBoundData(upperBound, processID);
if (!(SEND == netSuccess())) {
cerr << "Error in netinterface - failed to send upperbounds\n";
exit(EXIT_FAILURE);
}
}
void NetInterface::stopNetComm() {
net->stopNetCommunication();
pManager->noProcessesRunning();
}
int NetInterface::getNumFreeProcesses() {
return pManager->getNumFreeProc();
}
int NetInterface::getTotalNumProc() {
return net->getNumProcesses();
}
int NetInterface::getNumTags() {
return numberOfTags;
}
int NetInterface::getNextMsgTag() {
int tempTag = numberOfTags;
numberOfTags++;
return tempTag;
}
int NetInterface::isUpAndRunning() {
return net->netCommStarted();
}
int NetInterface::netError() {
return net->netError();
}
int NetInterface::netSuccess() {
return net->netSuccess();
}
int NetInterface::noAvailableProcesses() {
return pManager->noAvailableProcesses();
}
int NetInterface::waitForBetterProcesses() {
return pManager->waitForBetterProcesses();
}