Script copr_be_py
[hide private]
[frames] | no frames]

Source Code for Script script-copr_be_py

  1  #!/usr/bin/python -ttu 
  2   
  3   
  4  from backend import errors 
  5  from backend.dispatcher import Worker 
  6  from backend.actions import Action 
  7  from bunch import Bunch 
  8  import ConfigParser 
  9  import daemon 
 10  import glob 
 11  import grp 
 12  import json 
 13  import lockfile 
 14  import logging 
 15  import multiprocessing 
 16  import optparse 
 17  import os 
 18  import pwd 
 19  import requests 
 20  import setproctitle 
 21  import signal 
 22  import sys 
 23  import time 
 24   
 25   
26 -def _get_conf(cp, section, option, default):
27 """ 28 To make returning items from config parser less irritating 29 """ 30 31 if cp.has_section(section) and cp.has_option(section, option): 32 return cp.get(section, option) 33 return default
34 35
36 -class CoprJobGrab(multiprocessing.Process):
37 38 """ 39 Fetch jobs from the Frontend 40 - submit them to the jobs queue for workers 41 """ 42
43 - def __init__(self, opts, events, jobs, lock):
44 # base class initialization 45 multiprocessing.Process.__init__(self, name="jobgrab") 46 47 self.opts = opts 48 self.events = events 49 self.jobs = jobs 50 self.added_jobs = [] 51 self.lock = lock
52
53 - def event(self, what):
54 self.events.put({"when": time.time(), "who": "jobgrab", "what": what})
55
56 - def fetch_jobs(self):
57 try: 58 r = requests.get( 59 "{0}/waiting/".format(self.opts.frontend_url), 60 auth=("user", self.opts.frontend_auth)) 61 62 except requests.RequestException as e: 63 self.event("Error retrieving jobs from {0}: {1}".format( 64 (self.opts.frontend_url, e))) 65 else: 66 try: 67 r_json = json.loads(r.content) # using old requests on el6 :( 68 except ValueError as e: 69 self.event("Error getting JSON build list from FE {0}" 70 .format(e)) 71 return 72 73 if "builds" in r_json and r_json["builds"]: 74 self.event("{0} jobs returned".format(len(r_json["builds"]))) 75 count = 0 76 for b in r_json["builds"]: 77 if "id" in b: 78 extended_id = "{0}-{1}".format(b["id"], b["chroot"]) 79 jobfile = os.path.join( 80 self.opts.jobsdir, 81 "{0}.json".format(extended_id)) 82 83 if (not os.path.exists(jobfile) and 84 extended_id not in self.added_jobs): 85 86 count += 1 87 open(jobfile, 'w').write(json.dumps(b)) 88 self.event("Wrote job: {0}".format(extended_id)) 89 if count: 90 self.event("New jobs: %s" % count) 91 if "actions" in r_json and r_json["actions"]: 92 self.event("{0} actions returned".format( 93 len(r_json["actions"]))) 94 95 for action in r_json["actions"]: 96 ao = Action(self.opts, self.events, action, self.lock) 97 ao.run()
98
99 - def run(self):
100 setproctitle.setproctitle("CoprJobGrab") 101 abort = False 102 try: 103 while not abort: 104 self.fetch_jobs() 105 for f in sorted(glob.glob( 106 os.path.join(self.opts.jobsdir, "*.json"))): 107 108 n = os.path.basename(f).replace(".json", "") 109 if n not in self.added_jobs: 110 self.jobs.put(f) 111 self.added_jobs.append(n) 112 self.event("adding to work queue id {0}".format(n)) 113 time.sleep(self.opts.sleeptime) 114 except KeyboardInterrupt: 115 return
116 117
118 -class CoprLog(multiprocessing.Process):
119 120 """log mechanism where items from the events queue get recorded""" 121
122 - def __init__(self, opts, events):
123 124 # base class initialization 125 multiprocessing.Process.__init__(self, name="logger") 126 127 self.opts = opts 128 self.events = events 129 130 logdir = os.path.dirname(self.opts.logfile) 131 if not os.path.exists(logdir): 132 os.makedirs(logdir, mode=0750) 133 134 # setup a log file to write to 135 logging.basicConfig(filename=self.opts.logfile, level=logging.DEBUG)
136
137 - def log(self, event):
138 139 when = time.strftime("%F %T", time.gmtime(event["when"])) 140 msg = "{0} : {1}: {2}".format(when, 141 event["who"], 142 event["what"].strip()) 143 144 try: 145 if self.opts.verbose: 146 sys.stderr.write("{0}\n".format(msg)) 147 sys.stderr.flush() 148 logging.debug(msg) 149 except (IOError, OSError) as e: 150 sys.stderr.write("Could not write to logfile {0} - {1}\n".format( 151 self.logfile, e))
152 153 # event format is a dict {when:time, who:[worker|logger|job|main], 154 # what:str}
155 - def run(self):
156 setproctitle.setproctitle("CoprLog") 157 abort = False 158 try: 159 while not abort: 160 e = self.events.get() 161 if "when" in e and "who" in e and "what" in e: 162 self.log(e) 163 except KeyboardInterrupt: 164 return
165 166
167 -class CoprBackend(object):
168 169 """ 170 Core process - starts/stops/initializes workers 171 """ 172
173 - def __init__(self, config_file=None, ext_opts=None):
174 # read in config file 175 # put all the config items into a single self.opts bunch 176 177 if not config_file: 178 raise errors.CoprBackendError("Must specify config_file") 179 180 self.config_file = config_file 181 self.ext_opts = ext_opts # to stow our cli options for read_conf() 182 self.opts = self.read_conf() 183 self.lock = multiprocessing.Lock() 184 185 # job is a path to a jobfile on the localfs 186 self.jobs = multiprocessing.Queue() 187 self.events = multiprocessing.Queue() 188 # event format is a dict {when:time, who:[worker|logger|job|main], 189 # what:str} 190 191 # create logger 192 self._logger = CoprLog(self.opts, self.events) 193 self._logger.start() 194 195 self.event("Starting up Job Grabber") 196 # create job grabber 197 self._jobgrab = CoprJobGrab(self.opts, self.events, self.jobs, self.lock) 198 self._jobgrab.start() 199 200 if not os.path.exists(self.opts.worker_logdir): 201 os.makedirs(self.opts.worker_logdir, mode=0750) 202 203 self.workers = []
204
205 - def event(self, what):
206 self.events.put({"when": time.time(), "who": "main", "what": what})
207
208 - def read_conf(self):
209 "read in config file - return Bunch of config data" 210 opts = Bunch() 211 cp = ConfigParser.ConfigParser() 212 try: 213 cp.read(self.config_file) 214 opts.results_baseurl = _get_conf( 215 cp, "backend", "results_baseurl", "http://copr") 216 opts.frontend_url = _get_conf( 217 cp, "backend", "frontend_url", "http://coprs/rest/api") 218 opts.frontend_auth = _get_conf( 219 cp, "backend", "frontend_auth", "PASSWORDHERE") 220 221 opts.spawn_playbook = _get_conf( 222 cp, "backend", "spawn_playbook", 223 "/etc/copr/builder_playbook.yml") 224 225 opts.terminate_playbook = _get_conf( 226 cp, "backend", "terminate_playbook", 227 "/etc/copr/terminate_playbook.yml") 228 229 opts.jobsdir = _get_conf(cp, "backend", "jobsdir", None) 230 opts.destdir = _get_conf(cp, "backend", "destdir", None) 231 opts.exit_on_worker = _get_conf( 232 cp, "backend", "exit_on_worker", False) 233 opts.fedmsg_enabled = _get_conf( 234 cp, "backend", "fedmsg_enabled", False) 235 opts.sleeptime = int(_get_conf(cp, "backend", "sleeptime", 10)) 236 opts.num_workers = int(_get_conf(cp, "backend", "num_workers", 8)) 237 opts.timeout = int(_get_conf(cp, "builder", "timeout", 1800)) 238 opts.logfile = _get_conf( 239 cp, "backend", "logfile", "/var/log/copr/backend.log") 240 opts.verbose = _get_conf(cp, "backend", "verbose", False) 241 opts.worker_logdir = _get_conf( 242 cp, "backend", "worker_logdir", "/var/log/copr/workers/") 243 # thoughts for later 244 # ssh key for connecting to builders? 245 # cloud key stuff? 246 # 247 except ConfigParser.Error as e: 248 raise errors.CoprBackendError( 249 "Error parsing config file: {0}: {1}".format( 250 self.config_file, e)) 251 252 if not opts.jobsdir or not opts.destdir: 253 raise errors.CoprBackendError( 254 "Incomplete Config - must specify" 255 " jobsdir and destdir in configuration") 256 257 if self.ext_opts: 258 for v in self.ext_opts: 259 setattr(opts, v, self.ext_opts.get(v)) 260 return opts
261
262 - def run(self):
263 self.abort = False 264 while not self.abort: 265 # re-read config into opts 266 self.opts = self.read_conf() 267 268 if self.jobs.qsize(): 269 self.event("# jobs in queue: {0}".format(self.jobs.qsize())) 270 # this handles starting/growing the number of workers 271 if len(self.workers) < self.opts.num_workers: 272 self.event("Spinning up more workers for jobs") 273 for _ in range(self.opts.num_workers - len(self.workers)): 274 worker_num = len(self.workers) + 1 275 w = Worker( 276 self.opts, self.jobs, self.events, worker_num, 277 lock=self.lock) 278 self.workers.append(w) 279 w.start() 280 self.event("Finished starting worker processes") 281 # FIXME - prune out workers 282 # if len(self.workers) > self.opts.num_workers: 283 # killnum = len(self.workers) - self.opts.num_workers 284 # for w in self.workers[:killnum]: 285 # insert a poison pill? Kill after something? I dunno. 286 # FIXME - if a worker bombs out - we need to check them 287 # and startup a new one if it happens 288 # check for dead workers and abort 289 for w in self.workers: 290 if not w.is_alive(): 291 self.event("Worker {0} died unexpectedly".format( 292 w.worker_num)) 293 if self.opts.exit_on_worker: 294 raise errors.CoprBackendError( 295 "Worker died unexpectedly, exiting") 296 else: 297 self.workers.remove(w) # it is not working anymore 298 w.terminate() # kill it with a fire 299 300 time.sleep(self.opts.sleeptime)
301
302 - def terminate(self):
303 """ 304 Cleanup backend processes (just workers for now) 305 """ 306 307 self.abort = True 308 for w in self.workers: 309 self.workers.remove(w) 310 w.terminate()
311 312
313 -def parse_args(args):
314 parser = optparse.OptionParser("\ncopr-be [options]") 315 parser.add_option("-c", "--config", default="/etc/copr/copr-be.conf", 316 dest="config_file", 317 help="config file to use for copr-be run") 318 parser.add_option("-d", "--daemonize", default=False, dest="daemonize", 319 action="store_true", help="daemonize or not") 320 parser.add_option("-p", "--pidfile", 321 default="/var/run/copr-backend/copr-be.pid", 322 dest="pidfile", 323 help="pid file to use for copr-be if daemonized") 324 parser.add_option("-x", "--exit", default=False, dest="exit_on_worker", 325 action="store_true", help="exit on worker failure") 326 parser.add_option("-v", "--verbose", default=False, dest="verbose", 327 action="store_true", help="be more verbose") 328 329 opts, args = parser.parse_args(args) 330 if not os.path.exists(opts.config_file): 331 sys.stderr.write("No config file found at: {0}\n".format( 332 opts.config_file)) 333 sys.exit(1) 334 opts.config_file = os.path.abspath(opts.config_file) 335 336 ret_opts = Bunch() 337 for o in ("daemonize", "exit_on_worker", "pidfile", "config_file"): 338 setattr(ret_opts, o, getattr(opts, o)) 339 340 return ret_opts
341 342
343 -def main(args):
344 opts = parse_args(args) 345 346 try: 347 context = daemon.DaemonContext( 348 pidfile=lockfile.FileLock(opts.pidfile), 349 gid=grp.getgrnam("copr").gr_gid, 350 uid=pwd.getpwnam("copr").pw_uid, 351 detach_process=opts.daemonize, 352 umask=022, 353 stderr=sys.stderr, 354 signal_map={ 355 signal.SIGTERM: "terminate", 356 signal.SIGHUP: "terminate", 357 }, 358 ) 359 with context: 360 cbe = CoprBackend(opts.config_file, ext_opts=opts) 361 cbe.run() 362 except (Exception, KeyboardInterrupt): 363 sys.stderr.write("Killing/Dying\n") 364 if "cbe" in locals(): 365 cbe.terminate() 366 raise
367 368 if __name__ == "__main__": 369 try: 370 main(sys.argv[1:]) 371 except KeyboardInterrupt: 372 sys.stderr.write("\nUser cancelled, may need cleanup\n") 373 sys.exit(0) 374