diff --git a/cmreslogging/handlers.py b/cmreslogging/handlers.py index 52e250a..08a70f1 100644 --- a/cmreslogging/handlers.py +++ b/cmreslogging/handlers.py @@ -75,6 +75,7 @@ class IndexNameFrequency(Enum): __DEFAULT_ES_DOC_TYPE = 'python_log' __DEFAULT_RAISE_ON_EXCEPTION = False __DEFAULT_TIMESTAMP_FIELD_NAME = "timestamp" + __DEFAULT_ES_INGEST_PIPELINE = None __LOGGING_FILTER_FIELDS = ['msecs', 'relativeCreated', @@ -138,7 +139,8 @@ def __init__(self, es_doc_type=__DEFAULT_ES_DOC_TYPE, es_additional_fields=__DEFAULT_ADDITIONAL_FIELDS, raise_on_indexing_exceptions=__DEFAULT_RAISE_ON_EXCEPTION, - default_timestamp_field_name=__DEFAULT_TIMESTAMP_FIELD_NAME): + default_timestamp_field_name=__DEFAULT_TIMESTAMP_FIELD_NAME, + es_ingest_pipeline=__DEFAULT_ES_INGEST_PIPELINE): """ Handler constructor :param hosts: The list of hosts that elasticsearch clients will connect. The list can be provided @@ -172,6 +174,8 @@ def __init__(self, to the logs, such the application, environment, etc. :param raise_on_indexing_exceptions: A boolean, True only for debugging purposes to raise exceptions caused when + :param es_ingest_pipeline: A string identifying an ingest pipeline on the elasticserach host. No pipeline + is used by default. :return: A ready to be used CMRESHandler. """ logging.Handler.__init__(self) @@ -194,6 +198,7 @@ def __init__(self, 'host_ip': socket.gethostbyname(socket.gethostname())}) self.raise_on_indexing_exceptions = raise_on_indexing_exceptions self.default_timestamp_field_name = default_timestamp_field_name + self.es_ingest_pipeline = es_ingest_pipeline self._client = None self._buffer = [] @@ -266,6 +271,15 @@ def test_es_source(self): """ return self.__get_es_client().ping() + def __get_bulk_metadata(self): + metadata = { + '_index': self._index_name_func.__func__(self.es_index_name), + '_type': self.es_doc_type + } + if self.es_ingest_pipeline is not None: + metadata['pipeline'] = self.es_ingest_pipeline + return metadata + @staticmethod def __get_es_datetime_str(timestamp): """ Returns elasticsearch utc formatted time for an epoch timestamp @@ -289,12 +303,12 @@ def flush(self): with self._buffer_lock: logs_buffer = self._buffer self._buffer = [] + metadata = self.__get_bulk_metadata() actions = ( - { - '_index': self._index_name_func.__func__(self.es_index_name), - '_type': self.es_doc_type, - '_source': log_record - } + dict( + _source=log_record, + **metadata + ) for log_record in logs_buffer ) eshelpers.bulk( diff --git a/tests/test_cmreshandler.py b/tests/test_cmreshandler.py index d07dfdd..6526b89 100644 --- a/tests/test_cmreshandler.py +++ b/tests/test_cmreshandler.py @@ -5,6 +5,7 @@ import sys sys.path.insert(0, os.path.abspath('.')) from cmreslogging.handlers import CMRESHandler +import elasticsearch class CMRESHandlerTestCase(unittest.TestCase): @@ -169,6 +170,37 @@ def test_index_name_frequency_functions(self): CMRESHandler._get_yearly_index_name(index_name) ) + def test_pipelined_insert(self): + index_name = 'pythontest' + handler = CMRESHandler(hosts=[{'host': self.getESHost(), 'port': self.getESPort()}], + auth_type=CMRESHandler.AuthType.NO_AUTH, + es_index_name=index_name, + use_ssl=False, + index_name_frequency=CMRESHandler.IndexNameFrequency.DAILY, + raise_on_indexing_exceptions=True, + es_ingest_pipeline='pythontestpipeline') + log = logging.getLogger("PythonTest") + log.setLevel(logging.DEBUG) + log.addHandler(handler) + + pipeline_definition = { + 'description': 'split multiline message pipeline', + 'processors': [ + { + 'split': { + 'field': 'message', + 'separator': '\n' + } + } + ] + } + + ingest_client = elasticsearch.client.IngestClient(handler._CMRESHandler__get_es_client()) + ingest_client.put_pipeline(id='pythontestpipeline', body=pipeline_definition) + log.info('multiline\nlog') + handler.flush() + self.assertEqual(0, len(handler._buffer)) + if __name__ == '__main__': unittest.main()