Log In | Get Help   
Home My Page Projects Code Snippets Project Openings BonFIRE VCOC Demonstration Kit
Summary Activity SCM Files Wiki
[bonfiredemokit] Annotation of /ea/ea_bonfire.py
[bonfiredemokit] / ea / ea_bonfire.py Repository:
ViewVC logotype

Annotation of /ea/ea_bonfire.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 10 - (view) (download) (as text)

1 : agomez 10 #
2 :     # BonFIRE Virtual Clusters on Federated Clouds Demonstration Kit
3 :     #
4 :     # Copyright (c) Fundacion Centro Tecnologico de Supercomputacion de Galicia 2012
5 :     #
6 :     # License GPL Version 3
7 :     #
8 :     # The research leading to these results has received funding from
9 :     # the European Community's Seventh Framework Programme (FP7/2007-2013)
10 :     # under agreement number 257386
11 :     #
12 :     # This software is provided with ABSOLUTELY NO WARRANTY
13 :     #
14 :     '''
15 :     Created on 03/02/2012
16 :     Experiment agent: Submits, monitors and deletes experiments
17 :     @author: R. Valin
18 :     '''
19 :     import time
20 :     import sub_pycurl as em
21 :     import xml_conv
22 :     import ssh_test
23 :     import save_time as save
24 :     import application as ap
25 :     import credentials as cr
26 :    
27 :    
28 :    
29 :     def ea_bf(exp,iter):
30 :     """It Submits and deletes experiments, checks if the VMs are available and saves the time"""
31 :    
32 :     #: Url experiment manager y data
33 :     #user = 'agomez'
34 :     #user = cr.getuser('~/.restfully/api.bonfire-project.eu.yml')
35 :     #passwd = 'elpais00'
36 :     #passwd = cr.getpass('~/.restfully/api.bonfire-project.eu.yml')
37 :     theurl = 'https://api.bonfire-project.eu:443/managed_experiments'
38 :     #theurl= 'https://api.integration.bonfire.grid5000.fr/managed_experiments'
39 :     raiz = 'https://api.bonfire-project.eu:443'
40 :     #raiz = 'https://api.integration.bonfire.grid5000.fr/'
41 :    
42 :     app=0
43 :     ela=0
44 :    
45 :     #Construimos el objeto job que nos permite enviar, borrar, comprobar logs y le pasamos las credenciales
46 :     job = em.sub()
47 :     job.user = cr.getuser('~/.restfully/api.bonfire-project.eu.yml')
48 :     job.passwd = cr.getpass('~/.restfully/api.bonfire-project.eu.yml')
49 :    
50 :     #Enviamos y procesamos el envio
51 :    
52 :     #Diccionario con tiempos
53 :     #sub_time = envio
54 :     #broker_time = registro en el broker from log info
55 :     #ssh_able = accesible por ssh
56 :     #del_time = borrado
57 :     #ssh_disable = no accesible por ssh
58 :    
59 :    
60 :     #Creamos experimento enviando json
61 :     stats_log = dict()
62 :     sub_time = time.time()
63 :     #print 'exp', exp
64 :     error_try=0
65 :     while error_try==0:
66 :     try:
67 :     respuesta = job.submit(theurl, exp)
68 :     except Exception, inst:
69 :     print '\n-------------------'
70 :     print 'Warning: API is not reachable.'
71 :     print 'Trying again...'
72 :     time.sleep(5)
73 :     else:
74 :     try:
75 :     job_infor = xml_conv.xml_parse(respuesta)
76 :     except Exception, inst:
77 :     print 'API reply is not XML:'
78 :     print 'API reply', respuesta
79 :     time.sleep(5)
80 :     else:
81 :     error_try=1
82 :    
83 :    
84 :     print '----------------'
85 :     print 'Experiment submitted: ', time.asctime(time.localtime(sub_time))
86 :     print '----------------\n'
87 :     for i in job_infor:
88 :     print i+':\t', job_infor[i]
89 :     time.sleep(float(3))
90 :     print '----------------\n'
91 :    
92 :    
93 :     #Pedimos el log y lo procesamos
94 :     compute=[]
95 :     deployed_log = ''
96 :     log_error=0
97 :     #uris_log=dict()
98 :     #uris_log['deployed']=''
99 :     j=0
100 :     print 'Deploying experiment...'
101 :     while('deployed' not in deployed_log and log_error is 0 ):
102 :     #print 'Comprobando log del experimento...', job_infor['experiment']
103 :     #print
104 :     respuesta_log = job.log(raiz + job_infor['log'])
105 :     #print(respuesta_log)
106 :     uris_log = xml_conv.log_parsing(respuesta_log, raiz, compute)
107 :     j=j+1
108 :     deployed_log = uris_log['deployed']
109 :     log_error=uris_log['error']
110 :     #if 'deployed' not in deployed_log:
111 :     # print 'Deploying experiment...', job_infor['experiment']
112 :    
113 :     if log_error is 0 and 'deployed' in deployed_log:
114 :     broker_time = time.time()
115 :     #stats_log['broker_time'] = time.time()
116 :     print 'Experiment deployed \n', uris_log
117 :     stats_log['manager']=broker_time-sub_time
118 :     #save.s_time(stats_log['manager'],'manager')
119 :     else:
120 :     print 'Deploy error\n', uris_log, log_error
121 :     print '\n'
122 :     print 'Error reply\n', respuesta_log
123 :     print '\n'
124 :    
125 :     status=''
126 :     waiting_t = 0
127 :     compute_uris=dict()
128 :     if log_error is 0 and 'deployed' in deployed_log:
129 :     status_cluster=''
130 :     #Tiempo de referencia para medir que todos los WN esten desplegados y cambio de estado
131 :     #t2=time.time()
132 :     waiting = 0
133 :     statusini=''
134 :     wnstatus=dict()
135 :     while 'Failed' not in status and waiting < 1200 and 'Running' not in status_cluster and 'Failed' not in status_cluster:
136 :     j=0 #Para que cluster deployed todos los nodos tienen que estar running.
137 :     #print "Failed' not in status", 'Failed' not in status
138 :     #print "waiting < 1200", waiting < 1200
139 :     #print "Running not in status_cluster", 'Running' not in status_cluster
140 :     #print "Failed not in status_cluster", 'Failed' not in status_cluster
141 :     """Recorre todos los nodos del cluster"""
142 :     for com in uris_log['computes']:
143 :     #print (('Running' or 'Failed') not in status and waiting < 1200, status)
144 :     #print 'status', 'Running' not in status and 'Failed' not in status and waiting < 1200, status
145 :     #print 'urlcomputes', com
146 :     respuesta_compute = job.computes(com)
147 :     t1=time.time()
148 :     #print 'Respuesta compute', respuesta_compute
149 :     #if '500 Internal Server Error' in respuesta_compute or 'Connection refused' in respuesta_compute or 'execution expired' in respuesta_compute and '502 Proxy Error' in respuesta_compute:
150 :     # print 'execution expired' not in respuesta_compute
151 :     # print 'Connection refused' not in respuesta_compute
152 :     # print '502 Proxy Error' not in respuesta_compute
153 :     error_try=0
154 :     while error_try==0:
155 :     try:
156 :     compute_info = xml_conv.xml_compute_parse(respuesta_compute)
157 :     except Exception, inst:
158 :     print '\n-------------------'
159 :     print 'Warning: Compute status error.'
160 :     print compute_info
161 :     print 'Respuesta', respuesta_compute
162 :     print 'Trying again...'
163 :     time.sleep(10)
164 :     respuesta_compute = job.computes(com)
165 :     else:
166 :     #waiting=waiting+(time.time()-t1)
167 :     error_try=1
168 :     status=compute_info['state'].capitalize()
169 :     if 'Failed' in status or 'Done' in status:
170 :     print ('Compute '+com+' has failed')
171 :     status_cluster='Failed'
172 :     elif status not in statusini and 'Running' not in status:
173 :     wnstatus[str(status)]=t1-sub_time
174 :     print (compute_info['hostname'], status)
175 :     elif 'Running' in status:
176 :     j=j+1
177 :     #print ('Compute '+com+'is running'), j
178 :     print (compute_info['hostname'], status)
179 :     if 'client' in compute_info['hostname']:
180 :     #Nodo de referencia para elasticidad
181 :     ela_ref=com
182 :    
183 :     if j == len(uris_log['computes']):
184 :     print 'All cluster computes are running'
185 :     status_cluster='Running'
186 :     wnstatus[str(status)]=t1-sub_time
187 :     stats_log['cluster']=wnstatus[str(status)]
188 :     #waiting_t = waiting_t+(time.time()-t2)
189 :     compute_uris[com]=compute_info
190 :     print '\n'
191 :    
192 :     if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
193 :     print '----------------'
194 :     print 'Cluster deployed'
195 :     print 'Configuring OGS...'
196 :     print '----------------'
197 :    
198 :     elif log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
199 :     print '----------------'
200 :     print 'Possible Broker Error', respuesta_compute
201 :     print '----------------'
202 :    
203 :     """Muestra los nodos desplegados y caracteristicas"""
204 :     nodes=0
205 :     for i in compute_uris.keys():
206 :     print compute_uris[i]
207 :     if 'client' in compute_uris[i]['hostname']:
208 :     nodes=nodes+1
209 :     print 'Nodes:', nodes
210 :     elif 'master' in compute_uris[i]['hostname']:
211 :     master=compute_uris[i]['ip']
212 :     print 'Master', master
213 :     print '----------------\n'
214 :    
215 :     #Probamos si se puede acceder por ssh
216 :     if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
217 :     for uri in compute_uris.keys():
218 :     compute_info=compute_uris[uri]
219 :     ssh_test.ssh_open(compute_info['ip'], compute_info['hostname'])
220 :     #stats_log['ssh_able'] = time.time()
221 :     stats_log['ssh']=time.time()-sub_time
222 :     #save.s_time(stats_log['ssh'],'ssh')
223 :     #time.sleep(float(1200))
224 :    
225 :     if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
226 :     print '----------------'
227 :     print "Cluster is ssh-available "
228 :     print '----------------\n'
229 :    
230 :     nodes_qhost=0
231 :     while nodes_qhost != nodes:
232 :     out=ssh_test.ssh_qhost(master)
233 :     #print 'Salida qhost'
234 :     error_try=0
235 :     try:
236 :     nodes_qhost=xml_conv.xml_qhost(out)
237 :     except Exception, inst:
238 :     print '\n-------------------'
239 :     print 'Warning: XML reply is not correct.'
240 :     print 'Qhost reply:', nodes_qhost
241 :     print 'Trying again...'
242 :     time.sleep(5)
243 :     else:
244 :     error_try=1
245 :    
246 :     stats_log['ogs']=time.time()-sub_time
247 :     time.sleep(30)
248 :     print '----------------'
249 :     print "Cluster is ready. OGS configured"
250 :     print '----------------\n'
251 :     print ssh_test.ssh_qhost(master)
252 :    
253 :     #Ejecutamos aplicacion
254 :     if app == 1:
255 :     app_time=time.time()
256 :     ap.sub_app(str(master))
257 :     stats_log['app']=time.time()-app_time
258 :    
259 :     #Elasticidad:add nodes
260 :     if ela == 1:
261 :     #Actualizamos nodos totales cluster
262 :     nodes=nodes+1
263 :     #Pedimos info de un compute ya desplegado
264 :     ela_ref_info=compute_uris[ela_ref]
265 :     if '444' in uris_log['broker']:
266 :     #Montamos la url para solicitar al broker un compute
267 :     theurl_comp=uris_log['broker'].replace(':444','')+'/computes'
268 :     #print theurl_comp
269 :     #Desplegamos nuevo nodo
270 :     #print theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref']
271 :     ela_time=time.time()
272 :     node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref'])
273 :     #print 'node_output', node_output
274 :     #Parseamos info y comprobamos estado
275 :     #compute_info = xml_conv.xml_compute_parse(node_output)
276 :     #status=compute_info['state'].capitalize()
277 :     status=''
278 :     while 'Running' not in status and 'Failed' not in status:
279 :     error_try=0
280 :     while error_try==0:
281 :     try:
282 :     compute_info = xml_conv.xml_compute_parse(node_output)
283 :     except Exception, inst:
284 :     print '\n-------------------'
285 :     print 'Warning: Compute status error.'
286 :     print compute_info
287 :     print 'Respuesta', respuesta_compute
288 :     print 'Trying again...'
289 :     time.sleep(5)
290 :     node_output = job.computes(ela_ref)
291 :     else:
292 :     #waiting=waiting+(time.time()-t1)
293 :     error_try=1
294 :     status=compute_info['state'].capitalize()
295 :     if 'Failed' in status or 'Done' in status:
296 :     print ('Compute '+com+' has failed')
297 :     status_cluster='Failed'
298 :     respuesta_compute = job.computes(ela_ref)
299 :     node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref'])
300 :     elif status not in statusini and 'Running' not in status:
301 :     wnstatus[str(status)]=t1-sub_time
302 :     print (compute_info['hostname'], status)
303 :     elif 'Running' in status:
304 :     stats_log['ela_running']=time.time()-ela_time
305 :     print ('Compute '+ela_ref+'is running')
306 :     print (compute_info['hostname'], status)
307 :     #Registramos tiempo running
308 :     node_output = job.computes(ela_ref)
309 :     compute_uris[ela_ref]=compute_info
310 :     print '\n'
311 :    
312 :     #Parseamos la info del nuevo nodo y evaluamos cuando esta ssh-available
313 :     ssh_test.ssh_open(compute_info['ip'],compute_info['hostname'])
314 :     stats_log['ela_ssh']=time.time()-ela_time
315 :     print '------------------------'
316 :     print 'Node is ssh-available'
317 :     print '------------------------'+'\n'
318 :     #Evaluamos si se ha added el nodo al sistema de colas
319 :     nodes_qhost=0
320 :     while nodes_qhost != nodes:
321 :     out=ssh_test.ssh_qhost(master)
322 :     #print 'Salida qhost', out
323 :     error_try=0
324 :     try:
325 :     nodes_qhost=xml_conv.xml_qhost(out)
326 :     print 'Nodos contados y totales', nodes_qhost, nodes
327 :     except Exception, inst:
328 :     print '\n-------------------'
329 :     print 'Warning: XML reply is not correct.'
330 :     print 'Qhost reply:', nodes_qhost
331 :     print 'Trying again...'
332 :     time.sleep(5)
333 :     else:
334 :     error_try=1
335 :     stats_log['ela_ogs']=time.time()-ela_time
336 :    
337 :    
338 :     if iter is 0:
339 :     for i in stats_log:
340 :     print i+'\t'+str(stats_log[str(i)])
341 :     #Imprimimos tiempos
342 :     #Borramos el experimento
343 :     print '----------------'
344 :     print 'Deleting experiment...', raiz + job_infor['experiment']
345 :     print '----------------\n'
346 :     print
347 :     del_time = time.time()
348 :     respuesta_del = job.delete(raiz + job_infor['experiment'])
349 :     #stats_log['disable'] = time.time()
350 :     print (respuesta_del)
351 :    
352 :    
353 :    
354 :     #Probamos si se puede acceder por ssh
355 :     # if log_error is 0 and 'deployed' in deployed_log and 'Running' in status:
356 :     # for uri in compute_uris.keys():
357 :     # compute_info=compute_uris[uri]
358 :     # ssh_test.ssh_close(compute_info['ip'], compute_info['name'])
359 :     # stats_log['ssh_close'] = time.time()
360 :    
361 :     if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
362 :     #t2=time.time()
363 :     waiting = 0
364 :     status=''
365 :     sshdone=dict()
366 :     j=0
367 :     while 'Done' not in status_cluster and waiting < 1200:
368 :     for com in uris_log['computes']:
369 :     #print 'urlcomputes', com
370 :     #t1=time.time()
371 :     respuesta_compute = job.computes(com)
372 :     t1=time.time()
373 :     #print respuesta_compute
374 :     compute_info = xml_conv.xml_compute_parse(respuesta_compute)
375 :     #waiting=waiting+(time.time()-t1)
376 :     status=compute_info['state'].capitalize()
377 :     #print 'Compute status', status, j, waiting
378 :     if 'Done' in status:
379 :     j=j+1
380 :     sshdone[compute_info['name']]=t1-del_time
381 :     #print ('Compute'+com+'is done'), j
382 :     if j == len(uris_log['computes']):
383 :     print 'All cluster computes are Done'
384 :     status_cluster='Done'
385 :     stats_log['cluster_done']=t1-del_time
386 :     #print 'ssh_done', sshdone
387 :     print '----------------'
388 :     print 'Cluster_Done', stats_log['cluster_done']
389 :     print '----------------'
390 :     #if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
391 :     # print 'Tiempo borrado', stats_log['done'] - stats_log['del_time']
392 :     # stats_log['borrado']= stats_log['done'] - stats_log['del_time']
393 :     #save.s_time(stats_log['borrado'],'delete')
394 :     save.st_cluster(sub_time,stats_log,iter,job_infor['name'],uris_log['computes'])
395 :     if iter is 0:
396 :     for i in stats_log:
397 :     print i+'\t'+str(stats_log[str(i)])
398 :    
399 :    
400 :     for j in stats_log:
401 :     print(str(stats_log[str(j)])+'\t')
402 :    
403 :     #print 'Tiempo ssh', stats_log['ssh_close'] - stats_log['disable']
404 :     # print 'Borrado: ', time.asctime(time.localtime(stats_log['del_time']))
405 :    
406 :     else:
407 :     print 'Experiment deleted. Broker api error.'
408 :     print 'Deleted repetition:', iter
409 :     iter=iter-1
410 :     print 'Return repetition:', iter
411 :     time.sleep(30)
412 :     return iter
413 :    
414 :    

root@forge.cesga.es
ViewVC Help
Powered by ViewVC 1.0.0  

Powered By FusionForge