Package backend :: Module dispatcher
[hide private]
[frames] | no frames]

Source Code for Module backend.dispatcher

  1  #!/usr/bin/python -tt 
  2   
  3   
  4  import os 
  5  import sys 
  6  import multiprocessing 
  7  import time 
  8  import Queue 
  9  import json 
 10  import mockremote 
 11  from bunch import Bunch 
 12  import errors 
 13  import ansible 
 14  import ansible.playbook 
 15  import ansible.errors 
 16  from ansible import callbacks 
 17  import requests 
 18   
 19   
 20   
 21   
22 -class SilentPlaybookCallbacks(callbacks.PlaybookCallbacks):
23 ''' playbook callbacks - quietly! ''' 24
25 - def __init__(self, verbose=False):
26 27 self.verbose = verbose
28
29 - def on_start(self):
30 callbacks.call_callback_module('playbook_on_start')
31
32 - def on_notify(self, host, handler):
33 callbacks.call_callback_module('playbook_on_notify', host, handler)
34
35 - def on_no_hosts_matched(self):
36 callbacks.call_callback_module('playbook_on_no_hosts_matched')
37
38 - def on_no_hosts_remaining(self):
39 callbacks.call_callback_module('playbook_on_no_hosts_remaining')
40
41 - def on_task_start(self, name, is_conditional):
42 callbacks.call_callback_module('playbook_on_task_start', name, is_conditional)
43
44 - def on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None):
45 result = None 46 print "***** VARS_PROMPT WILL NOT BE RUN IN THIS KIND OF PLAYBOOK *****" 47 callbacks.call_callback_module('playbook_on_vars_prompt', varname, private=private, prompt=prompt, encrypt=encrypt, confirm=confirm, salt_size=salt_size, salt=None) 48 return result
49
50 - def on_setup(self):
51 callbacks.call_callback_module('playbook_on_setup')
52
53 - def on_import_for_host(self, host, imported_file):
54 callbacks.call_callback_module('playbook_on_import_for_host', host, imported_file)
55
56 - def on_not_import_for_host(self, host, missing_file):
57 callbacks.call_callback_module('playbook_on_not_import_for_host', host, missing_file)
58
59 - def on_play_start(self, pattern):
60 callbacks.call_callback_module('playbook_on_play_start', pattern)
61
62 - def on_stats(self, stats):
63 callbacks.call_callback_module('playbook_on_stats', stats)
64 65
66 -class WorkerCallback(object):
67 - def __init__(self, logfile=None):
68 self.logfile = logfile
69
70 - def log(self, msg):
71 if self.logfile: 72 now = time.strftime('%F %T') 73 try: 74 open(self.logfile, 'a').write(str(now) + ': ' + msg + '\n') 75 except (IOError, OSError), e: 76 print >>sys.stderr, 'Could not write to logfile %s - %s' % (self.logfile, str(e))
77 78
79 -class Worker(multiprocessing.Process):
80 - def __init__(self, opts, jobs, events, worker_num, ip=None, create=True, callback=None):
81 82 # base class initialization 83 multiprocessing.Process.__init__(self, name="worker-builder") 84 85 86 # job management stuff 87 self.jobs = jobs 88 self.events = events # event queue for communicating back to dispatcher 89 self.worker_num = worker_num 90 self.ip = ip 91 self.opts = opts 92 self.kill_received = False 93 self.callback = callback 94 self.create = create 95 if not self.callback: 96 self.logfile = self.opts.worker_logdir + '/worker-%s.log' % self.worker_num 97 self.callback = WorkerCallback(logfile = self.logfile) 98 99 if ip: 100 self.callback.log('creating worker: %s' % ip) 101 self.event('creating worker: %s' % ip) 102 else: 103 self.callback.log('creating worker: dynamic ip') 104 self.event('creating worker: dynamic ip')
105
106 - def event(self, what):
107 if self.ip: 108 who = 'worker-%s-%s' % (self.worker_num, self.ip) 109 else: 110 who = 'worker-%s' % (self.worker_num) 111 112 self.events.put({'when':time.time(), 'who':who, 'what':what})
113
114 - def spawn_instance(self):
115 """call the spawn playbook to startup/provision a building instance""" 116 117 118 self.callback.log('spawning instance begin') 119 start = time.time() 120 121 stats = callbacks.AggregateStats() 122 playbook_cb = SilentPlaybookCallbacks(verbose=False) 123 runner_cb = callbacks.DefaultRunnerCallbacks() 124 # fixme - extra_vars to include ip as a var if we need to specify ips 125 # also to include info for instance type to handle the memory requirements of builds 126 play = ansible.playbook.PlayBook(stats=stats, playbook=self.opts.spawn_playbook, 127 callbacks=playbook_cb, runner_callbacks=runner_cb, 128 remote_user='root') 129 130 play.run() 131 self.callback.log('spawning instance end') 132 self.callback.log('Instance spawn/provision took %s sec' % (time.time() - start)) 133 134 if self.ip: 135 return self.ip 136 137 for i in play.SETUP_CACHE: 138 if i =='localhost': 139 continue 140 return i 141 142 # if we get here we're in trouble 143 self.callback.log('No IP back from spawn_instance - dumping cache output') 144 self.callback.log(str(play.SETUP_CACHE)) 145 self.callback.log(str(play.stats.summarize('localhost'))) 146 self.callback.log('Test spawn_instance playbook manually') 147 148 return None
149
150 - def terminate_instance(self,ip):
151 """call the terminate playbook to destroy the building instance""" 152 self.callback.log('terminate instance begin') 153 154 stats = callbacks.AggregateStats() 155 playbook_cb = SilentPlaybookCallbacks(verbose=False) 156 runner_cb = callbacks.DefaultRunnerCallbacks() 157 play = ansible.playbook.PlayBook(host_list=ip +',', stats=stats, playbook=self.opts.terminate_playbook, 158 callbacks=playbook_cb, runner_callbacks=runner_cb, 159 remote_user='root') 160 161 play.run() 162 self.callback.log('terminate instance end')
163
164 - def parse_job(self, jobfile):
165 # read the json of the job in 166 # break out what we need return a bunch of the info we need 167 build = json.load(open(jobfile)) 168 jobdata = Bunch() 169 jobdata.pkgs = build['pkgs'].split(' ') 170 jobdata.repos = [r for r in build['repos'].split(' ') if r.strip() ] 171 jobdata.chroots = build['chroots'].split(' ') 172 jobdata.memory_reqs = build['memory_reqs'] 173 jobdata.timeout = build['timeout'] 174 jobdata.destdir = self.opts.destdir + '/' + build['copr']['owner']['name'] + '/' + build['copr']['name'] + '/' 175 jobdata.build_id = build['id'] 176 jobdata.results = self.opts.results_baseurl + '/' + build['copr']['owner']['name'] + '/' + build['copr']['name'] + '/' 177 jobdata.copr_id = build['copr']['id'] 178 jobdata.user_id = build['user_id'] 179 jobdata.user_name = build['copr']['owner']['name'] 180 jobdata.copr_name = build['copr']['name'] 181 return jobdata
182 183 # maybe we move this to the callback?
184 - def post_to_frontend(self, data):
185 """send data to frontend""" 186 187 headers = {'content-type': 'application/json'} 188 url='%s/update_builds/' % self.opts.frontend_url 189 auth=('user', self.opts.frontend_auth) 190 191 msg = None 192 try: 193 r = requests.post(url, data=json.dumps(data), auth=auth, 194 headers=headers) 195 if r.status_code != 200: 196 msg = 'Failed to submit to frontend: %s: %s' % (r.status_code, r.text) 197 except requests.RequestException, e: 198 msg = 'Post request failed: %s' % e 199 200 if msg: 201 self.callback.log(msg) 202 return False 203 204 return True
205 206 # maybe we move this to the callback?
207 - def mark_started(self, job):
208 209 210 build = {'id':job.build_id, 211 'started_on': job.started_on, 212 'results': job.results, 213 } 214 data = {'builds':[build]} 215 216 if not self.post_to_frontend(data): 217 raise errors.CoprWorkerError, "Could not communicate to front end to submit status info"
218 219 # maybe we move this to the callback?
220 - def return_results(self, job):
221 222 self.callback.log('%s status %s. Took %s seconds' % (job.build_id, job.status, job.ended_on - job.started_on)) 223 build = {'id':job.build_id, 224 'ended_on': job.ended_on, 225 'status': job.status, 226 } 227 data = {'builds':[build]} 228 229 if not self.post_to_frontend(data): 230 raise errors.CoprWorkerError, "Could not communicate to front end to submit results" 231 232 os.unlink(job.jobfile)
233
234 - def run(self):
235 # worker should startup and check if it can function 236 # for each job it takes from the jobs queue 237 # run opts.setup_playbook to create the instance 238 # do the build (mockremote) 239 # terminate the instance 240 241 while not self.kill_received: 242 try: 243 jobfile = self.jobs.get() 244 except Queue.Empty: 245 break 246 247 # parse the job json into our info 248 job = self.parse_job(jobfile) 249 250 # FIXME 251 # this is our best place to sanity check the job before starting 252 # up any longer process 253 254 job.jobfile = jobfile 255 256 # spin up our build instance 257 if self.create: 258 try: 259 ip = self.spawn_instance() 260 if not ip: 261 raise errors.CoprWorkerError, "No IP found from creating instance" 262 263 except ansible.errors.AnsibleError, e: 264 self.callback.log('failure to setup instance: %s' % e) 265 raise 266 267 status = 1 268 job.started_on = time.time() 269 self.mark_started(job) 270 271 self.event('build start: user:%s copr:%s build:%s ip:%s pid:%s' % (job.user_name, job.copr_name, job.build_id, ip, self.pid)) 272 273 for chroot in job.chroots: 274 self.event('chroot start: chroot:%s user:%s copr:%s build:%s ip:%s pid:%s' % (chroot, job.user_name, job.copr_name, job.build_id, ip, self.pid)) 275 chroot_destdir = job.destdir + '/' + chroot 276 # setup our target dir locally 277 if not os.path.exists(chroot_destdir): 278 try: 279 os.makedirs(chroot_destdir) 280 except (OSError, IOError), e: 281 msg = "Could not make results dir for job: %s - %s" % (chroot_destdir, str(e)) 282 self.callback.log(msg) 283 status = 0 284 continue 285 286 # FIXME 287 # need a plugin hook or some mechanism to check random 288 # info about the pkgs 289 # this should use ansible to download the pkg on the remote system 290 # and run a series of checks on the package before we 291 # start the build - most importantly license checks. 292 293 294 self.callback.log('Starting build: id=%r builder=%r timeout=%r destdir=%r chroot=%r repos=%r' % (job.build_id,ip, job.timeout, job.destdir, chroot, str(job.repos))) 295 self.callback.log('building pkgs: %s' % ' '.join(job.pkgs)) 296 try: 297 chroot_repos = list(job.repos) 298 chroot_repos.append(job.results + '/' + chroot) 299 chrootlogfile = chroot_destdir + '/build-%s.log' % job.build_id 300 mr = mockremote.MockRemote(builder=ip, timeout=job.timeout, 301 destdir=job.destdir, chroot=chroot, cont=True, recurse=True, 302 repos=chroot_repos, 303 callback=mockremote.CliLogCallBack(quiet=True,logfn=chrootlogfile)) 304 mr.build_pkgs(job.pkgs) 305 except mockremote.MockRemoteError, e: 306 # record and break 307 self.callback.log('%s - %s' % (ip, e)) 308 status = 0 # failure 309 else: 310 # we can't really trace back if we just fail normally 311 # check if any pkgs didn't build 312 if mr.failed: 313 status = 0 314 self.callback.log('Finished build: id=%r builder=%r timeout=%r destdir=%r chroot=%r repos=%r' % (job.build_id, ip, job.timeout, job.destdir, chroot, str(job.repos))) 315 job.ended_on = time.time() 316 317 job.status = status 318 self.return_results(job) 319 self.callback.log('worker finished build: %s' % ip) 320 self.event('build end: user:%s copr:%s build:%s ip:%s pid:%s status:%s' % (job.user_name, job.copr_name, job.build_id, ip, self.pid, job.status)) 321 # clean up the instance 322 if self.create: 323 self.terminate_instance(ip)
324