Log In | Get Help   
Home My Page Projects Code Snippets Project Openings BonFIRE VCOC Demonstration Kit
Summary Activity SCM Files Wiki
[bonfiredemokit] Annotation of /ea/ea_bonfire.py
[bonfiredemokit] / ea / ea_bonfire.py Repository:
ViewVC logotype

Annotation of /ea/ea_bonfire.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 15 - (view) (download) (as text)

1 : agomez 10 #
2 :     # BonFIRE Virtual Clusters on Federated Clouds Demonstration Kit
3 :     #
4 :     # Copyright (c) Fundacion Centro Tecnologico de Supercomputacion de Galicia 2012
5 :     #
6 : agomez 14 # License Apache Software
7 : agomez 10 #
8 :     # The research leading to these results has received funding from
9 :     # the European Community's Seventh Framework Programme (FP7/2007-2013)
10 :     # under agreement number 257386
11 :     #
12 :     # This software is provided with ABSOLUTELY NO WARRANTY
13 :     #
14 :     '''
15 :     Created on 03/02/2012
16 :     Experiment agent: Submits, monitors and deletes experiments
17 :     @author: R. Valin
18 :     '''
19 :     import time
20 :     import sub_pycurl as em
21 :     import xml_conv
22 :     import ssh_test
23 :     import save_time as save
24 :     import application as ap
25 :     import credentials as cr
26 :    
27 :    
28 :    
29 :     def ea_bf(exp,iter):
30 :     """It Submits and deletes experiments, checks if the VMs are available and saves the time"""
31 :    
32 : agomez 15 #: Url experiment manager and data
33 :    
34 : agomez 10 theurl = 'https://api.bonfire-project.eu:443/managed_experiments'
35 :     #theurl= 'https://api.integration.bonfire.grid5000.fr/managed_experiments'
36 :     raiz = 'https://api.bonfire-project.eu:443'
37 :     #raiz = 'https://api.integration.bonfire.grid5000.fr/'
38 :    
39 :     app=0
40 :     ela=0
41 :    
42 : agomez 15 #Create the experiment manager object which allows submission, deletion, and check logs.
43 : agomez 10 job = em.sub()
44 : agomez 15
45 :     #Inform it about user credentials
46 : agomez 10 job.user = cr.getuser('~/.restfully/api.bonfire-project.eu.yml')
47 :     job.passwd = cr.getpass('~/.restfully/api.bonfire-project.eu.yml')
48 :    
49 : agomez 15 #Send the request
50 : agomez 10
51 : agomez 15 # Dictionary of times
52 :     #sub_time = time of the submision
53 :     #broker_time = Registry from broker got from log info
54 :     #ssh_able = ssh accesible
55 :     #del_time = deleted
56 :     #ssh_disable = can not be connected by ssh
57 : agomez 10
58 :    
59 : agomez 15 #Create the experiment sending the JSON file
60 : agomez 10 stats_log = dict()
61 :     sub_time = time.time()
62 :     #print 'exp', exp
63 :     error_try=0
64 :     while error_try==0:
65 :     try:
66 :     respuesta = job.submit(theurl, exp)
67 :     except Exception, inst:
68 :     print '\n-------------------'
69 :     print 'Warning: API is not reachable.'
70 :     print 'Trying again...'
71 :     time.sleep(5)
72 :     else:
73 :     try:
74 :     job_infor = xml_conv.xml_parse(respuesta)
75 :     except Exception, inst:
76 :     print 'API reply is not XML:'
77 :     print 'API reply', respuesta
78 :     time.sleep(5)
79 :     else:
80 :     error_try=1
81 :    
82 :    
83 :     print '----------------'
84 :     print 'Experiment submitted: ', time.asctime(time.localtime(sub_time))
85 :     print '----------------\n'
86 :     for i in job_infor:
87 :     print i+':\t', job_infor[i]
88 :     time.sleep(float(3))
89 :     print '----------------\n'
90 :    
91 :    
92 : agomez 15 # Get the log and process it
93 : agomez 10 compute=[]
94 :     deployed_log = ''
95 :     log_error=0
96 :     #uris_log=dict()
97 :     #uris_log['deployed']=''
98 :     j=0
99 :     print 'Deploying experiment...'
100 :     while('deployed' not in deployed_log and log_error is 0 ):
101 : agomez 15 #print 'Checking experiment log...', job_infor['experiment']
102 : agomez 10 #print
103 :     respuesta_log = job.log(raiz + job_infor['log'])
104 :     #print(respuesta_log)
105 :     uris_log = xml_conv.log_parsing(respuesta_log, raiz, compute)
106 :     j=j+1
107 :     deployed_log = uris_log['deployed']
108 :     log_error=uris_log['error']
109 :     #if 'deployed' not in deployed_log:
110 :     # print 'Deploying experiment...', job_infor['experiment']
111 :    
112 :     if log_error is 0 and 'deployed' in deployed_log:
113 :     broker_time = time.time()
114 :     #stats_log['broker_time'] = time.time()
115 :     print 'Experiment deployed \n', uris_log
116 :     stats_log['manager']=broker_time-sub_time
117 :     #save.s_time(stats_log['manager'],'manager')
118 :     else:
119 :     print 'Deploy error\n', uris_log, log_error
120 :     print '\n'
121 :     print 'Error reply\n', respuesta_log
122 :     print '\n'
123 :    
124 :     status=''
125 :     waiting_t = 0
126 :     compute_uris=dict()
127 :     if log_error is 0 and 'deployed' in deployed_log:
128 :     status_cluster=''
129 : agomez 15 # Reference time to measure that all WN have been deployed and change state
130 : agomez 10 #t2=time.time()
131 :     waiting = 0
132 :     statusini=''
133 :     wnstatus=dict()
134 :     while 'Failed' not in status and waiting < 1200 and 'Running' not in status_cluster and 'Failed' not in status_cluster:
135 : agomez 15 j=0 # A deployed cluster means that all nodes must be running.
136 : agomez 10 #print "Failed' not in status", 'Failed' not in status
137 :     #print "waiting < 1200", waiting < 1200
138 :     #print "Running not in status_cluster", 'Running' not in status_cluster
139 :     #print "Failed not in status_cluster", 'Failed' not in status_cluster
140 : agomez 15 """Loop on all of cluster nodes"""
141 : agomez 10 for com in uris_log['computes']:
142 :     #print (('Running' or 'Failed') not in status and waiting < 1200, status)
143 :     #print 'status', 'Running' not in status and 'Failed' not in status and waiting < 1200, status
144 :     #print 'urlcomputes', com
145 :     respuesta_compute = job.computes(com)
146 :     t1=time.time()
147 :     #print 'Respuesta compute', respuesta_compute
148 :     #if '500 Internal Server Error' in respuesta_compute or 'Connection refused' in respuesta_compute or 'execution expired' in respuesta_compute and '502 Proxy Error' in respuesta_compute:
149 :     # print 'execution expired' not in respuesta_compute
150 :     # print 'Connection refused' not in respuesta_compute
151 :     # print '502 Proxy Error' not in respuesta_compute
152 :     error_try=0
153 :     while error_try==0:
154 :     try:
155 :     compute_info = xml_conv.xml_compute_parse(respuesta_compute)
156 :     except Exception, inst:
157 :     print '\n-------------------'
158 :     print 'Warning: Compute status error.'
159 :     print compute_info
160 :     print 'Respuesta', respuesta_compute
161 :     print 'Trying again...'
162 :     time.sleep(10)
163 :     respuesta_compute = job.computes(com)
164 :     else:
165 :     #waiting=waiting+(time.time()-t1)
166 :     error_try=1
167 :     status=compute_info['state'].capitalize()
168 :     if 'Failed' in status or 'Done' in status:
169 :     print ('Compute '+com+' has failed')
170 :     status_cluster='Failed'
171 :     elif status not in statusini and 'Running' not in status:
172 :     wnstatus[str(status)]=t1-sub_time
173 :     print (compute_info['hostname'], status)
174 :     elif 'Running' in status:
175 :     j=j+1
176 :     #print ('Compute '+com+'is running'), j
177 :     print (compute_info['hostname'], status)
178 :     if 'client' in compute_info['hostname']:
179 : agomez 15 # Elasticity reference node
180 : agomez 10 ela_ref=com
181 :    
182 :     if j == len(uris_log['computes']):
183 :     print 'All cluster computes are running'
184 :     status_cluster='Running'
185 :     wnstatus[str(status)]=t1-sub_time
186 :     stats_log['cluster']=wnstatus[str(status)]
187 :     #waiting_t = waiting_t+(time.time()-t2)
188 :     compute_uris[com]=compute_info
189 :     print '\n'
190 :    
191 :     if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
192 :     print '----------------'
193 :     print 'Cluster deployed'
194 :     print 'Configuring OGS...'
195 :     print '----------------'
196 :    
197 :     elif log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
198 :     print '----------------'
199 :     print 'Possible Broker Error', respuesta_compute
200 :     print '----------------'
201 :    
202 : agomez 15 """Show the deployed nodes and their carasteristics"""
203 : agomez 10 nodes=0
204 :     for i in compute_uris.keys():
205 :     print compute_uris[i]
206 :     if 'client' in compute_uris[i]['hostname']:
207 :     nodes=nodes+1
208 :     print 'Nodes:', nodes
209 :     elif 'master' in compute_uris[i]['hostname']:
210 :     master=compute_uris[i]['ip']
211 :     print 'Master', master
212 :     print '----------------\n'
213 :    
214 : agomez 15 # Check if it is ssh-available
215 : agomez 10 if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
216 :     for uri in compute_uris.keys():
217 :     compute_info=compute_uris[uri]
218 :     ssh_test.ssh_open(compute_info['ip'], compute_info['hostname'])
219 :     #stats_log['ssh_able'] = time.time()
220 :     stats_log['ssh']=time.time()-sub_time
221 :     #save.s_time(stats_log['ssh'],'ssh')
222 :     #time.sleep(float(1200))
223 :    
224 :     if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
225 :     print '----------------'
226 :     print "Cluster is ssh-available "
227 :     print '----------------\n'
228 :    
229 :     nodes_qhost=0
230 :     while nodes_qhost != nodes:
231 :     out=ssh_test.ssh_qhost(master)
232 :     #print 'Salida qhost'
233 :     error_try=0
234 :     try:
235 :     nodes_qhost=xml_conv.xml_qhost(out)
236 :     except Exception, inst:
237 :     print '\n-------------------'
238 :     print 'Warning: XML reply is not correct.'
239 :     print 'Qhost reply:', nodes_qhost
240 :     print 'Trying again...'
241 :     time.sleep(5)
242 :     else:
243 :     error_try=1
244 :    
245 :     stats_log['ogs']=time.time()-sub_time
246 :     time.sleep(30)
247 :     print '----------------'
248 :     print "Cluster is ready. OGS configured"
249 :     print '----------------\n'
250 :     print ssh_test.ssh_qhost(master)
251 :    
252 :     #Ejecutamos aplicacion
253 :     if app == 1:
254 :     app_time=time.time()
255 :     ap.sub_app(str(master))
256 :     stats_log['app']=time.time()-app_time
257 :    
258 : agomez 15 #Elasticity:add nodes
259 : agomez 10 if ela == 1:
260 : agomez 15 #Updete the total number of nodes
261 : agomez 10 nodes=nodes+1
262 : agomez 15 #Ask for info about one compute node
263 : agomez 10 ela_ref_info=compute_uris[ela_ref]
264 :     if '444' in uris_log['broker']:
265 : agomez 15 #Create the URL to request a new node
266 : agomez 10 theurl_comp=uris_log['broker'].replace(':444','')+'/computes'
267 :     #print theurl_comp
268 : agomez 15 #Deploy a new node
269 : agomez 10 #print theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref']
270 :     ela_time=time.time()
271 :     node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref'])
272 :     #print 'node_output', node_output
273 : agomez 15 #Parse return and check status
274 : agomez 10 #compute_info = xml_conv.xml_compute_parse(node_output)
275 :     #status=compute_info['state'].capitalize()
276 :     status=''
277 :     while 'Running' not in status and 'Failed' not in status:
278 :     error_try=0
279 :     while error_try==0:
280 :     try:
281 :     compute_info = xml_conv.xml_compute_parse(node_output)
282 :     except Exception, inst:
283 :     print '\n-------------------'
284 :     print 'Warning: Compute status error.'
285 :     print compute_info
286 : agomez 15 print 'Answer', respuesta_compute
287 : agomez 10 print 'Trying again...'
288 :     time.sleep(5)
289 :     node_output = job.computes(ela_ref)
290 :     else:
291 :     #waiting=waiting+(time.time()-t1)
292 :     error_try=1
293 :     status=compute_info['state'].capitalize()
294 :     if 'Failed' in status or 'Done' in status:
295 :     print ('Compute '+com+' has failed')
296 :     status_cluster='Failed'
297 :     respuesta_compute = job.computes(ela_ref)
298 :     node_output=job.submit_xml(theurl_comp,ela_ref_info['name'],ela_ref_info['instance'],ela_ref_info['disk_ref'],ela_ref_info['net_ref'],ela_ref_info['loc_ref'])
299 :     elif status not in statusini and 'Running' not in status:
300 :     wnstatus[str(status)]=t1-sub_time
301 :     print (compute_info['hostname'], status)
302 :     elif 'Running' in status:
303 :     stats_log['ela_running']=time.time()-ela_time
304 :     print ('Compute '+ela_ref+'is running')
305 :     print (compute_info['hostname'], status)
306 :     #Registramos tiempo running
307 :     node_output = job.computes(ela_ref)
308 :     compute_uris[ela_ref]=compute_info
309 :     print '\n'
310 :    
311 : agomez 15 #Parse information about new node and check if ssh-available
312 : agomez 10 ssh_test.ssh_open(compute_info['ip'],compute_info['hostname'])
313 :     stats_log['ela_ssh']=time.time()-ela_time
314 :     print '------------------------'
315 :     print 'Node is ssh-available'
316 :     print '------------------------'+'\n'
317 : agomez 15 #Evaluate if the node has been added to OGS
318 : agomez 10 nodes_qhost=0
319 :     while nodes_qhost != nodes:
320 :     out=ssh_test.ssh_qhost(master)
321 :     #print 'Salida qhost', out
322 :     error_try=0
323 :     try:
324 :     nodes_qhost=xml_conv.xml_qhost(out)
325 : agomez 15 print 'Counted nodes and total', nodes_qhost, nodes
326 : agomez 10 except Exception, inst:
327 :     print '\n-------------------'
328 :     print 'Warning: XML reply is not correct.'
329 :     print 'Qhost reply:', nodes_qhost
330 :     print 'Trying again...'
331 :     time.sleep(5)
332 :     else:
333 :     error_try=1
334 :     stats_log['ela_ogs']=time.time()-ela_time
335 :    
336 :    
337 :     if iter is 0:
338 :     for i in stats_log:
339 :     print i+'\t'+str(stats_log[str(i)])
340 : agomez 15 #Print times
341 :     #Delete experiment
342 : agomez 10 print '----------------'
343 :     print 'Deleting experiment...', raiz + job_infor['experiment']
344 :     print '----------------\n'
345 :     print
346 :     del_time = time.time()
347 :     respuesta_del = job.delete(raiz + job_infor['experiment'])
348 :     #stats_log['disable'] = time.time()
349 :     print (respuesta_del)
350 :    
351 :    
352 :    
353 : agomez 15 #Check if ssh-available
354 : agomez 10 # if log_error is 0 and 'deployed' in deployed_log and 'Running' in status:
355 :     # for uri in compute_uris.keys():
356 :     # compute_info=compute_uris[uri]
357 :     # ssh_test.ssh_close(compute_info['ip'], compute_info['name'])
358 :     # stats_log['ssh_close'] = time.time()
359 :    
360 :     if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
361 :     #t2=time.time()
362 :     waiting = 0
363 :     status=''
364 :     sshdone=dict()
365 :     j=0
366 :     while 'Done' not in status_cluster and waiting < 1200:
367 :     for com in uris_log['computes']:
368 :     #print 'urlcomputes', com
369 :     #t1=time.time()
370 :     respuesta_compute = job.computes(com)
371 :     t1=time.time()
372 :     #print respuesta_compute
373 :     compute_info = xml_conv.xml_compute_parse(respuesta_compute)
374 :     #waiting=waiting+(time.time()-t1)
375 :     status=compute_info['state'].capitalize()
376 :     #print 'Compute status', status, j, waiting
377 :     if 'Done' in status:
378 :     j=j+1
379 :     sshdone[compute_info['name']]=t1-del_time
380 :     #print ('Compute'+com+'is done'), j
381 :     if j == len(uris_log['computes']):
382 :     print 'All cluster computes are Done'
383 :     status_cluster='Done'
384 :     stats_log['cluster_done']=t1-del_time
385 :     #print 'ssh_done', sshdone
386 :     print '----------------'
387 :     print 'Cluster_Done', stats_log['cluster_done']
388 :     print '----------------'
389 :     #if log_error is 0 and 'deployed' in deployed_log and 'Running' in status_cluster:
390 :     # print 'Tiempo borrado', stats_log['done'] - stats_log['del_time']
391 :     # stats_log['borrado']= stats_log['done'] - stats_log['del_time']
392 :     #save.s_time(stats_log['borrado'],'delete')
393 :     save.st_cluster(sub_time,stats_log,iter,job_infor['name'],uris_log['computes'])
394 :     if iter is 0:
395 :     for i in stats_log:
396 :     print i+'\t'+str(stats_log[str(i)])
397 :    
398 :    
399 :     for j in stats_log:
400 :     print(str(stats_log[str(j)])+'\t')
401 :    
402 : agomez 15 #print 'Time ssh', stats_log['ssh_close'] - stats_log['disable']
403 :     # print 'Delete: ', time.asctime(time.localtime(stats_log['del_time']))
404 : agomez 10
405 :     else:
406 :     print 'Experiment deleted. Broker api error.'
407 :     print 'Deleted repetition:', iter
408 :     iter=iter-1
409 :     print 'Return repetition:', iter
410 :     time.sleep(30)
411 :     return iter
412 :    
413 :    

root@forge.cesga.es
ViewVC Help
Powered by ViewVC 1.0.0  

Powered By FusionForge