1
2
3
4 from backend import errors
5 from backend.dispatcher import Worker
6 from backend.actions import Action
7 from bunch import Bunch
8 import ConfigParser
9 import daemon
10 import glob
11 import grp
12 import json
13 import lockfile
14 import logging
15 import multiprocessing
16 import optparse
17 import os
18 import pwd
19 import requests
20 import setproctitle
21 import signal
22 import sys
23 import time
24
25
27 """
28 To make returning items from config parser less irritating
29 """
30
31 if cp.has_section(section) and cp.has_option(section, option):
32 return cp.get(section, option)
33 return default
34
35
37
38 """
39 Fetch jobs from the Frontend
40 - submit them to the jobs queue for workers
41 """
42
43 - def __init__(self, opts, events, jobs, lock):
44
45 multiprocessing.Process.__init__(self, name="jobgrab")
46
47 self.opts = opts
48 self.events = events
49 self.jobs = jobs
50 self.added_jobs = []
51 self.lock = lock
52
54 self.events.put({"when": time.time(), "who": "jobgrab", "what": what})
55
57 try:
58 r = requests.get(
59 "{0}/waiting/".format(self.opts.frontend_url),
60 auth=("user", self.opts.frontend_auth))
61
62 except requests.RequestException as e:
63 self.event("Error retrieving jobs from {0}: {1}".format(
64 (self.opts.frontend_url, e)))
65 else:
66 try:
67 r_json = json.loads(r.content)
68 except ValueError as e:
69 self.event("Error getting JSON build list from FE {0}"
70 .format(e))
71 return
72
73 if "builds" in r_json and r_json["builds"]:
74 self.event("{0} jobs returned".format(len(r_json["builds"])))
75 count = 0
76 for b in r_json["builds"]:
77 if "id" in b:
78 extended_id = "{0}-{1}".format(b["id"], b["chroot"])
79 jobfile = os.path.join(
80 self.opts.jobsdir,
81 "{0}.json".format(extended_id))
82
83 if (not os.path.exists(jobfile) and
84 extended_id not in self.added_jobs):
85
86 count += 1
87 open(jobfile, 'w').write(json.dumps(b))
88 self.event("Wrote job: {0}".format(extended_id))
89 if count:
90 self.event("New jobs: %s" % count)
91 if "actions" in r_json and r_json["actions"]:
92 self.event("{0} actions returned".format(
93 len(r_json["actions"])))
94
95 for action in r_json["actions"]:
96 ao = Action(self.opts, self.events, action, self.lock)
97 ao.run()
98
100 setproctitle.setproctitle("CoprJobGrab")
101 abort = False
102 try:
103 while not abort:
104 self.fetch_jobs()
105 for f in sorted(glob.glob(
106 os.path.join(self.opts.jobsdir, "*.json"))):
107
108 n = os.path.basename(f).replace(".json", "")
109 if n not in self.added_jobs:
110 self.jobs.put(f)
111 self.added_jobs.append(n)
112 self.event("adding to work queue id {0}".format(n))
113 time.sleep(self.opts.sleeptime)
114 except KeyboardInterrupt:
115 return
116
117
118 -class CoprLog(multiprocessing.Process):
119
120 """log mechanism where items from the events queue get recorded"""
121
123
124
125 multiprocessing.Process.__init__(self, name="logger")
126
127 self.opts = opts
128 self.events = events
129
130 logdir = os.path.dirname(self.opts.logfile)
131 if not os.path.exists(logdir):
132 os.makedirs(logdir, mode=0750)
133
134
135 logging.basicConfig(filename=self.opts.logfile, level=logging.DEBUG)
136
137 - def log(self, event):
138
139 when = time.strftime("%F %T", time.gmtime(event["when"]))
140 msg = "{0} : {1}: {2}".format(when,
141 event["who"],
142 event["what"].strip())
143
144 try:
145 if self.opts.verbose:
146 sys.stderr.write("{0}\n".format(msg))
147 sys.stderr.flush()
148 logging.debug(msg)
149 except (IOError, OSError) as e:
150 sys.stderr.write("Could not write to logfile {0} - {1}\n".format(
151 self.logfile, e))
152
153
154
156 setproctitle.setproctitle("CoprLog")
157 abort = False
158 try:
159 while not abort:
160 e = self.events.get()
161 if "when" in e and "who" in e and "what" in e:
162 self.log(e)
163 except KeyboardInterrupt:
164 return
165
166
168
169 """
170 Core process - starts/stops/initializes workers
171 """
172
173 - def __init__(self, config_file=None, ext_opts=None):
174
175
176
177 if not config_file:
178 raise errors.CoprBackendError("Must specify config_file")
179
180 self.config_file = config_file
181 self.ext_opts = ext_opts
182 self.opts = self.read_conf()
183 self.lock = multiprocessing.Lock()
184
185
186 self.jobs = multiprocessing.Queue()
187 self.events = multiprocessing.Queue()
188
189
190
191
192 self._logger = CoprLog(self.opts, self.events)
193 self._logger.start()
194
195 self.event("Starting up Job Grabber")
196
197 self._jobgrab = CoprJobGrab(self.opts, self.events, self.jobs, self.lock)
198 self._jobgrab.start()
199
200 if not os.path.exists(self.opts.worker_logdir):
201 os.makedirs(self.opts.worker_logdir, mode=0750)
202
203 self.workers = []
204
206 self.events.put({"when": time.time(), "who": "main", "what": what})
207
209 "read in config file - return Bunch of config data"
210 opts = Bunch()
211 cp = ConfigParser.ConfigParser()
212 try:
213 cp.read(self.config_file)
214 opts.results_baseurl = _get_conf(
215 cp, "backend", "results_baseurl", "http://copr")
216 opts.frontend_url = _get_conf(
217 cp, "backend", "frontend_url", "http://coprs/rest/api")
218 opts.frontend_auth = _get_conf(
219 cp, "backend", "frontend_auth", "PASSWORDHERE")
220
221 opts.spawn_playbook = _get_conf(
222 cp, "backend", "spawn_playbook",
223 "/etc/copr/builder_playbook.yml")
224
225 opts.terminate_playbook = _get_conf(
226 cp, "backend", "terminate_playbook",
227 "/etc/copr/terminate_playbook.yml")
228
229 opts.jobsdir = _get_conf(cp, "backend", "jobsdir", None)
230 opts.destdir = _get_conf(cp, "backend", "destdir", None)
231 opts.exit_on_worker = _get_conf(
232 cp, "backend", "exit_on_worker", False)
233 opts.fedmsg_enabled = _get_conf(
234 cp, "backend", "fedmsg_enabled", False)
235 opts.sleeptime = int(_get_conf(cp, "backend", "sleeptime", 10))
236 opts.num_workers = int(_get_conf(cp, "backend", "num_workers", 8))
237 opts.timeout = int(_get_conf(cp, "builder", "timeout", 1800))
238 opts.logfile = _get_conf(
239 cp, "backend", "logfile", "/var/log/copr/backend.log")
240 opts.verbose = _get_conf(cp, "backend", "verbose", False)
241 opts.worker_logdir = _get_conf(
242 cp, "backend", "worker_logdir", "/var/log/copr/workers/")
243
244
245
246
247 except ConfigParser.Error as e:
248 raise errors.CoprBackendError(
249 "Error parsing config file: {0}: {1}".format(
250 self.config_file, e))
251
252 if not opts.jobsdir or not opts.destdir:
253 raise errors.CoprBackendError(
254 "Incomplete Config - must specify"
255 " jobsdir and destdir in configuration")
256
257 if self.ext_opts:
258 for v in self.ext_opts:
259 setattr(opts, v, self.ext_opts.get(v))
260 return opts
261
263 self.abort = False
264 while not self.abort:
265
266 self.opts = self.read_conf()
267
268 if self.jobs.qsize():
269 self.event("# jobs in queue: {0}".format(self.jobs.qsize()))
270
271 if len(self.workers) < self.opts.num_workers:
272 self.event("Spinning up more workers for jobs")
273 for _ in range(self.opts.num_workers - len(self.workers)):
274 worker_num = len(self.workers) + 1
275 w = Worker(
276 self.opts, self.jobs, self.events, worker_num,
277 lock=self.lock)
278 self.workers.append(w)
279 w.start()
280 self.event("Finished starting worker processes")
281
282
283
284
285
286
287
288
289 for w in self.workers:
290 if not w.is_alive():
291 self.event("Worker {0} died unexpectedly".format(
292 w.worker_num))
293 if self.opts.exit_on_worker:
294 raise errors.CoprBackendError(
295 "Worker died unexpectedly, exiting")
296 else:
297 self.workers.remove(w)
298 w.terminate()
299
300 time.sleep(self.opts.sleeptime)
301
303 """
304 Cleanup backend processes (just workers for now)
305 """
306
307 self.abort = True
308 for w in self.workers:
309 self.workers.remove(w)
310 w.terminate()
311
312
314 parser = optparse.OptionParser("\ncopr-be [options]")
315 parser.add_option("-c", "--config", default="/etc/copr/copr-be.conf",
316 dest="config_file",
317 help="config file to use for copr-be run")
318 parser.add_option("-d", "--daemonize", default=False, dest="daemonize",
319 action="store_true", help="daemonize or not")
320 parser.add_option("-p", "--pidfile",
321 default="/var/run/copr-backend/copr-be.pid",
322 dest="pidfile",
323 help="pid file to use for copr-be if daemonized")
324 parser.add_option("-x", "--exit", default=False, dest="exit_on_worker",
325 action="store_true", help="exit on worker failure")
326 parser.add_option("-v", "--verbose", default=False, dest="verbose",
327 action="store_true", help="be more verbose")
328
329 opts, args = parser.parse_args(args)
330 if not os.path.exists(opts.config_file):
331 sys.stderr.write("No config file found at: {0}\n".format(
332 opts.config_file))
333 sys.exit(1)
334 opts.config_file = os.path.abspath(opts.config_file)
335
336 ret_opts = Bunch()
337 for o in ("daemonize", "exit_on_worker", "pidfile", "config_file"):
338 setattr(ret_opts, o, getattr(opts, o))
339
340 return ret_opts
341
342
344 opts = parse_args(args)
345
346 try:
347 context = daemon.DaemonContext(
348 pidfile=lockfile.FileLock(opts.pidfile),
349 gid=grp.getgrnam("copr").gr_gid,
350 uid=pwd.getpwnam("copr").pw_uid,
351 detach_process=opts.daemonize,
352 umask=022,
353 stderr=sys.stderr,
354 signal_map={
355 signal.SIGTERM: "terminate",
356 signal.SIGHUP: "terminate",
357 },
358 )
359 with context:
360 cbe = CoprBackend(opts.config_file, ext_opts=opts)
361 cbe.run()
362 except (Exception, KeyboardInterrupt):
363 sys.stderr.write("Killing/Dying\n")
364 if "cbe" in locals():
365 cbe.terminate()
366 raise
367
368 if __name__ == "__main__":
369 try:
370 main(sys.argv[1:])
371 except KeyboardInterrupt:
372 sys.stderr.write("\nUser cancelled, may need cleanup\n")
373 sys.exit(0)
374