Log In | Get Help   
Home My Page Projects Code Snippets Project Openings BonFIRE VCOC Demonstration Kit
Summary Activity SCM Files Wiki
[bonfiredemokit] Annotation of /ea/ea_bonfire.py
[bonfiredemokit] / ea / ea_bonfire.py Repository:
ViewVC logotype

Annotation of /ea/ea_bonfire.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1 - (view) (download) (as text)

1 : agomez 1 '''
2 :     Created on 03/02/2012
3 :     Experiment agent: Submits, monitors and deletes experiments
4 :     @author: R. Valin
5 :     '''
6 :     import time
7 :     import sub_pycurl as em
8 :     import xml_conv
9 :     import ssh_test
10 :     import save_time as save
11 :     import application as ap
12 :     import credentials as cr
13 :    
14 :    
15 :    
16 :     def ea_bf(exp,iter):
17 :     """It Submits and deletes experiments, checks if the VMs are available and saves the time"""
18 :    
19 :     #: Url experiment manager y data
20 :     #user = 'agomez'
21 :     #user = cr.getuser('~/.restfully/api.bonfire-project.eu.yml')
22 :     #passwd = 'elpais00'
23 :     #passwd = cr.getpass('~/.restfully/api.bonfire-project.eu.yml')
24 :     theurl = 'https://api.bonfire-project.eu:443/managed_experiments'
25 :     #theurl= 'https://api.integration.bonfire.grid5000.fr/managed_experiments'
26 :     raiz = 'https://api.bonfire-project.eu:443'
27 :     #raiz = 'https://api.integration.bonfire.grid5000.fr/'
28 :    
29 :     app=0
30 :     ela=0
31 :    
32 :     #Construimos el objeto job que nos permite enviar, borrar, comprobar logs y le pasamos las credenciales
33 :     job = em.sub()
34 :     job.user = cr.getuser('~/.restfully/api.bonfire-project.eu.yml')
35 :     job.passwd = cr.getpass('~/.restfully/api.bonfire-project.eu.yml')
36 :    
37 :     #Enviamos y procesamos el envio
38 :    
39 :     #Diccionario con tiempos
40 :     #sub_time = envio
41 :     #broker_time = registro en el broker from log info
42 :     #ssh_able = accesible por ssh
43 :     #del_time = borrado
44 :     #ssh_disable = no accesible por ssh
45 :    
46 :    
47 :     #Creamos experimento enviando json
48 :     stats_log = dict()
49 :     sub_time = time.time()
50 :     #print 'exp', exp
51 :     error_try=0
52 :     while error_try==0:
53 :     try:
54 :     respuesta = job.submit(theurl, exp)
55 :     except Exception, inst:
56 :     print '\n-------------------'
57 :     print 'Warning: API is not reachable.'
58 :     print 'Trying again...'
59 :     time.sleep(5)
60 :     else:
61 :     try:
62 :     job_infor = xml_conv.xml_parse(respuesta)
63 :     except Exception, inst:
64 :     print 'API reply is not XML:'
65 :     print 'API reply', respuesta
66 :     time.sleep(5)
67 :     else:
68 :     error_try=1
69 :    
70 :    
71 :     print '----------------'
72 :     print 'Experiment submitted: ', time.asctime(time.localtime(sub_time))
73 :     print '----------------\n'
74 :     for i in job_infor:
75 :     print i+':\t', job_infor[i]
76 :     time.sleep(float(3))
77 :     print '----------------\n'
78 :    
79 :    
80 :     #Pedimos el log y lo procesamos
81 :     compute=[]
82 :     deployed_log = ''
83 :     log_error=0
84 :     #uris_log=dict()
85 :     #uris_log['deployed']=''
86 :     j=0
87 :     print 'Deploying experiment...'
88 :     while('deployed' not in deployed_log and log_error is 0 ):
89 :     #print 'Comprobando log del experimento...', job_infor['experiment']
90 :     #print
91 :     respuesta_log = job.log(raiz + job_infor['log'])
92 :     #print(respuesta_log)
93 :     uris_log = xml_conv.log_parsing(respuesta_log, raiz, compute)
94 :     j=j+1
95 :     deployed_log = uris_log['deployed']
96 :     log_error=uris_log['error']
97 :     #if 'deployed' not in deployed_log:
98 :     # print 'Deploying experiment...', job_infor['experiment']
99 :    
100 :     if log_error is 0 and 'deployed' in deployed_log:
101 :     broker_time = time.time()
102 :     #stats_log['broker_time'] = time.time()
103 :     print 'Experiment deployed \n', uris_log
104 :     stats_log['manager']=broker_time-sub_time
105 :     #save.s_time(stats_log['manager'],'manager')
106 :     else:
107 :     print 'Deploy error\n', uris_log, log_error
108 :     print '\n'
109 :     print 'Error reply\n', respuesta_log
110 :     print '\n'
111 :    
112 :     status=''
113 :     waiting_t = 0
114 :     compute_uris=dict()
115 :     if log_error is 0 and 'deployed' in deployed_log:
116 :     status_cluster=''
117 :     #Tiempo de referencia para medir que todos los WN esten desplegados y cambio de estado
118 :     #t2=time.time()
119 :     waiting = 0
120 :     statusini=''
121 :     wnstatus=dict()
122 :     while 'Failed' not in status and waiting < 1200 and 'Running' not in status_cluster and 'Failed' not in status_cluster:
123 :     j=0 #Para que cluster deployed todos los nodos tienen que estar running.
124 :     #print "Failed' not in status", 'Failed' not in status
125 :     #print "waiting < 1200", waiting < 1200
126 :     #print "Running not in status_cluster", 'Running' not in status_cluster
127 :     #print "Failed not in status_cluster", 'Failed' not in status_cluster
128 :     """Recorre todos los nodos del cluster"""
129 :     for com in uris_log['computes']:
130 :     #print (('Running' or 'Failed') not in status and waiting < 1200, status)
131 :     #print 'status', 'Running' not in status and 'Failed' not in status and waiting < 1200, status
132 :     #print 'urlcomputes', com
133 :     respuesta_compute = job.computes(com)
134 :     t1=time.time()
135 :     #print 'Respuesta compute', respuesta_compute
136 :     #if '500 Internal Server Error' in respuesta_compute or 'Connection refused' in respuesta_compute or 'execution expired' in respuesta_compute and '502 Proxy Error' in respuesta_compute:
137 :     # print 'execution expired' not in respuesta_compute
138 :     # print 'Connection refused' not in respuesta_compute
139 :     # print '502 Proxy Error' not in respuesta_compute
140 :     error_try=0
141 :     while error_try==0:
142 :     try:
143 :     compute_info = xml_conv.xml_compute_parse(respuesta_compute)
144 :     except Exception, inst:
145 :     print '\n-------------------'
146 :     print 'Warning: Compute status error.'
147 :     print compute_info
148 :     print 'Respuesta', respuesta_compute
149 :     print 'Trying again...'
150 :     time.sleep(10)
151 :     respuesta_compute = job.computes(com)
152 :     else:
153 :     #waiting=waiting+(time.time()-t1)
154 :     error_try=1
155 :     status=compute_info['state'].capitalize()
156 :     if 'Failed' in status or 'Done' in status:
157 :     print ('Compute '+com+' has failed')
158 :     status_cluster='Failed'
159 :     elif status not in statusini and 'Running' not in status:
160 :     wnstatus[str(status)]=t1-sub_time
161 :     print (compute_info['hostname'], status)
162 :     elif 'Running' in status:
163 :     j=j+1
164 :     #print ('Compute '+com+'is running'), j
165 :     print (compute_info['hostname'], status)
166 :     if 'client' in compute_info['hostname']:
167 :     #Nodo de referencia para elasticidad
168 :     ela_ref=com
169 :    
170 :     if j == len(uris_log['computes']):
171 :     print 'All cluster computes are running'
172 :     status_cluster='Running'
173 :     wnstatus[str(status)]=t1-sub_time
174 :     stats_log['cluster']=wnstatus[str(status)]
175 :     #waiting_t = waiting_t+(time.time()-t2)
176 :     compute_uris[com]=compute_info
177 :     print '\n'
178 :    
179 :     if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
180 :     print '----------------'
181 :     print 'Cluster deployed'
182 :     print 'Configuring OGS...'
183 :     print '----------------'
184 :    
185 :     elif log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
186 :     print '----------------'
187 :     print 'Possible Broker Error', respuesta_compute
188 :     print '----------------'
189 :    
190 :     """Muestra los nodos desplegados y caracteristicas"""
191 :     nodes=0
192 :     for i in compute_uris.keys():
193 :     print compute_uris[i]
194 :     if 'client' in compute_uris[i]['hostname']:
195 :     nodes=nodes+1
196 :     print 'Nodes:', nodes
197 :     elif 'master' in compute_uris[i]['hostname']:
198 :     master=compute_uris[i]['ip']
199 :     print 'Master', master
200 :     print '----------------\n'
201 :    
202 :     #Probamos si se puede acceder por ssh
203 :     if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
204 :     for uri in compute_uris.keys():
205 :     compute_info=compute_uris[uri]
206 :     ssh_test.ssh_open(compute_info['ip'], compute_info['hostname'])
207 :     #stats_log['ssh_able'] = time.time()
208 :     stats_log['ssh']=time.time()-sub_time
209 :     #save.s_time(stats_log['ssh'],'ssh')
210 :     #time.sleep(float(1200))
211 :    
212 :     if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
213 :     print '----------------'
214 :     print "Cluster is ssh-available "
215 :     print '----------------\n'
216 :    
217 :     nodes_qhost=0
218 :     while nodes_qhost != nodes:
219 :     out=ssh_test.ssh_qhost(master)
220 :     #print 'Salida qhost'
221 :     error_try=0
222 :     try:
223 :     nodes_qhost=xml_conv.xml_qhost(out)
224 :     except Exception, inst:
225 :     print '\n-------------------'
226 :     print 'Warning: XML reply is not correct.'
227 :     print 'Qhost reply:', nodes_qhost
228 :     print 'Trying again...'
229 :     time.sleep(5)
230 :     else:
231 :     error_try=1
232 :    
233 :     stats_log['ogs']=time.time()-sub_time
234 :     time.sleep(30)
235 :     print '----------------'
236 :     print "Cluster is ready. OGS configured"
237 :     print '----------------\n'
238 :     print ssh_test.ssh_qhost(master)
239 :    
240 :     #Ejecutamos aplicacion
241 :     if app == 1:
242 :     app_time=time.time()
243 :     ap.sub_app(str(master))
244 :     stats_log['app']=time.time()-app_time
245 :    
246 :     #Elasticidad:add nodes
247 :     if ela == 1:
248 :     #Actualizamos nodos totales cluster
249 :     nodes=nodes+1
250 :     #Pedimos info de un compute ya desplegado
251 :     ela_ref_info=compute_uris[ela_ref]
252 :     if '444' in uris_log['broker']:
253 :     #Montamos la url para solicitar al broker un compute
254 :     theurl_comp=uris_log['broker'].replace(':444','')+'/computes'
255 :     #print theurl_comp
256 :     #Desplegamos nuevo nodo
257 :     #print theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref']
258 :     ela_time=time.time()
259 :     node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref'])
260 :     #print 'node_output', node_output
261 :     #Parseamos info y comprobamos estado
262 :     #compute_info = xml_conv.xml_compute_parse(node_output)
263 :     #status=compute_info['state'].capitalize()
264 :     status=''
265 :     while 'Running' not in status and 'Failed' not in status:
266 :     error_try=0
267 :     while error_try==0:
268 :     try:
269 :     compute_info = xml_conv.xml_compute_parse(node_output)
270 :     except Exception, inst:
271 :     print '\n-------------------'
272 :     print 'Warning: Compute status error.'
273 :     print compute_info
274 :     print 'Respuesta', respuesta_compute
275 :     print 'Trying again...'
276 :     time.sleep(5)
277 :     node_output = job.computes(ela_ref)
278 :     else:
279 :     #waiting=waiting+(time.time()-t1)
280 :     error_try=1
281 :     status=compute_info['state'].capitalize()
282 :     if 'Failed' in status or 'Done' in status:
283 :     print ('Compute '+com+' has failed')
284 :     status_cluster='Failed'
285 :     respuesta_compute = job.computes(ela_ref)
286 :     node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref'])
287 :     elif status not in statusini and 'Running' not in status:
288 :     wnstatus[str(status)]=t1-sub_time
289 :     print (compute_info['hostname'], status)
290 :     elif 'Running' in status:
291 :     stats_log['ela_running']=time.time()-ela_time
292 :     print ('Compute '+ela_ref+'is running')
293 :     print (compute_info['hostname'], status)
294 :     #Registramos tiempo running
295 :     node_output = job.computes(ela_ref)
296 :     compute_uris[ela_ref]=compute_info
297 :     print '\n'
298 :    
299 :     #Parseamos la info del nuevo nodo y evaluamos cuando esta ssh-available
300 :     ssh_test.ssh_open(compute_info['ip'],compute_info['hostname'])
301 :     stats_log['ela_ssh']=time.time()-ela_time
302 :     print '------------------------'
303 :     print 'Node is ssh-available'
304 :     print '------------------------'+'\n'
305 :     #Evaluamos si se ha added el nodo al sistema de colas
306 :     nodes_qhost=0
307 :     while nodes_qhost != nodes:
308 :     out=ssh_test.ssh_qhost(master)
309 :     #print 'Salida qhost', out
310 :     error_try=0
311 :     try:
312 :     nodes_qhost=xml_conv.xml_qhost(out)
313 :     print 'Nodos contados y totales', nodes_qhost, nodes
314 :     except Exception, inst:
315 :     print '\n-------------------'
316 :     print 'Warning: XML reply is not correct.'
317 :     print 'Qhost reply:', nodes_qhost
318 :     print 'Trying again...'
319 :     time.sleep(5)
320 :     else:
321 :     error_try=1
322 :     stats_log['ela_ogs']=time.time()-ela_time
323 :    
324 :    
325 :     if iter is 0:
326 :     for i in stats_log:
327 :     print i+'\t'+str(stats_log[str(i)])
328 :     #Imprimimos tiempos
329 :     #Borramos el experimento
330 :     print '----------------'
331 :     print 'Deleting experiment...', raiz + job_infor['experiment']
332 :     print '----------------\n'
333 :     print
334 :     del_time = time.time()
335 :     respuesta_del = job.delete(raiz + job_infor['experiment'])
336 :     #stats_log['disable'] = time.time()
337 :     print (respuesta_del)
338 :    
339 :    
340 :    
341 :     #Probamos si se puede acceder por ssh
342 :     # if log_error is 0 and 'deployed' in deployed_log and 'Running' in status:
343 :     # for uri in compute_uris.keys():
344 :     # compute_info=compute_uris[uri]
345 :     # ssh_test.ssh_close(compute_info['ip'], compute_info['name'])
346 :     # stats_log['ssh_close'] = time.time()
347 :    
348 :     if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
349 :     #t2=time.time()
350 :     waiting = 0
351 :     status=''
352 :     sshdone=dict()
353 :     j=0
354 :     while 'Done' not in status_cluster and waiting < 1200:
355 :     for com in uris_log['computes']:
356 :     #print 'urlcomputes', com
357 :     #t1=time.time()
358 :     respuesta_compute = job.computes(com)
359 :     t1=time.time()
360 :     #print respuesta_compute
361 :     compute_info = xml_conv.xml_compute_parse(respuesta_compute)
362 :     #waiting=waiting+(time.time()-t1)
363 :     status=compute_info['state'].capitalize()
364 :     #print 'Compute status', status, j, waiting
365 :     if 'Done' in status:
366 :     j=j+1
367 :     sshdone[compute_info['name']]=t1-del_time
368 :     #print ('Compute'+com+'is done'), j
369 :     if j == len(uris_log['computes']):
370 :     print 'All cluster computes are Done'
371 :     status_cluster='Done'
372 :     stats_log['cluster_done']=t1-del_time
373 :     #print 'ssh_done', sshdone
374 :     print '----------------'
375 :     print 'Cluster_Done', stats_log['cluster_done']
376 :     print '----------------'
377 :     #if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
378 :     # print 'Tiempo borrado', stats_log['done'] - stats_log['del_time']
379 :     # stats_log['borrado']= stats_log['done'] - stats_log['del_time']
380 :     #save.s_time(stats_log['borrado'],'delete')
381 :     save.st_cluster(sub_time,stats_log,iter,job_infor['name'],uris_log['computes'])
382 :     if iter is 0:
383 :     for i in stats_log:
384 :     print i+'\t'+str(stats_log[str(i)])
385 :    
386 :    
387 :     for j in stats_log:
388 :     print(str(stats_log[str(j)])+'\t')
389 :    
390 :     #print 'Tiempo ssh', stats_log['ssh_close'] - stats_log['disable']
391 :     # print 'Borrado: ', time.asctime(time.localtime(stats_log['del_time']))
392 :    
393 :     else:
394 :     print 'Experiment deleted. Broker api error.'
395 :     print 'Deleted repetition:', iter
396 :     iter=iter-1
397 :     print 'Return repetition:', iter
398 :     time.sleep(30)
399 :     return iter
400 :    
401 :    

root@forge.cesga.es
ViewVC Help
Powered by ViewVC 1.0.0  

Powered By FusionForge