diff --git a/VERSION b/VERSION index 9d6c175..f1c0aef 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.5.12 +0.6.dev diff --git a/setup.py b/setup.py old mode 100644 new mode 100755 index b7d8913..0380fc3 --- a/setup.py +++ b/setup.py @@ -9,6 +9,10 @@ # setuptools borrowed and adapted from GitPython. try: from setuptools import setup, find_packages + + # Silence pyflakes + assert setup + assert find_packages except ImportError: from ez_setup import use_setuptools use_setuptools() @@ -46,6 +50,9 @@ def run(self): # Just to trigger exception import xdist + # Silence pyflakes + assert xdist + retcode = subprocess.call('py.test') except ImportError: print >>sys.stderr, ( diff --git a/tests/test_log_help.py b/tests/test_log_help.py index 5a27f40..432a1f8 100644 --- a/tests/test_log_help.py +++ b/tests/test_log_help.py @@ -1,6 +1,54 @@ -import wal_e.log_help +import re + +import wal_e.log_help as log_help + + +def sanitize_log(log): + return re.sub(r'time=[0-9T:\-\.]+ pid=\d+', + 'time=2012-01-01T00.1234-00 pid=1234', + log) + def test_nonexisting_socket(tmpdir): # Must not raise an exception, silently failing is preferred for # now. - wal_e.log_help.configure(syslog_address=tmpdir.join('bogus')) + log_help.configure(syslog_address=tmpdir.join('bogus')) + + +def test_format_structured_info(): + zero = {}, 'time=2012-01-01T00.1234-00 pid=1234' + + one = ({'hello': 'world'}, + u'time=2012-01-01T00.1234-00 pid=1234 hello=world') + + many = ({'hello': 'world', 'goodbye': 'world'}, + u'time=2012-01-01T00.1234-00 pid=1234 goodbye=world hello=world') + + otherTyps = ({1: None, frozenset([1, ' ']): 7.0, '': ''}, + u"time=2012-01-01T00.1234-00 pid=1234 " + "1=None = frozenset([1, ' '])=7.0") + + for d, expect in [zero, one, many, otherTyps]: + result = log_help.WalELogger._fmt_structured(d) + assert sanitize_log(result) == expect + + +def test_fmt_logline_simple(): + out = log_help.WalELogger.fmt_logline( + 'The message', 'The detail', 'The hint', {'structured-data': 'yes'}) + out = sanitize_log(out) + + assert out == """MSG: The message +DETAIL: The detail +HINT: The hint +STRUCTURED: time=2012-01-01T00.1234-00 pid=1234 structured-data=yes""" + + # Try without structured data + out = log_help.WalELogger.fmt_logline( + 'The message', 'The detail', 'The hint') + out = sanitize_log(out) + + assert out == """MSG: The message +DETAIL: The detail +HINT: The hint +STRUCTURED: time=2012-01-01T00.1234-00 pid=1234""" diff --git a/tests/test_pipeline_sanity.py b/tests/test_pipeline_sanity.py index 3926853..33e3742 100644 --- a/tests/test_pipeline_sanity.py +++ b/tests/test_pipeline_sanity.py @@ -1,15 +1,53 @@ -import pytest +import wal_e.pipeline as pipeline -from wal_e.pipeline import * - -def test_rate_limit(tmpdir): - payload = 'abcd' * 1048576 - payload_file = tmpdir.join('payload') +def create_bogus_payload(dirname): + payload = 'abcd' * 1048576 + payload_file = dirname.join('payload') payload_file.write(payload) + return payload, payload_file + + +def test_rate_limit(tmpdir): + payload, payload_file = create_bogus_payload(tmpdir) - pl = PipeViwerRateLimitFilter(1048576 * 100, - stdin=payload_file.open()) + pl = pipeline.PipeViwerRateLimitFilter(1048576 * 100, + stdin=payload_file.open()) + pl.start() round_trip = pl.stdout.read() pl.finish() assert round_trip == payload + + +def test_upload_download_pipeline(tmpdir, rate_limit): + payload, payload_file = create_bogus_payload(tmpdir) + + # Upload section + test_upload = tmpdir.join('upload') + with open(unicode(test_upload), 'w') as upload: + with open(unicode(payload_file)) as inp: + pl = pipeline.get_upload_pipeline( + inp, upload, rate_limit=rate_limit) + pl.finish() + + with open(unicode(test_upload)) as completed: + round_trip = completed.read() + + # Download section + test_download = tmpdir.join('download') + with open(unicode(test_upload)) as upload: + with open(unicode(test_download), 'w') as download: + pl = pipeline.get_download_pipeline(upload, download) + pl.finish() + + with open(unicode(test_download)) as completed: + round_trip = completed.read() + + assert round_trip == payload + + +def pytest_generate_tests(metafunc): + # Test both with and without rate limiting if there is rate_limit + # parameter. + if "rate_limit" in metafunc.funcargnames: + metafunc.parametrize("rate_limit", [None, int(2 ** 25)]) diff --git a/wal_e/cmd.py b/wal_e/cmd.py index 64456c6..12a1be4 100755 --- a/wal_e/cmd.py +++ b/wal_e/cmd.py @@ -35,9 +35,8 @@ def gevent_monkey(*args, **kwargs): from wal_e.pipeline import LZOP_BIN, PV_BIN, GPG_BIN from wal_e.worker.pg_controldata_worker import CONFIG_BIN, PgControlDataParser -# TODO: Make controllable from userland log_help.configure( - format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s') + format='%(name)-12s %(levelname)-8s %(message)s') logger = log_help.WalELogger('wal_e.main', level=logging.INFO) @@ -66,8 +65,8 @@ def psql_err_handler(popen): 'note that superuser access is required')) # Bogus error message that is re-caught and re-raised - raise Exception('INTERNAL: Had problems running psql ' - 'from external_program_check') + raise EnvironmentError('INTERNAL: Had problems running psql ' + 'from external_program_check') with open(os.devnull, 'w') as nullf: for program in to_check: @@ -136,11 +135,11 @@ def main(argv=None): 'Can also be defined via environment variable ' 'WALE_S3_PREFIX') - parser.add_argument('--gpg-key-id', - help='GPG key ID to encrypt to. (Also needed when decrypting.) ' - 'Can also be defined via environment variable ' - 'WALE_GPG_KEY_ID') - + parser.add_argument( + '--gpg-key-id', + help='GPG key ID to encrypt to. (Also needed when decrypting.) ' + 'Can also be defined via environment variable ' + 'WALE_GPG_KEY_ID') subparsers = parser.add_subparsers(title='subcommands', dest='subcommand') @@ -238,14 +237,15 @@ def main(argv=None): help='A WAL segment number or base backup name') # delete old versions operator - delete_old_versions_parser = delete_subparsers.add_parser( + delete_subparsers.add_parser( 'old-versions', help=('Delete all old versions of WAL-E backup files. One probably ' - 'wants to ensure that they take a new backup with the new format ' - 'first. This is useful after a WAL-E major release upgrade.')) + 'wants to ensure that they take a new backup with the new ' + 'format first. ' + 'This is useful after a WAL-E major release upgrade.')) # delete *everything* operator - delete_old_versions_parser = delete_subparsers.add_parser( + delete_subparsers.add_parser( 'everything', help=('Delete all data in the current WAL-E context. ' 'Typically this is only appropriate when decommissioning an ' @@ -288,7 +288,8 @@ def main(argv=None): # This will be None if we're not encrypting gpg_key_id = args.gpg_key_id or os.getenv('WALE_GPG_KEY_ID') - backup_cxt = s3_operator.S3Backup(aws_access_key_id, secret_key, s3_prefix, gpg_key_id) + backup_cxt = s3_operator.S3Backup(aws_access_key_id, secret_key, s3_prefix, + gpg_key_id) subcommand = args.subcommand @@ -329,7 +330,8 @@ def main(argv=None): pool_size=args.pool_size) elif subcommand == 'wal-fetch': external_program_check([LZOP_BIN]) - res = backup_cxt.wal_s3_restore(args.WAL_SEGMENT, args.WAL_DESTINATION) + res = backup_cxt.wal_s3_restore(args.WAL_SEGMENT, + args.WAL_DESTINATION) if not res: sys.exit(1) elif subcommand == 'wal-push': diff --git a/wal_e/exception.py b/wal_e/exception.py index d4113d9..87ac7be 100644 --- a/wal_e/exception.py +++ b/wal_e/exception.py @@ -13,20 +13,21 @@ class UserException(Exception): http://developer.postgresql.org/pgdocs/postgres/error-style-guide.html - If it is necessary to trap these exceptions. use a subclass. + If it is necessary to trap these exceptions, use a subclass. >>> raise UserException(msg='foo', detail='bar') Traceback (most recent call last): ... UserException: ERROR: MSG: foo DETAIL: bar - + STRUCTURED: time=... pid=... >>> raise UserException(msg='foo', detail='bar', hint='hello') Traceback (most recent call last): ... UserException: ERROR: MSG: foo DETAIL: bar HINT: hello + STRUCTURED: time=... pid=... """ def __init__(self, msg=None, detail=None, hint=None): diff --git a/wal_e/log_help.py b/wal_e/log_help.py index 74eccbf..d368060 100644 --- a/wal_e/log_help.py +++ b/wal_e/log_help.py @@ -2,7 +2,7 @@ A module to assist with using the Python logging module """ - +import datetime import errno import logging import logging.handlers @@ -10,33 +10,7 @@ import time -class UTCFormatter(logging.Formatter): - - # Undocumented, seemingly still in 2.7 (see - # http://od-eon.com/blogs/stefan/logging-utc-timestamps-python/) - converter = time.gmtime - - def formatTime(self, record, datefmt=None): - """ - Return the creation time of the specified LogRecord as formatted text. - - Base taken from logging.Formatter, but modified very slightly - to produce a more standard ISO8601 millisecond-including - timestamp. At the very least, it was chosen to very carefully - be parsable with PostgreSQL's timestamptz datatype. - - It also avoids the representation of ISO8601 with spaces. - - """ - - ct = self.converter(record.created) - - if datefmt: - s = time.strftime(datefmt, ct) - else: - t = time.strftime("%Y-%m-%dT%H:%M:%S", ct) - s = "%s.%03d+00 pid=%d" % (t, record.msecs, os.getpid()) - return s +class IndentFormatter(logging.Formatter): def format(self, record, *args, **kwargs): """ @@ -51,10 +25,33 @@ def format(self, record, *args, **kwargs): def configure(*args, **kwargs): + """Guards configuring logging to enable retry + + Logging is rather important to start up properly, so try very hard + to make this happen: without it is difficult to report sane and + well-formatted error messages to the log. + """ + def terrible_log_output(s): + import sys + + print >>sys.stderr, s + + while True: + try: + return configure_guts(*args, **kwargs) + except EnvironmentError, e: + if e.errno == errno.EACCES: + terrible_log_output('wal-e: Could not set up logger because' + 'of EACCESS, connection refused issue: ' + 'retrying') + time.sleep(1) + + +def configure_guts(*args, **kwargs): """ Borrowed from logging.basicConfig - Uses the UTCFormatter instead of the regular Formatter + Uses the IndentFormatter instead of the regular Formatter Also, opts you into syslogging. @@ -74,9 +71,7 @@ def configure(*args, **kwargs): try: # Nobody can escape syslog, for now, and this default only - # works on Linux. See: - # - # http://docs.python.org/library/logging.handlers.html#sysloghandler + # works on Linux. handlers.append(logging.handlers.SysLogHandler(syslog_address)) except EnvironmentError, e: if e.errno == errno.ENOENT: @@ -88,8 +83,7 @@ def configure(*args, **kwargs): fs = kwargs.get("format", logging.BASIC_FORMAT) dfs = kwargs.get("datefmt", None) - style = kwargs.get("style", '%') - fmt = UTCFormatter(fs, dfs) + fmt = IndentFormatter(fs, dfs) for handler in handlers: handler.setFormatter(fmt) @@ -113,7 +107,23 @@ def __init__(self, *args, **kwargs): self._logger.setLevel(level) @staticmethod - def fmt_logline(msg, detail=None, hint=None): + def _fmt_structured(d): + """Formats '{k1:v1, k2:v2}' => 'time=... pid=... k1=v1 k2=v2' + + Output is lexically sorted, *except* the time and pid always + come first, to assist with human scanning of the data. + """ + timeEntry = datetime.datetime.utcnow().strftime( + "time=%Y-%m-%dT%H:%M:%S.%f-00") + pidEntry = "pid=" + str(os.getpid()) + + rest = sorted('='.join([unicode(k), unicode(v)]) + for (k, v) in d.items()) + + return ' '.join([timeEntry, pidEntry] + rest) + + @staticmethod + def fmt_logline(msg, detail=None, hint=None, structured=None): msg_parts = ['MSG: ' + msg] if detail is not None: @@ -121,15 +131,26 @@ def fmt_logline(msg, detail=None, hint=None): if hint is not None: msg_parts.append('HINT: ' + hint) + # Initialize a fresh dictionary if structured is not passed, + # because keyword arguments are not re-evaluated when calling + # the function and it's okay for callees to mutate their + # passed dictionary. + if structured is None: + structured = {} + + msg_parts.append('STRUCTURED: ' + + WalELogger._fmt_structured(structured)) + return '\n'.join(msg_parts) def log(self, level, msg, *args, **kwargs): detail = kwargs.pop('detail', None) hint = kwargs.pop('hint', None) + structured = kwargs.pop('structured', None) self._logger.log( level, - self.fmt_logline(msg, detail, hint), + self.fmt_logline(msg, detail, hint, structured), *args, **kwargs) # Boilerplate convenience shims to different logging levels. One diff --git a/wal_e/operator/s3_operator.py b/wal_e/operator/s3_operator.py index 3f80daf..b416b1c 100644 --- a/wal_e/operator/s3_operator.py +++ b/wal_e/operator/s3_operator.py @@ -1,21 +1,10 @@ -import argparse -import contextlib -import csv -import datetime -import errno import functools import gevent.pool -import glob import itertools import json import logging import os -import re -import signal -import subprocess import sys -import tempfile -import textwrap import wal_e.worker.s3_worker as s3_worker import wal_e.tar_partition as tar_partition @@ -24,9 +13,8 @@ from cStringIO import StringIO from wal_e.exception import UserException, UserCritical -from wal_e.piper import popen_sp from wal_e.storage import s3_storage -from wal_e.worker.psql_worker import PSQL_BIN, PgBackupStatements +from wal_e.worker.psql_worker import PgBackupStatements from wal_e.worker.pg_controldata_worker import PgControlDataParser @@ -45,7 +33,8 @@ class S3Backup(object): """ def __init__(self, - aws_access_key_id, aws_secret_access_key, s3_prefix, gpg_key_id): + aws_access_key_id, aws_secret_access_key, s3_prefix, + gpg_key_id): self.aws_access_key_id = aws_access_key_id self.aws_secret_access_key = aws_secret_access_key self.gpg_key_id = gpg_key_id @@ -70,9 +59,6 @@ def backup_list(self, query, detail): """ import csv - from boto.s3.connection import OrdinaryCallingFormat - from boto.s3.connection import S3Connection - from wal_e.storage.s3_storage import BackupInfo s3_conn = self.new_connection() @@ -193,9 +179,10 @@ def raise_walk_error(e): # Make an attempt to upload extended version metadata extended_version_url = backup_s3_prefix + '/extended_version.txt' - logger.info(msg='start upload postgres version metadata', - detail=('Uploading to {extended_version_url}.' - .format(extended_version_url=extended_version_url))) + logger.info( + msg='start upload postgres version metadata', + detail=('Uploading to {extended_version_url}.' + .format(extended_version_url=extended_version_url))) s3_worker.uri_put_file(extended_version_url, StringIO(version), content_encoding='text/plain') logger.info(msg='postgres version metadata upload complete') @@ -208,7 +195,8 @@ def raise_walk_error(e): total_size += tpart.total_member_size uploads.append(pool.apply_async( s3_worker.do_partition_put, - [backup_s3_prefix, tpart, per_process_limit, self.gpg_key_id])) + [backup_s3_prefix, tpart, per_process_limit, + self.gpg_key_id])) finally: while uploads: uploads.pop().get() @@ -298,19 +286,15 @@ def database_s3_fetch(self, pg_cluster_dir, backup_name, pool_size): p.join(raise_error=True) - def database_s3_backup(self, data_directory, *args, **kwargs): - """ - Uploads a PostgreSQL file cluster to S3 + """Uploads a PostgreSQL file cluster to S3 Mechanism: just wraps _s3_upload_pg_cluster_dir with start/stop backup actions with exception handling. In particular there is a 'finally' block to stop the backup in most situations. - """ - upload_good = False backup_stop_good = False while_offline = False @@ -323,19 +307,24 @@ def database_s3_backup(self, data_directory, *args, **kwargs): start_backup_info = PgBackupStatements.run_start_backup() version = PgBackupStatements.pg_version()['version'] else: - if os.path.exists(os.path.join(data_directory, 'postmaster.pid')): + if os.path.exists(os.path.join(data_directory, + 'postmaster.pid')): + hint = ('Shut down postgres. ' + 'If there is a stale lockfile, ' + 'then remove it after being very sure postgres ' + 'is not running.') raise UserException( msg='while_offline set, but pg looks to be running', detail='Found a postmaster.pid lockfile, and aborting', - hint='Shut down postgres. If there is a stale lockfile, ' - 'then remove it after being very sure postgres is not ' - 'running.') + hint=hint) controldata = PgControlDataParser(data_directory) - start_backup_info = controldata.last_xlog_file_name_and_offset() + start_backup_info = \ + controldata.last_xlog_file_name_and_offset() version = controldata.pg_version() uploaded_to, expanded_size_bytes = self._s3_upload_pg_cluster_dir( - start_backup_info, data_directory, version=version, *args, **kwargs) + start_backup_info, data_directory, version=version, + *args, **kwargs) upload_good = True finally: if not upload_good: @@ -391,17 +380,35 @@ def wal_s3_archive(self, wal_path): This code is intended to typically be called from Postgres's archive_command feature. - """ wal_file_name = os.path.basename(wal_path) - - # It's okay-ish for this to blow up in event of problems: - # Postgres will retry archiving *forever* - s3_worker.do_lzop_s3_put( - '{0}/wal_{1}/{2}'.format(self.s3_prefix, - FILE_STRUCTURE_VERSION, - wal_file_name), - wal_path, self.gpg_key_id) + s3_url = '{0}/wal_{1}/{2}'.format( + self.s3_prefix, FILE_STRUCTURE_VERSION, wal_file_name) + + logger.info(msg='begin archiving a file', + detail=('Uploading "{wal_path}" to "{s3_url}".' + .format(wal_path=wal_path, s3_url=s3_url)), + structured={'action': 'push-wal', + 'key': s3_url, + 'seg': wal_file_name, + 'prefix': self.s3_prefix, + 'state': 'begin'}) + + # Upload and record the rate at which it happened. + kib_per_second = s3_worker.do_lzop_s3_put(s3_url, wal_path, + self.gpg_key_id) + + logger.info( + msg='completed archiving to a file ', + detail=('Archiving to "{s3_url}" complete at ' + '{kib_per_second}KiB/s. ') + .format(s3_url=s3_url, kib_per_second=kib_per_second), + structured={'action': 'push-wal', + 'key': s3_url, + 'rate': kib_per_second, + 'seg': wal_file_name, + 'prefix': self.s3_prefix, + 'state': 'complete'}) def wal_s3_restore(self, wal_name, wal_destination): """ @@ -414,11 +421,30 @@ def wal_s3_restore(self, wal_name, wal_destination): basename(wal_path), so both are required. """ - return s3_worker.do_lzop_s3_get( - '{0}/wal_{1}/{2}.lzo'.format(self.s3_prefix, - FILE_STRUCTURE_VERSION, - wal_name), - wal_destination, (self.gpg_key_id is not None)) + + s3_url = '{0}/wal_{1}/{2}.lzo'.format( + self.s3_prefix, FILE_STRUCTURE_VERSION, wal_name) + + logger.info( + msg='begin wal restore', + structured={'action': 'wal-fetch', + 'key': s3_url, + 'seg': wal_name, + 'prefix': self.s3_prefix, + 'state': 'begin'}) + + ret = s3_worker.do_lzop_s3_get( + s3_url, wal_destination, self.gpg_key_id is not None) + + logger.info( + msg='complete wal restore', + structured={'action': 'wal-fetch', + 'key': s3_url, + 'seg': wal_name, + 'prefix': self.s3_prefix, + 'state': 'complete'}) + + return ret def delete_old_versions(self, dry_run): assert s3_storage.CURRENT_VERSION not in s3_storage.OBSOLETE_VERSIONS diff --git a/wal_e/pipeline.py b/wal_e/pipeline.py index 4ec0ecc..201bc45 100644 --- a/wal_e/pipeline.py +++ b/wal_e/pipeline.py @@ -1,4 +1,7 @@ -""" Primitives to manage and construct pipelines for compression/encryption. """ +"""Primitives to manage and construct pipelines for +compression/encryption. +""" + from gevent import sleep from wal_e.exception import UserCritical @@ -15,38 +18,56 @@ BUFSIZE_HT = 128 * 8192 -def get_upload_pipeline(in_fd, out_fd, gpg_key=None): + +def get_upload_pipeline(in_fd, out_fd, rate_limit=None, + gpg_key=None): """ Create a UNIX pipeline to process a file for uploading. (Compress, and optionally encrypt) """ + commands = [] + if rate_limit is not None: + commands.append(PipeViwerRateLimitFilter(rate_limit)) + commands.append(LZOCompressionFilter()) + if gpg_key is not None: - compress = LZOCompressionFilter(stdin=in_fd) - encrypt = GPGEncryptionFilter(gpg_key, stdin=compress.stdout, stdout=out_fd) - commands = [compress, encrypt] - else: - commands = [LZOCompressionFilter(stdin=in_fd, stdout=out_fd)] + commands.append(GPGEncryptionFilter(gpg_key)) + + return Pipeline(commands, in_fd, out_fd) - return Pipeline(commands) def get_download_pipeline(in_fd, out_fd, gpg=False): """ Create a pipeline to process a file after downloading. (Optionally decrypt, then decompress) """ - if gpg == True: - decrypt = GPGDecryptionFilter(stdin=in_fd) - decompress = LZODecompressionFilter(stdin=decrypt.stdout, stdout=out_fd) - commands = [decrypt, decompress] - else: - commands = [LZODecompressionFilter(stdin=in_fd, stdout=out_fd)] + commands = [] + if gpg: + commands.append(GPGDecryptionFilter()) + commands.append(LZODecompressionFilter()) - return Pipeline(commands) + return Pipeline(commands, in_fd, out_fd) class Pipeline(object): """ Represent a pipeline of commands. stdin and stdout are wrapped to be non-blocking. """ - def __init__(self, commands): + def __init__(self, commands, in_fd, out_fd): self.commands = commands + # Teach the first command to take input specially + commands[0].stdinSet = in_fd + last_command = commands[0] + + # Connect all interior commands to one another via stdin/stdout + for command in commands[1:]: + last_command.start() + command.stdinSet = last_command.stdout + last_command = command + + # Teach the last command to spill output to out_fd rather than to + # its default, which is typically stdout. + assert last_command is commands[-1] + last_command.stdoutSet = out_fd + last_command.start() + @property def stdin(self): return NonBlockPipeFileWrap(self.commands[0].stdin) @@ -56,34 +77,69 @@ def stdout(self): return NonBlockPipeFileWrap(self.commands[-1].stdout) def finish(self): - [command.finish() for command in self.commands] + for command in self.commands: + command.finish() class PipelineCommand(object): - """ A pipeline command. Stdin and stdout are *blocking* because you - want them to be if you're piping them to another command. + """A pipeline command - (If you need a gevent-compatible stdin/out, wrap it in NonBlockPipeFileWrap.) - """ - def __init__(self, stdin=PIPE, stdout=PIPE): - pass + Stdin and stdout are *blocking*, as tools that want to use + non-blocking pipes will set that on their own. - def start(self, command, stdin, stdout): + If one needs a gevent-compatible stdin/out, wrap it in + NonBlockPipeFileWrap. + """ + def __init__(self, command, stdin=PIPE, stdout=PIPE): self._command = command - self._process = popen_sp(command, stdin=stdin, stdout=stdout, - bufsize=BUFSIZE_HT, close_fds=True) + self._stdin = stdin + self._stdout = stdout + + self._process = None + + def start(self): + if self._process is not None: + raise StandardError( + 'BUG: Tried to .start on a PipelineCommand twice') + + self._process = popen_sp(self._command, + stdin=self._stdin, stdout=self._stdout, + bufsize=BUFSIZE_HT, close_fds=True) @property def stdin(self): return self._process.stdin + @stdin.setter + def stdinSet(self, value): + # Use the grotesque name 'stdinSet' to suppress pyflakes. + if self._process is not None: + raise StandardError( + 'BUG: Trying to set stdin on PipelineCommand ' + 'after it has already been .start-ed') + + self._stdin = value + @property def stdout(self): return self._process.stdout + @stdout.setter + def stdoutSet(self, value): + # Use the grotesque name 'stdoutSet' to suppress pyflakes. + if self._process is not None: + raise StandardError( + 'BUG: Trying to set stdout on PipelineCommand ' + 'after it has already been .start-ed') + + self._stdout = value + @property def returncode(self): - return self._process.returncode + if self._process is None: + return None + else: + return self._process.returncode def finish(self): while True: @@ -110,29 +166,34 @@ def finish(self): class PipeViwerRateLimitFilter(PipelineCommand): """ Limit the rate of transfer through a pipe using pv """ def __init__(self, rate_limit, stdin=PIPE, stdout=PIPE): - self.start([PV_BIN, '--rate-limit=' + unicode(rate_limit)], - stdin, stdout) + PipelineCommand.__init__( + self, + [PV_BIN, '--rate-limit=' + unicode(rate_limit)], stdin, stdout) class LZOCompressionFilter(PipelineCommand): """ Compress using LZO. """ def __init__(self, stdin=PIPE, stdout=PIPE): - self.start([LZOP_BIN, '--stdout'], stdin, stdout) + PipelineCommand.__init__( + self, [LZOP_BIN, '--stdout'], stdin, stdout) class LZODecompressionFilter(PipelineCommand): """ Decompress using LZO. """ def __init__(self, stdin=PIPE, stdout=PIPE): - self.start([LZOP_BIN, '-d', '--stdout', '-'], stdin, stdout) + PipelineCommand.__init__( + self, [LZOP_BIN, '-d', '--stdout', '-'], stdin, stdout) class GPGEncryptionFilter(PipelineCommand): """ Encrypt using GPG, using the provided public key ID. """ def __init__(self, key, stdin=PIPE, stdout=PIPE): - self.start([GPG_BIN, '-e', '-z', '0', '-r', key], stdin, stdout) + PipelineCommand.__init__( + self, [GPG_BIN, '-e', '-z', '0', '-r', key], stdin, stdout) class GPGDecryptionFilter(PipelineCommand): """ Decrypt using GPG (the private key must exist and be unpassworded). """ def __init__(self, stdin=PIPE, stdout=PIPE): - self.start([GPG_BIN, '-d', '-q'], stdin, stdout) + PipelineCommand.__init__( + self, [GPG_BIN, '-d', '-q'], stdin, stdout) diff --git a/wal_e/piper.py b/wal_e/piper.py index 94100dd..c34900f 100644 --- a/wal_e/piper.py +++ b/wal_e/piper.py @@ -15,8 +15,12 @@ import subprocess import sys -from subprocess import PIPE from cStringIO import StringIO +from subprocess import PIPE + +# This is not used in this module, but is imported by dependent +# modules, so do this to quiet pyflakes. +assert PIPE class NonBlockPipeFileWrap(object): @@ -93,7 +97,6 @@ def subprocess_setup(f=None): then restores SIGPIPE to what most Unix processes expect. http://bugs.python.org/issue1652 - http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html """ diff --git a/wal_e/storage/s3_storage.py b/wal_e/storage/s3_storage.py index f2045c7..b888e47 100644 --- a/wal_e/storage/s3_storage.py +++ b/wal_e/storage/s3_storage.py @@ -14,7 +14,7 @@ from urlparse import urlparse -CURRENT_VERSION = 'dev-version' +CURRENT_VERSION = '005' SEGMENT_REGEXP = (r'(?P(?P[0-9A-F]{8,8})(?P[0-9A-F]{8,8})' '(?P[0-9A-F]{8,8}))') @@ -27,10 +27,12 @@ VOLUME_REGEXP = (r'part_(\d+)\.tar\.lzo') + # A representation of a log number and segment, naive of timeline. # This number always increases, even when diverging into two # timelines, so it's useful for conservative garbage collection. -class SegmentNumber(collections.namedtuple('SegmentNumber', ['log', 'seg'])): +class SegmentNumber(collections.namedtuple('SegmentNumber', + ['log', 'seg'])): @property def as_an_integer(self): @@ -55,6 +57,7 @@ def as_an_integer(self): OBSOLETE_VERSIONS = frozenset(('004', '003', '002', '001', '000')) + class StorageLayout(object): """ Encapsulates and defines S3 URL path manipulations for WAL-E @@ -62,18 +65,18 @@ class StorageLayout(object): Without a trailing slash >>> sl = StorageLayout('s3://foo/bar') >>> sl.basebackups() - 'bar/basebackups_dev-version/' + 'bar/basebackups_005/' >>> sl.wal_directory() - 'bar/wal_dev-version/' + 'bar/wal_005/' >>> sl.bucket_name() 'foo' With a trailing slash >>> sl = StorageLayout('s3://foo/bar/') >>> sl.basebackups() - 'bar/basebackups_dev-version/' + 'bar/basebackups_005/' >>> sl.wal_directory() - 'bar/wal_dev-version/' + 'bar/wal_005/' >>> sl.bucket_name() 'foo' diff --git a/wal_e/tar_partition.py b/wal_e/tar_partition.py index e66fae6..ce8e2a4 100644 --- a/wal_e/tar_partition.py +++ b/wal_e/tar_partition.py @@ -40,11 +40,9 @@ import collections import errno import os -import sys import tarfile import wal_e.log_help as log_help -import wal_e.pipeline logger = log_help.WalELogger(__name__) @@ -120,19 +118,12 @@ def __init__(self, name, *args, **kwargs): list.__init__(self, *args, **kwargs) @staticmethod - def _padded_tar_add(tar, et_info, rate_limit=None): + def _padded_tar_add(tar, et_info): try: with open(et_info.submitted_path, 'rb') as raw_file: - if rate_limit is not None: - pv = wal_e.pipeline.PipeViwerRateLimitFilter(rate_limit, stdin=raw_file) - - with StreamPadFileObj(pv.stdout, - et_info.tarinfo.size) as f: - tar.addfile(et_info.tarinfo, f) - else: - with StreamPadFileObj(raw_file, - et_info.tarinfo.size) as f: - tar.addfile(et_info.tarinfo, f) + with StreamPadFileObj(raw_file, + et_info.tarinfo.size) as f: + tar.addfile(et_info.tarinfo, f) except EnvironmentError, e: if (e.errno == errno.ENOENT and @@ -146,7 +137,7 @@ def _padded_tar_add(tar, et_info, rate_limit=None): else: raise - def tarfile_write(self, fileobj, rate_limit=None): + def tarfile_write(self, fileobj): tar = None try: tar = tarfile.open(fileobj=fileobj, mode='w|') @@ -155,7 +146,7 @@ def tarfile_write(self, fileobj, rate_limit=None): # Treat files specially because they may grow, shrink, # or may be unlinked in the meanwhile. if et_info.tarinfo.isfile(): - self._padded_tar_add(tar, et_info, rate_limit=rate_limit) + self._padded_tar_add(tar, et_info) else: tar.addfile(et_info.tarinfo) finally: diff --git a/wal_e/worker/pg_controldata_worker.py b/wal_e/worker/pg_controldata_worker.py index 76dfb08..aca084d 100644 --- a/wal_e/worker/pg_controldata_worker.py +++ b/wal_e/worker/pg_controldata_worker.py @@ -5,6 +5,7 @@ CONTROLDATA_BIN = 'pg_controldata' CONFIG_BIN = 'pg_config' + class PgControlDataParser(object): """ When we're backing up a PG cluster that is not @@ -33,8 +34,8 @@ def __init__(self, data_directory): self._pg_version = val def _read_controldata(self): - controldata_proc = popen_sp([self._controldata_bin, self.data_directory], - stdout=PIPE) + controldata_proc = popen_sp( + [self._controldata_bin, self.data_directory], stdout=PIPE) stdout = controldata_proc.communicate()[0] controldata = {} for line in stdout.split('\n'): @@ -52,7 +53,8 @@ def pg_version(self): def last_xlog_file_name_and_offset(self): controldata = self._read_controldata() - last_checkpoint_offset = controldata["Latest checkpoint's REDO location"] + last_checkpoint_offset = \ + controldata["Latest checkpoint's REDO location"] current_timeline = controldata["Latest checkpoint's TimeLineID"] x, offset = last_checkpoint_offset.split('/') timeline = current_timeline.zfill(8) diff --git a/wal_e/worker/s3_worker.py b/wal_e/worker/s3_worker.py index 8562310..b35ad38 100644 --- a/wal_e/worker/s3_worker.py +++ b/wal_e/worker/s3_worker.py @@ -12,7 +12,6 @@ import logging import re import socket -import subprocess import sys import tarfile import tempfile @@ -22,7 +21,7 @@ import wal_e.storage.s3_storage as s3_storage import wal_e.log_help as log_help -from wal_e.exception import UserException, UserCritical +from wal_e.exception import UserException from wal_e.pipeline import get_upload_pipeline, get_download_pipeline from wal_e.piper import PIPE @@ -167,6 +166,7 @@ def format_kib_per_second(start, finish, amount_in_bytes): except ZeroDivisionError: return 'NaN' + def do_partition_put(backup_s3_prefix, tpart, rate_limit, gpg_key): """ Synchronous version of the s3-upload wrapper @@ -176,8 +176,9 @@ def do_partition_put(backup_s3_prefix, tpart, rate_limit, gpg_key): detail='Building volume {name}.'.format(name=tpart.name)) with tempfile.NamedTemporaryFile(mode='rwb') as tf: - pipeline = get_upload_pipeline(PIPE, tf, gpg_key=gpg_key) - tpart.tarfile_write(pipeline.stdin, rate_limit=rate_limit) + pipeline = get_upload_pipeline(PIPE, tf, + rate_limit=rate_limit, gpg_key=gpg_key) + tpart.tarfile_write(pipeline.stdin) pipeline.stdin.flush() pipeline.stdin.close() pipeline.finish() @@ -242,7 +243,6 @@ def put_file_helper(): .format(s3_url=s3_url, kib_per_second=kib_per_second)) - def do_lzop_s3_put(s3_url, local_path, gpg_key): """ Compress and upload a given local path. @@ -259,15 +259,12 @@ def do_lzop_s3_put(s3_url, local_path, gpg_key): s3_url += '.lzo' with tempfile.NamedTemporaryFile(mode='rwb') as tf: - pipeline = get_upload_pipeline(file(local_path, 'r'), tf, gpg_key=gpg_key) + pipeline = get_upload_pipeline( + open(local_path, 'r'), tf, gpg_key=gpg_key) pipeline.finish() tf.flush() - logger.info(msg='begin archiving a file', - detail=('Uploading "{local_path}" to "{s3_url}".' - .format(**locals()))) - clock_start = time.clock() tf.seek(0) k = uri_put_file(s3_url, tf) @@ -275,11 +272,9 @@ def do_lzop_s3_put(s3_url, local_path, gpg_key): kib_per_second = format_kib_per_second(clock_start, clock_finish, k.size) - logger.info( - msg='completed archiving to a file ', - detail=('Archiving to "{s3_url}" complete at ' - '{kib_per_second}KiB/s. ') - .format(s3_url=s3_url, kib_per_second=kib_per_second)) + + return kib_per_second + def write_and_close_thread(key, stream): try: @@ -288,6 +283,7 @@ def write_and_close_thread(key, stream): stream.flush() stream.close() + def do_lzop_s3_get(s3_url, path, decrypt): """ Get and decompress a S3 URL