Log In | Get Help   
Home My Page Projects Code Snippets Project Openings BonFIRE VCOC Demonstration Kit
Summary Activity SCM Files Wiki
[bonfiredemokit] View of /ea/ea_bonfire.py
[bonfiredemokit] / ea / ea_bonfire.py Repository:
ViewVC logotype

View of /ea/ea_bonfire.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 18 - (download) (as text) (annotate)
Tue Oct 9 16:17:03 2012 UTC (11 years, 7 months ago) by agomez
File size: 16331 byte(s)
Changed comments to English and test
#
# BonFIRE Virtual Clusters on Federated Clouds Demonstration Kit
#
# Copyright (c) Fundacion Centro Tecnologico de Supercomputacion de Galicia 2012
# 
# License Apache Software
#
# The research leading to these results has received funding from 
# the European Community's Seventh Framework Programme (FP7/2007-2013) 
# under agreement number 257386
#
# This software is provided with ABSOLUTELY NO WARRANTY
# 
'''
Created on 03/02/2012
Experiment agent: Submits, monitors and deletes experiments
@author: R. Valin
'''
import time
import sub_pycurl as em
import xml_conv
import ssh_test
import save_time as save
import application as ap
import credentials as cr



def ea_bf(exp,iter):
    """It Submits and deletes experiments, checks if the VMs are available and saves the time"""
    
    #: Url experiment manager and data
    
    theurl = 'https://api.bonfire-project.eu:443/managed_experiments'
    #theurl= 'https://api.integration.bonfire.grid5000.fr/managed_experiments'
    raiz = 'https://api.bonfire-project.eu:443'
    #raiz = 'https://api.integration.bonfire.grid5000.fr/'
    
    app=0 
    ela=0 

    #Create the experiment manager object which allows submission, deletion, and check logs. 
    job = em.sub()

    #Inform it about user credentials
    job.user =  cr.getuser('~/.restfully/api.bonfire-project.eu.yml') 
    job.passwd = cr.getpass('~/.restfully/api.bonfire-project.eu.yml') 
    
    #Send the request
    
    # Dictionary of times
    #sub_time = time of the submision
    #broker_time = Registry from broker got from log info
    #ssh_able = ssh accesible
    #del_time = deleted
    #ssh_disable = can not be connected by ssh
    
    
    #Create the experiment sending the JSON file
    stats_log = dict()
    sub_time = time.time()
    #print 'exp', exp
    error_try=0
    while error_try==0:
        try:
            respuesta = job.submit(theurl, exp)
        except Exception, inst:
            print '\n-------------------'
            print 'Warning: API is not reachable.'
            print 'Trying again...'
            time.sleep(5)
        else:
            try:
                job_infor = xml_conv.xml_parse(respuesta)
            except Exception, inst:
                print 'API reply is not XML:'
                print 'API reply', respuesta
                time.sleep(5)
            else:
                error_try=1
            

    print '----------------'
    print 'Experiment submitted: ', time.asctime(time.localtime(sub_time))
    print '----------------\n'
    for i in job_infor:
        print i+':\t', job_infor[i]
        time.sleep(float(3))
    print '----------------\n'

    
    # Get the log and process it
    compute=[]
    deployed_log = ''
    log_error=0
    #uris_log=dict()
    #uris_log['deployed']=''
    j=0
    print 'Deploying experiment...'
    while('deployed' not in deployed_log and log_error is 0 ):
        #print 'Checking experiment log...', job_infor['experiment']
        #print
        respuesta_log = job.log(raiz + job_infor['log'])
        #print(respuesta_log)
        uris_log = xml_conv.log_parsing(respuesta_log, raiz, compute)
        j=j+1
        deployed_log = uris_log['deployed']
        log_error=uris_log['error']
        #if 'deployed' not in deployed_log:
        #    print 'Deploying experiment...', job_infor['experiment']
            
    if log_error is 0 and 'deployed' in deployed_log:
        broker_time = time.time()
        #stats_log['broker_time'] = time.time()
        print 'Experiment deployed \n', uris_log
        stats_log['manager']=broker_time-sub_time
        #save.s_time(stats_log['manager'],'manager')
    else:
        print 'Deploy error\n', uris_log, log_error 
        print '\n'
        print 'Error reply\n', respuesta_log
        print '\n'

    status=''    
    waiting_t = 0
    compute_uris=dict()
    if log_error is 0 and 'deployed' in deployed_log:
        status_cluster=''
        # Reference time to measure that all WN have been deployed and change state
        #t2=time.time()
        waiting = 0
        statusini=''
        wnstatus=dict()
        while 'Failed' not in status and waiting < 1200 and 'Running' not in status_cluster and 'Failed' not in status_cluster:
            j=0 # A deployed cluster means that all nodes must be running. 
            #print "Failed' not in status", 'Failed' not in status 
            #print "waiting < 1200", waiting < 1200 
            #print "Running not in status_cluster", 'Running' not in status_cluster 
            #print "Failed not in status_cluster", 'Failed' not in status_cluster
            """Loop on all of cluster nodes"""
            for com in uris_log['computes']:
            #print (('Running' or 'Failed') not in status and waiting < 1200, status)
            #print 'status', 'Running' not in status and 'Failed' not in status and waiting < 1200, status
                #print 'urlcomputes', com
                respuesta_compute = job.computes(com)
                t1=time.time()
            #print 'Respuesta compute', respuesta_compute
                 #if '500 Internal Server Error' in respuesta_compute or 'Connection refused' in respuesta_compute or 'execution expired' in  respuesta_compute and '502 Proxy Error' in respuesta_compute:
                 #   print 'execution expired' not in  respuesta_compute
                 #   print 'Connection refused' not in respuesta_compute
                 #   print '502 Proxy Error' not in respuesta_compute
                error_try=0    
                while error_try==0:
                    try:
                        compute_info = xml_conv.xml_compute_parse(respuesta_compute)
                    except Exception, inst:
                        print '\n-------------------'
                        print 'Warning: Compute status error.'
                        print compute_info
                        print 'Respuesta', respuesta_compute
                        print 'Trying again...'
                        time.sleep(10)
                        respuesta_compute = job.computes(com)
                    else:
                #waiting=waiting+(time.time()-t1)
                        error_try=1
                        status=compute_info['state'].capitalize()
                if 'Failed' in status or 'Done' in status:
                    print ('Compute '+com+' has failed')
                    status_cluster='Failed'
                elif status not in statusini and 'Running' not in status:                  
                    wnstatus[str(status)]=t1-sub_time
                    print (compute_info['hostname'], status)
                elif 'Running' in status:
                    j=j+1
                    #print ('Compute '+com+'is running'), j
                    print (compute_info['hostname'], status)
		    if 'client' in compute_info['hostname']:
			# Elasticity reference node
			ela_ref=com
				
                    if j == len(uris_log['computes']):
                        print 'All cluster computes are running'
                        status_cluster='Running'
                        wnstatus[str(status)]=t1-sub_time
                        stats_log['cluster']=wnstatus[str(status)]
                #waiting_t = waiting_t+(time.time()-t2)
                compute_uris[com]=compute_info
            print '\n'
                            
    if  log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
        print '----------------'
        print 'Cluster deployed'
	print 'Configuring OGS...'
        print '----------------'
       
    elif log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
        print '----------------'
        print 'Possible Broker Error', respuesta_compute
        print '----------------'

    """Show the deployed nodes and their carasteristics"""
    nodes=0
    for i in compute_uris.keys():        
        print compute_uris[i]
        if 'client' in compute_uris[i]['hostname']:
            nodes=nodes+1
            print 'Nodes:', nodes
        elif 'master' in compute_uris[i]['hostname']:
            master=compute_uris[i]['ip']
            print 'Master', master
    print '----------------\n'
        
    # Check if it is ssh-available
    if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
        for uri in compute_uris.keys():
            compute_info=compute_uris[uri]
            ssh_test.ssh_open(compute_info['ip'], compute_info['hostname'])
        #stats_log['ssh_able'] = time.time()
        stats_log['ssh']=time.time()-sub_time
            #save.s_time(stats_log['ssh'],'ssh')    
        #time.sleep(float(1200))
    
    if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
        print '----------------'    
        print "Cluster is ssh-available "
        print '----------------\n'

        nodes_qhost=0
        while nodes_qhost != nodes:
            out=ssh_test.ssh_qhost(master)
            #print 'Salida qhost'
            error_try=0
            try:
                nodes_qhost=xml_conv.xml_qhost(out)
            except Exception, inst:
                print '\n-------------------'
                print 'Warning: XML reply is not correct.'
                print 'Qhost reply:', nodes_qhost
                print 'Trying again...'
                time.sleep(5)
            else:
                error_try=1
            
        stats_log['ogs']=time.time()-sub_time
        time.sleep(30)
        print '----------------'  
        print "Cluster is ready. OGS configured"
        print '----------------\n'  
        print ssh_test.ssh_qhost(master)
        
	#Ejecutamos aplicacion
	if app == 1:
		app_time=time.time()
		ap.sub_app(str(master))
		stats_log['app']=time.time()-app_time
	
	#Elasticity:add nodes
	if ela == 1:
		#Updete the total number of nodes
		nodes=nodes+1
		#Ask for info about one compute node
		ela_ref_info=compute_uris[ela_ref]
		if '444' in uris_log['broker']:
			#Create the URL to request a new node
			theurl_comp=uris_log['broker'].replace(':444','')+'/computes'
			#print theurl_comp
		#Deploy a new node
		#print theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref']
		ela_time=time.time()
		node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref'])
		#print 'node_output', node_output
		#Parse return and check status
		#compute_info = xml_conv.xml_compute_parse(node_output)
		#status=compute_info['state'].capitalize()
		status=''
		while 'Running' not in status and 'Failed' not in status:
			error_try=0
		        while error_try==0:
                	    try:
		               compute_info = xml_conv.xml_compute_parse(node_output)
                	    except Exception, inst:
		               print '\n-------------------'
                	       print 'Warning: Compute status error.'
		               print compute_info
                	       print 'Answer', respuesta_compute
		               print 'Trying again...'
                	       time.sleep(5)
		               node_output = job.computes(ela_ref)
                	    else:
		               #waiting=waiting+(time.time()-t1)
                	       error_try=1
		               status=compute_info['state'].capitalize()
                	if 'Failed' in status or 'Done' in status:
	                    print ('Compute '+com+' has failed')
        	            status_cluster='Failed'
			    respuesta_compute = job.computes(ela_ref)
			    node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref'])
                	elif status not in statusini and 'Running' not in status:
	                    wnstatus[str(status)]=t1-sub_time
        	            print (compute_info['hostname'], status)
                	elif 'Running' in status:
			    stats_log['ela_running']=time.time()-ela_time
	                    print ('Compute '+ela_ref+'is running')
        	            print (compute_info['hostname'], status)
                	    #Registramos tiempo running
			node_output = job.computes(ela_ref)
	                compute_uris[ela_ref]=compute_info
	                print '\n'

		#Parse information about new node and check if ssh-available
            	ssh_test.ssh_open(compute_info['ip'],compute_info['hostname'])
		stats_log['ela_ssh']=time.time()-ela_time
		print '------------------------'
		print 'Node is ssh-available'
		print '------------------------'+'\n'
		#Evaluate if the node has been added to OGS
		nodes_qhost=0
        	while nodes_qhost != nodes:
            		out=ssh_test.ssh_qhost(master)
            		#print 'Salida qhost', out
            		error_try=0
            		try:
                		nodes_qhost=xml_conv.xml_qhost(out)
				print 'Counted nodes and total', nodes_qhost, nodes
        	    	except Exception, inst:
                		print '\n-------------------'
                		print 'Warning: XML reply is not correct.'
	                	print 'Qhost reply:', nodes_qhost
	                	print 'Trying again...'
        	        	time.sleep(5)
            		else:
                		error_try=1
		stats_log['ela_ogs']=time.time()-ela_time	
		

        if iter is 0:
                for i in stats_log:
                        print i+'\t'+str(stats_log[str(i)])
        #Print times
    #Delete experiment
    print '----------------'
    print 'Deleting experiment...', raiz + job_infor['experiment']
    print '----------------\n'
    print 
    del_time = time.time()
    respuesta_del = job.delete(raiz + job_infor['experiment'])
    #stats_log['disable'] = time.time()
    print (respuesta_del)
    

    
    #Check if ssh-available
#   if log_error is 0 and 'deployed' in deployed_log and 'Running' in status:
#        for uri in compute_uris.keys():
#            compute_info=compute_uris[uri]
#            ssh_test.ssh_close(compute_info['ip'], compute_info['name'])
#            stats_log['ssh_close'] = time.time()

    if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
        #t2=time.time()
        waiting = 0
        status='' 
        sshdone=dict()
        j=0
        while 'Done' not in status_cluster and waiting < 1200:
            for com in uris_log['computes']:
                #print 'urlcomputes', com
                #t1=time.time()
                respuesta_compute = job.computes(com)
                t1=time.time()
                #print respuesta_compute
                compute_info = xml_conv.xml_compute_parse(respuesta_compute)
                #waiting=waiting+(time.time()-t1)
                status=compute_info['state'].capitalize()
                #print 'Compute status', status, j, waiting
                if 'Done' in status:
                    j=j+1
                    sshdone[compute_info['name']]=t1-del_time
                    #print ('Compute'+com+'is done'), j
                    if j == len(uris_log['computes']):
                        print 'All cluster computes are Done'
                        status_cluster='Done'
                        stats_log['cluster_done']=t1-del_time    
        #print 'ssh_done', sshdone
        print '----------------'
        print 'Cluster_Done', stats_log['cluster_done']
        print '----------------'
    #if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
    #    print 'Tiempo borrado', stats_log['done'] - stats_log['del_time']
    #    stats_log['borrado']= stats_log['done'] - stats_log['del_time']
        #save.s_time(stats_log['borrado'],'delete')
        save.st_cluster(sub_time,stats_log,iter,job_infor['name'],uris_log['computes'])
        if iter is 0:
		for i in stats_log:
			print i+'\t'+str(stats_log[str(i)])


    	for j in stats_log:
        	print(str(stats_log[str(j)])+'\t')

        #print 'Time ssh', stats_log['ssh_close'] - stats_log['disable']
    #    print 'Delete: ', time.asctime(time.localtime(stats_log['del_time']))

    else:
        print 'Experiment deleted. Broker api error.'
	print 'Deleted repetition:', iter
	iter=iter-1
	print 'Return repetition:', iter
        time.sleep(30)
    return iter



root@forge.cesga.es
ViewVC Help
Powered by ViewVC 1.0.0  

Powered By FusionForge