#
# BonFIRE Virtual Clusters on Federated Clouds Demonstration Kit
#
# Copyright (c) Fundacion Centro Tecnologico de Supercomputacion de Galicia 2012
#
# License Apache Software
#
# The research leading to these results has received funding from
# the European Community's Seventh Framework Programme (FP7/2007-2013)
# under agreement number 257386
#
# This software is provided with ABSOLUTELY NO WARRANTY
#
'''
Created on 03/02/2012
Experiment agent: Submits, monitors and deletes experiments
@author: R. Valin
'''
import time
import sub_pycurl as em
import xml_conv
import ssh_test
import save_time as save
import application as ap
import credentials as cr
def ea_bf(exp,iter):
"""It Submits and deletes experiments, checks if the VMs are available and saves the time"""
#: Url experiment manager y data
#user = 'agomez'
#user = cr.getuser('~/.restfully/api.bonfire-project.eu.yml')
#passwd = 'elpais00'
#passwd = cr.getpass('~/.restfully/api.bonfire-project.eu.yml')
theurl = 'https://api.bonfire-project.eu:443/managed_experiments'
#theurl= 'https://api.integration.bonfire.grid5000.fr/managed_experiments'
raiz = 'https://api.bonfire-project.eu:443'
#raiz = 'https://api.integration.bonfire.grid5000.fr/'
app=0
ela=0
#Construimos el objeto job que nos permite enviar, borrar, comprobar logs y le pasamos las credenciales
job = em.sub()
job.user = cr.getuser('~/.restfully/api.bonfire-project.eu.yml')
job.passwd = cr.getpass('~/.restfully/api.bonfire-project.eu.yml')
#Enviamos y procesamos el envio
#Diccionario con tiempos
#sub_time = envio
#broker_time = registro en el broker from log info
#ssh_able = accesible por ssh
#del_time = borrado
#ssh_disable = no accesible por ssh
#Creamos experimento enviando json
stats_log = dict()
sub_time = time.time()
#print 'exp', exp
error_try=0
while error_try==0:
try:
respuesta = job.submit(theurl, exp)
except Exception, inst:
print '\n-------------------'
print 'Warning: API is not reachable.'
print 'Trying again...'
time.sleep(5)
else:
try:
job_infor = xml_conv.xml_parse(respuesta)
except Exception, inst:
print 'API reply is not XML:'
print 'API reply', respuesta
time.sleep(5)
else:
error_try=1
print '----------------'
print 'Experiment submitted: ', time.asctime(time.localtime(sub_time))
print '----------------\n'
for i in job_infor:
print i+':\t', job_infor[i]
time.sleep(float(3))
print '----------------\n'
#Pedimos el log y lo procesamos
compute=[]
deployed_log = ''
log_error=0
#uris_log=dict()
#uris_log['deployed']=''
j=0
print 'Deploying experiment...'
while('deployed' not in deployed_log and log_error is 0 ):
#print 'Comprobando log del experimento...', job_infor['experiment']
#print
respuesta_log = job.log(raiz + job_infor['log'])
#print(respuesta_log)
uris_log = xml_conv.log_parsing(respuesta_log, raiz, compute)
j=j+1
deployed_log = uris_log['deployed']
log_error=uris_log['error']
#if 'deployed' not in deployed_log:
# print 'Deploying experiment...', job_infor['experiment']
if log_error is 0 and 'deployed' in deployed_log:
broker_time = time.time()
#stats_log['broker_time'] = time.time()
print 'Experiment deployed \n', uris_log
stats_log['manager']=broker_time-sub_time
#save.s_time(stats_log['manager'],'manager')
else:
print 'Deploy error\n', uris_log, log_error
print '\n'
print 'Error reply\n', respuesta_log
print '\n'
status=''
waiting_t = 0
compute_uris=dict()
if log_error is 0 and 'deployed' in deployed_log:
status_cluster=''
#Tiempo de referencia para medir que todos los WN esten desplegados y cambio de estado
#t2=time.time()
waiting = 0
statusini=''
wnstatus=dict()
while 'Failed' not in status and waiting < 1200 and 'Running' not in status_cluster and 'Failed' not in status_cluster:
j=0 #Para que cluster deployed todos los nodos tienen que estar running.
#print "Failed' not in status", 'Failed' not in status
#print "waiting < 1200", waiting < 1200
#print "Running not in status_cluster", 'Running' not in status_cluster
#print "Failed not in status_cluster", 'Failed' not in status_cluster
"""Recorre todos los nodos del cluster"""
for com in uris_log['computes']:
#print (('Running' or 'Failed') not in status and waiting < 1200, status)
#print 'status', 'Running' not in status and 'Failed' not in status and waiting < 1200, status
#print 'urlcomputes', com
respuesta_compute = job.computes(com)
t1=time.time()
#print 'Respuesta compute', respuesta_compute
#if '500 Internal Server Error' in respuesta_compute or 'Connection refused' in respuesta_compute or 'execution expired' in respuesta_compute and '502 Proxy Error' in respuesta_compute:
# print 'execution expired' not in respuesta_compute
# print 'Connection refused' not in respuesta_compute
# print '502 Proxy Error' not in respuesta_compute
error_try=0
while error_try==0:
try:
compute_info = xml_conv.xml_compute_parse(respuesta_compute)
except Exception, inst:
print '\n-------------------'
print 'Warning: Compute status error.'
print compute_info
print 'Respuesta', respuesta_compute
print 'Trying again...'
time.sleep(10)
respuesta_compute = job.computes(com)
else:
#waiting=waiting+(time.time()-t1)
error_try=1
status=compute_info['state'].capitalize()
if 'Failed' in status or 'Done' in status:
print ('Compute '+com+' has failed')
status_cluster='Failed'
elif status not in statusini and 'Running' not in status:
wnstatus[str(status)]=t1-sub_time
print (compute_info['hostname'], status)
elif 'Running' in status:
j=j+1
#print ('Compute '+com+'is running'), j
print (compute_info['hostname'], status)
if 'client' in compute_info['hostname']:
#Nodo de referencia para elasticidad
ela_ref=com
if j == len(uris_log['computes']):
print 'All cluster computes are running'
status_cluster='Running'
wnstatus[str(status)]=t1-sub_time
stats_log['cluster']=wnstatus[str(status)]
#waiting_t = waiting_t+(time.time()-t2)
compute_uris[com]=compute_info
print '\n'
if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
print '----------------'
print 'Cluster deployed'
print 'Configuring OGS...'
print '----------------'
elif log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
print '----------------'
print 'Possible Broker Error', respuesta_compute
print '----------------'
"""Muestra los nodos desplegados y caracteristicas"""
nodes=0
for i in compute_uris.keys():
print compute_uris[i]
if 'client' in compute_uris[i]['hostname']:
nodes=nodes+1
print 'Nodes:', nodes
elif 'master' in compute_uris[i]['hostname']:
master=compute_uris[i]['ip']
print 'Master', master
print '----------------\n'
#Probamos si se puede acceder por ssh
if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
for uri in compute_uris.keys():
compute_info=compute_uris[uri]
ssh_test.ssh_open(compute_info['ip'], compute_info['hostname'])
#stats_log['ssh_able'] = time.time()
stats_log['ssh']=time.time()-sub_time
#save.s_time(stats_log['ssh'],'ssh')
#time.sleep(float(1200))
if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
print '----------------'
print "Cluster is ssh-available "
print '----------------\n'
nodes_qhost=0
while nodes_qhost != nodes:
out=ssh_test.ssh_qhost(master)
#print 'Salida qhost'
error_try=0
try:
nodes_qhost=xml_conv.xml_qhost(out)
except Exception, inst:
print '\n-------------------'
print 'Warning: XML reply is not correct.'
print 'Qhost reply:', nodes_qhost
print 'Trying again...'
time.sleep(5)
else:
error_try=1
stats_log['ogs']=time.time()-sub_time
time.sleep(30)
print '----------------'
print "Cluster is ready. OGS configured"
print '----------------\n'
print ssh_test.ssh_qhost(master)
#Ejecutamos aplicacion
if app == 1:
app_time=time.time()
ap.sub_app(str(master))
stats_log['app']=time.time()-app_time
#Elasticidad:add nodes
if ela == 1:
#Actualizamos nodos totales cluster
nodes=nodes+1
#Pedimos info de un compute ya desplegado
ela_ref_info=compute_uris[ela_ref]
if '444' in uris_log['broker']:
#Montamos la url para solicitar al broker un compute
theurl_comp=uris_log['broker'].replace(':444','')+'/computes'
#print theurl_comp
#Desplegamos nuevo nodo
#print theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref']
ela_time=time.time()
node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref'])
#print 'node_output', node_output
#Parseamos info y comprobamos estado
#compute_info = xml_conv.xml_compute_parse(node_output)
#status=compute_info['state'].capitalize()
status=''
while 'Running' not in status and 'Failed' not in status:
error_try=0
while error_try==0:
try:
compute_info = xml_conv.xml_compute_parse(node_output)
except Exception, inst:
print '\n-------------------'
print 'Warning: Compute status error.'
print compute_info
print 'Respuesta', respuesta_compute
print 'Trying again...'
time.sleep(5)
node_output = job.computes(ela_ref)
else:
#waiting=waiting+(time.time()-t1)
error_try=1
status=compute_info['state'].capitalize()
if 'Failed' in status or 'Done' in status:
print ('Compute '+com+' has failed')
status_cluster='Failed'
respuesta_compute = job.computes(ela_ref)
node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref'])
elif status not in statusini and 'Running' not in status:
wnstatus[str(status)]=t1-sub_time
print (compute_info['hostname'], status)
elif 'Running' in status:
stats_log['ela_running']=time.time()-ela_time
print ('Compute '+ela_ref+'is running')
print (compute_info['hostname'], status)
#Registramos tiempo running
node_output = job.computes(ela_ref)
compute_uris[ela_ref]=compute_info
print '\n'
#Parseamos la info del nuevo nodo y evaluamos cuando esta ssh-available
ssh_test.ssh_open(compute_info['ip'],compute_info['hostname'])
stats_log['ela_ssh']=time.time()-ela_time
print '------------------------'
print 'Node is ssh-available'
print '------------------------'+'\n'
#Evaluamos si se ha added el nodo al sistema de colas
nodes_qhost=0
while nodes_qhost != nodes:
out=ssh_test.ssh_qhost(master)
#print 'Salida qhost', out
error_try=0
try:
nodes_qhost=xml_conv.xml_qhost(out)
print 'Nodos contados y totales', nodes_qhost, nodes
except Exception, inst:
print '\n-------------------'
print 'Warning: XML reply is not correct.'
print 'Qhost reply:', nodes_qhost
print 'Trying again...'
time.sleep(5)
else:
error_try=1
stats_log['ela_ogs']=time.time()-ela_time
if iter is 0:
for i in stats_log:
print i+'\t'+str(stats_log[str(i)])
#Imprimimos tiempos
#Borramos el experimento
print '----------------'
print 'Deleting experiment...', raiz + job_infor['experiment']
print '----------------\n'
print
del_time = time.time()
respuesta_del = job.delete(raiz + job_infor['experiment'])
#stats_log['disable'] = time.time()
print (respuesta_del)
#Probamos si se puede acceder por ssh
# if log_error is 0 and 'deployed' in deployed_log and 'Running' in status:
# for uri in compute_uris.keys():
# compute_info=compute_uris[uri]
# ssh_test.ssh_close(compute_info['ip'], compute_info['name'])
# stats_log['ssh_close'] = time.time()
if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
#t2=time.time()
waiting = 0
status=''
sshdone=dict()
j=0
while 'Done' not in status_cluster and waiting < 1200:
for com in uris_log['computes']:
#print 'urlcomputes', com
#t1=time.time()
respuesta_compute = job.computes(com)
t1=time.time()
#print respuesta_compute
compute_info = xml_conv.xml_compute_parse(respuesta_compute)
#waiting=waiting+(time.time()-t1)
status=compute_info['state'].capitalize()
#print 'Compute status', status, j, waiting
if 'Done' in status:
j=j+1
sshdone[compute_info['name']]=t1-del_time
#print ('Compute'+com+'is done'), j
if j == len(uris_log['computes']):
print 'All cluster computes are Done'
status_cluster='Done'
stats_log['cluster_done']=t1-del_time
#print 'ssh_done', sshdone
print '----------------'
print 'Cluster_Done', stats_log['cluster_done']
print '----------------'
#if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
# print 'Tiempo borrado', stats_log['done'] - stats_log['del_time']
# stats_log['borrado']= stats_log['done'] - stats_log['del_time']
#save.s_time(stats_log['borrado'],'delete')
save.st_cluster(sub_time,stats_log,iter,job_infor['name'],uris_log['computes'])
if iter is 0:
for i in stats_log:
print i+'\t'+str(stats_log[str(i)])
for j in stats_log:
print(str(stats_log[str(j)])+'\t')
#print 'Tiempo ssh', stats_log['ssh_close'] - stats_log['disable']
# print 'Borrado: ', time.asctime(time.localtime(stats_log['del_time']))
else:
print 'Experiment deleted. Broker api error.'
print 'Deleted repetition:', iter
iter=iter-1
print 'Return repetition:', iter
time.sleep(30)
return iter