From 3eed367d71ec24a628614138ab7712494641f118 Mon Sep 17 00:00:00 2001 From: Gustavo Freitas Date: Tue, 25 May 2021 16:35:12 -0300 Subject: [PATCH 1/5] format and disable host_ip --- cmreslogging/handlers.py | 193 +++++++++++++++++++++++---------------- 1 file changed, 113 insertions(+), 80 deletions(-) diff --git a/cmreslogging/handlers.py b/cmreslogging/handlers.py index 52e250a..9f04328 100644 --- a/cmreslogging/handlers.py +++ b/cmreslogging/handlers.py @@ -11,12 +11,14 @@ try: from requests_kerberos import HTTPKerberosAuth, DISABLED + CMR_KERBEROS_SUPPORTED = True except ImportError: CMR_KERBEROS_SUPPORTED = False try: from requests_aws4auth import AWS4Auth + AWS4AUTH_SUPPORTED = True except ImportError: AWS4AUTH_SUPPORTED = False @@ -25,45 +27,47 @@ class CMRESHandler(logging.Handler): - """ Elasticsearch log handler + """Elasticsearch log handler Allows to log to elasticsearch into json format. All LogRecord fields are serialised and inserted """ class AuthType(Enum): - """ Authentication types supported + """Authentication types supported The handler supports - No authentication - Basic authentication - Kerberos or SSO authentication (on windows and linux) """ + NO_AUTH = 0 BASIC_AUTH = 1 KERBEROS_AUTH = 2 AWS_SIGNED_AUTH = 3 class IndexNameFrequency(Enum): - """ Index type supported + """Index type supported the handler supports - Daily indices - Weekly indices - Monthly indices - Year indices """ + DAILY = 0 WEEKLY = 1 MONTHLY = 2 YEARLY = 3 # Defaults for the class - __DEFAULT_ELASTICSEARCH_HOST = [{'host': 'localhost', 'port': 9200}] - __DEFAULT_AUTH_USER = '' - __DEFAULT_AUTH_PASSWD = '' - __DEFAULT_AWS_ACCESS_KEY = '' - __DEFAULT_AWS_SECRET_KEY = '' - __DEFAULT_AWS_REGION = '' + __DEFAULT_ELASTICSEARCH_HOST = [{"host": "localhost", "port": 9200}] + __DEFAULT_AUTH_USER = "" + __DEFAULT_AUTH_PASSWD = "" + __DEFAULT_AWS_ACCESS_KEY = "" + __DEFAULT_AWS_SECRET_KEY = "" + __DEFAULT_AWS_REGION = "" __DEFAULT_USE_SSL = False __DEFAULT_VERIFY_SSL = True __DEFAULT_AUTH_TYPE = AuthType.NO_AUTH @@ -71,75 +75,84 @@ class IndexNameFrequency(Enum): __DEFAULT_BUFFER_SIZE = 1000 __DEFAULT_FLUSH_FREQ_INSEC = 1 __DEFAULT_ADDITIONAL_FIELDS = {} - __DEFAULT_ES_INDEX_NAME = 'python_logger' - __DEFAULT_ES_DOC_TYPE = 'python_log' + __DEFAULT_ES_INDEX_NAME = "python_logger" + __DEFAULT_ES_DOC_TYPE = "python_log" __DEFAULT_RAISE_ON_EXCEPTION = False __DEFAULT_TIMESTAMP_FIELD_NAME = "timestamp" - __LOGGING_FILTER_FIELDS = ['msecs', - 'relativeCreated', - 'levelno', - 'created'] + __LOGGING_FILTER_FIELDS = ["msecs", "relativeCreated", "levelno", "created"] @staticmethod def _get_daily_index_name(es_index_name): - """ Returns elasticearch index name + """Returns elasticearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date. """ - return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m.%d')) + return "{0!s}-{1!s}".format( + es_index_name, datetime.datetime.now().strftime("%Y.%m.%d") + ) @staticmethod def _get_weekly_index_name(es_index_name): - """ Return elasticsearch index name + """Return elasticsearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date and specific week """ current_date = datetime.datetime.now() - start_of_the_week = current_date - datetime.timedelta(days=current_date.weekday()) - return "{0!s}-{1!s}".format(es_index_name, start_of_the_week.strftime('%Y.%m.%d')) + start_of_the_week = current_date - datetime.timedelta( + days=current_date.weekday() + ) + return "{0!s}-{1!s}".format( + es_index_name, start_of_the_week.strftime("%Y.%m.%d") + ) @staticmethod def _get_monthly_index_name(es_index_name): - """ Return elasticsearch index name + """Return elasticsearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date and specific moth """ - return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m')) + return "{0!s}-{1!s}".format( + es_index_name, datetime.datetime.now().strftime("%Y.%m") + ) @staticmethod def _get_yearly_index_name(es_index_name): - """ Return elasticsearch index name + """Return elasticsearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date and specific year """ - return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y')) + return "{0!s}-{1!s}".format( + es_index_name, datetime.datetime.now().strftime("%Y") + ) _INDEX_FREQUENCY_FUNCION_DICT = { IndexNameFrequency.DAILY: _get_daily_index_name, IndexNameFrequency.WEEKLY: _get_weekly_index_name, IndexNameFrequency.MONTHLY: _get_monthly_index_name, - IndexNameFrequency.YEARLY: _get_yearly_index_name + IndexNameFrequency.YEARLY: _get_yearly_index_name, } - def __init__(self, - hosts=__DEFAULT_ELASTICSEARCH_HOST, - auth_details=(__DEFAULT_AUTH_USER, __DEFAULT_AUTH_PASSWD), - aws_access_key=__DEFAULT_AWS_ACCESS_KEY, - aws_secret_key=__DEFAULT_AWS_SECRET_KEY, - aws_region=__DEFAULT_AWS_REGION, - auth_type=__DEFAULT_AUTH_TYPE, - use_ssl=__DEFAULT_USE_SSL, - verify_ssl=__DEFAULT_VERIFY_SSL, - buffer_size=__DEFAULT_BUFFER_SIZE, - flush_frequency_in_sec=__DEFAULT_FLUSH_FREQ_INSEC, - es_index_name=__DEFAULT_ES_INDEX_NAME, - index_name_frequency=__DEFAULT_INDEX_FREQUENCY, - es_doc_type=__DEFAULT_ES_DOC_TYPE, - es_additional_fields=__DEFAULT_ADDITIONAL_FIELDS, - raise_on_indexing_exceptions=__DEFAULT_RAISE_ON_EXCEPTION, - default_timestamp_field_name=__DEFAULT_TIMESTAMP_FIELD_NAME): - """ Handler constructor + def __init__( + self, + hosts=__DEFAULT_ELASTICSEARCH_HOST, + auth_details=(__DEFAULT_AUTH_USER, __DEFAULT_AUTH_PASSWD), + aws_access_key=__DEFAULT_AWS_ACCESS_KEY, + aws_secret_key=__DEFAULT_AWS_SECRET_KEY, + aws_region=__DEFAULT_AWS_REGION, + auth_type=__DEFAULT_AUTH_TYPE, + use_ssl=__DEFAULT_USE_SSL, + verify_ssl=__DEFAULT_VERIFY_SSL, + buffer_size=__DEFAULT_BUFFER_SIZE, + flush_frequency_in_sec=__DEFAULT_FLUSH_FREQ_INSEC, + es_index_name=__DEFAULT_ES_INDEX_NAME, + index_name_frequency=__DEFAULT_INDEX_FREQUENCY, + es_doc_type=__DEFAULT_ES_DOC_TYPE, + es_additional_fields=__DEFAULT_ADDITIONAL_FIELDS, + raise_on_indexing_exceptions=__DEFAULT_RAISE_ON_EXCEPTION, + default_timestamp_field_name=__DEFAULT_TIMESTAMP_FIELD_NAME, + ): + """Handler constructor :param hosts: The list of hosts that elasticsearch clients will connect. The list can be provided in the format ```[{'host':'host1','port':9200}, {'host':'host2','port':9200}]``` to @@ -190,8 +203,11 @@ def __init__(self, self.index_name_frequency = index_name_frequency self.es_doc_type = es_doc_type self.es_additional_fields = es_additional_fields.copy() - self.es_additional_fields.update({'host': socket.gethostname(), - 'host_ip': socket.gethostbyname(socket.gethostname())}) + + # disable it from elasticsearch log + # self.es_additional_fields.update({'host': socket.gethostname(), + # 'host_ip': socket.gethostbyname(socket.gethostname())}) + self.raise_on_indexing_exceptions = raise_on_indexing_exceptions self.default_timestamp_field_name = default_timestamp_field_name @@ -199,7 +215,9 @@ def __init__(self, self._buffer = [] self._buffer_lock = Lock() self._timer = None - self._index_name_func = CMRESHandler._INDEX_FREQUENCY_FUNCION_DICT[self.index_name_frequency] + self._index_name_func = CMRESHandler._INDEX_FREQUENCY_FUNCION_DICT[ + self.index_name_frequency + ] self.serializer = CMRESSerializer() def __schedule_flush(self): @@ -211,53 +229,65 @@ def __schedule_flush(self): def __get_es_client(self): if self.auth_type == CMRESHandler.AuthType.NO_AUTH: if self._client is None: - self._client = Elasticsearch(hosts=self.hosts, - use_ssl=self.use_ssl, - verify_certs=self.verify_certs, - connection_class=RequestsHttpConnection, - serializer=self.serializer) + self._client = Elasticsearch( + hosts=self.hosts, + use_ssl=self.use_ssl, + verify_certs=self.verify_certs, + connection_class=RequestsHttpConnection, + serializer=self.serializer, + ) return self._client if self.auth_type == CMRESHandler.AuthType.BASIC_AUTH: if self._client is None: - return Elasticsearch(hosts=self.hosts, - http_auth=self.auth_details, - use_ssl=self.use_ssl, - verify_certs=self.verify_certs, - connection_class=RequestsHttpConnection, - serializer=self.serializer) + return Elasticsearch( + hosts=self.hosts, + http_auth=self.auth_details, + use_ssl=self.use_ssl, + verify_certs=self.verify_certs, + connection_class=RequestsHttpConnection, + serializer=self.serializer, + ) return self._client if self.auth_type == CMRESHandler.AuthType.KERBEROS_AUTH: if not CMR_KERBEROS_SUPPORTED: - raise EnvironmentError("Kerberos module not available. Please install \"requests-kerberos\"") + raise EnvironmentError( + 'Kerberos module not available. Please install "requests-kerberos"' + ) # For kerberos we return a new client each time to make sure the tokens are up to date - return Elasticsearch(hosts=self.hosts, - use_ssl=self.use_ssl, - verify_certs=self.verify_certs, - connection_class=RequestsHttpConnection, - http_auth=HTTPKerberosAuth(mutual_authentication=DISABLED), - serializer=self.serializer) + return Elasticsearch( + hosts=self.hosts, + use_ssl=self.use_ssl, + verify_certs=self.verify_certs, + connection_class=RequestsHttpConnection, + http_auth=HTTPKerberosAuth(mutual_authentication=DISABLED), + serializer=self.serializer, + ) if self.auth_type == CMRESHandler.AuthType.AWS_SIGNED_AUTH: if not AWS4AUTH_SUPPORTED: - raise EnvironmentError("AWS4Auth not available. Please install \"requests-aws4auth\"") + raise EnvironmentError( + 'AWS4Auth not available. Please install "requests-aws4auth"' + ) if self._client is None: - awsauth = AWS4Auth(self.aws_access_key, self.aws_secret_key, self.aws_region, 'es') + awsauth = AWS4Auth( + self.aws_access_key, self.aws_secret_key, self.aws_region, "es" + ) self._client = Elasticsearch( hosts=self.hosts, http_auth=awsauth, use_ssl=self.use_ssl, verify_certs=True, connection_class=RequestsHttpConnection, - serializer=self.serializer + serializer=self.serializer, ) return self._client raise ValueError("Authentication method not supported") def test_es_source(self): - """ Returns True if the handler can ping the Elasticsearch servers + """Returns True if the handler can ping the Elasticsearch servers Can be used to confirm the setup of a handler has been properly done and confirm that things like the authentication is working properly @@ -268,16 +298,19 @@ def test_es_source(self): @staticmethod def __get_es_datetime_str(timestamp): - """ Returns elasticsearch utc formatted time for an epoch timestamp + """Returns elasticsearch utc formatted time for an epoch timestamp :param timestamp: epoch, including milliseconds :return: A string valid for elasticsearch time record """ current_date = datetime.datetime.utcfromtimestamp(timestamp) - return "{0!s}.{1:03d}Z".format(current_date.strftime('%Y-%m-%dT%H:%M:%S'), int(current_date.microsecond / 1000)) + return "{0!s}.{1:03d}Z".format( + current_date.strftime("%Y-%m-%dT%H:%M:%S"), + int(current_date.microsecond / 1000), + ) def flush(self): - """ Flushes the buffer into ES + """Flushes the buffer into ES :return: None """ if self._timer is not None and self._timer.is_alive(): @@ -291,23 +324,21 @@ def flush(self): self._buffer = [] actions = ( { - '_index': self._index_name_func.__func__(self.es_index_name), - '_type': self.es_doc_type, - '_source': log_record + "_index": self._index_name_func.__func__(self.es_index_name), + "_type": self.es_doc_type, + "_source": log_record, } for log_record in logs_buffer ) eshelpers.bulk( - client=self.__get_es_client(), - actions=actions, - stats_only=True + client=self.__get_es_client(), actions=actions, stats_only=True ) except Exception as exception: if self.raise_on_indexing_exceptions: raise exception def close(self): - """ Flushes the buffer and release any outstanding resource + """Flushes the buffer and release any outstanding resource :return: None """ @@ -316,7 +347,7 @@ def close(self): self._timer = None def emit(self, record): - """ Emit overrides the abstract logging.Handler logRecord emit method + """Emit overrides the abstract logging.Handler logRecord emit method Format and records the log @@ -331,7 +362,9 @@ def emit(self, record): if key == "args": value = tuple(str(arg) for arg in value) rec[key] = "" if value is None else value - rec[self.default_timestamp_field_name] = self.__get_es_datetime_str(record.created) + rec[self.default_timestamp_field_name] = self.__get_es_datetime_str( + record.created + ) with self._buffer_lock: self._buffer.append(rec) From e95d2e7291a52b4846bb1dee71c6dd8add383a7f Mon Sep 17 00:00:00 2001 From: Gustavo Freitas Date: Tue, 25 May 2021 16:49:23 -0300 Subject: [PATCH 2/5] test remove fields --- cmreslogging/handlers.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmreslogging/handlers.py b/cmreslogging/handlers.py index 9f04328..97a4306 100644 --- a/cmreslogging/handlers.py +++ b/cmreslogging/handlers.py @@ -151,6 +151,7 @@ def __init__( es_additional_fields=__DEFAULT_ADDITIONAL_FIELDS, raise_on_indexing_exceptions=__DEFAULT_RAISE_ON_EXCEPTION, default_timestamp_field_name=__DEFAULT_TIMESTAMP_FIELD_NAME, + disabled_fields=[], ): """Handler constructor @@ -185,6 +186,7 @@ def __init__( to the logs, such the application, environment, etc. :param raise_on_indexing_exceptions: A boolean, True only for debugging purposes to raise exceptions caused when + :param disabled_fields: A array, by default empty, but if you want to remove any key , put here :return: A ready to be used CMRESHandler. """ logging.Handler.__init__(self) @@ -203,6 +205,7 @@ def __init__( self.index_name_frequency = index_name_frequency self.es_doc_type = es_doc_type self.es_additional_fields = es_additional_fields.copy() + self.disabled_fields = disabled_fields # disable it from elasticsearch log # self.es_additional_fields.update({'host': socket.gethostname(), @@ -321,6 +324,8 @@ def flush(self): try: with self._buffer_lock: logs_buffer = self._buffer + print(f"gustavo: => {logs_buffer}") + print(f"disable fields => {self.disabled_fields}") self._buffer = [] actions = ( { From ca5748c47921a24e5a3e7eb14e77986b457c79ce Mon Sep 17 00:00:00 2001 From: Gustavo Freitas Date: Tue, 25 May 2021 17:14:53 -0300 Subject: [PATCH 3/5] add disabled fields --- cmreslogging/handlers.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/cmreslogging/handlers.py b/cmreslogging/handlers.py index 97a4306..99260ab 100644 --- a/cmreslogging/handlers.py +++ b/cmreslogging/handlers.py @@ -151,7 +151,7 @@ def __init__( es_additional_fields=__DEFAULT_ADDITIONAL_FIELDS, raise_on_indexing_exceptions=__DEFAULT_RAISE_ON_EXCEPTION, default_timestamp_field_name=__DEFAULT_TIMESTAMP_FIELD_NAME, - disabled_fields=[], + disabled_fields: tuple = (), ): """Handler constructor @@ -186,7 +186,7 @@ def __init__( to the logs, such the application, environment, etc. :param raise_on_indexing_exceptions: A boolean, True only for debugging purposes to raise exceptions caused when - :param disabled_fields: A array, by default empty, but if you want to remove any key , put here + :param disabled_fields: A tuple, by default empty, but if you want to remove any key , put here :return: A ready to be used CMRESHandler. """ logging.Handler.__init__(self) @@ -323,9 +323,22 @@ def flush(self): if self._buffer: try: with self._buffer_lock: - logs_buffer = self._buffer - print(f"gustavo: => {logs_buffer}") - print(f"disable fields => {self.disabled_fields}") + if not self.disabled_fields: + logs_buffer = self._buffer + else: + logs_buffer = [] + + for d in self._buffer: + result = map( + d.__delitem__, + filter(d.__contains__, self.disabled_fields), + ) + logs_buffer.append(result) + # print(d) + + # print(f"gustavo: => {logs_buffer}") + # print(f"disable fields => {self.disabled_fields}") + self._buffer = [] actions = ( { From ae2556f9f4a4f3bedb81825eaddf426b1fac5e49 Mon Sep 17 00:00:00 2001 From: Gustavo Freitas Date: Tue, 25 May 2021 17:15:59 -0300 Subject: [PATCH 4/5] 1.0.1 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 08baacc..bb0c350 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ # Versions should comply with PEP440. For a discussion on single-sourcing # the version across setup.py and the project code, see # https://packaging.python.org/en/latest/single_source_version.html - version='1.0.0', + version='1.0.1', description='Elasticsearch Log handler for the logging library', long_description=long_description, From 5e50f2ef882013ac399bcdcb28987e411121ba6b Mon Sep 17 00:00:00 2001 From: Gustavo Freitas Date: Wed, 26 May 2021 07:04:28 -0300 Subject: [PATCH 5/5] fix del_all --- cmreslogging/handlers.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/cmreslogging/handlers.py b/cmreslogging/handlers.py index 99260ab..4935f44 100644 --- a/cmreslogging/handlers.py +++ b/cmreslogging/handlers.py @@ -312,6 +312,13 @@ def __get_es_datetime_str(timestamp): int(current_date.microsecond / 1000), ) + def del_all(self, mapping): + """Remove list of elements from mapping. + :return: list""" + for key in self.disabled_fields: + del mapping[key] + return mapping + def flush(self): """Flushes the buffer into ES :return: None @@ -326,18 +333,7 @@ def flush(self): if not self.disabled_fields: logs_buffer = self._buffer else: - logs_buffer = [] - - for d in self._buffer: - result = map( - d.__delitem__, - filter(d.__contains__, self.disabled_fields), - ) - logs_buffer.append(result) - # print(d) - - # print(f"gustavo: => {logs_buffer}") - # print(f"disable fields => {self.disabled_fields}") + logs_buffer = [self.del_all(i) for i in self._buffer] self._buffer = [] actions = (