--- ea/ea_bonfire.py 2012/08/27 11:46:35 1 +++ ea/ea_bonfire.py 2012/10/08 09:46:28 14 @@ -1,401 +1,414 @@ -''' -Created on 03/02/2012 -Experiment agent: Submits, monitors and deletes experiments -@author: R. Valin -''' -import time -import sub_pycurl as em -import xml_conv -import ssh_test -import save_time as save -import application as ap -import credentials as cr - - - -def ea_bf(exp,iter): - """It Submits and deletes experiments, checks if the VMs are available and saves the time""" - - #: Url experiment manager y data - #user = 'agomez' - #user = cr.getuser('~/.restfully/api.bonfire-project.eu.yml') - #passwd = 'elpais00' - #passwd = cr.getpass('~/.restfully/api.bonfire-project.eu.yml') - theurl = 'https://api.bonfire-project.eu:443/managed_experiments' - #theurl= 'https://api.integration.bonfire.grid5000.fr/managed_experiments' - raiz = 'https://api.bonfire-project.eu:443' - #raiz = 'https://api.integration.bonfire.grid5000.fr/' - - app=0 - ela=0 - - #Construimos el objeto job que nos permite enviar, borrar, comprobar logs y le pasamos las credenciales - job = em.sub() - job.user = cr.getuser('~/.restfully/api.bonfire-project.eu.yml') - job.passwd = cr.getpass('~/.restfully/api.bonfire-project.eu.yml') - - #Enviamos y procesamos el envio - - #Diccionario con tiempos - #sub_time = envio - #broker_time = registro en el broker from log info - #ssh_able = accesible por ssh - #del_time = borrado - #ssh_disable = no accesible por ssh - - - #Creamos experimento enviando json - stats_log = dict() - sub_time = time.time() - #print 'exp', exp - error_try=0 - while error_try==0: - try: - respuesta = job.submit(theurl, exp) - except Exception, inst: - print '\n-------------------' - print 'Warning: API is not reachable.' - print 'Trying again...' - time.sleep(5) - else: - try: - job_infor = xml_conv.xml_parse(respuesta) - except Exception, inst: - print 'API reply is not XML:' - print 'API reply', respuesta - time.sleep(5) - else: - error_try=1 - - - print '----------------' - print 'Experiment submitted: ', time.asctime(time.localtime(sub_time)) - print '----------------\n' - for i in job_infor: - print i+':\t', job_infor[i] - time.sleep(float(3)) - print '----------------\n' - - - #Pedimos el log y lo procesamos - compute=[] - deployed_log = '' - log_error=0 - #uris_log=dict() - #uris_log['deployed']='' - j=0 - print 'Deploying experiment...' - while('deployed' not in deployed_log and log_error is 0 ): - #print 'Comprobando log del experimento...', job_infor['experiment'] - #print - respuesta_log = job.log(raiz + job_infor['log']) - #print(respuesta_log) - uris_log = xml_conv.log_parsing(respuesta_log, raiz, compute) - j=j+1 - deployed_log = uris_log['deployed'] - log_error=uris_log['error'] - #if 'deployed' not in deployed_log: - # print 'Deploying experiment...', job_infor['experiment'] - - if log_error is 0 and 'deployed' in deployed_log: - broker_time = time.time() - #stats_log['broker_time'] = time.time() - print 'Experiment deployed \n', uris_log - stats_log['manager']=broker_time-sub_time - #save.s_time(stats_log['manager'],'manager') - else: - print 'Deploy error\n', uris_log, log_error - print '\n' - print 'Error reply\n', respuesta_log - print '\n' - - status='' - waiting_t = 0 - compute_uris=dict() - if log_error is 0 and 'deployed' in deployed_log: - status_cluster='' - #Tiempo de referencia para medir que todos los WN esten desplegados y cambio de estado - #t2=time.time() - waiting = 0 - statusini='' - wnstatus=dict() - while 'Failed' not in status and waiting < 1200 and 'Running' not in status_cluster and 'Failed' not in status_cluster: - j=0 #Para que cluster deployed todos los nodos tienen que estar running. - #print "Failed' not in status", 'Failed' not in status - #print "waiting < 1200", waiting < 1200 - #print "Running not in status_cluster", 'Running' not in status_cluster - #print "Failed not in status_cluster", 'Failed' not in status_cluster - """Recorre todos los nodos del cluster""" - for com in uris_log['computes']: - #print (('Running' or 'Failed') not in status and waiting < 1200, status) - #print 'status', 'Running' not in status and 'Failed' not in status and waiting < 1200, status - #print 'urlcomputes', com - respuesta_compute = job.computes(com) - t1=time.time() - #print 'Respuesta compute', respuesta_compute - #if '500 Internal Server Error' in respuesta_compute or 'Connection refused' in respuesta_compute or 'execution expired' in respuesta_compute and '502 Proxy Error' in respuesta_compute: - # print 'execution expired' not in respuesta_compute - # print 'Connection refused' not in respuesta_compute - # print '502 Proxy Error' not in respuesta_compute - error_try=0 - while error_try==0: - try: - compute_info = xml_conv.xml_compute_parse(respuesta_compute) - except Exception, inst: - print '\n-------------------' - print 'Warning: Compute status error.' - print compute_info - print 'Respuesta', respuesta_compute - print 'Trying again...' - time.sleep(10) - respuesta_compute = job.computes(com) - else: - #waiting=waiting+(time.time()-t1) - error_try=1 - status=compute_info['state'].capitalize() - if 'Failed' in status or 'Done' in status: - print ('Compute '+com+' has failed') - status_cluster='Failed' - elif status not in statusini and 'Running' not in status: - wnstatus[str(status)]=t1-sub_time - print (compute_info['hostname'], status) - elif 'Running' in status: - j=j+1 - #print ('Compute '+com+'is running'), j - print (compute_info['hostname'], status) - if 'client' in compute_info['hostname']: - #Nodo de referencia para elasticidad - ela_ref=com - - if j == len(uris_log['computes']): - print 'All cluster computes are running' - status_cluster='Running' - wnstatus[str(status)]=t1-sub_time - stats_log['cluster']=wnstatus[str(status)] - #waiting_t = waiting_t+(time.time()-t2) - compute_uris[com]=compute_info - print '\n' - - if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: - print '----------------' - print 'Cluster deployed' - print 'Configuring OGS...' - print '----------------' - - elif log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: - print '----------------' - print 'Possible Broker Error', respuesta_compute - print '----------------' - - """Muestra los nodos desplegados y caracteristicas""" - nodes=0 - for i in compute_uris.keys(): - print compute_uris[i] - if 'client' in compute_uris[i]['hostname']: - nodes=nodes+1 - print 'Nodes:', nodes - elif 'master' in compute_uris[i]['hostname']: - master=compute_uris[i]['ip'] - print 'Master', master - print '----------------\n' - - #Probamos si se puede acceder por ssh - if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: - for uri in compute_uris.keys(): - compute_info=compute_uris[uri] - ssh_test.ssh_open(compute_info['ip'], compute_info['hostname']) - #stats_log['ssh_able'] = time.time() - stats_log['ssh']=time.time()-sub_time - #save.s_time(stats_log['ssh'],'ssh') - #time.sleep(float(1200)) - - if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: - print '----------------' - print "Cluster is ssh-available " - print '----------------\n' - - nodes_qhost=0 - while nodes_qhost != nodes: - out=ssh_test.ssh_qhost(master) - #print 'Salida qhost' - error_try=0 - try: - nodes_qhost=xml_conv.xml_qhost(out) - except Exception, inst: - print '\n-------------------' - print 'Warning: XML reply is not correct.' - print 'Qhost reply:', nodes_qhost - print 'Trying again...' - time.sleep(5) - else: - error_try=1 - - stats_log['ogs']=time.time()-sub_time - time.sleep(30) - print '----------------' - print "Cluster is ready. OGS configured" - print '----------------\n' - print ssh_test.ssh_qhost(master) - - #Ejecutamos aplicacion - if app == 1: - app_time=time.time() - ap.sub_app(str(master)) - stats_log['app']=time.time()-app_time - - #Elasticidad:add nodes - if ela == 1: - #Actualizamos nodos totales cluster - nodes=nodes+1 - #Pedimos info de un compute ya desplegado - ela_ref_info=compute_uris[ela_ref] - if '444' in uris_log['broker']: - #Montamos la url para solicitar al broker un compute - theurl_comp=uris_log['broker'].replace(':444','')+'/computes' - #print theurl_comp - #Desplegamos nuevo nodo - #print theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref'] - ela_time=time.time() - node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref']) - #print 'node_output', node_output - #Parseamos info y comprobamos estado - #compute_info = xml_conv.xml_compute_parse(node_output) - #status=compute_info['state'].capitalize() - status='' - while 'Running' not in status and 'Failed' not in status: - error_try=0 - while error_try==0: - try: - compute_info = xml_conv.xml_compute_parse(node_output) - except Exception, inst: - print '\n-------------------' - print 'Warning: Compute status error.' - print compute_info - print 'Respuesta', respuesta_compute - print 'Trying again...' - time.sleep(5) - node_output = job.computes(ela_ref) - else: - #waiting=waiting+(time.time()-t1) - error_try=1 - status=compute_info['state'].capitalize() - if 'Failed' in status or 'Done' in status: - print ('Compute '+com+' has failed') - status_cluster='Failed' - respuesta_compute = job.computes(ela_ref) - node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref']) - elif status not in statusini and 'Running' not in status: - wnstatus[str(status)]=t1-sub_time - print (compute_info['hostname'], status) - elif 'Running' in status: - stats_log['ela_running']=time.time()-ela_time - print ('Compute '+ela_ref+'is running') - print (compute_info['hostname'], status) - #Registramos tiempo running - node_output = job.computes(ela_ref) - compute_uris[ela_ref]=compute_info - print '\n' - - #Parseamos la info del nuevo nodo y evaluamos cuando esta ssh-available - ssh_test.ssh_open(compute_info['ip'],compute_info['hostname']) - stats_log['ela_ssh']=time.time()-ela_time - print '------------------------' - print 'Node is ssh-available' - print '------------------------'+'\n' - #Evaluamos si se ha added el nodo al sistema de colas - nodes_qhost=0 - while nodes_qhost != nodes: - out=ssh_test.ssh_qhost(master) - #print 'Salida qhost', out - error_try=0 - try: - nodes_qhost=xml_conv.xml_qhost(out) - print 'Nodos contados y totales', nodes_qhost, nodes - except Exception, inst: - print '\n-------------------' - print 'Warning: XML reply is not correct.' - print 'Qhost reply:', nodes_qhost - print 'Trying again...' - time.sleep(5) - else: - error_try=1 - stats_log['ela_ogs']=time.time()-ela_time - - - if iter is 0: - for i in stats_log: - print i+'\t'+str(stats_log[str(i)]) - #Imprimimos tiempos - #Borramos el experimento - print '----------------' - print 'Deleting experiment...', raiz + job_infor['experiment'] - print '----------------\n' - print - del_time = time.time() - respuesta_del = job.delete(raiz + job_infor['experiment']) - #stats_log['disable'] = time.time() - print (respuesta_del) - - - - #Probamos si se puede acceder por ssh -# if log_error is 0 and 'deployed' in deployed_log and 'Running' in status: -# for uri in compute_uris.keys(): -# compute_info=compute_uris[uri] -# ssh_test.ssh_close(compute_info['ip'], compute_info['name']) -# stats_log['ssh_close'] = time.time() - - if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: - #t2=time.time() - waiting = 0 - status='' - sshdone=dict() - j=0 - while 'Done' not in status_cluster and waiting < 1200: - for com in uris_log['computes']: - #print 'urlcomputes', com - #t1=time.time() - respuesta_compute = job.computes(com) - t1=time.time() - #print respuesta_compute - compute_info = xml_conv.xml_compute_parse(respuesta_compute) - #waiting=waiting+(time.time()-t1) - status=compute_info['state'].capitalize() - #print 'Compute status', status, j, waiting - if 'Done' in status: - j=j+1 - sshdone[compute_info['name']]=t1-del_time - #print ('Compute'+com+'is done'), j - if j == len(uris_log['computes']): - print 'All cluster computes are Done' - status_cluster='Done' - stats_log['cluster_done']=t1-del_time - #print 'ssh_done', sshdone - print '----------------' - print 'Cluster_Done', stats_log['cluster_done'] - print '----------------' - #if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: - # print 'Tiempo borrado', stats_log['done'] - stats_log['del_time'] - # stats_log['borrado']= stats_log['done'] - stats_log['del_time'] - #save.s_time(stats_log['borrado'],'delete') - save.st_cluster(sub_time,stats_log,iter,job_infor['name'],uris_log['computes']) - if iter is 0: - for i in stats_log: - print i+'\t'+str(stats_log[str(i)]) - - - for j in stats_log: - print(str(stats_log[str(j)])+'\t') - - #print 'Tiempo ssh', stats_log['ssh_close'] - stats_log['disable'] - # print 'Borrado: ', time.asctime(time.localtime(stats_log['del_time'])) - - else: - print 'Experiment deleted. Broker api error.' - print 'Deleted repetition:', iter - iter=iter-1 - print 'Return repetition:', iter - time.sleep(30) - return iter - - +# +# BonFIRE Virtual Clusters on Federated Clouds Demonstration Kit +# +# Copyright (c) Fundacion Centro Tecnologico de Supercomputacion de Galicia 2012 +# +# License Apache Software +# +# The research leading to these results has received funding from +# the European Community's Seventh Framework Programme (FP7/2007-2013) +# under agreement number 257386 +# +# This software is provided with ABSOLUTELY NO WARRANTY +# +''' +Created on 03/02/2012 +Experiment agent: Submits, monitors and deletes experiments +@author: R. Valin +''' +import time +import sub_pycurl as em +import xml_conv +import ssh_test +import save_time as save +import application as ap +import credentials as cr + + + +def ea_bf(exp,iter): + """It Submits and deletes experiments, checks if the VMs are available and saves the time""" + + #: Url experiment manager y data + #user = 'agomez' + #user = cr.getuser('~/.restfully/api.bonfire-project.eu.yml') + #passwd = 'elpais00' + #passwd = cr.getpass('~/.restfully/api.bonfire-project.eu.yml') + theurl = 'https://api.bonfire-project.eu:443/managed_experiments' + #theurl= 'https://api.integration.bonfire.grid5000.fr/managed_experiments' + raiz = 'https://api.bonfire-project.eu:443' + #raiz = 'https://api.integration.bonfire.grid5000.fr/' + + app=0 + ela=0 + + #Construimos el objeto job que nos permite enviar, borrar, comprobar logs y le pasamos las credenciales + job = em.sub() + job.user = cr.getuser('~/.restfully/api.bonfire-project.eu.yml') + job.passwd = cr.getpass('~/.restfully/api.bonfire-project.eu.yml') + + #Enviamos y procesamos el envio + + #Diccionario con tiempos + #sub_time = envio + #broker_time = registro en el broker from log info + #ssh_able = accesible por ssh + #del_time = borrado + #ssh_disable = no accesible por ssh + + + #Creamos experimento enviando json + stats_log = dict() + sub_time = time.time() + #print 'exp', exp + error_try=0 + while error_try==0: + try: + respuesta = job.submit(theurl, exp) + except Exception, inst: + print '\n-------------------' + print 'Warning: API is not reachable.' + print 'Trying again...' + time.sleep(5) + else: + try: + job_infor = xml_conv.xml_parse(respuesta) + except Exception, inst: + print 'API reply is not XML:' + print 'API reply', respuesta + time.sleep(5) + else: + error_try=1 + + + print '----------------' + print 'Experiment submitted: ', time.asctime(time.localtime(sub_time)) + print '----------------\n' + for i in job_infor: + print i+':\t', job_infor[i] + time.sleep(float(3)) + print '----------------\n' + + + #Pedimos el log y lo procesamos + compute=[] + deployed_log = '' + log_error=0 + #uris_log=dict() + #uris_log['deployed']='' + j=0 + print 'Deploying experiment...' + while('deployed' not in deployed_log and log_error is 0 ): + #print 'Comprobando log del experimento...', job_infor['experiment'] + #print + respuesta_log = job.log(raiz + job_infor['log']) + #print(respuesta_log) + uris_log = xml_conv.log_parsing(respuesta_log, raiz, compute) + j=j+1 + deployed_log = uris_log['deployed'] + log_error=uris_log['error'] + #if 'deployed' not in deployed_log: + # print 'Deploying experiment...', job_infor['experiment'] + + if log_error is 0 and 'deployed' in deployed_log: + broker_time = time.time() + #stats_log['broker_time'] = time.time() + print 'Experiment deployed \n', uris_log + stats_log['manager']=broker_time-sub_time + #save.s_time(stats_log['manager'],'manager') + else: + print 'Deploy error\n', uris_log, log_error + print '\n' + print 'Error reply\n', respuesta_log + print '\n' + + status='' + waiting_t = 0 + compute_uris=dict() + if log_error is 0 and 'deployed' in deployed_log: + status_cluster='' + #Tiempo de referencia para medir que todos los WN esten desplegados y cambio de estado + #t2=time.time() + waiting = 0 + statusini='' + wnstatus=dict() + while 'Failed' not in status and waiting < 1200 and 'Running' not in status_cluster and 'Failed' not in status_cluster: + j=0 #Para que cluster deployed todos los nodos tienen que estar running. + #print "Failed' not in status", 'Failed' not in status + #print "waiting < 1200", waiting < 1200 + #print "Running not in status_cluster", 'Running' not in status_cluster + #print "Failed not in status_cluster", 'Failed' not in status_cluster + """Recorre todos los nodos del cluster""" + for com in uris_log['computes']: + #print (('Running' or 'Failed') not in status and waiting < 1200, status) + #print 'status', 'Running' not in status and 'Failed' not in status and waiting < 1200, status + #print 'urlcomputes', com + respuesta_compute = job.computes(com) + t1=time.time() + #print 'Respuesta compute', respuesta_compute + #if '500 Internal Server Error' in respuesta_compute or 'Connection refused' in respuesta_compute or 'execution expired' in respuesta_compute and '502 Proxy Error' in respuesta_compute: + # print 'execution expired' not in respuesta_compute + # print 'Connection refused' not in respuesta_compute + # print '502 Proxy Error' not in respuesta_compute + error_try=0 + while error_try==0: + try: + compute_info = xml_conv.xml_compute_parse(respuesta_compute) + except Exception, inst: + print '\n-------------------' + print 'Warning: Compute status error.' + print compute_info + print 'Respuesta', respuesta_compute + print 'Trying again...' + time.sleep(10) + respuesta_compute = job.computes(com) + else: + #waiting=waiting+(time.time()-t1) + error_try=1 + status=compute_info['state'].capitalize() + if 'Failed' in status or 'Done' in status: + print ('Compute '+com+' has failed') + status_cluster='Failed' + elif status not in statusini and 'Running' not in status: + wnstatus[str(status)]=t1-sub_time + print (compute_info['hostname'], status) + elif 'Running' in status: + j=j+1 + #print ('Compute '+com+'is running'), j + print (compute_info['hostname'], status) + if 'client' in compute_info['hostname']: + #Nodo de referencia para elasticidad + ela_ref=com + + if j == len(uris_log['computes']): + print 'All cluster computes are running' + status_cluster='Running' + wnstatus[str(status)]=t1-sub_time + stats_log['cluster']=wnstatus[str(status)] + #waiting_t = waiting_t+(time.time()-t2) + compute_uris[com]=compute_info + print '\n' + + if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: + print '----------------' + print 'Cluster deployed' + print 'Configuring OGS...' + print '----------------' + + elif log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: + print '----------------' + print 'Possible Broker Error', respuesta_compute + print '----------------' + + """Muestra los nodos desplegados y caracteristicas""" + nodes=0 + for i in compute_uris.keys(): + print compute_uris[i] + if 'client' in compute_uris[i]['hostname']: + nodes=nodes+1 + print 'Nodes:', nodes + elif 'master' in compute_uris[i]['hostname']: + master=compute_uris[i]['ip'] + print 'Master', master + print '----------------\n' + + #Probamos si se puede acceder por ssh + if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: + for uri in compute_uris.keys(): + compute_info=compute_uris[uri] + ssh_test.ssh_open(compute_info['ip'], compute_info['hostname']) + #stats_log['ssh_able'] = time.time() + stats_log['ssh']=time.time()-sub_time + #save.s_time(stats_log['ssh'],'ssh') + #time.sleep(float(1200)) + + if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: + print '----------------' + print "Cluster is ssh-available " + print '----------------\n' + + nodes_qhost=0 + while nodes_qhost != nodes: + out=ssh_test.ssh_qhost(master) + #print 'Salida qhost' + error_try=0 + try: + nodes_qhost=xml_conv.xml_qhost(out) + except Exception, inst: + print '\n-------------------' + print 'Warning: XML reply is not correct.' + print 'Qhost reply:', nodes_qhost + print 'Trying again...' + time.sleep(5) + else: + error_try=1 + + stats_log['ogs']=time.time()-sub_time + time.sleep(30) + print '----------------' + print "Cluster is ready. OGS configured" + print '----------------\n' + print ssh_test.ssh_qhost(master) + + #Ejecutamos aplicacion + if app == 1: + app_time=time.time() + ap.sub_app(str(master)) + stats_log['app']=time.time()-app_time + + #Elasticidad:add nodes + if ela == 1: + #Actualizamos nodos totales cluster + nodes=nodes+1 + #Pedimos info de un compute ya desplegado + ela_ref_info=compute_uris[ela_ref] + if '444' in uris_log['broker']: + #Montamos la url para solicitar al broker un compute + theurl_comp=uris_log['broker'].replace(':444','')+'/computes' + #print theurl_comp + #Desplegamos nuevo nodo + #print theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref'] + ela_time=time.time() + node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref']) + #print 'node_output', node_output + #Parseamos info y comprobamos estado + #compute_info = xml_conv.xml_compute_parse(node_output) + #status=compute_info['state'].capitalize() + status='' + while 'Running' not in status and 'Failed' not in status: + error_try=0 + while error_try==0: + try: + compute_info = xml_conv.xml_compute_parse(node_output) + except Exception, inst: + print '\n-------------------' + print 'Warning: Compute status error.' + print compute_info + print 'Respuesta', respuesta_compute + print 'Trying again...' + time.sleep(5) + node_output = job.computes(ela_ref) + else: + #waiting=waiting+(time.time()-t1) + error_try=1 + status=compute_info['state'].capitalize() + if 'Failed' in status or 'Done' in status: + print ('Compute '+com+' has failed') + status_cluster='Failed' + respuesta_compute = job.computes(ela_ref) + node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref']) + elif status not in statusini and 'Running' not in status: + wnstatus[str(status)]=t1-sub_time + print (compute_info['hostname'], status) + elif 'Running' in status: + stats_log['ela_running']=time.time()-ela_time + print ('Compute '+ela_ref+'is running') + print (compute_info['hostname'], status) + #Registramos tiempo running + node_output = job.computes(ela_ref) + compute_uris[ela_ref]=compute_info + print '\n' + + #Parseamos la info del nuevo nodo y evaluamos cuando esta ssh-available + ssh_test.ssh_open(compute_info['ip'],compute_info['hostname']) + stats_log['ela_ssh']=time.time()-ela_time + print '------------------------' + print 'Node is ssh-available' + print '------------------------'+'\n' + #Evaluamos si se ha added el nodo al sistema de colas + nodes_qhost=0 + while nodes_qhost != nodes: + out=ssh_test.ssh_qhost(master) + #print 'Salida qhost', out + error_try=0 + try: + nodes_qhost=xml_conv.xml_qhost(out) + print 'Nodos contados y totales', nodes_qhost, nodes + except Exception, inst: + print '\n-------------------' + print 'Warning: XML reply is not correct.' + print 'Qhost reply:', nodes_qhost + print 'Trying again...' + time.sleep(5) + else: + error_try=1 + stats_log['ela_ogs']=time.time()-ela_time + + + if iter is 0: + for i in stats_log: + print i+'\t'+str(stats_log[str(i)]) + #Imprimimos tiempos + #Borramos el experimento + print '----------------' + print 'Deleting experiment...', raiz + job_infor['experiment'] + print '----------------\n' + print + del_time = time.time() + respuesta_del = job.delete(raiz + job_infor['experiment']) + #stats_log['disable'] = time.time() + print (respuesta_del) + + + + #Probamos si se puede acceder por ssh +# if log_error is 0 and 'deployed' in deployed_log and 'Running' in status: +# for uri in compute_uris.keys(): +# compute_info=compute_uris[uri] +# ssh_test.ssh_close(compute_info['ip'], compute_info['name']) +# stats_log['ssh_close'] = time.time() + + if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: + #t2=time.time() + waiting = 0 + status='' + sshdone=dict() + j=0 + while 'Done' not in status_cluster and waiting < 1200: + for com in uris_log['computes']: + #print 'urlcomputes', com + #t1=time.time() + respuesta_compute = job.computes(com) + t1=time.time() + #print respuesta_compute + compute_info = xml_conv.xml_compute_parse(respuesta_compute) + #waiting=waiting+(time.time()-t1) + status=compute_info['state'].capitalize() + #print 'Compute status', status, j, waiting + if 'Done' in status: + j=j+1 + sshdone[compute_info['name']]=t1-del_time + #print ('Compute'+com+'is done'), j + if j == len(uris_log['computes']): + print 'All cluster computes are Done' + status_cluster='Done' + stats_log['cluster_done']=t1-del_time + #print 'ssh_done', sshdone + print '----------------' + print 'Cluster_Done', stats_log['cluster_done'] + print '----------------' + #if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: + # print 'Tiempo borrado', stats_log['done'] - stats_log['del_time'] + # stats_log['borrado']= stats_log['done'] - stats_log['del_time'] + #save.s_time(stats_log['borrado'],'delete') + save.st_cluster(sub_time,stats_log,iter,job_infor['name'],uris_log['computes']) + if iter is 0: + for i in stats_log: + print i+'\t'+str(stats_log[str(i)]) + + + for j in stats_log: + print(str(stats_log[str(j)])+'\t') + + #print 'Tiempo ssh', stats_log['ssh_close'] - stats_log['disable'] + # print 'Borrado: ', time.asctime(time.localtime(stats_log['del_time'])) + + else: + print 'Experiment deleted. Broker api error.' + print 'Deleted repetition:', iter + iter=iter-1 + print 'Return repetition:', iter + time.sleep(30) + return iter + +