diff --git a/datastore/datastore.py b/datastore/datastore.py index 4badd0f1..f9fefb38 100644 --- a/datastore/datastore.py +++ b/datastore/datastore.py @@ -150,7 +150,6 @@ def create(self, err_if_exists=True, **kwargs): self._connect() if self._engine == ELASTICSEARCH: - es_url = elastic_search.connect.get_es_url() create_map = [ # TODO: use namedtuples (True, ELASTICSEARCH_INDEX_1, ELASTICSEARCH_DOC_TYPE, self._store_name, self._check_doc_type_for_elasticsearch, elastic_search.create.create_entity_index), diff --git a/datastore/elastic_search/create.py b/datastore/elastic_search/create.py index 07a04e77..7b79e8e9 100644 --- a/datastore/elastic_search/create.py +++ b/datastore/elastic_search/create.py @@ -1,5 +1,5 @@ import logging -from typing import List, Dict, Any +from typing import List, Any from elasticsearch import Elasticsearch from elasticsearch.exceptions import NotFoundError @@ -9,6 +9,8 @@ log_prefix = 'datastore.elastic_search.create' +# TODO: `connection` should be called `client` + def exists(connection, index_name): # type: (Elasticsearch, str) -> bool """ @@ -58,16 +60,14 @@ def delete_index(connection, index_name, logger, err_if_does_not_exist=True, **k logger.debug('%s: Delete Index %s: Operation successfully completed', log_prefix, index_name) -def _create_index(connection, index_name, doc_type, logger, mapping_body, err_if_exists=True, **kwargs): - # type: (Elasticsearch, str, str, logging.Logger, Dict[str, Any], bool, **Any) -> None +def _create_index(connection, index_name, err_if_exists=True, **kwargs): + # type: (Elasticsearch, str, bool, **Any) -> None """ Creates an Elasticsearch index needed for similarity based searching + Args: connection: Elasticsearch client object index_name: The name of the index - doc_type: The type of the documents that will be indexed - logger: logging object to log at debug and exception level - mapping_body: dict, mappings to put on the index err_if_exists: if to raise error if the index already exists, defaults to True kwargs: master_timeout: Specify timeout for connection to master @@ -90,90 +90,74 @@ def _create_index(connection, index_name, doc_type, logger, mapping_body, err_if 'Datastore().delete()'.format(index_name)) else: return - try: - body = { - 'index': { - 'analysis': { - 'analyzer': { - 'my_analyzer': { - 'tokenizer': 'standard', - 'filter': ['standard', 'lowercase', 'my_stemmer'] - } - }, - 'filter': { - 'my_stemmer': { - 'type': 'stemmer', - 'name': 'english' - } + + body = { + 'index': { + 'analysis': { + 'analyzer': { + 'my_analyzer': { + 'tokenizer': 'standard', + 'filter': ['standard', 'lowercase', 'my_stemmer'] + } + }, + 'filter': { + 'my_stemmer': { + 'type': 'stemmer', + 'name': 'english' } } } } + } - # At this point in time, elasticsearch-py doesn't accept arbitrary kwargs, so we have to filter kwargs per - # method. Refer https://github.com/elastic/elasticsearch-py/blob/master/elasticsearch/client/indices.py - create_kwargs = filter_kwargs(kwargs=kwargs, - keep_kwargs_keys=['master_timeout', 'timeout', 'update_all_types', - 'wait_for_active_shards']) - connection.indices.create(index=index_name, body=body, **create_kwargs) - - put_mapping_kwargs = filter_kwargs(kwargs=kwargs, keep_kwargs_keys=['allow_no_indices', 'expand_wildcards', - 'ignore_unavailable', - 'master_timeout', 'timeout', - 'update_all_types']) - if doc_type: - connection.indices.put_mapping(body=mapping_body, index=index_name, doc_type=doc_type, - **put_mapping_kwargs) - else: - logger.debug('%s: doc_type not in arguments, skipping put_mapping on index ...' % log_prefix) - logger.debug('%s: Create Index %s: Operation successfully completed', log_prefix, index_name) - except Exception as e: - logger.exception('%s: Exception while creating index %s, Rolling back \n %s', log_prefix, index_name, e) - delete_index(connection=connection, index_name=index_name, logger=logger) - raise e + # At this point in time, elasticsearch-py doesn't accept arbitrary kwargs, so we have to filter kwargs per + # method. Refer https://github.com/elastic/elasticsearch-py/blob/master/elasticsearch/client/indices.py + valid_create_kwargs = ['master_timeout', 'timeout', 'update_all_types', 'wait_for_active_shards'] + create_kwargs = filter_kwargs(kwargs=kwargs, keep_kwargs_keys=valid_create_kwargs) + connection.indices.create(index=index_name, body=body, **create_kwargs) -def create_entity_index(connection, index_name, doc_type, logger, **kwargs): - # type: (Elasticsearch, str, str, logging.Logger, **Any) -> None +def _put_index_mapping(connection, index_name, doc_type, mapping_body, **kwargs): + # type: (Elasticsearch, str, str, Dict[str, Any], **Any) -> None """ - Creates an mapping specific to entity storage in elasticsearch and makes a call to create_index - to create the index with the given mapping body - Args: - connection: Elasticsearch client object - index_name: The name of the index - doc_type: The type of the documents that will be indexed + doc_type: The type of the documents that will be indexed logger: logging object to log at debug and exception level + mapping_body: dict, mappings to put on the index + Args: + connection: + index_name: + doc_type: **kwargs: - master_timeout: Specify timeout for connection to master - timeout: Explicit operation timeout - update_all_types: Whether to update the mapping for all fields with the same name across all types or not - wait_for_active_shards: Set the number of active shards to wait for before the operation returns. - doc_type: The name of the document type - allow_no_indices: Whether to ignore if a wildcard indices expression resolves into no concrete indices. - (This includes _all string or when no indices have been specified) - expand_wildcards: Whether to expand wildcard expression to concrete indices that are open, closed or both., - default 'open', valid choices are: 'open', 'closed', 'none', 'all' - ignore_unavailable: Whether specified concrete indices should be ignored when unavailable - (missing or closed) - Refer https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.client.IndicesClient.create - Refer https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.client.IndicesClient.put_mapping + Returns: + """ - mapping_body = { + + valid_mapping_kwargs = ['allow_no_indices', 'expand_wildcards', 'ignore_unavailable', 'master_timeout', + 'timeout', 'update_all_types'] + put_mapping_kwargs = filter_kwargs(kwargs=kwargs, keep_kwargs_keys=valid_mapping_kwargs) + connection.indices.put_mapping(body=mapping_body, index=index_name, doc_type=doc_type, **put_mapping_kwargs) + + +def _get_entity_index_mapping(doc_type): + return { doc_type: { 'properties': { + 'entity_data': { + 'type': 'text', + 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 32766}}, + }, 'language_script': { 'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}, }, 'value': { 'type': 'text', - 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}, + 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 32766}}, }, 'variants': { 'type': 'text', - 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}, - 'analyzer': 'my_analyzer', + 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 32766}}, 'norms': {'enabled': False}, # Needed if we want to give longer variants higher scores }, # other removed/unused fields, kept only for backward compatibility @@ -181,10 +165,6 @@ def create_entity_index(connection, index_name, doc_type, logger, **kwargs): 'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}, }, - 'entity_data': { - 'type': 'text', - 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}, - }, 'source_language': { 'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}, @@ -193,13 +173,12 @@ def create_entity_index(connection, index_name, doc_type, logger, **kwargs): } } - _create_index(connection, index_name, doc_type, logger, mapping_body, **kwargs) - -def create_crf_index(connection, index_name, doc_type, logger, **kwargs): +def create_entity_index(connection, index_name, doc_type, logger, **kwargs): # type: (Elasticsearch, str, str, logging.Logger, **Any) -> None """ - This method is used to create an index with mapping suited for story training_data + Creates an mapping specific to entity storage in elasticsearch and makes a call to create_index + to create the index with the given mapping body Args: connection: Elasticsearch client object index_name: The name of the index @@ -221,26 +200,76 @@ def create_crf_index(connection, index_name, doc_type, logger, **kwargs): Refer https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.client.IndicesClient.create Refer https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.client.IndicesClient.put_mapping """ - mapping_body = { + try: + _create_index(connection, index_name, **kwargs) + mapping = _get_entity_index_mapping(doc_type) + _put_index_mapping(connection, index_name, doc_type, mapping, **kwargs) + logger.debug('%s: Create Index %s: Operation successfully completed', log_prefix, index_name) + except Exception as e: + logger.exception('%s: Exception while creating index %s, Rolling back \n %s', log_prefix, index_name, e) + delete_index(connection=connection, index_name=index_name, logger=logger) + raise e + + +def _get_crf_index_mapping(doc_type): + return { doc_type: { 'properties': { 'entity_data': { - 'type': 'text' + 'type': 'text', + 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}, }, 'sentence': { - 'enabled': False + 'type': 'text', + 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 32766}}, }, 'entities': { - 'enabled': False + 'type': 'text', + 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 32766}}, }, 'language_script': { - 'type': 'text' + 'type': 'text', + 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}, } } } } - _create_index(connection, index_name, doc_type, logger, mapping_body, **kwargs) + +def create_crf_index(connection, index_name, doc_type, logger, **kwargs): + # type: (Elasticsearch, str, str, logging.Logger, **Any) -> None + """ + This method is used to create an index with mapping suited for story training_data + Args: + connection: Elasticsearch client object + index_name: The name of the index + doc_type: The type of the documents that will be indexed + logger: logging object to log at debug and exception level + **kwargs: + master_timeout: Specify timeout for connection to master + timeout: Explicit operation timeout + update_all_types: Whether to update the mapping for all fields with the same name across all types or not + wait_for_active_shards: Set the number of active shards to wait for before the operation returns. + doc_type: The name of the document type + allow_no_indices: Whether to ignore if a wildcard indices expression resolves into no concrete indices. + (This includes _all string or when no indices have been specified) + expand_wildcards: Whether to expand wildcard expression to concrete indices that are open, closed or both., + default 'open', valid choices are: 'open', 'closed', 'none', 'all' + ignore_unavailable: Whether specified concrete indices should be ignored when unavailable + (missing or closed) + + Refer https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.client.IndicesClient.create + Refer https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.client.IndicesClient.put_mapping + """ + try: + _create_index(connection, index_name, **kwargs) + mapping = _get_crf_index_mapping(doc_type) + _put_index_mapping(connection, index_name, doc_type, mapping, **kwargs) + logger.debug('%s: Create Index %s: Operation successfully completed', log_prefix, index_name) + except Exception as e: + logger.exception('%s: Exception while creating index %s, Rolling back \n %s', log_prefix, index_name, e) + delete_index(connection=connection, index_name=index_name, logger=logger) + raise e def create_alias(connection, index_list, alias_name, logger, **kwargs):