From 1de6db1fa30954abe01fa9e02e0c39a0aa138985 Mon Sep 17 00:00:00 2001 From: Hector Castro Date: Tue, 9 Jun 2015 22:17:23 -0400 Subject: [PATCH 1/4] Add channel support to Redis transport This changeset adds support for publishing log entries to a Redis channel, which is also supported by Logstash's Redis input. Beaver configuration files can now supply a `redis_data_type` key. Valid values for this key are `list` and `channel`. If left unset, the default is `list`. Attempts to resolve #266. --- beaver/config.py | 1 + beaver/transports/redis_transport.py | 20 ++++++++++++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/beaver/config.py b/beaver/config.py index bb37d150..8c46f02c 100644 --- a/beaver/config.py +++ b/beaver/config.py @@ -87,6 +87,7 @@ def __init__(self, args, logger=None): 'rabbitmq_delivery_mode': 1, 'redis_url': os.environ.get('REDIS_URL', 'redis://localhost:6379/0'), 'redis_namespace': os.environ.get('REDIS_NAMESPACE', 'logstash:beaver'), + 'redis_data_type': os.environ.get('REDIS_DATA_TYPE', 'list'), 'redis_password': '', 'sqs_aws_access_key': '', 'sqs_aws_secret_key': '', diff --git a/beaver/transports/redis_transport.py b/beaver/transports/redis_transport.py index 06062c90..dfe97ae1 100644 --- a/beaver/transports/redis_transport.py +++ b/beaver/transports/redis_transport.py @@ -24,6 +24,7 @@ def __init__(self, beaver_config, logger=None): ) self._namespace = beaver_config.get('redis_namespace') + self._data_type = beaver_config.get('redis_data_type') self._current_server_index = 0 self._check_connections() @@ -75,16 +76,27 @@ def callback(self, filename, lines, **kwargs): namespace = self._namespace self._logger.debug('Got namespace: ' + namespace) + data_type = self._data_type + self._logger.debug('Got data type: ' + data_type) + server = self._get_next_server() self._logger.debug('Got redis server: ' + server['url']) pipeline = server['redis'].pipeline(transaction=False) for line in lines: - pipeline.rpush( - namespace, - self.format(filename, line, timestamp, **kwargs) - ) + if data_type == 'list': + pipeline.rpush( + namespace, + self.format(filename, line, timestamp, **kwargs) + ) + elif data_type == 'channel': + pipeline.publish( + namespace, + self.format(filename, line, timestamp, **kwargs) + ) + else: + raise TransportException('Unknown Redis data type') try: pipeline.execute() From b970bda7e6592f88f9cd6a16a4d572c3578c06cf Mon Sep 17 00:00:00 2001 From: Hector Castro Date: Wed, 10 Jun 2015 08:50:53 -0400 Subject: [PATCH 2/4] Update Redis transport usage documentation --- docs/user/usage.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/user/usage.rst b/docs/user/usage.rst index 6192634e..27b5ae77 100644 --- a/docs/user/usage.rst +++ b/docs/user/usage.rst @@ -75,6 +75,7 @@ Beaver can optionally get data from a ``configfile`` using the ``-c`` flag. This * rabbitmq_delivery_mode: Default ``1``. Message deliveryMode. 1: non persistent 2: persistent * redis_url: Default ``redis://localhost:6379/0``. Comma separated redis URLs * redis_namespace: Default ``logstash:beaver``. Redis key namespace +* redis_data_type: Default ``list``, but can also be ``channel``. Redis data type used for transporting log messages * sqs_aws_access_key: Can be left blank to use IAM Roles or AWS_ACCESS_KEY_ID environment variable (see: https://github.com/boto/boto#getting-started-with-boto) * sqs_aws_secret_key: Can be left blank to use IAM Roles or AWS_SECRET_ACCESS_KEY environment variable (see: https://github.com/boto/boto#getting-started-with-boto) * sqs_aws_region: Default ``us-east-1``. AWS Region From 94812db5e13a95bf20da26bc616e6f12973deff4 Mon Sep 17 00:00:00 2001 From: Hector Castro Date: Wed, 10 Jun 2015 13:09:04 -0400 Subject: [PATCH 3/4] Move data type method conditional outside of loop --- beaver/transports/redis_transport.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/beaver/transports/redis_transport.py b/beaver/transports/redis_transport.py index dfe97ae1..b02db5e6 100644 --- a/beaver/transports/redis_transport.py +++ b/beaver/transports/redis_transport.py @@ -84,19 +84,18 @@ def callback(self, filename, lines, **kwargs): pipeline = server['redis'].pipeline(transaction=False) + if data_type == 'list': + data_type_method = pipeline.rpush + elif data_type == 'channel': + data_type_method = pipeline.publish + else: + raise TransportException('Unknown Redis data type') + for line in lines: - if data_type == 'list': - pipeline.rpush( - namespace, - self.format(filename, line, timestamp, **kwargs) - ) - elif data_type == 'channel': - pipeline.publish( - namespace, - self.format(filename, line, timestamp, **kwargs) - ) - else: - raise TransportException('Unknown Redis data type') + data_type_method( + namespace, + self.format(filename, line, timestamp, **kwargs) + ) try: pipeline.execute() From 39ee609fd9e4d273eb9f3b68a09514c6f9dedf45 Mon Sep 17 00:00:00 2001 From: Hector Castro Date: Wed, 10 Jun 2015 16:05:05 -0400 Subject: [PATCH 4/4] Add constants for data types, validate in init, use callback map --- beaver/transports/redis_transport.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/beaver/transports/redis_transport.py b/beaver/transports/redis_transport.py index b02db5e6..47ae0f49 100644 --- a/beaver/transports/redis_transport.py +++ b/beaver/transports/redis_transport.py @@ -8,6 +8,8 @@ class RedisTransport(BaseTransport): + LIST_DATA_TYPE = 'list' + CHANNEL_DATA_TYPE = 'channel' def __init__(self, beaver_config, logger=None): super(RedisTransport, self).__init__(beaver_config, logger=logger) @@ -24,9 +26,13 @@ def __init__(self, beaver_config, logger=None): ) self._namespace = beaver_config.get('redis_namespace') - self._data_type = beaver_config.get('redis_data_type') self._current_server_index = 0 + self._data_type = beaver_config.get('redis_data_type') + if self._data_type not in [self.LIST_DATA_TYPE, + self.CHANNEL_DATA_TYPE]: + raise TransportException('Unknown Redis data type') + self._check_connections() def _check_connections(self): @@ -84,15 +90,14 @@ def callback(self, filename, lines, **kwargs): pipeline = server['redis'].pipeline(transaction=False) - if data_type == 'list': - data_type_method = pipeline.rpush - elif data_type == 'channel': - data_type_method = pipeline.publish - else: - raise TransportException('Unknown Redis data type') + callback_map = { + self.LIST_DATA_TYPE: pipeline.rpush, + self.CHANNEL_DATA_TYPE: pipeline.publish, + } + callback_method = callback_map[data_type] for line in lines: - data_type_method( + callback_method( namespace, self.format(filename, line, timestamp, **kwargs) )