Woodchuck: 785656a245efadfeb5603997dd69041a9e132f85
1: #! /usr/bin/env python
2:
3: # Copyright 2011 Neal H. Walfield <neal@walfield.org>
4: #
5: # This file is part of Woodchuck.
6: #
7: # Woodchuck is free software; you can redistribute it and/or modify it
8: # under the terms of the GNU General Public License as published by
9: # the Free Software Foundation; either version 3 of the License, or
10: # (at your option) any later version.
11: #
12: # Woodchuck is distributed in the hope that it will be useful, but WITHOUT
13: # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14: # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
15: # License for more details.
16: #
17: # You should have received a copy of the GNU General Public License
18: # along with this program. If not, see
19: # <http://www.gnu.org/licenses/>.
20:
21: from __future__ import with_statement
22:
23: import sys
24: import os
25: import subprocess
26: import collections
27: import gobject
28: import json
29: from optparse import OptionParser
30: import tempfile
31: import smtplib
32: import traceback
33: from email.mime.text import MIMEText
34: import atexit
35: import dbus
36:
37: from dbus.mainloop.glib import DBusGMainLoop
38: DBusGMainLoop(set_as_default=True)
39:
40: # Basic configuration parameters.
41:
42: # The number of synchronization operations to run in parallel.
43: parallel_transfers = 3
44:
45: # The default target object freshness (in hours).
46: default_freshness = 24
47:
48: # Load and configure the logging facilities.
49: import logging
50: logger = logging.getLogger(__name__)
51:
52: # Save the logging output to a file. If the file exceeds
53: # logfile_max_size, rotate it (and remove the old file).
54: logfile = os.path.expanduser("~/.vcssync.log")
55: logfile_max_size = 1024 * 1024
56: try:
57: logfile_size = os.stat(logfile).st_size
58: except OSError:
59: # Most likely the file does not exist.
60: logfile_size = 0
61:
62: if logfile_size > logfile_max_size:
63: try:
64: old_logfile = logfile + ".old"
65: os.rename(logfile, old_logfile)
66: except OSError, e:
67: print "Renaming %s to %s: %s" % (logfile, old_logfile, str(e))
68:
69: # The desired logging level.
70: #logging_level = logging.DEBUG
71: logging_level = logging.INFO
72: logging.basicConfig(
73: level=logging_level,
74: format=('%(asctime)s (pid: ' + str(os.getpid()) + ') '
75: + '%(levelname)-8s %(message)s'),
76: filename=logfile,
77: filemode='a')
78:
79: def print_and_log(*args):
80: """
81: Print the arguments to stdout and send them to the log file.
82: """
83: logger.warn(*args)
84: sys.stdout.write(' '.join(args) + "\n")
85:
86: logger.info("VCS Sync started (%s)." % (repr(sys.argv)))
87:
88: # Log uncaught exceptions.
89: original_excepthook = sys.excepthook
90:
91: def my_excepthook(exctype, value, tb):
92: """Log uncaught exceptions."""
93: logger.error(
94: "Uncaught exception: %s"
95: % (''.join(traceback.format_exception(exctype, value, tb)),))
96: original_excepthook(exctype, value, tb)
97: sys.excepthook = my_excepthook
98:
99: @atexit.register
100: def exit_handler():
101: logger.info("Exiting.")
102:
103: # Load woodchuck.
104: try:
105: from pywoodchuck import PyWoodchuck, __file__ as pywoodchuck_file
106: import woodchuck
107: logging.debug("pywoodchuck loaded successfully (%s, %s)."
108: % (pywoodchuck_file, woodchuck.__file__))
109: except ImportError, e:
110: # If Woodchuck is not found, don't die horribly.
111: print_and_log("Loading pywoodchuck failed: %s." % (str(e),))
112: class PyWoodchuck(object):
113: def __init__(*args):
114: pass
115: def available(self):
116: return False
117:
118: # The data structure describing a remote repository and what to do
119: # with it.
120: #
121: # - directory is the root directory of the repository
122: # - sync is either 'push' or 'pull'
123: # - remote is the repository to push to or from. If None, the
124: # default remote is used.
125: # - refs is the set of refs to push or pull. If None, the default
126: # set of references is used.
127: # - freshness is how often to perform the action, in days.
128: Remote = collections.namedtuple(
129: 'Remote', ['sync', 'directory', 'remote', 'refs', 'freshness'])
130:
131: def remote_to_id(remote):
132: """
133: Given a remote, convert it to a Woodchuck id by joining each field
134: together and separating them by spaces.
135:
136: We assume that none of the components contain a space.
137: """
138: id = [ remote.sync, remote.directory ]
139: if remote.remote:
140: id.append(remote.remote)
141: if remote.refs:
142: id.append(remote.refs)
143: else:
144: assert remote.refs is None
145: return ' '.join(id)
146:
147: def id_to_remote(id):
148: """
149: Given an id that was encoded using remote_to_id, convert it to a
150: Remote.
151: """
152: (sync, directory, remote, refs) = (id.split(" ", 3) + [None, None])[:4]
153: return Remote(sync, directory, remote, refs, None)
154:
155: # Class that receives and processes Woodchuck upcalls.
156: class mywoodchuck(PyWoodchuck):
157: def __init__(self, human_readable_name, dbus_service_name,
158: request_feedback):
159: # We need to claim the name before we register with Woodchuck.
160: #
161: # Consider: the reason we started is that Woodchuck might have
162: # made an upcall. The DBus daemon will only queue the message
163: # for 25 seconds, after which point it will drop the message
164: # on the floor. Registering with Woodchuck means using DBus.
165: # Indeed, it means making a blocking call to Woodchuck. If
166: # Woodchuck is currently running the scheduler (which it
167: # likely is if it just made an upcall to us), then we could
168: # block long enough that the message is dropped.
169: try:
170: self.bus_name = dbus.service.BusName(dbus_service_name,
171: bus=dbus.SessionBus(),
172: do_not_queue=True)
173: except dbus.exceptions.NameExistsException, e:
174: print_and_log("Already running (Unable to claim %s: %s)."
175: % (dbus_service_name, str(e)))
176: sys.exit(1)
177:
178: PyWoodchuck.__init__(self, human_readable_name, dbus_service_name,
179: request_feedback)
180: if not self.available():
181: print_and_log("Woodchuck server not running.")
182:
183: # Streams don't have any content to be updated. Immediately
184: # indicate that the update was success, but that we didn't find
185: # anything new.
186: def stream_update_cb(self, stream):
187: remote = id_to_remote(stream.identifier)
188: logger.debug("stream update called on %s)" % (str(remote),))
189:
190: self[stream.identifier].transferred(
191: transferred_up=0, transferred_down=0,
192: transfer_time=0, transfer_duration=0, new_objects=0)
193:
194: # Objects are updated (unlike streams). If the object is still in
195: # the configuration file, start the update. Otherwise, unregister
196: # the object (or, rather, its stream as each stream has exactly
197: # one object).
198: def object_transfer_cb(self, stream, object,
199: version, filename, quality):
200: logger.debug("object transfer called on %s)"
201: % (stream.identifier,))
202:
203: # Reload the configuration.
204: load_config()
205:
206: remote = id_to_remote(stream.identifier)
207: for remote in remotes:
208: if remote_to_id(remote) == stream.identifier:
209: transfer(remote)
210: break
211: else:
212: # User doesn't want to update this object any more--it's
213: # no longer in the config, but we haven't removed it yet.
214: logger.debug("object transfer: %s no longer in config."
215: % (stream.identifier,))
216: self[stream.identifier][object.identifier].transfer_failed(
217: woodchuck.TransferStatus.FailureGone)
218: del self[stream.identifier][object.identifier]
219:
220: # The set of active fetchers/pushers.
221: Transferer = collections.namedtuple(
222: 'Transferer', ['process', 'command_line', 'remote', 'output', 'results'])
223: transferers = []
224: transferers_queued = []
225:
226: some_transfer_failed = 0
227:
228: def idle():
229: return not transferers and not transferers_queued
230:
231: def transfer(remote):
232: global transferers
233: global transferers_queued
234:
235: logger.debug("transfer: transferring %s." % (str(remote),))
236:
237: if len(transferers) >= parallel_transfers:
238: logger.debug("Maximum transfer parallelism reached (%d). Enqueuing %s"
239: % (parallel_transfers, str(remote),))
240: transferers_queued.append(remote)
241: return
242:
243: directory = os.path.expanduser(remote.directory)
244:
245: # This should ensured by the configuration loader.
246: assert remote.sync in ('push', 'pull')
247:
248: args = None
249: if (# git: Look for a .git directory or the usual files in a bare
250: # repository.
251: os.path.exists(os.path.join(directory, '.git'))
252: or (all(os.path.exists(os.path.join(directory, d))
253: for d in ['branches', 'config', 'HEAD', 'hooks',
254: 'info', 'objects', 'refs']))):
255: args = ['/usr/bin/env', 'git']
256:
257: if remote.sync == 'push':
258: args.append('push')
259: elif remote.sync == 'pull':
260: args.append('fetch')
261:
262: if remote.remote is not None:
263: args.append(remote.remote)
264:
265: if remote.refs is not None:
266: args.append(remote.refs)
267: elif (# Mercurial: Look for a .hg directory
268: os.path.exists(os.path.join(directory, '.hg'))):
269: args = ['/usr/bin/env', 'hg', remote.sync]
270:
271: if remote.refs is not None:
272: args += ("-r", remote.refs)
273:
274: if remote.remote is not None:
275: args.append(remote.remote)
276: else:
277: # Unknown repository.
278: print_and_log("%s: %s does not contain a supported repository."
279: % (remote_to_id(remote), directory))
280: some_transfer_failed = 1
281: return
282:
283: logger.debug("Running %s (remote: %s)"
284: % (str(args), str(remote)))
285:
286: output = tempfile.TemporaryFile(mode='w+b')
287:
288: process = subprocess.Popen(
289: args=args,
290: cwd=directory,
291: stdout=output.fileno(),
292: stderr=subprocess.STDOUT)
293: transferers.append(Transferer(process, args, remote, output, []))
294:
295: poll_start()
296:
297: poll_id = None
298:
299: transferers_failed = []
300:
301: def poll():
302: global transferers
303: global transferers_failed
304: global some_transfer_failed
305: global poll_id
306:
307: not_finished = []
308: for transferer in transferers:
309: ret = transferer.process.poll()
310: if ret is not None:
311: remote = remote_to_id(transferer.remote)
312:
313: # Read in any output.
314: transferer.output.seek(0)
315: output = transferer.output.read()
316: transferer.output.close()
317:
318: transferer.results.append(ret)
319: transferer.results.append(output)
320:
321: if ret != 0:
322: transferers_failed.append(transferer)
323: some_transfer_failed = 1
324:
325: print_and_log("%s: %s%s"
326: % (remote,
327: "ok." if ret == 0 else "failed!",
328: ("\n" + output) if ret != 0 else ""))
329: try:
330: if wc.available():
331: try:
332: stream = wc[remote]
333: except KeyError:
334: stream = wc.stream_register(
335: human_readable_name=remote,
336: stream_identifier=remote,
337: freshness=woodchuck.never_updated)
338:
339: if ret == 0:
340: stream.object_transferred(remote)
341: else:
342: stream.object_transfer_failed(
343: remote,
344: woodchuck.TransferStatus.TransientOther)
345: except woodchuck.Error, e:
346: logger.warn(
347: "Failed to register transfer result with Woodchuck: %s"
348: % (str(e)))
349: else:
350: not_finished.append(transferer)
351:
352: transferers = not_finished
353:
354: if len(transferers) < parallel_transfers and transferers_queued:
355: transfer(transferers_queued.pop(0))
356:
357: if idle():
358: if transferers_failed:
359: if options.daemon:
360: message = """
361: The following errors occured while synchronizing your repositories:
362:
363: """
364: for transferer in transferers_failed:
365: message += (
366: "Transfering %s failed.\n %s -> exit code %d.\n%s\n"
367: % (remote_to_id(transferer.remote),
368: ' '.join("'" + a + "'"
369: for a in transferer.command_line),
370: transferer.results[0],
371: ((" " + l) for l in transferer.results[1])))
372:
373: msg = MIMEText(message)
374: msg['Subject'] = 'VCSSync: Synchronization failed.'
375: email = os.environ['USER'] + '@localhost'
376: msg['From'] = '"VCSSync Daemon" <%s>' % email
377: msg['To'] = '<%s>' % email
378:
379: s = smtplib.SMTP('localhost')
380: s.sendmail(email, [email], msg.as_string())
381: s.quit()
382:
383: transferers_failed[:] = []
384: poll_id = None
385: inactive_timeout_start()
386: return False
387: else:
388: return True
389:
390: def poll_start():
391: global poll_id
392: if not poll_id:
393: poll_id = gobject.timeout_add(2000, poll)
394:
395: # If running in daemon mode and there is no activity for a while (in
396: # milliseconds), quit.
397: inactive_duration = 5 * 60 * 1000
398:
399: inactive_timeout_id = None
400: def inactive_timeout():
401: if idle():
402: logger.debug("Inactive, quitting.")
403: mainloop.quit()
404:
405: inactive_timeout_remove()
406: return False
407:
408: def inactive_timeout_remove():
409: global inactive_timeout_id
410: if inactive_timeout_id:
411: gobject.source_remove(inactive_timeout_id)
412: inactive_timeout_id = None
413:
414: def inactive_timeout_start():
415: global inactive_timeout_id
416:
417: inactive_timeout_remove()
418:
419: if not options.daemon:
420: inactive_timeout()
421: else:
422: logger.debug("Started inactivity timer.")
423: inactive_timeout_id \
424: = gobject.timeout_add(inactive_duration, inactive_timeout)
425:
426: # Command line options.
427: parser = OptionParser()
428: parser.add_option("-d", "--daemon", action="store_true", default=False,
429: help="don't automatically synchronize all files.")
430: parser.add_option("-r", "--reload-config", action="store_true", default=False,
431: help="just read the configuration file, then quit.")
432: (options, args) = parser.parse_args()
433:
434: if args:
435: print "Unknown arguments: %s" % (str(args))
436: parser.print_help()
437: sys.exit(1)
438:
439: # Config file parsing.
440: remotes = []
441:
442: config_file = "~/.vcssync"
443: config_file_expanded = os.path.expanduser(config_file)
444: config_file_loaded = None
445:
446: config_file_example = """\
447: // This file is a json file.
448: //
449: // vcssync expects a single data structure, a hash mapping repository
450: // directories to an array of hashes. The array is a list of
451: // synchronization actions to perform on the repository. Each inner
452: // hash contains of up to four keys:
453: //
454: // 'sync': The direction to synchronize: either 'pull' or 'push',
455: // (default: 'pull').
456: // 'remote': The remote to sync with (default: the VCS's default,
457: // typically 'origin').
458: // 'refs': The set of references to synchronize (default: all
459: // references).
460: // 'freshness': How out of data to allow the copy to become, in hours
461: // (default: 24). In other words, approximately how often to perform
462: // the action.
463: {
464: "~/src/some-project": [
465: {"sync": "pull"},
466: {"sync": "pull", "remote": "gitorious-alice", freshness:48},
467: {"sync": "pull", "remote": "gitorious-bob", freshness:48},
468: // Push to the backup server approximately every two hours.
469: {"sync": "push", "remote": "backup-server", freshness:2}
470: ],
471: "~/src/other-project": [
472: {"sync": "pull"},
473: {"sync": "push", "remote": "backup-server", freshness:2}
474: ]
475: }
476: """
477: def load_config():
478: global remotes
479: global config_file_loaded
480:
481: if not os.path.isfile(config_file_expanded):
482: print_and_log("Configuration file (%s) does not exist."
483: % (config_file,))
484:
485: with open(config_file_expanded, "w") as fhandle:
486: print >>fhandle, config_file_example,
487:
488: print_and_log("Example configuration file created.")
489:
490: sys.exit(1)
491:
492: mtime = os.stat(config_file_expanded).st_mtime
493: if config_file_loaded is not None:
494: if config_file_loaded == mtime:
495: logger.debug("Config file unchanged, not reloading.")
496: return
497: else:
498: logger.debug("Config file changed, reloading.")
499:
500: config_file_loaded = mtime
501:
502: remotes = []
503: config = None
504: try:
505: raw_data = ""
506: data = ""
507: with open(os.path.expanduser(config_file)) as fhandle:
508: for line in fhandle:
509: raw_data += line
510:
511: line = line.strip()
512: if line[:2] == "//":
513: # Strip comments, which the json parser does not
514: # support, but, preserve lines numbers to improve
515: # error messages.
516: data += "\n"
517: continue
518: else:
519: data += line + "\n"
520:
521: if raw_data == config_file_example:
522: print_and_log("vcssync unconfigured. Please edit %s."
523: % (config_file,))
524: sys.exit(1)
525:
526: config = json.loads(data)
527: except ValueError, e:
528: print_and_log("Error parsing %s: %s" % (config_file, str(e)))
529: sys.exit(1)
530: except OSError, e:
531: print_and_log("Error opening %s: %s" % (config_file, str(e)))
532: sys.exit(1)
533:
534: error = False
535: remotes = []
536: for repository, lines in config.items():
537: for line in lines:
538: try:
539: remote = line.get('remote', None)
540:
541: sync = line.get('sync', 'pull')
542: if sync not in ('pull', 'push'):
543: print_and_log(("Unsupported sync value for %s/%s: %s"
544: + " (either push or pull)")
545: % (repository, remote, sync))
546:
547: refs = line.get('refs', None)
548:
549: freshness = line.get('freshness', default_freshness)
550:
551: remotes.append(Remote(sync, repository, remote, refs,
552: freshness))
553: except AttributeError:
554: error = True
555: print_and_log("Error processing %s's stanza: %s"
556: % (repository, str(line)))
557:
558: if error:
559: print_and_log("Error parsing %s." % (config_file,))
560: sys.exit(1)
561:
562: if not remotes:
563: print_and_log("No repositories defined in %s." % (config_file,))
564: sys.exit(0)
565:
566: for i, remote in enumerate(remotes):
567: logger.debug("%d: Configured action: %s" % (i, str(remote)))
568:
569: load_config()
570:
571: # Woodchuck synchronization.
572: wc = None
573:
574: def register(remote):
575: """
576: Register the remote. Do not die if the remote is already
577: registered.
578:
579: Returns True if something was registered or updated. False,
580: otherwise.
581: """
582: id = remote_to_id(remote)
583: try:
584: wc.stream_register(human_readable_name=id,
585: stream_identifier=id,
586: freshness=woodchuck.never_updated)
587: except woodchuck.ObjectExistsError:
588: pass
589:
590: freshness = remote.freshness * 60 * 60
591: try:
592: wc[id].object_register(human_readable_name=id,
593: object_identifier=id,
594: transfer_frequency=freshness)
595: return True
596: except woodchuck.ObjectExistsError:
597: if wc[id][id].transfer_frequency != freshness:
598: wc[id][id].transfer_frequency = freshness
599: return True
600:
601: return False
602:
603: def main():
604: # Only request feedback if we are running as a daemon.
605: global wc
606: wc = mywoodchuck("VCS Sync", "org.woodchuck.vcssync",
607: request_feedback=options.daemon)
608:
609: if options.daemon or options.reload_config:
610: # Only bother synchronizing the configuration and the Woodchuck
611: # configuration if we are running as a daemon.
612: if wc.available():
613: # Register any previously unknown streams.
614: streams = wc.streams_list()
615: stream_ids = [s.identifier for s in streams]
616:
617: logger.debug("Registered streams: %s" % (str(stream_ids),))
618: logger.debug("Configured streams: %s" % (str(remotes),))
619:
620: changes = 0
621: for remote in remotes:
622: id = remote_to_id(remote)
623: if id in stream_ids:
624: stream_ids.remove(id)
625:
626: if register(remote):
627: changes += 1
628:
629: # Unregister streams that the user is no longer interested in
630: # (i.e., removed from the configuration file.
631: for id in stream_ids:
632: logger.info("Unregistering %s" % id)
633: del wc[id]
634:
635: if (options.reload_config or changes or len(stream_ids)):
636: print_and_log("%d synchronization actions updated, %d pruned."
637: % (changes, len(stream_ids)))
638:
639: if options.reload_config:
640: sys.exit(0)
641:
642: inactive_timeout_start()
643: else:
644: # We are not running in daemon mode. Update everything.
645: for remote in remotes:
646: transfer(remote)
647:
648: gobject.idle_add(main)
649: mainloop = gobject.MainLoop()
650: mainloop.run()
651: if some_transfer_failed:
652: sys.exit(1)
Generated by git2html.