Woodchuck: 785656a245efadfeb5603997dd69041a9e132f85

     1: #! /usr/bin/env python
     2: 
     3: # Copyright 2011 Neal H. Walfield <neal@walfield.org>
     4: #
     5: # This file is part of Woodchuck.
     6: #
     7: # Woodchuck is free software; you can redistribute it and/or modify it
     8: # under the terms of the GNU General Public License as published by
     9: # the Free Software Foundation; either version 3 of the License, or
    10: # (at your option) any later version.
    11: #
    12: # Woodchuck is distributed in the hope that it will be useful, but WITHOUT
    13: # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
    14: # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public
    15: # License for more details.
    16: #
    17: # You should have received a copy of the GNU General Public License
    18: # along with this program.  If not, see
    19: # <http://www.gnu.org/licenses/>.
    20: 
    21: from __future__ import with_statement
    22: 
    23: import sys
    24: import os
    25: import subprocess
    26: import collections
    27: import gobject
    28: import json
    29: from optparse import OptionParser
    30: import tempfile
    31: import smtplib
    32: import traceback
    33: from email.mime.text import MIMEText
    34: import atexit
    35: import dbus
    36: 
    37: from dbus.mainloop.glib import DBusGMainLoop
    38: DBusGMainLoop(set_as_default=True)
    39: 
    40: # Basic configuration parameters.
    41: 
    42: # The number of synchronization operations to run in parallel.
    43: parallel_transfers = 3
    44: 
    45: # The default target object freshness (in hours).
    46: default_freshness = 24
    47: 
    48: # Load and configure the logging facilities.
    49: import logging
    50: logger = logging.getLogger(__name__)
    51: 
    52: # Save the logging output to a file.  If the file exceeds
    53: # logfile_max_size, rotate it (and remove the old file).
    54: logfile = os.path.expanduser("~/.vcssync.log")
    55: logfile_max_size = 1024 * 1024
    56: try:
    57:     logfile_size = os.stat(logfile).st_size
    58: except OSError:
    59:     # Most likely the file does not exist.
    60:     logfile_size = 0
    61: 
    62: if logfile_size > logfile_max_size:
    63:     try:
    64:         old_logfile = logfile + ".old"
    65:         os.rename(logfile, old_logfile)
    66:     except OSError, e:
    67:         print "Renaming %s to %s: %s" % (logfile, old_logfile, str(e))
    68: 
    69: # The desired logging level.
    70: #logging_level = logging.DEBUG
    71: logging_level = logging.INFO
    72: logging.basicConfig(
    73:     level=logging_level,
    74:     format=('%(asctime)s (pid: ' + str(os.getpid()) + ') '
    75:             + '%(levelname)-8s %(message)s'),
    76:     filename=logfile,
    77:     filemode='a')
    78: 
    79: def print_and_log(*args):
    80:     """
    81:     Print the arguments to stdout and send them to the log file.
    82:     """
    83:     logger.warn(*args)
    84:     sys.stdout.write(' '.join(args) + "\n")
    85: 
    86: logger.info("VCS Sync started (%s)." % (repr(sys.argv)))
    87: 
    88: # Log uncaught exceptions.
    89: original_excepthook = sys.excepthook
    90: 
    91: def my_excepthook(exctype, value, tb):
    92:     """Log uncaught exceptions."""
    93:     logger.error(
    94:         "Uncaught exception: %s"
    95:         % (''.join(traceback.format_exception(exctype, value, tb)),))
    96:     original_excepthook(exctype, value, tb)
    97: sys.excepthook = my_excepthook
    98: 
    99: @atexit.register
   100: def exit_handler():
   101:     logger.info("Exiting.")
   102: 
   103: # Load woodchuck.
   104: try:
   105:     from pywoodchuck import PyWoodchuck, __file__ as pywoodchuck_file
   106:     import woodchuck
   107:     logging.debug("pywoodchuck loaded successfully (%s, %s)."
   108:                   % (pywoodchuck_file, woodchuck.__file__))
   109: except ImportError, e:
   110:     # If Woodchuck is not found, don't die horribly.
   111:     print_and_log("Loading pywoodchuck failed: %s." % (str(e),))
   112:     class PyWoodchuck(object):
   113:         def __init__(*args):
   114:             pass
   115:         def available(self):
   116:             return False
   117: 
   118: # The data structure describing a remote repository and what to do
   119: # with it.
   120: #
   121: #  - directory is the root directory of the repository
   122: #  - sync is either 'push' or 'pull'
   123: #  - remote is the repository to push to or from.  If None, the
   124: #      default remote is used.
   125: #  - refs is the set of refs to push or pull.  If None, the default
   126: #      set of references is used.
   127: #  - freshness is how often to perform the action, in days.
   128: Remote = collections.namedtuple(
   129:     'Remote', ['sync', 'directory', 'remote', 'refs', 'freshness'])
   130: 
   131: def remote_to_id(remote):
   132:     """
   133:     Given a remote, convert it to a Woodchuck id by joining each field
   134:     together and separating them by spaces.
   135: 
   136:     We assume that none of the components contain a space.
   137:     """
   138:     id = [ remote.sync, remote.directory ]
   139:     if remote.remote:
   140:         id.append(remote.remote)
   141:         if remote.refs:
   142:             id.append(remote.refs)
   143:     else:
   144:         assert remote.refs is None
   145:     return ' '.join(id)
   146: 
   147: def id_to_remote(id):
   148:     """
   149:     Given an id that was encoded using remote_to_id, convert it to a
   150:     Remote.
   151:     """
   152:     (sync, directory, remote, refs) = (id.split(" ", 3) + [None, None])[:4]
   153:     return Remote(sync, directory, remote, refs, None)
   154: 
   155: # Class that receives and processes Woodchuck upcalls.
   156: class mywoodchuck(PyWoodchuck):
   157:     def __init__(self, human_readable_name, dbus_service_name,
   158:                  request_feedback):
   159:         # We need to claim the name before we register with Woodchuck.
   160:         #
   161:         # Consider: the reason we started is that Woodchuck might have
   162:         # made an upcall.  The DBus daemon will only queue the message
   163:         # for 25 seconds, after which point it will drop the message
   164:         # on the floor.  Registering with Woodchuck means using DBus.
   165:         # Indeed, it means making a blocking call to Woodchuck.  If
   166:         # Woodchuck is currently running the scheduler (which it
   167:         # likely is if it just made an upcall to us), then we could
   168:         # block long enough that the message is dropped.
   169:         try:
   170:             self.bus_name = dbus.service.BusName(dbus_service_name,
   171:                                                  bus=dbus.SessionBus(),
   172:                                                  do_not_queue=True)
   173:         except dbus.exceptions.NameExistsException, e:
   174:             print_and_log("Already running (Unable to claim %s: %s)."
   175:                           % (dbus_service_name, str(e)))
   176:             sys.exit(1)
   177: 
   178:         PyWoodchuck.__init__(self, human_readable_name, dbus_service_name,
   179:                              request_feedback)
   180:         if not self.available():
   181:             print_and_log("Woodchuck server not running.")
   182: 
   183:     # Streams don't have any content to be updated.  Immediately
   184:     # indicate that the update was success, but that we didn't find
   185:     # anything new.
   186:     def stream_update_cb(self, stream):
   187:         remote = id_to_remote(stream.identifier)
   188:         logger.debug("stream update called on %s)" % (str(remote),))
   189: 
   190:         self[stream.identifier].transferred(
   191:             transferred_up=0, transferred_down=0,
   192:             transfer_time=0, transfer_duration=0, new_objects=0)
   193: 
   194:     # Objects are updated (unlike streams).  If the object is still in
   195:     # the configuration file, start the update.  Otherwise, unregister
   196:     # the object (or, rather, its stream as each stream has exactly
   197:     # one object).
   198:     def object_transfer_cb(self, stream, object,
   199:                            version, filename, quality):
   200:         logger.debug("object transfer called on %s)"
   201:                      % (stream.identifier,))
   202: 
   203:         # Reload the configuration.
   204:         load_config()
   205: 
   206:         remote = id_to_remote(stream.identifier)
   207:         for remote in remotes:
   208:             if remote_to_id(remote) == stream.identifier:
   209:                 transfer(remote)
   210:                 break
   211:         else:
   212:             # User doesn't want to update this object any more--it's
   213:             # no longer in the config, but we haven't removed it yet.
   214:             logger.debug("object transfer: %s no longer in config."
   215:                          % (stream.identifier,))
   216:             self[stream.identifier][object.identifier].transfer_failed(
   217:                 woodchuck.TransferStatus.FailureGone)
   218:             del self[stream.identifier][object.identifier]
   219: 
   220: # The set of active fetchers/pushers.
   221: Transferer = collections.namedtuple(
   222:     'Transferer', ['process', 'command_line', 'remote', 'output', 'results'])
   223: transferers = []
   224: transferers_queued = []
   225: 
   226: some_transfer_failed = 0
   227: 
   228: def idle():
   229:     return not transferers and not transferers_queued
   230: 
   231: def transfer(remote):
   232:     global transferers
   233:     global transferers_queued
   234: 
   235:     logger.debug("transfer: transferring %s." % (str(remote),))
   236: 
   237:     if len(transferers) >= parallel_transfers:
   238:         logger.debug("Maximum transfer parallelism reached (%d).  Enqueuing %s"
   239:                      % (parallel_transfers, str(remote),))
   240:         transferers_queued.append(remote)
   241:         return
   242: 
   243:     directory = os.path.expanduser(remote.directory)
   244: 
   245:     # This should ensured by the configuration loader.
   246:     assert remote.sync in ('push', 'pull')
   247: 
   248:     args = None
   249:     if (# git: Look for a .git directory or the usual files in a bare
   250:         # repository.
   251:         os.path.exists(os.path.join(directory, '.git'))
   252:         or (all(os.path.exists(os.path.join(directory, d))
   253:                 for d in ['branches', 'config', 'HEAD', 'hooks',
   254:                           'info', 'objects', 'refs']))):
   255:         args = ['/usr/bin/env', 'git']
   256:     
   257:         if remote.sync == 'push':
   258:             args.append('push')
   259:         elif remote.sync == 'pull':
   260:             args.append('fetch')
   261:     
   262:         if remote.remote is not None:
   263:             args.append(remote.remote)
   264:     
   265:         if remote.refs is not None:
   266:             args.append(remote.refs)
   267:     elif (# Mercurial: Look for a .hg directory
   268:         os.path.exists(os.path.join(directory, '.hg'))):
   269:         args = ['/usr/bin/env', 'hg', remote.sync]
   270:     
   271:         if remote.refs is not None:
   272:             args += ("-r", remote.refs)
   273: 
   274:         if remote.remote is not None:
   275:             args.append(remote.remote)
   276:     else:
   277:         # Unknown repository.
   278:         print_and_log("%s: %s does not contain a supported repository."
   279:                       % (remote_to_id(remote), directory))
   280:         some_transfer_failed = 1
   281:         return
   282: 
   283:     logger.debug("Running %s (remote: %s)"
   284:                  % (str(args), str(remote)))
   285: 
   286:     output = tempfile.TemporaryFile(mode='w+b')
   287: 
   288:     process = subprocess.Popen(
   289:         args=args,
   290:         cwd=directory,
   291:         stdout=output.fileno(),
   292:         stderr=subprocess.STDOUT)
   293:     transferers.append(Transferer(process, args, remote, output, []))
   294: 
   295:     poll_start()
   296: 
   297: poll_id = None
   298: 
   299: transferers_failed = []
   300: 
   301: def poll():
   302:     global transferers
   303:     global transferers_failed
   304:     global some_transfer_failed
   305:     global poll_id
   306: 
   307:     not_finished = []
   308:     for transferer in transferers:
   309:         ret = transferer.process.poll()
   310:         if ret is not None:
   311:             remote = remote_to_id(transferer.remote)
   312: 
   313:             # Read in any output.
   314:             transferer.output.seek(0)
   315:             output = transferer.output.read()
   316:             transferer.output.close()
   317: 
   318:             transferer.results.append(ret)
   319:             transferer.results.append(output)
   320: 
   321:             if ret != 0:
   322:                 transferers_failed.append(transferer)
   323:                 some_transfer_failed = 1
   324: 
   325:             print_and_log("%s: %s%s"
   326:                           % (remote,
   327:                              "ok." if ret == 0 else "failed!",
   328:                              ("\n" + output) if ret != 0 else ""))
   329:             try:
   330:                 if wc.available():
   331:                     try:
   332:                         stream = wc[remote]
   333:                     except KeyError:
   334:                         stream = wc.stream_register(
   335:                             human_readable_name=remote,
   336:                             stream_identifier=remote,
   337:                             freshness=woodchuck.never_updated)
   338:     
   339:                     if ret == 0:
   340:                         stream.object_transferred(remote)
   341:                     else:
   342:                         stream.object_transfer_failed(
   343:                             remote,
   344:                             woodchuck.TransferStatus.TransientOther)
   345:             except woodchuck.Error, e:
   346:                 logger.warn(
   347:                     "Failed to register transfer result with Woodchuck: %s"
   348:                     % (str(e)))
   349:         else:
   350:             not_finished.append(transferer)
   351: 
   352:     transferers = not_finished
   353: 
   354:     if len(transferers) < parallel_transfers and transferers_queued:
   355:         transfer(transferers_queued.pop(0))
   356: 
   357:     if idle():
   358:         if transferers_failed:
   359:             if options.daemon:
   360:                 message = """
   361: The following errors occured while synchronizing your repositories:
   362: 
   363: """
   364:                 for transferer in transferers_failed:
   365:                     message += (
   366:                         "Transfering %s failed.\n  %s -> exit code %d.\n%s\n"
   367:                         % (remote_to_id(transferer.remote),
   368:                            ' '.join("'" + a + "'"
   369:                                     for a in transferer.command_line),
   370:                            transferer.results[0],
   371:                            (("  " + l) for l in transferer.results[1])))
   372: 
   373:                 msg = MIMEText(message)
   374:                 msg['Subject'] = 'VCSSync: Synchronization failed.'
   375:                 email = os.environ['USER'] + '@localhost'
   376:                 msg['From'] = '"VCSSync Daemon" <%s>' % email
   377:                 msg['To'] = '<%s>' % email
   378: 
   379:                 s = smtplib.SMTP('localhost')
   380:                 s.sendmail(email, [email], msg.as_string())
   381:                 s.quit()
   382: 
   383:             transferers_failed[:] = []
   384:         poll_id = None
   385:         inactive_timeout_start()
   386:         return False
   387:     else:
   388:         return True
   389: 
   390: def poll_start():
   391:     global poll_id
   392:     if not poll_id:
   393:         poll_id = gobject.timeout_add(2000, poll)
   394: 
   395: # If running in daemon mode and there is no activity for a while (in
   396: # milliseconds), quit.
   397: inactive_duration = 5 * 60 * 1000
   398: 
   399: inactive_timeout_id = None
   400: def inactive_timeout():
   401:     if idle():
   402:         logger.debug("Inactive, quitting.")
   403:         mainloop.quit()
   404: 
   405:     inactive_timeout_remove()
   406:     return False
   407: 
   408: def inactive_timeout_remove():
   409:     global inactive_timeout_id
   410:     if inactive_timeout_id:
   411:         gobject.source_remove(inactive_timeout_id)
   412:         inactive_timeout_id = None
   413: 
   414: def inactive_timeout_start():
   415:     global inactive_timeout_id
   416: 
   417:     inactive_timeout_remove()
   418: 
   419:     if not options.daemon:
   420:         inactive_timeout()
   421:     else:
   422:         logger.debug("Started inactivity timer.")
   423:         inactive_timeout_id \
   424:             = gobject.timeout_add(inactive_duration, inactive_timeout)
   425: 
   426: # Command line options.
   427: parser = OptionParser()
   428: parser.add_option("-d", "--daemon", action="store_true", default=False,
   429:                   help="don't automatically synchronize all files.")
   430: parser.add_option("-r", "--reload-config", action="store_true", default=False,
   431:                   help="just read the configuration file, then quit.")
   432: (options, args) = parser.parse_args()
   433: 
   434: if args:
   435:     print "Unknown arguments: %s" % (str(args))
   436:     parser.print_help()
   437:     sys.exit(1)
   438: 
   439: # Config file parsing.
   440: remotes = []
   441: 
   442: config_file = "~/.vcssync"
   443: config_file_expanded = os.path.expanduser(config_file)
   444: config_file_loaded = None
   445: 
   446: config_file_example = """\
   447: // This file is a json file.
   448: //   
   449: //  vcssync expects a single data structure, a hash mapping repository
   450: //  directories to an array of hashes.  The array is a list of
   451: //  synchronization actions to perform on the repository.  Each inner
   452: //  hash contains of up to four keys:
   453: // 
   454: //   'sync': The direction to synchronize: either 'pull' or 'push',
   455: //     (default: 'pull').
   456: //   'remote': The remote to sync with (default: the VCS's default,
   457: //     typically 'origin').
   458: //   'refs': The set of references to synchronize (default: all
   459: //     references).
   460: //   'freshness': How out of data to allow the copy to become, in hours
   461: //     (default: 24).  In other words, approximately how often to perform
   462: //     the action.
   463: {
   464:     "~/src/some-project": [
   465: 	{"sync": "pull"},
   466: 	{"sync": "pull", "remote": "gitorious-alice", freshness:48},
   467: 	{"sync": "pull", "remote": "gitorious-bob", freshness:48},
   468:         // Push to the backup server approximately every two hours.
   469: 	{"sync": "push", "remote": "backup-server", freshness:2}
   470:     ],
   471:     "~/src/other-project": [
   472: 	{"sync": "pull"},
   473: 	{"sync": "push", "remote": "backup-server", freshness:2}
   474:     ]
   475: }
   476: """
   477: def load_config():
   478:     global remotes
   479:     global config_file_loaded
   480: 
   481:     if not os.path.isfile(config_file_expanded):
   482:         print_and_log("Configuration file (%s) does not exist."
   483:                       % (config_file,))
   484: 
   485:         with open(config_file_expanded, "w") as fhandle:
   486:             print >>fhandle, config_file_example,
   487: 
   488:         print_and_log("Example configuration file created.")
   489: 
   490:         sys.exit(1)
   491: 
   492:     mtime = os.stat(config_file_expanded).st_mtime
   493:     if config_file_loaded is not None:
   494:         if config_file_loaded == mtime:
   495:             logger.debug("Config file unchanged, not reloading.")
   496:             return
   497:         else:
   498:             logger.debug("Config file changed, reloading.")
   499: 
   500:     config_file_loaded = mtime
   501: 
   502:     remotes = []
   503:     config = None
   504:     try:
   505:         raw_data = ""
   506:         data = ""
   507:         with open(os.path.expanduser(config_file)) as fhandle:
   508:             for line in fhandle:
   509:                 raw_data += line
   510: 
   511:                 line = line.strip()
   512:                 if line[:2] == "//":
   513:                     # Strip comments, which the json parser does not
   514:                     # support, but, preserve lines numbers to improve
   515:                     # error messages.
   516:                     data += "\n"
   517:                     continue
   518:                 else:
   519:                     data += line + "\n"
   520: 
   521:             if raw_data == config_file_example:
   522:                 print_and_log("vcssync unconfigured.  Please edit %s."
   523:                               % (config_file,))
   524:                 sys.exit(1)
   525: 
   526:             config = json.loads(data)
   527:     except ValueError, e:
   528:         print_and_log("Error parsing %s: %s" % (config_file, str(e)))
   529:         sys.exit(1)
   530:     except OSError, e:
   531:         print_and_log("Error opening %s: %s" % (config_file, str(e)))
   532:         sys.exit(1)
   533:     
   534:     error = False
   535:     remotes = []
   536:     for repository, lines in config.items():
   537:         for line in lines:
   538:             try:
   539:                 remote = line.get('remote', None)
   540:     
   541:                 sync = line.get('sync', 'pull')
   542:                 if sync not in ('pull', 'push'):
   543:                     print_and_log(("Unsupported sync value for %s/%s: %s"
   544:                                    + " (either push or pull)")
   545:                                   % (repository, remote, sync))
   546:     
   547:                 refs = line.get('refs', None)
   548:     
   549:                 freshness = line.get('freshness', default_freshness)
   550: 
   551:                 remotes.append(Remote(sync, repository, remote, refs,
   552:                                       freshness))
   553:             except AttributeError:
   554:                 error = True
   555:                 print_and_log("Error processing %s's stanza: %s"
   556:                               % (repository, str(line)))
   557: 
   558:     if error:
   559:         print_and_log("Error parsing %s." % (config_file,))
   560:         sys.exit(1)
   561:     
   562:     if not remotes:
   563:         print_and_log("No repositories defined in %s." % (config_file,))
   564:         sys.exit(0)
   565:     
   566:     for i, remote in enumerate(remotes):
   567:         logger.debug("%d: Configured action: %s" % (i, str(remote)))
   568: 
   569: load_config()
   570: 
   571: # Woodchuck synchronization.
   572: wc = None
   573: 
   574: def register(remote):
   575:     """
   576:     Register the remote.  Do not die if the remote is already
   577:     registered.
   578: 
   579:     Returns True if something was registered or updated.  False,
   580:     otherwise.
   581:     """
   582:     id = remote_to_id(remote)
   583:     try:
   584:         wc.stream_register(human_readable_name=id,
   585:                            stream_identifier=id,
   586:                            freshness=woodchuck.never_updated)
   587:     except woodchuck.ObjectExistsError:
   588:         pass
   589: 
   590:     freshness = remote.freshness * 60 * 60
   591:     try:
   592:         wc[id].object_register(human_readable_name=id,
   593:                                object_identifier=id,
   594:                                transfer_frequency=freshness)
   595:         return True
   596:     except woodchuck.ObjectExistsError:
   597:         if wc[id][id].transfer_frequency != freshness:
   598:             wc[id][id].transfer_frequency = freshness
   599:             return True
   600: 
   601:     return False
   602: 
   603: def main():
   604:     # Only request feedback if we are running as a daemon.
   605:     global wc
   606:     wc = mywoodchuck("VCS Sync", "org.woodchuck.vcssync",
   607:                      request_feedback=options.daemon)
   608: 
   609:     if options.daemon or options.reload_config:
   610:         # Only bother synchronizing the configuration and the Woodchuck
   611:         # configuration if we are running as a daemon.
   612:         if wc.available():
   613:             # Register any previously unknown streams.
   614:             streams = wc.streams_list()
   615:             stream_ids = [s.identifier for s in streams]
   616:         
   617:             logger.debug("Registered streams: %s" % (str(stream_ids),))
   618:             logger.debug("Configured streams: %s" % (str(remotes),))
   619:     
   620:             changes = 0
   621:             for remote in remotes:
   622:                 id = remote_to_id(remote)
   623:                 if id in stream_ids:
   624:                     stream_ids.remove(id)
   625:     
   626:                 if register(remote):
   627:                     changes += 1
   628: 
   629:             # Unregister streams that the user is no longer interested in
   630:             # (i.e., removed from the configuration file.
   631:             for id in stream_ids:
   632:                 logger.info("Unregistering %s" % id)
   633:                 del wc[id]
   634:     
   635:         if (options.reload_config or changes or len(stream_ids)):
   636:             print_and_log("%d synchronization actions updated, %d pruned."
   637:                           % (changes, len(stream_ids)))
   638: 
   639:         if options.reload_config:
   640:             sys.exit(0)
   641:     
   642:         inactive_timeout_start()
   643:     else:
   644:         # We are not running in daemon mode.  Update everything.
   645:         for remote in remotes:
   646:             transfer(remote)
   647: 
   648: gobject.idle_add(main)
   649: mainloop = gobject.MainLoop()
   650: mainloop.run()
   651: if some_transfer_failed:
   652:     sys.exit(1)

Generated by git2html.