# # BonFIRE Virtual Clusters on Federated Clouds Demonstration Kit # # Copyright (c) Fundacion Centro Tecnologico de Supercomputacion de Galicia 2012 # # License Apache Software # # The research leading to these results has received funding from # the European Community's Seventh Framework Programme (FP7/2007-2013) # under agreement number 257386 # # This software is provided with ABSOLUTELY NO WARRANTY # ''' Created on 03/02/2012 Experiment agent: Submits, monitors and deletes experiments @author: R. Valin ''' import time import sub_pycurl as em import xml_conv import ssh_test import save_time as save import application as ap import credentials as cr def ea_bf(exp,iter): """It Submits and deletes experiments, checks if the VMs are available and saves the time""" #: Url experiment manager and data theurl = 'https://api.bonfire-project.eu:443/managed_experiments' #theurl= 'https://api.integration.bonfire.grid5000.fr/managed_experiments' raiz = 'https://api.bonfire-project.eu:443' #raiz = 'https://api.integration.bonfire.grid5000.fr/' app=0 ela=0 #Create the experiment manager object which allows submission, deletion, and check logs. job = em.sub() #Inform it about user credentials job.user = cr.getuser('~/.restfully/api.bonfire-project.eu.yml') job.passwd = cr.getpass('~/.restfully/api.bonfire-project.eu.yml') #Send the request # Dictionary of times #sub_time = time of the submision #broker_time = Registry from broker got from log info #ssh_able = ssh accesible #del_time = deleted #ssh_disable = can not be connected by ssh #Create the experiment sending the JSON file stats_log = dict() sub_time = time.time() #print 'exp', exp error_try=0 while error_try==0: try: respuesta = job.submit(theurl, exp) except Exception, inst: print '\n-------------------' print 'Warning: API is not reachable.' print 'Trying again...' time.sleep(5) else: try: job_infor = xml_conv.xml_parse(respuesta) except Exception, inst: print 'API reply is not XML:' print 'API reply', respuesta time.sleep(5) else: error_try=1 print '----------------' print 'Experiment submitted: ', time.asctime(time.localtime(sub_time)) print '----------------\n' for i in job_infor: print i+':\t', job_infor[i] time.sleep(float(3)) print '----------------\n' # Get the log and process it compute=[] deployed_log = '' log_error=0 #uris_log=dict() #uris_log['deployed']='' j=0 print 'Deploying experiment...' while('deployed' not in deployed_log and log_error is 0 ): #print 'Checking experiment log...', job_infor['experiment'] #print respuesta_log = job.log(raiz + job_infor['log']) #print(respuesta_log) uris_log = xml_conv.log_parsing(respuesta_log, raiz, compute) j=j+1 deployed_log = uris_log['deployed'] log_error=uris_log['error'] #if 'deployed' not in deployed_log: # print 'Deploying experiment...', job_infor['experiment'] if log_error is 0 and 'deployed' in deployed_log: broker_time = time.time() #stats_log['broker_time'] = time.time() print 'Experiment deployed \n', uris_log stats_log['manager']=broker_time-sub_time #save.s_time(stats_log['manager'],'manager') else: print 'Deploy error\n', uris_log, log_error print '\n' print 'Error reply\n', respuesta_log print '\n' status='' waiting_t = 0 compute_uris=dict() if log_error is 0 and 'deployed' in deployed_log: status_cluster='' # Reference time to measure that all WN have been deployed and change state #t2=time.time() waiting = 0 statusini='' wnstatus=dict() while 'Failed' not in status and waiting < 1200 and 'Running' not in status_cluster and 'Failed' not in status_cluster: j=0 # A deployed cluster means that all nodes must be running. #print "Failed' not in status", 'Failed' not in status #print "waiting < 1200", waiting < 1200 #print "Running not in status_cluster", 'Running' not in status_cluster #print "Failed not in status_cluster", 'Failed' not in status_cluster """Loop on all of cluster nodes""" for com in uris_log['computes']: #print (('Running' or 'Failed') not in status and waiting < 1200, status) #print 'status', 'Running' not in status and 'Failed' not in status and waiting < 1200, status #print 'urlcomputes', com respuesta_compute = job.computes(com) t1=time.time() #print 'Respuesta compute', respuesta_compute #if '500 Internal Server Error' in respuesta_compute or 'Connection refused' in respuesta_compute or 'execution expired' in respuesta_compute and '502 Proxy Error' in respuesta_compute: # print 'execution expired' not in respuesta_compute # print 'Connection refused' not in respuesta_compute # print '502 Proxy Error' not in respuesta_compute error_try=0 while error_try==0: try: compute_info = xml_conv.xml_compute_parse(respuesta_compute) except Exception, inst: print '\n-------------------' print 'Warning: Compute status error.' print compute_info print 'Respuesta', respuesta_compute print 'Trying again...' time.sleep(10) respuesta_compute = job.computes(com) else: #waiting=waiting+(time.time()-t1) error_try=1 status=compute_info['state'].capitalize() if 'Failed' in status or 'Done' in status: print ('Compute '+com+' has failed') status_cluster='Failed' elif status not in statusini and 'Running' not in status: wnstatus[str(status)]=t1-sub_time print (compute_info['hostname'], status) elif 'Running' in status: j=j+1 #print ('Compute '+com+'is running'), j print (compute_info['hostname'], status) if 'client' in compute_info['hostname']: # Elasticity reference node ela_ref=com if j == len(uris_log['computes']): print 'All cluster computes are running' status_cluster='Running' wnstatus[str(status)]=t1-sub_time stats_log['cluster']=wnstatus[str(status)] #waiting_t = waiting_t+(time.time()-t2) compute_uris[com]=compute_info print '\n' if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: print '----------------' print 'Cluster deployed' print 'Configuring OGS...' print '----------------' elif log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: print '----------------' print 'Possible Broker Error', respuesta_compute print '----------------' """Show the deployed nodes and their carasteristics""" nodes=0 for i in compute_uris.keys(): print compute_uris[i] if 'client' in compute_uris[i]['hostname']: nodes=nodes+1 print 'Nodes:', nodes elif 'master' in compute_uris[i]['hostname']: master=compute_uris[i]['ip'] print 'Master', master print '----------------\n' # Check if it is ssh-available if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: for uri in compute_uris.keys(): compute_info=compute_uris[uri] ssh_test.ssh_open(compute_info['ip'], compute_info['hostname']) #stats_log['ssh_able'] = time.time() stats_log['ssh']=time.time()-sub_time #save.s_time(stats_log['ssh'],'ssh') #time.sleep(float(1200)) if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: print '----------------' print "Cluster is ssh-available " print '----------------\n' nodes_qhost=0 while nodes_qhost != nodes: out=ssh_test.ssh_qhost(master) #print 'Salida qhost' error_try=0 try: nodes_qhost=xml_conv.xml_qhost(out) except Exception, inst: print '\n-------------------' print 'Warning: XML reply is not correct.' print 'Qhost reply:', nodes_qhost print 'Trying again...' time.sleep(5) else: error_try=1 stats_log['ogs']=time.time()-sub_time time.sleep(30) print '----------------' print "Cluster is ready. OGS configured" print '----------------\n' print ssh_test.ssh_qhost(master) #Ejecutamos aplicacion if app == 1: app_time=time.time() ap.sub_app(str(master)) stats_log['app']=time.time()-app_time #Elasticity:add nodes if ela == 1: #Updete the total number of nodes nodes=nodes+1 #Ask for info about one compute node ela_ref_info=compute_uris[ela_ref] if '444' in uris_log['broker']: #Create the URL to request a new node theurl_comp=uris_log['broker'].replace(':444','')+'/computes' #print theurl_comp #Deploy a new node #print theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref'] ela_time=time.time() node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref']) #print 'node_output', node_output #Parse return and check status #compute_info = xml_conv.xml_compute_parse(node_output) #status=compute_info['state'].capitalize() status='' while 'Running' not in status and 'Failed' not in status: error_try=0 while error_try==0: try: compute_info = xml_conv.xml_compute_parse(node_output) except Exception, inst: print '\n-------------------' print 'Warning: Compute status error.' print compute_info print 'Answer', respuesta_compute print 'Trying again...' time.sleep(5) node_output = job.computes(ela_ref) else: #waiting=waiting+(time.time()-t1) error_try=1 status=compute_info['state'].capitalize() if 'Failed' in status or 'Done' in status: print ('Compute '+com+' has failed') status_cluster='Failed' respuesta_compute = job.computes(ela_ref) node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref']) elif status not in statusini and 'Running' not in status: wnstatus[str(status)]=t1-sub_time print (compute_info['hostname'], status) elif 'Running' in status: stats_log['ela_running']=time.time()-ela_time print ('Compute '+ela_ref+'is running') print (compute_info['hostname'], status) #Registramos tiempo running node_output = job.computes(ela_ref) compute_uris[ela_ref]=compute_info print '\n' #Parse information about new node and check if ssh-available ssh_test.ssh_open(compute_info['ip'],compute_info['hostname']) stats_log['ela_ssh']=time.time()-ela_time print '------------------------' print 'Node is ssh-available' print '------------------------'+'\n' #Evaluate if the node has been added to OGS nodes_qhost=0 while nodes_qhost != nodes: out=ssh_test.ssh_qhost(master) #print 'Salida qhost', out error_try=0 try: nodes_qhost=xml_conv.xml_qhost(out) print 'Counted nodes and total', nodes_qhost, nodes except Exception, inst: print '\n-------------------' print 'Warning: XML reply is not correct.' print 'Qhost reply:', nodes_qhost print 'Trying again...' time.sleep(5) else: error_try=1 stats_log['ela_ogs']=time.time()-ela_time if iter is 0: for i in stats_log: print i+'\t'+str(stats_log[str(i)]) #Print times #Delete experiment print '----------------' print 'Deleting experiment...', raiz + job_infor['experiment'] print '----------------\n' print del_time = time.time() respuesta_del = job.delete(raiz + job_infor['experiment']) #stats_log['disable'] = time.time() print (respuesta_del) #Check if ssh-available # if log_error is 0 and 'deployed' in deployed_log and 'Running' in status: # for uri in compute_uris.keys(): # compute_info=compute_uris[uri] # ssh_test.ssh_close(compute_info['ip'], compute_info['name']) # stats_log['ssh_close'] = time.time() if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: #t2=time.time() waiting = 0 status='' sshdone=dict() j=0 while 'Done' not in status_cluster and waiting < 1200: for com in uris_log['computes']: #print 'urlcomputes', com #t1=time.time() respuesta_compute = job.computes(com) t1=time.time() #print respuesta_compute compute_info = xml_conv.xml_compute_parse(respuesta_compute) #waiting=waiting+(time.time()-t1) status=compute_info['state'].capitalize() #print 'Compute status', status, j, waiting if 'Done' in status: j=j+1 sshdone[compute_info['name']]=t1-del_time #print ('Compute'+com+'is done'), j if j == len(uris_log['computes']): print 'All cluster computes are Done' status_cluster='Done' stats_log['cluster_done']=t1-del_time #print 'ssh_done', sshdone print '----------------' print 'Cluster_Done', stats_log['cluster_done'] print '----------------' #if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster: # print 'Tiempo borrado', stats_log['done'] - stats_log['del_time'] # stats_log['borrado']= stats_log['done'] - stats_log['del_time'] #save.s_time(stats_log['borrado'],'delete') save.st_cluster(sub_time,stats_log,iter,job_infor['name'],uris_log['computes']) if iter is 0: for i in stats_log: print i+'\t'+str(stats_log[str(i)]) for j in stats_log: print(str(stats_log[str(j)])+'\t') #print 'Time ssh', stats_log['ssh_close'] - stats_log['disable'] # print 'Delete: ', time.asctime(time.localtime(stats_log['del_time'])) else: print 'Experiment deleted. Broker api error.' print 'Deleted repetition:', iter iter=iter-1 print 'Return repetition:', iter time.sleep(30) return iter