From 7e07dfccf0d43773e04ea8416ea25589be094a2b Mon Sep 17 00:00:00 2001 From: vitaliyok Date: Mon, 14 Jul 2025 12:42:19 +0100 Subject: [PATCH 01/10] Updated ES search functions and authentication --- cogstack.py | 564 +++++++++++++++++++++++++++++------ credentials.py | 13 +- search/search_template.ipynb | 121 ++++++-- 3 files changed, 565 insertions(+), 133 deletions(-) diff --git a/cogstack.py b/cogstack.py index 070c84d..3285705 100644 --- a/cogstack.py +++ b/cogstack.py @@ -1,158 +1,528 @@ import getpass -from typing import Dict, List, Any, Optional, Iterable, Tuple +import traceback +from typing import Dict, List, Any, Optional, Iterable, Sequence, Tuple import elasticsearch -import elasticsearch.helpers +import elasticsearch.helpers as es_helpers +from IPython.display import display, HTML import pandas as pd -from tqdm.notebook import tqdm -import eland as ed +import tqdm +#import eland as ed import warnings warnings.filterwarnings("ignore") from credentials import * - class CogStack(object): """ A class for interacting with Elasticsearch. - Args: - hosts (List[str]): A list of Elasticsearch host URLs. - username (str, optional): The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username. - password (str, optional): The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password. - api (bool, optional): A boolean value indicating whether to use API keys or basic authentication to connect to Elasticsearch. Defaults to False (i.e., use basic authentication). Elasticsearch 7.17. - api_key (str, optional): The API key to use when connecting to Elasticsearch. - When provided along with `api=True`, this takes precedence over username/password. Only available when using Elasticsearch 8.17. + Parameters + ------------ + hosts : List[str] + A list of Elasticsearch host URLs. + username : str, optional + The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username. + password : str, optional + The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password. + apiKey : Dict, optional + API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticseach or Kibana + + Example: + .. code-block:: json + { + "id": "API_KEY_ID", + "api_key": "API_KEY", + "encoded": "API_KEY_ENCODED_STRING" + } """ def __init__(self, hosts: List, username: Optional[str] = None, password: Optional[str] = None, - api: bool = False, timeout: Optional[int]=60, api_key: Optional[str] = None): - - if api_key and api: - self.elastic = elasticsearch.Elasticsearch(hosts=hosts, - api_key=api_key, - verify_certs=False, - timeout=timeout) + apiKey: Dict = None): - - elif api: - api_username, api_password = self._check_auth_details(username, password) + if apiKey is not None: + encoded = apiKey["encoded"] if "encoded" in apiKey.keys() else input("Encoded API key: ") + hasEncodedValue = encoded is not None and encoded != '' + + if(not hasEncodedValue): + + api_Id = apiKey["id"] if "id" in apiKey.keys() else input("API Id: ") + api_key = apiKey["api_key"] if "api_key" in apiKey.keys() else getpass.getpass("API Key: ") + + self.elastic = elasticsearch.Elasticsearch(hosts=hosts, - api_key=(api_username, api_password), - verify_certs=False, - timeout=timeout) - + api_key= encoded if hasEncodedValue else (api_Id, api_key), + verify_certs=False) else: username, password = self._check_auth_details(username, password) self.elastic = elasticsearch.Elasticsearch(hosts=hosts, basic_auth=(username, password), - verify_certs=False, - timeout=timeout) - + verify_certs=False) + self.elastic._request_timeout = 300 def _check_auth_details(self, username=None, password=None) -> Tuple[str, str]: """ Prompt the user for a username and password if the values are not provided as function arguments. - Args: - api_username (str, optional): The API username. If not provided, the user will be prompted to enter a username. - api_password (str, optional): The API password. If not provided, the user will be prompted to enter a password. + Parameters + ------------ + username : str, optional + The API username. If not provided, the user will be prompted to enter a username. + password : str, optional + The API password. If not provided, the user will be prompted to enter a password. - Returns: - Tuple[str, str]: A tuple containing the API username and password. + Returns + ---------- + Tuple[str, str]: + A tuple containing the API username and password. """ if username is None: username = input("Username: ") if password is None: password = getpass.getpass("Password: ") return username, password + + def get_indices_and_aliases(self): + """ + Retrieve indices and their aliases + + Returns: + --------- + A table of indices and aliases to use in subsequent queries + """ + all_aliases = self.elastic.indices.get_alias().body + index_aliases_coll = [] + for index in all_aliases: + index_aliases = dict() + index_aliases['Index'] = index + aliases=[] + for alias in all_aliases[index]['aliases']: + aliases.append(alias) + index_aliases['Aliases'] = ', '.join(aliases) + index_aliases_coll.append(index_aliases) + with pd.option_context('display.max_colwidth', None): + return pd.DataFrame(index_aliases_coll, columns=['Index', 'Aliases']) - def get_docs_generator(self, index: List, query: Dict, es_gen_size: int=800, request_timeout: Optional[int] = 300): + def get_index_fields(self, index: str | Sequence[str]): """ - Retrieve a generator object that can be used to iterate through documents in an Elasticsearch index. + Retrieve indices and their fields with data type + + Parameters + ---------- + index: str | Sequence[str] + Name(s) of indices or aliases for which the list of fields is retrieved + + Returns + ---------- + pandas.DataFrame + A DataFrame containing index names and their fields with data types - Args: - index (List[str]): A list of Elasticsearch index names to search. - query (Dict): A dictionary containing the search query parameters. - es_gen_size (int, optional): The number of documents to retrieve per batch. Defaults to 800. - request_timeout (int, optional): The time in seconds to wait for a response from Elasticsearch before timing out. Defaults to 300. + Raises + ------ + Exception + If the operation fails for any reason. + """ + try: + all_mappings = self.elastic.indices.get_mapping(index=index, allow_no_indices=False).body + columns= ['Field', 'Type'] + if (isinstance(index, List)): + columns.insert(0, 'Index') + index_mappings_coll = [] + for index in all_mappings: + for property in all_mappings[index]['mappings']['properties']: + index_mapping = dict() + index_mapping['Index'] = index + index_mapping['Field'] = property + index_mapping['Type'] = all_mappings[index]['mappings']['properties'][property]['type'] if "type" in all_mappings[index]['mappings']['properties'][property].keys() else '?' + index_mappings_coll.append(index_mapping) + except Exception as err: + raise Exception(f"Unexpected {err=}, {type(err)=}") + with pd.option_context('display.max_rows', len(index_mappings_coll) + 1): + return display(pd.DataFrame(data= index_mappings_coll, columns=columns)) - Returns: - generator: A generator object that can be used to iterate through the documents in the specified Elasticsearch index. - """ - docs_generator = elasticsearch.helpers.scan(self.elastic, - query=query, - index=index, - size=es_gen_size, - request_timeout=request_timeout) - return docs_generator - - def cogstack2df(self, query: Dict, index: str, column_headers=None, es_gen_size: int=800, request_timeout: int=300, - show_progress: bool = True): + + def count_search_results(self, index: str | Sequence[str], query: dict): + """ + Count number of documents returned by the query + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch indices or their aliases to search. + + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other query options which will be ignored + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + .. code-block:: json + {"match": {"title": "python"}}} + """ + query = self.__extract_query(query=query) + count = self.elastic.count(index=index, query=query)['count'] + return f"Number of documents: {format(count, ',')}" + + def read_data_with_scan(self, + index: str, + query: dict, + include_fields: list[str]=None, + size: int=1000, + request_timeout: int=300, + show_progress: bool = True): """ - Retrieve documents from an Elasticsearch index and convert them to a Pandas DataFrame. + Retrieve documents from an Elasticsearch index using search query and elasticsearch scan helper function. + The function converts search results to a Pandas DataFrame and does not return current scroll id if the process fails. - Args: - query (Dict): A dictionary containing the search query parameters. - index (str): The name of the Elasticsearch index to search. - column_headers (List[str], optional): A list of column headers to use for the DataFrame. If not provided, the DataFrame will have default column names. - es_gen_size (int, optional): The number of documents to retrieve per batch. Defaults to 800. - request_timeout (int, optional): The time in seconds to wait for a response from Elasticsearch before timing out. Defaults to 300. - show_progress (bool, optional): Whether to show the progress in console. Defaults to true. + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch indices or their aliases to search. + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other query options which will be used in the search - Returns: - pandas.DataFrame: A DataFrame containing the retrieved documents. + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block (preferred method to avoid clashing with other parameters) + .. code-block:: json + {"match": {"title": "python"}}} + + include_fields : list[str], optional + A list of fields to be included in search results and presented as columns in the DataFrame. + If not provided, only _index, _id and _score fields will be included. + Columns _index, _id, _score are present in all search results + size : int, optional, default = 1000 + The number of documents to be returned by the query or scroll API during each iteration. + MAX: 10,000. + request_timeout : int, optional, default=300 + The time in seconds to wait for a response from Elasticsearch before timing out. + show_progress : bool, optional, default=True + Whether to show the progress in console. + Returns + ------ + pandas.DataFrame + A DataFrame containing the retrieved documents. + + Raises + ------ + Exception + If the search fails or cancelled by the user. + """ + try: + + self.__validate_size(size=size) + if "query" not in query.keys(): + temp_query = query.copy() + query.clear() + query["query"] = temp_query + + docs_generator = es_helpers.scan(self.elastic, + index=index, + query=query, + size=size, + request_timeout=request_timeout, + source=False, + fields = include_fields) + all_mapped_results = [] + results = self.elastic.count(index=index, query=query["query"], request_timeout=request_timeout) + pr_bar = tqdm.tqdm(docs_generator, total=results["count"], desc="CogStack retrieved...", disable=not show_progress, colour='green') + all_mapped_results = self.__map_search_results(hits=pr_bar) + except BaseException as err: + if(type(err) is KeyboardInterrupt): + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", "\033[0;33m", "\033[0;33m") + pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) + print("Request cancelled and current search_scroll_id deleted...") + else: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") + pr_bar.set_description("CogStack read failed! Processed", refresh=True) + print(Exception(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}")) + finally: + return self.__create_dataframe(all_mapped_results, include_fields) + + + def read_data_with_scroll(self, + index: str | Sequence[str], + query: dict, + include_fields:Optional[list[str]]=None, + size: int=1000, + search_scroll_id: Optional[str] = None, + request_timeout: Optional[int]=300, + show_progress: Optional[bool] = True): + + """ + Retrieves documents from an Elasticsearch index using search query and scroll API. + The function converts search results to a Pandas DataFrame. + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch indices or their aliases to search. + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other query options which will be ignored + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + .. code-block:: json + {"match": {"title": "python"}}} + include_fields : list[str], optional + A list of fields to be included in search results and presented as columns in the DataFrame. + If not provided, only _index, _id and _score fields will be included. + Columns _index, _id, _score are present in all search results + size : int, optional, default = 1000 + The number of documents to be returned by the query or scroll API during each iteration. + MAX: 10,000. + search_scroll_id : str, optional + The value of the last scroll_id returned by scroll API and used to continue the search if the current search fails. + The value of scroll_id times out after 10 minutes. After which the search will have to be restarted. + Note: Absence of this parameter indicates a new search. + request_timeout : int, optional, default=300 + The time in seconds to wait for a response from Elasticsearch before timing out. + show_progress : bool, optional, default=True + Whether to show the progress in console. + IMPORTANT: The progress bar displays the total hits for the query even if continuing the search using `search_scroll_id`. + Returns + ------ + pandas.DataFrame + A DataFrame containing the retrieved documents. + + Raises + ------ + Exception + If the search fails or cancelled by the user. + If the search fails, error message includes the value of current `search_scroll_id` which can be used as a function parameter to continue the search. + IMPORTANT: If the function fails after `scroll` request, the subsequent request will skip results of the failed scroll by the value of `size` parameter. + """ + try: + self.__validate_size(size=size) + query = self.__extract_query(query=query) + result_count = size + all_mapped_results =[] + resp=None + pr_bar = tqdm.tqdm(desc="CogStack retrieved...", disable=not show_progress, colour='green') + + if search_scroll_id is None: + resp = self.elastic.search(index=index, + size=size, + query=query, + fields=include_fields, + source=False, + scroll="10m", + timeout=f"{request_timeout}s", + rest_total_hits_as_int=True) + + pr_bar.total = resp.body['hits']['total'] + hits = resp.body['hits']['hits'] + result_count = len(hits) + search_scroll_id = resp.body['_scroll_id'] + all_mapped_results.extend(self.__map_search_results(hits=hits)) + pr_bar.update(len(hits)) + + while search_scroll_id and result_count == size: + # Perform ES scroll request + resp = self.elastic.scroll(scroll_id=search_scroll_id, scroll="10m", rest_total_hits_as_int=True) + hits = resp.body['hits']['hits'] + pr_bar.total = pr_bar.total if pr_bar.total else resp.body['hits']['total'] + all_mapped_results.extend(self.__map_search_results(hits=hits)) + search_scroll_id = resp.body['_scroll_id'] + result_count = len(hits) + pr_bar.update(result_count) + + self.elastic.clear_scroll(scroll_id = search_scroll_id) + except BaseException as err: + if(type(err) is KeyboardInterrupt): + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", "\033[0;33m", "\033[0;33m") + pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) + self.elastic.clear_scroll(scroll_id = search_scroll_id) + print("Request cancelled and current search_scroll_id deleted...") + else: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") + pr_bar.set_description("CogStack read failed! Processed", refresh=True) + print(Exception(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}"), f"{search_scroll_id=}", sep='\n') + finally: + return self.__create_dataframe(all_mapped_results, include_fields) + + + def read_data_with_sorting(self, + index: str | Sequence[str], + query: dict, + include_fields: Optional[list[str]]=None, + size: Optional[int]=1000, + sort: Optional[dict|list[str]] = {"id": "asc"}, + search_after: Optional[list[str|int|float|Any|None]] = None, + request_timeout: Optional[int]=300, + show_progress: Optional[bool] = True): + """ + Retrieve documents from an Elasticsearch index using search query and convert them to a Pandas DataFrame. + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch indices or their aliases to search. + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other query options which will be ignored + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + .. code-block:: json + {"match": {"title": "python"}}} + include_fields : list[str], optional + A list of fields to be included in search results and presented as columns in the DataFrame. + If not provided, only _index, _id and _score fields will be included. + Columns _index, _id, _score are present in all search results + size : int, optional, default = 1000 + The number of documents to be returned by the query or scroll API during each iteration. + MAX: 10,000. + sort : dict|list[str], optional, default = {"id": "asc"} + Sort field name(s) and order (`asc` or `desc`) in dictionary format or list of field names without order. + `{"id":"asc"}` or `id` is added if not provided as a tiebreaker field. + Default sorting order is `asc` + >Example: + - `dict : {"filed_Name" : "desc", "id" : "asc"}` + - `list : ["filed_Name", "id"]` + search_after : list[str|int|float|Any|None], optional + The sort value of the last record in search results. + Can be provided if the a search fails and needs to be restarted from the last successful search. + Use the value of `search_after_value` from the error message + request_timeout : int, optional, default = 300 + The time in seconds to wait for a response from Elasticsearch before timing out. + show_progress : bool, optional + Whether to show the progress in console. Defaults to true. + + Returns + ------ + pandas.DataFrame + A DataFrame containing the retrieved documents. + + Raises + ------ + Exception + If the search fails or cancelled by the user. + Error message includes the value of current `search_after_value` which can be used as a function parameter to continue the search. """ - docs_generator = elasticsearch.helpers.scan(self.elastic, - query=query, - index=index, - size=es_gen_size, - request_timeout=request_timeout) - temp_results = [] - results = self.elastic.count(index=index, query=query['query'], request_timeout=300) # type: ignore - for hit in tqdm(docs_generator, total=results['count'], desc="CogStack retrieved...", disable=not show_progress): + try: + result_count = size + all_mapped_results =[] + search_after_value = search_after + + self.__validate_size(size=size) + query = self.__extract_query(query=query) + + if ((type(sort) is dict and 'id' not in sort.keys()) or (type(sort) is list and 'id' not in sort)): + if type(sort) is dict: + sort['id'] = 'asc' + else: + sort.append('id') + + pr_bar = tqdm.tqdm(desc="CogStack retrieved...", disable=not show_progress, colour='green') + + while result_count == size: + search_result = self.elastic.search(index=index, + size=size, + query=query, + fields=include_fields, + source=False, + sort=sort, + search_after=search_after_value, + timeout=f"{request_timeout}s", + track_scores=True, + allow_no_indices=False, + rest_total_hits_as_int=True) + + hits = search_result['hits']['hits'] + all_mapped_results.extend(self.__map_search_results(hits=hits)) + result_count = len(hits) + pr_bar.update(result_count) + search_after_value = hits[-1]['sort'] + pr_bar.total = pr_bar.total if pr_bar.total else search_result.body['hits']['total'] + except BaseException as err: + if(type(err) is KeyboardInterrupt): + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", "\033[0;33m", "\033[0;33m") + pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) + print("Request cancelled.") + else: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") + pr_bar.set_description("CogStack read failed! Processed", refresh=True) + print(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}") + print(f"The last {search_after_value=}") + finally: + return self.__create_dataframe(all_mapped_results, include_fields) + + def __extract_query(self, query: dict): + if "query" in query.keys(): + query = query['query'] + return query + + def __validate_size(self, size): + if size > 10000: + raise ValueError('Size must not be greater then 10000') + + def __map_search_results(self, hits: Iterable): + hit: dict + for hit in hits: row = dict() row['_index'] = hit['_index'] row['_id'] = hit['_id'] row['_score'] = hit['_score'] - row.update(hit['_source']) - temp_results.append(row) - if column_headers: + if 'fields' in hit.keys(): + row.update({k: ', '.join(map(str, v)) for k,v in dict(hit['fields']).items()}) + yield row + + def __create_dataframe(self, all_mapped_results, column_headers): + """ + Create a Pandas DataFrame from the search results. + + Parameters + ---------- + all_mapped_results : list + The list of mapped search results. + column_headers : list or None + The list of column headers to include in the DataFrame. + + Returns + ------- + pandas.DataFrame + A DataFrame containing the search results. + """ df_headers = ['_index', '_id', '_score'] - df_headers.extend(column_headers) - df = pd.DataFrame(temp_results, columns=df_headers) - else: - df = pd.DataFrame(temp_results) - return df - - def DataFrame(self, index: str, columns: Optional[List[str]] = None): - """ - Fast method to return a pandas dataframe from a CogStack search. - - Args: - index (str): A list of indices to search. - columns (List[str], optional): A list of column names to include in the DataFrame. If not provided, all columns will be included. - - Returns: - DataFrame: A pd.DataFrame like object containing the retrieved documents. + if column_headers and "*" not in column_headers: + df_headers.extend(column_headers) + return pd.DataFrame(data=all_mapped_results, columns=df_headers) + return pd.DataFrame(data=all_mapped_results) + +def print_dataframe(df : pd.DataFrame, separator : str = '\\n'): """ - return ed.DataFrame(es_client=self.elastic, es_index_pattern=index, columns=columns) + Replace separator string with HTML <br/> tag for printing in Notebook + Parameters: + ----------- + df : DataFrame + Input DataFrame + separator : str + Separator to be replaced with HTML <br/> + """ + return display(HTML(df.to_html().replace(separator, '
'))) def list_chunker(user_list: List[Any], n: int) -> List[List[Any]]: """ Divide a list into sublists of a specified size. - Args: - user_list (List[Any]): The list to be divided. - n (int): The size of the sublists. + Parameters: + ---------- + user_list : List[Any] + The list to be divided. + n : int + The size of the sublists. Returns: + -------- List[List[Any]]: A list of sublists containing the elements of the input list. """ n=max(1, n) return [user_list[i:i+n] for i in range(0, len(user_list), n)] -def _no_progress_bar(iterable: Iterable, **kwargs): - return iterable - diff --git a/credentials.py b/credentials.py index 62193a6..b3ad46e 100644 --- a/credentials.py +++ b/credentials.py @@ -2,13 +2,20 @@ # CogStack login details ## Any questions on what these details are please contact your local CogStack administrator. -hosts: List[str] = [] # This is a list of your CogStack ElasticSearch instances. +hosts: List[str] = [ + # "https://cogstack-es-1:9200", # This is an example of a CogStack ElasticSearch instance. + ] # This is a list of your CogStack ElasticSearch instances. ## These are your login details (either via http_auth or API) Should be in str format username = None password = None - -api_key = None # Encoded api key issued by your cogstack administrator. +# If you are using API key authentication +# Use either "id" and "api_key" or "encoded" field, or both. +api_key = { + "id": "", # This is the API key id issued by your cogstack administrator. + "api_key": "", # This is the api key issued by your cogstack administrator. + "encoded": "" # This is the encoded api key issued by your cogstack administrator. + } # NLM authentication # The UMLS REST API requires a UMLS account for the authentication described below. diff --git a/search/search_template.ipynb b/search/search_template.ipynb index 2d2e187..0810747 100644 --- a/search/search_template.ipynb +++ b/search/search_template.ipynb @@ -11,14 +11,15 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import sys\n", "sys.path.append('..')\n", + "import pandas as pd\n", "from credentials import *\n", - "from cogstack import CogStack" + "from cogstack import CogStack, print_dataframe" ] }, { @@ -30,11 +31,11 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": {}, "outputs": [], "source": [ - "cs = CogStack(hosts, username=username, password=password, api=True)" + "cs = CogStack(hosts, apiKey=api_key)" ] }, { @@ -44,14 +45,27 @@ "# Check the list of Indices and columns" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "View all indices and their aliases available to this user. Either index names or their aliases can be used to extract data" + ] + }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "for i in cs.elastic.indices.get_mapping().keys():\n", - " print(i)" + "print_dataframe(cs.get_indices_and_aliases(), ', ')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "View fields/columns and their data types for provided index names or aliases" ] }, { @@ -60,17 +74,14 @@ "metadata": {}, "outputs": [], "source": [ - "# Check the list of columns in that index\n", - "index = ''\n", - "for col in cs.elastic.indices.get_mapping(index=index)[index]['mappings']['properties'].keys():\n", - " print(col)" + "cs.get_index_fields([])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "# Set parameters" + "# Set search query parameters" ] }, { @@ -79,7 +90,7 @@ "metadata": {}, "outputs": [], "source": [ - "pt_list = [] # example list of patients' patient_TrustNumber here" + "pt_list = [ ] # example list of patients' patient_TrustNumber here" ] }, { @@ -117,24 +128,45 @@ "metadata": {}, "outputs": [], "source": [ - "# Example query structure\n", - "query = {\n", - " \"from\": 0,\n", - " \"size\": 10000,\n", - " \"query\": {\n", - " \"bool\": {\n", - " \"filter\": {\n", - " \"terms\": {\"patient_TrustNumber\": pt_list}\n", - " },\n", - " \"must\": [\n", - " {\"query_string\": {\n", - " \"query\": \"***YOUR LUCENE QUERY HERE***\"}\n", + "search_query = {\n", + " \"bool\": {\n", + " #\"filter\": {\n", + " # \"terms\": {\n", + " # \"patient_TrustNumber\": pt_list\n", + " # }\n", + " #},\n", + " \"must\": [\n", + " {\n", + " \"query_string\": {\n", + " \"query\": \"\",\n", + " \"default_field\":\"\"\n", + " }\n", + " }\n", + " ]\n", " }\n", - " ]\n", - " }\n", - " },\n", - " \"_source\": columns # This is a search column filter. remove if all columns are to be retrieved\n", - "}" + "}\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'Number of documents: 7,834'" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Count the number of documents matching the search query\n", + "cs.count_search_results(index=[], query=search_query)\n" ] }, { @@ -143,7 +175,28 @@ "tags": [] }, "source": [ - "# Search, Process, and Save" + "# Search, Process, and Save\n", + "Use either of the functions to extract search results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df = cs.read_data_with_scan(index=[], query=search_query, include_fields=columns)\n", + "print(df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df = cs.read_data_with_scroll(index=[], query=search_query, include_fields=columns)\n", + "print(df)" ] }, { @@ -152,7 +205,9 @@ "metadata": {}, "outputs": [], "source": [ - "df = cs.cogstack2df(query=query, index=index, column_headers=columns)" + "df = cs.read_data_with_sorting(index=[], query=search_query, \n", + " include_fields=columns)\n", + "print(df)" ] }, { @@ -195,7 +250,7 @@ "metadata": {}, "outputs": [], "source": [ - "df.to_csv(path_to_results + '/' +file_name, index=False)" + "df.to_csv(path_to_results + '\\\\' + file_name, index=False)" ] } ], @@ -215,7 +270,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.6 (default, Sep 26 2022, 11:37:49) \n[Clang 14.0.0 (clang-1400.0.29.202)]" + "version": "3.13.5" }, "vscode": { "interpreter": { From dd3e19549f14689c02c9e5e1494078a8fba54720 Mon Sep 17 00:00:00 2001 From: vitaliyok Date: Mon, 21 Jul 2025 14:42:40 +0100 Subject: [PATCH 02/10] Updated API key parameter name to match naming convention --- cogstack.py | 43 ++++++++++++++++-------------------- search/search_template.ipynb | 4 ++-- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/cogstack.py b/cogstack.py index 3285705..fe4f123 100644 --- a/cogstack.py +++ b/cogstack.py @@ -6,7 +6,6 @@ from IPython.display import display, HTML import pandas as pd import tqdm -#import eland as ed import warnings warnings.filterwarnings("ignore") @@ -37,29 +36,29 @@ class CogStack(object): } """ def __init__(self, hosts: List, username: Optional[str] = None, password: Optional[str] = None, - apiKey: Dict = None): + api_key: Dict = None): - if apiKey is not None: - encoded = apiKey["encoded"] if "encoded" in apiKey.keys() else input("Encoded API key: ") + if api_key is not None: + encoded = api_key["encoded"] if "encoded" in api_key.keys() and api_key["encoded"] != '' else input("Encoded API key: ") hasEncodedValue = encoded is not None and encoded != '' if(not hasEncodedValue): - api_Id = apiKey["id"] if "id" in apiKey.keys() else input("API Id: ") - api_key = apiKey["api_key"] if "api_key" in apiKey.keys() else getpass.getpass("API Key: ") + api_Id = api_key["id"] if "id" in api_key.keys() and api_key["id"] != '' else input("API Id: ") + api_key = api_key["api_key"] if "api_key" in api_key.keys() and api_key["api_key"] != '' else getpass.getpass("API Key: ") self.elastic = elasticsearch.Elasticsearch(hosts=hosts, api_key= encoded if hasEncodedValue else (api_Id, api_key), verify_certs=False) else: - username, password = self._check_auth_details(username, password) + username, password = self.__check_auth_details(username, password) self.elastic = elasticsearch.Elasticsearch(hosts=hosts, basic_auth=(username, password), verify_certs=False) self.elastic._request_timeout = 300 - - def _check_auth_details(self, username=None, password=None) -> Tuple[str, str]: + + def __check_auth_details(self, username=None, password=None) -> Tuple[str, str]: """ Prompt the user for a username and password if the values are not provided as function arguments. @@ -139,7 +138,6 @@ def get_index_fields(self, index: str | Sequence[str]): with pd.option_context('display.max_rows', len(index_mappings_coll) + 1): return display(pd.DataFrame(data= index_mappings_coll, columns=columns)) - def count_search_results(self, index: str | Sequence[str], query: dict): """ Count number of documents returned by the query @@ -210,14 +208,13 @@ def read_data_with_scan(self, If the search fails or cancelled by the user. """ try: - self.__validate_size(size=size) if "query" not in query.keys(): temp_query = query.copy() query.clear() query["query"] = temp_query - docs_generator = es_helpers.scan(self.elastic, + scan_results = es_helpers.scan(self.elastic, index=index, query=query, size=size, @@ -226,7 +223,7 @@ def read_data_with_scan(self, fields = include_fields) all_mapped_results = [] results = self.elastic.count(index=index, query=query["query"], request_timeout=request_timeout) - pr_bar = tqdm.tqdm(docs_generator, total=results["count"], desc="CogStack retrieved...", disable=not show_progress, colour='green') + pr_bar = tqdm.tqdm(scan_results, total=results["count"], desc="CogStack retrieved...", disable=not show_progress, colour='green') all_mapped_results = self.__map_search_results(hits=pr_bar) except BaseException as err: if(type(err) is KeyboardInterrupt): @@ -240,7 +237,6 @@ def read_data_with_scan(self, finally: return self.__create_dataframe(all_mapped_results, include_fields) - def read_data_with_scroll(self, index: str | Sequence[str], query: dict, @@ -300,11 +296,11 @@ def read_data_with_scroll(self, query = self.__extract_query(query=query) result_count = size all_mapped_results =[] - resp=None + search_result=None pr_bar = tqdm.tqdm(desc="CogStack retrieved...", disable=not show_progress, colour='green') if search_scroll_id is None: - resp = self.elastic.search(index=index, + search_result = self.elastic.search(index=index, size=size, query=query, fields=include_fields, @@ -313,20 +309,20 @@ def read_data_with_scroll(self, timeout=f"{request_timeout}s", rest_total_hits_as_int=True) - pr_bar.total = resp.body['hits']['total'] - hits = resp.body['hits']['hits'] + pr_bar.total = search_result.body['hits']['total'] + hits = search_result.body['hits']['hits'] result_count = len(hits) - search_scroll_id = resp.body['_scroll_id'] + search_scroll_id = search_result.body['_scroll_id'] all_mapped_results.extend(self.__map_search_results(hits=hits)) pr_bar.update(len(hits)) while search_scroll_id and result_count == size: # Perform ES scroll request - resp = self.elastic.scroll(scroll_id=search_scroll_id, scroll="10m", rest_total_hits_as_int=True) - hits = resp.body['hits']['hits'] - pr_bar.total = pr_bar.total if pr_bar.total else resp.body['hits']['total'] + search_result = self.elastic.scroll(scroll_id=search_scroll_id, scroll="10m", rest_total_hits_as_int=True) + hits = search_result.body['hits']['hits'] + pr_bar.total = pr_bar.total if pr_bar.total else search_result.body['hits']['total'] all_mapped_results.extend(self.__map_search_results(hits=hits)) - search_scroll_id = resp.body['_scroll_id'] + search_scroll_id = search_result.body['_scroll_id'] result_count = len(hits) pr_bar.update(result_count) @@ -344,7 +340,6 @@ def read_data_with_scroll(self, finally: return self.__create_dataframe(all_mapped_results, include_fields) - def read_data_with_sorting(self, index: str | Sequence[str], query: dict, diff --git a/search/search_template.ipynb b/search/search_template.ipynb index 0810747..bae2634 100644 --- a/search/search_template.ipynb +++ b/search/search_template.ipynb @@ -31,11 +31,11 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "cs = CogStack(hosts, apiKey=api_key)" + "cs = CogStack(hosts, api_key=api_key)" ] }, { From 6e84c710482860436044974ee513b16fba51e3d7 Mon Sep 17 00:00:00 2001 From: vitaliyok Date: Mon, 21 Jul 2025 14:44:32 +0100 Subject: [PATCH 03/10] Ignore __pycache__ directory during commit --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 57c267d..4542a74 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,6 @@ data/cogstack_search_results/ # Default environments venv + +# pythin cache folder +__pycache__ From 07368aa78fba2bfe9f12f1ecfb7844f9fb28eba8 Mon Sep 17 00:00:00 2001 From: vitaliyok Date: Thu, 7 Aug 2025 11:46:24 +0100 Subject: [PATCH 04/10] Moved authentication from constructor to separate methods for API and basic. --- cogstack.py | 111 +++++++++++++++++++++++------------ search/search_template.ipynb | 23 +++----- 2 files changed, 83 insertions(+), 51 deletions(-) diff --git a/cogstack.py b/cogstack.py index fe4f123..c921b97 100644 --- a/cogstack.py +++ b/cogstack.py @@ -20,12 +20,45 @@ class CogStack(object): ------------ hosts : List[str] A list of Elasticsearch host URLs. - username : str, optional + """ + def __init__(self, hosts: List[str]): + self.hosts = hosts + + def use_basic_auth(self, username: Optional[str] = None, password:Optional[str] = None) -> 'CogStack': + """ + Create an instance of CogStack using basic authentication. + If the `username` or `password` parameters are not provided, the user will be prompted to enter them. + + Parameters + ---------- + username : str, optional The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username. - password : str, optional + password : str, optional The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password. + + Returns + ------- + CogStack: An instance of the CogStack class. + """ + if username is None: + username = input("Username: ") + if password is None: + password = getpass.getpass("Password: ") + + return self.__connect(basic_auth=(username, password) if username and password else None) + + def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': + """ + Create an instance of CogStack using API key authentication. + + Parameters + ---------- apiKey : Dict, optional + API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticseach or Kibana + and provided by your CogStack administrator. + + If not provided, the user will be prompted to enter API key "encoded" value. Example: .. code-block:: json @@ -34,52 +67,55 @@ class CogStack(object): "api_key": "API_KEY", "encoded": "API_KEY_ENCODED_STRING" } - """ - def __init__(self, hosts: List, username: Optional[str] = None, password: Optional[str] = None, - api_key: Dict = None): + + Returns + ------- + CogStack: An instance of the CogStack class. + """ + if not api_key: + api_key = {"encoded": input("Encoded API key: ")} if api_key is not None: - encoded = api_key["encoded"] if "encoded" in api_key.keys() and api_key["encoded"] != '' else input("Encoded API key: ") - hasEncodedValue = encoded is not None and encoded != '' + if isinstance(api_key, str): + # If api_key is a string, it is assumed to be the encoded API key + encoded = api_key + hasEncodedValue = True + elif isinstance(api_key, Dict): + # If api_key is a dictionary, check for "encoded", "id" and "api_key" keys + encoded = api_key["encoded"] if "encoded" in api_key.keys() and api_key["encoded"] != '' else input("Encoded API key: ") + hasEncodedValue = encoded is not None and encoded != '' if(not hasEncodedValue): - api_Id = api_key["id"] if "id" in api_key.keys() and api_key["id"] != '' else input("API Id: ") api_key = api_key["api_key"] if "api_key" in api_key.keys() and api_key["api_key"] != '' else getpass.getpass("API Key: ") - - self.elastic = elasticsearch.Elasticsearch(hosts=hosts, - api_key= encoded if hasEncodedValue else (api_Id, api_key), - verify_certs=False) - else: - username, password = self.__check_auth_details(username, password) - self.elastic = elasticsearch.Elasticsearch(hosts=hosts, - basic_auth=(username, password), - verify_certs=False) - self.elastic._request_timeout = 300 + return self.__connect(api_key=encoded if hasEncodedValue else (api_Id, api_key)) - def __check_auth_details(self, username=None, password=None) -> Tuple[str, str]: - """ - Prompt the user for a username and password if the values are not provided as function arguments. - + def __connect(self, basic_auth : Optional[Tuple[str,str]] = None, api_key: Optional[str | Tuple[str, str]] = None) -> 'CogStack': + """ Connect to Elasticsearch using the provided credentials. Parameters - ------------ - username : str, optional - The API username. If not provided, the user will be prompted to enter a username. - password : str, optional - The API password. If not provided, the user will be prompted to enter a password. - - Returns ---------- - Tuple[str, str]: - A tuple containing the API username and password. + basic_auth : Tuple[str, str], optional + A tuple containing the username and password for basic authentication. + api_key : str or Tuple[str, str], optional + The API key or a tuple containing the API key ID and API key for API key authentication. + Returns + ------- + CogStack: An instance of the CogStack class. + Raises + ------ + Exception: If the connection to Elasticsearch fails. """ - if username is None: - username = input("Username: ") - if password is None: - password = getpass.getpass("Password: ") - return username, password - + self.elastic = elasticsearch.Elasticsearch(hosts=hosts, + api_key=api_key, + basic_auth=basic_auth, + verify_certs=False, + request_timeout=300) + if not self.elastic.ping(): + raise Exception("CogStack connection failed. Please check your host list and credentials and try again.") + print("CogStack connection established successfully.") + return self + def get_indices_and_aliases(self): """ Retrieve indices and their aliases @@ -248,6 +284,7 @@ def read_data_with_scroll(self, """ Retrieves documents from an Elasticsearch index using search query and scroll API. + Default scroll timeout is set to 10 minutes. The function converts search results to a Pandas DataFrame. Parameters diff --git a/search/search_template.ipynb b/search/search_template.ipynb index bae2634..3f0bbe7 100644 --- a/search/search_template.ipynb +++ b/search/search_template.ipynb @@ -11,7 +11,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -35,7 +35,7 @@ "metadata": {}, "outputs": [], "source": [ - "cs = CogStack(hosts, api_key=api_key)" + "cs = CogStack(hosts).use_api_key_auth(api_key=api_key)" ] }, { @@ -152,18 +152,7 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "'Number of documents: 7,834'" - ] - }, - "execution_count": 7, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "# Count the number of documents matching the search query\n", "cs.count_search_results(index=[], query=search_query)\n" @@ -185,6 +174,7 @@ "metadata": {}, "outputs": [], "source": [ + "# Read data using scan helper\n", "df = cs.read_data_with_scan(index=[], query=search_query, include_fields=columns)\n", "print(df)" ] @@ -195,6 +185,7 @@ "metadata": {}, "outputs": [], "source": [ + "# Read data with scroll API and get scroll id if search fails midway.\n", "df = cs.read_data_with_scroll(index=[], query=search_query, include_fields=columns)\n", "print(df)" ] @@ -205,6 +196,8 @@ "metadata": {}, "outputs": [], "source": [ + "# Read data with sorting and get search_after value if search fails midway.\n", + "# Note: Sorting requires a field to sort by, which should be present in the index. Default sorting is by _id.\n", "df = cs.read_data_with_sorting(index=[], query=search_query, \n", " include_fields=columns)\n", "print(df)" @@ -224,6 +217,7 @@ "outputs": [], "source": [ "# Whatever you want here\n", + "# For example, display the first few rows of the DataFrame\n", "df.head()" ] }, @@ -240,6 +234,7 @@ "metadata": {}, "outputs": [], "source": [ + "# Save the DataFrame to a CSV file\n", "path_to_results = \"../data/cogstack_search_results\"\n", "file_name = \"file_name.csv\"" ] From 45e5db83ff503d2236f5e63014905e5d71b72ac3 Mon Sep 17 00:00:00 2001 From: vitaliyok Date: Thu, 7 Aug 2025 16:07:45 +0100 Subject: [PATCH 05/10] Added class methods to initialize ES connection and some code refactoring --- .gitignore | 2 +- cogstack.py | 74 ++++++++++++++++++++++++++++++++---- search/search_template.ipynb | 6 ++- 3 files changed, 73 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index 4542a74..64c6b2a 100644 --- a/.gitignore +++ b/.gitignore @@ -12,5 +12,5 @@ data/cogstack_search_results/ # Default environments venv -# pythin cache folder +# python cache folder __pycache__ diff --git a/cogstack.py b/cogstack.py index c921b97..1c956b8 100644 --- a/cogstack.py +++ b/cogstack.py @@ -21,9 +21,63 @@ class CogStack(object): hosts : List[str] A list of Elasticsearch host URLs. """ + ES_TIMEOUT = 300 + def __init__(self, hosts: List[str]): self.hosts = hosts + @classmethod + def with_basic_auth(cls, hosts: List[str], username: Optional[str] = None, password: Optional[str] = None) -> 'CogStack': + """ + Create an instance of CogStack using basic authentication. + + Parameters + ---------- + hosts : List[str] + A list of Elasticsearch host URLs. + username : str, optional + The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username. + password : str, optional + The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password. + Returns + ------- + CogStack: An instance of the CogStack class. + """ + cs = cls(hosts) + cs.use_basic_auth(username, password) + return cs + + @classmethod + def with_api_key_auth(cls, hosts: List[str], api_key: Optional[Dict] = None) -> 'CogStack': + """ + Create an instance of CogStack using API key authentication. + + Parameters + ---------- + hosts : List[str] + A list of Elasticsearch host URLs. + apiKey : Dict, optional + + API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticseach or Kibana + and provided by your CogStack administrator. + + If not provided, the user will be prompted to enter API key "encoded" value. + + Example: + .. code-block:: json + { + "id": "API_KEY_ID", + "api_key": "API_KEY", + "encoded": "API_KEY_ENCODED_STRING" + } + Returns + ------- + CogStack: An instance of the CogStack class. + """ + cs = cls(hosts) + cs.use_api_key_auth(api_key) + return cs + def use_basic_auth(self, username: Optional[str] = None, password:Optional[str] = None) -> 'CogStack': """ Create an instance of CogStack using basic authentication. @@ -55,7 +109,7 @@ def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': ---------- apiKey : Dict, optional - API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticseach or Kibana + API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticsearch or Kibana and provided by your CogStack administrator. If not provided, the user will be prompted to enter API key "encoded" value. @@ -82,8 +136,14 @@ def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': hasEncodedValue = True elif isinstance(api_key, Dict): # If api_key is a dictionary, check for "encoded", "id" and "api_key" keys - encoded = api_key["encoded"] if "encoded" in api_key.keys() and api_key["encoded"] != '' else input("Encoded API key: ") - hasEncodedValue = encoded is not None and encoded != '' + if "id" in api_key.keys() and api_key["id"] != '' and "api_key" in api_key.keys() and api_key["api_key"] != '': + # If both "id" and "api_key" are present, use them + encoded = None + hasEncodedValue = False + else: + # If "encoded" is present, use it; otherwise prompt for it + encoded = api_key["encoded"] if "encoded" in api_key.keys() and api_key["encoded"] != '' else input("Encoded API key: ") + hasEncodedValue = encoded is not None and encoded != '' if(not hasEncodedValue): api_Id = api_key["id"] if "id" in api_key.keys() and api_key["id"] != '' else input("API Id: ") @@ -110,7 +170,7 @@ def __connect(self, basic_auth : Optional[Tuple[str,str]] = None, api_key: Optio api_key=api_key, basic_auth=basic_auth, verify_certs=False, - request_timeout=300) + request_timeout=self.ES_TIMEOUT) if not self.elastic.ping(): raise Exception("CogStack connection failed. Please check your host list and credentials and try again.") print("CogStack connection established successfully.") @@ -202,7 +262,7 @@ def read_data_with_scan(self, query: dict, include_fields: list[str]=None, size: int=1000, - request_timeout: int=300, + request_timeout: int=ES_TIMEOUT, show_progress: bool = True): """ Retrieve documents from an Elasticsearch index using search query and elasticsearch scan helper function. @@ -279,7 +339,7 @@ def read_data_with_scroll(self, include_fields:Optional[list[str]]=None, size: int=1000, search_scroll_id: Optional[str] = None, - request_timeout: Optional[int]=300, + request_timeout: Optional[int]=ES_TIMEOUT, show_progress: Optional[bool] = True): """ @@ -384,7 +444,7 @@ def read_data_with_sorting(self, size: Optional[int]=1000, sort: Optional[dict|list[str]] = {"id": "asc"}, search_after: Optional[list[str|int|float|Any|None]] = None, - request_timeout: Optional[int]=300, + request_timeout: Optional[int]=ES_TIMEOUT, show_progress: Optional[bool] = True): """ Retrieve documents from an Elasticsearch index using search query and convert them to a Pandas DataFrame. diff --git a/search/search_template.ipynb b/search/search_template.ipynb index 3f0bbe7..e6122a5 100644 --- a/search/search_template.ipynb +++ b/search/search_template.ipynb @@ -35,7 +35,11 @@ "metadata": {}, "outputs": [], "source": [ - "cs = CogStack(hosts).use_api_key_auth(api_key=api_key)" + "cs = CogStack.with_api_key_auth(hosts=hosts, api_key=api_key)\n", + "#cs = CogStack.with_basic_auth(hosts=hosts, username=username, password=password)\n", + "#cs = CogStack(hosts).use_api_key_auth(api_key=api_key)\n", + "#cs = CogStack(hosts).use_basic_auth(username=username, password=password)\n", + "#cs = CogStack(hosts).use_api_key_auth(\"\")\n" ] }, { From af556d3c0b145483766527d5619a608240812770 Mon Sep 17 00:00:00 2001 From: vitaliyok Date: Tue, 12 Aug 2025 11:16:45 +0100 Subject: [PATCH 06/10] Added reversed cogstack module to prev version, added new module cogstack2 and added deprecation warning --- cogstack.py | 684 ++++++---------------------------- cogstack2.py | 618 ++++++++++++++++++++++++++++++ search/search_template.ipynb | 114 ++---- search/search_template2.ipynb | 282 ++++++++++++++ 4 files changed, 1046 insertions(+), 652 deletions(-) create mode 100644 cogstack2.py create mode 100644 search/search_template2.ipynb diff --git a/cogstack.py b/cogstack.py index 1c956b8..cd7fc21 100644 --- a/cogstack.py +++ b/cogstack.py @@ -1,620 +1,168 @@ + import getpass -import traceback -from typing import Dict, List, Any, Optional, Iterable, Sequence, Tuple +from typing import Dict, List, Any, Optional, Iterable, Tuple import elasticsearch -import elasticsearch.helpers as es_helpers -from IPython.display import display, HTML +import elasticsearch.helpers import pandas as pd -import tqdm +from tqdm.notebook import tqdm +import eland as ed +# Suppress warnings related to security in Elasticsearch +# This is necessary to avoid warnings about insecure connections when using self-signed certificates or HTTP connections import warnings -warnings.filterwarnings("ignore") +from elastic_transport import SecurityWarning +from urllib3.exceptions import InsecureRequestWarning + +# Reset all filters +warnings.resetwarnings() + +warnings.filterwarnings("module", category=DeprecationWarning, module="cogstack") +warnings.filterwarnings('ignore', category=SecurityWarning) +warnings.filterwarnings('ignore', category=InsecureRequestWarning) from credentials import * class CogStack(object): + warnings.warn("cogstack module is deprecated, use cogstack2 instead.", DeprecationWarning) """ A class for interacting with Elasticsearch. - Parameters - ------------ - hosts : List[str] - A list of Elasticsearch host URLs. + Args: + hosts (List[str]): A list of Elasticsearch host URLs. + username (str, optional): The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username. + password (str, optional): The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password. + api (bool, optional): A boolean value indicating whether to use API keys or basic authentication to connect to Elasticsearch. Defaults to False (i.e., use basic authentication). Elasticsearch 7.17. + api_key (str, optional): The API key to use when connecting to Elasticsearch. + When provided along with `api=True`, this takes precedence over username/password. Only available when using Elasticsearch 8.17. """ - ES_TIMEOUT = 300 - - def __init__(self, hosts: List[str]): - self.hosts = hosts - - @classmethod - def with_basic_auth(cls, hosts: List[str], username: Optional[str] = None, password: Optional[str] = None) -> 'CogStack': - """ - Create an instance of CogStack using basic authentication. - - Parameters - ---------- - hosts : List[str] - A list of Elasticsearch host URLs. - username : str, optional - The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username. - password : str, optional - The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password. - Returns - ------- - CogStack: An instance of the CogStack class. - """ - cs = cls(hosts) - cs.use_basic_auth(username, password) - return cs - - @classmethod - def with_api_key_auth(cls, hosts: List[str], api_key: Optional[Dict] = None) -> 'CogStack': - """ - Create an instance of CogStack using API key authentication. - - Parameters - ---------- - hosts : List[str] - A list of Elasticsearch host URLs. - apiKey : Dict, optional - - API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticseach or Kibana - and provided by your CogStack administrator. - - If not provided, the user will be prompted to enter API key "encoded" value. + def __init__(self, hosts: List, username: Optional[str] = None, password: Optional[str] = None, + api: bool = False, timeout: Optional[int]=60, api_key: Optional[str] = None): + + if api_key and api: + self.elastic = elasticsearch.Elasticsearch(hosts=hosts, + api_key=api_key, + verify_certs=False, + request_timeout=timeout) + + + elif api: + api_username, api_password = self._check_auth_details(username, password) + self.elastic = elasticsearch.Elasticsearch(hosts=hosts, + api_key=(api_username, api_password), + verify_certs=False, + request_timeout=timeout) - Example: - .. code-block:: json - { - "id": "API_KEY_ID", - "api_key": "API_KEY", - "encoded": "API_KEY_ENCODED_STRING" - } - Returns - ------- - CogStack: An instance of the CogStack class. - """ - cs = cls(hosts) - cs.use_api_key_auth(api_key) - return cs + else: + username, password = self._check_auth_details(username, password) + self.elastic = elasticsearch.Elasticsearch(hosts=hosts, + basic_auth=(username, password), + verify_certs=False, + request_timeout=timeout) - def use_basic_auth(self, username: Optional[str] = None, password:Optional[str] = None) -> 'CogStack': - """ - Create an instance of CogStack using basic authentication. - If the `username` or `password` parameters are not provided, the user will be prompted to enter them. - Parameters - ---------- - username : str, optional - The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username. - password : str, optional - The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password. + def _check_auth_details(self, username=None, password=None) -> Tuple[str, str]: + """ + Prompt the user for a username and password if the values are not provided as function arguments. - Returns - ------- - CogStack: An instance of the CogStack class. + Args: + api_username (str, optional): The API username. If not provided, the user will be prompted to enter a username. + api_password (str, optional): The API password. If not provided, the user will be prompted to enter a password. + + Returns: + Tuple[str, str]: A tuple containing the API username and password. """ if username is None: username = input("Username: ") if password is None: password = getpass.getpass("Password: ") + return username, password - return self.__connect(basic_auth=(username, password) if username and password else None) - - def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': - """ - Create an instance of CogStack using API key authentication. - - Parameters - ---------- - apiKey : Dict, optional - - API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticsearch or Kibana - and provided by your CogStack administrator. - - If not provided, the user will be prompted to enter API key "encoded" value. - - Example: - .. code-block:: json - { - "id": "API_KEY_ID", - "api_key": "API_KEY", - "encoded": "API_KEY_ENCODED_STRING" - } - - Returns - ------- - CogStack: An instance of the CogStack class. + def get_docs_generator(self, index: List, query: Dict, es_gen_size: int=800, request_timeout: Optional[int] = 300): """ - if not api_key: - api_key = {"encoded": input("Encoded API key: ")} + Retrieve a generator object that can be used to iterate through documents in an Elasticsearch index. - if api_key is not None: - if isinstance(api_key, str): - # If api_key is a string, it is assumed to be the encoded API key - encoded = api_key - hasEncodedValue = True - elif isinstance(api_key, Dict): - # If api_key is a dictionary, check for "encoded", "id" and "api_key" keys - if "id" in api_key.keys() and api_key["id"] != '' and "api_key" in api_key.keys() and api_key["api_key"] != '': - # If both "id" and "api_key" are present, use them - encoded = None - hasEncodedValue = False - else: - # If "encoded" is present, use it; otherwise prompt for it - encoded = api_key["encoded"] if "encoded" in api_key.keys() and api_key["encoded"] != '' else input("Encoded API key: ") - hasEncodedValue = encoded is not None and encoded != '' - - if(not hasEncodedValue): - api_Id = api_key["id"] if "id" in api_key.keys() and api_key["id"] != '' else input("API Id: ") - api_key = api_key["api_key"] if "api_key" in api_key.keys() and api_key["api_key"] != '' else getpass.getpass("API Key: ") - - return self.__connect(api_key=encoded if hasEncodedValue else (api_Id, api_key)) - - def __connect(self, basic_auth : Optional[Tuple[str,str]] = None, api_key: Optional[str | Tuple[str, str]] = None) -> 'CogStack': - """ Connect to Elasticsearch using the provided credentials. - Parameters - ---------- - basic_auth : Tuple[str, str], optional - A tuple containing the username and password for basic authentication. - api_key : str or Tuple[str, str], optional - The API key or a tuple containing the API key ID and API key for API key authentication. - Returns - ------- - CogStack: An instance of the CogStack class. - Raises - ------ - Exception: If the connection to Elasticsearch fails. - """ - self.elastic = elasticsearch.Elasticsearch(hosts=hosts, - api_key=api_key, - basic_auth=basic_auth, - verify_certs=False, - request_timeout=self.ES_TIMEOUT) - if not self.elastic.ping(): - raise Exception("CogStack connection failed. Please check your host list and credentials and try again.") - print("CogStack connection established successfully.") - return self - - def get_indices_and_aliases(self): - """ - Retrieve indices and their aliases + Args: + index (List[str]): A list of Elasticsearch index names to search. + query (Dict): A dictionary containing the search query parameters. + es_gen_size (int, optional): The number of documents to retrieve per batch. Defaults to 800. + request_timeout (int, optional): The time in seconds to wait for a response from Elasticsearch before timing out. Defaults to 300. Returns: - --------- - A table of indices and aliases to use in subsequent queries - """ - all_aliases = self.elastic.indices.get_alias().body - index_aliases_coll = [] - for index in all_aliases: - index_aliases = dict() - index_aliases['Index'] = index - aliases=[] - for alias in all_aliases[index]['aliases']: - aliases.append(alias) - index_aliases['Aliases'] = ', '.join(aliases) - index_aliases_coll.append(index_aliases) - with pd.option_context('display.max_colwidth', None): - return pd.DataFrame(index_aliases_coll, columns=['Index', 'Aliases']) - - def get_index_fields(self, index: str | Sequence[str]): - """ - Retrieve indices and their fields with data type - - Parameters - ---------- - index: str | Sequence[str] - Name(s) of indices or aliases for which the list of fields is retrieved - - Returns - ---------- - pandas.DataFrame - A DataFrame containing index names and their fields with data types - - Raises - ------ - Exception - If the operation fails for any reason. - """ - try: - all_mappings = self.elastic.indices.get_mapping(index=index, allow_no_indices=False).body - columns= ['Field', 'Type'] - if (isinstance(index, List)): - columns.insert(0, 'Index') - index_mappings_coll = [] - for index in all_mappings: - for property in all_mappings[index]['mappings']['properties']: - index_mapping = dict() - index_mapping['Index'] = index - index_mapping['Field'] = property - index_mapping['Type'] = all_mappings[index]['mappings']['properties'][property]['type'] if "type" in all_mappings[index]['mappings']['properties'][property].keys() else '?' - index_mappings_coll.append(index_mapping) - except Exception as err: - raise Exception(f"Unexpected {err=}, {type(err)=}") - with pd.option_context('display.max_rows', len(index_mappings_coll) + 1): - return display(pd.DataFrame(data= index_mappings_coll, columns=columns)) - - def count_search_results(self, index: str | Sequence[str], query: dict): - """ - Count number of documents returned by the query - - Parameters - ---------- - index : str or Sequence[str] - The name(s) of the Elasticsearch indices or their aliases to search. - - query : dict - A dictionary containing the search query parameters. - Query can start with `query` key and contain other query options which will be ignored - - .. code-block:: json - {"query": {"match": {"title": "python"}}}} - or only consist of content of `query` block - .. code-block:: json - {"match": {"title": "python"}}} - """ - query = self.__extract_query(query=query) - count = self.elastic.count(index=index, query=query)['count'] - return f"Number of documents: {format(count, ',')}" - - def read_data_with_scan(self, - index: str, - query: dict, - include_fields: list[str]=None, - size: int=1000, - request_timeout: int=ES_TIMEOUT, - show_progress: bool = True): - """ - Retrieve documents from an Elasticsearch index using search query and elasticsearch scan helper function. - The function converts search results to a Pandas DataFrame and does not return current scroll id if the process fails. - - Parameters - ---------- - index : str or Sequence[str] - The name(s) of the Elasticsearch indices or their aliases to search. - query : dict - A dictionary containing the search query parameters. - Query can start with `query` key and contain other query options which will be used in the search - - .. code-block:: json - {"query": {"match": {"title": "python"}}}} - or only consist of content of `query` block (preferred method to avoid clashing with other parameters) - .. code-block:: json - {"match": {"title": "python"}}} - - include_fields : list[str], optional - A list of fields to be included in search results and presented as columns in the DataFrame. - If not provided, only _index, _id and _score fields will be included. - Columns _index, _id, _score are present in all search results - size : int, optional, default = 1000 - The number of documents to be returned by the query or scroll API during each iteration. - MAX: 10,000. - request_timeout : int, optional, default=300 - The time in seconds to wait for a response from Elasticsearch before timing out. - show_progress : bool, optional, default=True - Whether to show the progress in console. - Returns - ------ - pandas.DataFrame - A DataFrame containing the retrieved documents. - - Raises - ------ - Exception - If the search fails or cancelled by the user. - """ - try: - self.__validate_size(size=size) - if "query" not in query.keys(): - temp_query = query.copy() - query.clear() - query["query"] = temp_query - - scan_results = es_helpers.scan(self.elastic, - index=index, - query=query, - size=size, - request_timeout=request_timeout, - source=False, - fields = include_fields) - all_mapped_results = [] - results = self.elastic.count(index=index, query=query["query"], request_timeout=request_timeout) - pr_bar = tqdm.tqdm(scan_results, total=results["count"], desc="CogStack retrieved...", disable=not show_progress, colour='green') - all_mapped_results = self.__map_search_results(hits=pr_bar) - except BaseException as err: - if(type(err) is KeyboardInterrupt): - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", "\033[0;33m", "\033[0;33m") - pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) - print("Request cancelled and current search_scroll_id deleted...") - else: - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") - pr_bar.set_description("CogStack read failed! Processed", refresh=True) - print(Exception(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}")) - finally: - return self.__create_dataframe(all_mapped_results, include_fields) - - def read_data_with_scroll(self, - index: str | Sequence[str], - query: dict, - include_fields:Optional[list[str]]=None, - size: int=1000, - search_scroll_id: Optional[str] = None, - request_timeout: Optional[int]=ES_TIMEOUT, - show_progress: Optional[bool] = True): - - """ - Retrieves documents from an Elasticsearch index using search query and scroll API. - Default scroll timeout is set to 10 minutes. - The function converts search results to a Pandas DataFrame. - - Parameters - ---------- - index : str or Sequence[str] - The name(s) of the Elasticsearch indices or their aliases to search. - query : dict - A dictionary containing the search query parameters. - Query can start with `query` key and contain other query options which will be ignored - - .. code-block:: json - {"query": {"match": {"title": "python"}}}} - or only consist of content of `query` block - .. code-block:: json - {"match": {"title": "python"}}} - include_fields : list[str], optional - A list of fields to be included in search results and presented as columns in the DataFrame. - If not provided, only _index, _id and _score fields will be included. - Columns _index, _id, _score are present in all search results - size : int, optional, default = 1000 - The number of documents to be returned by the query or scroll API during each iteration. - MAX: 10,000. - search_scroll_id : str, optional - The value of the last scroll_id returned by scroll API and used to continue the search if the current search fails. - The value of scroll_id times out after 10 minutes. After which the search will have to be restarted. - Note: Absence of this parameter indicates a new search. - request_timeout : int, optional, default=300 - The time in seconds to wait for a response from Elasticsearch before timing out. - show_progress : bool, optional, default=True - Whether to show the progress in console. - IMPORTANT: The progress bar displays the total hits for the query even if continuing the search using `search_scroll_id`. - Returns - ------ - pandas.DataFrame - A DataFrame containing the retrieved documents. - - Raises - ------ - Exception - If the search fails or cancelled by the user. - If the search fails, error message includes the value of current `search_scroll_id` which can be used as a function parameter to continue the search. - IMPORTANT: If the function fails after `scroll` request, the subsequent request will skip results of the failed scroll by the value of `size` parameter. - """ - try: - self.__validate_size(size=size) - query = self.__extract_query(query=query) - result_count = size - all_mapped_results =[] - search_result=None - pr_bar = tqdm.tqdm(desc="CogStack retrieved...", disable=not show_progress, colour='green') - - if search_scroll_id is None: - search_result = self.elastic.search(index=index, - size=size, - query=query, - fields=include_fields, - source=False, - scroll="10m", - timeout=f"{request_timeout}s", - rest_total_hits_as_int=True) - - pr_bar.total = search_result.body['hits']['total'] - hits = search_result.body['hits']['hits'] - result_count = len(hits) - search_scroll_id = search_result.body['_scroll_id'] - all_mapped_results.extend(self.__map_search_results(hits=hits)) - pr_bar.update(len(hits)) - - while search_scroll_id and result_count == size: - # Perform ES scroll request - search_result = self.elastic.scroll(scroll_id=search_scroll_id, scroll="10m", rest_total_hits_as_int=True) - hits = search_result.body['hits']['hits'] - pr_bar.total = pr_bar.total if pr_bar.total else search_result.body['hits']['total'] - all_mapped_results.extend(self.__map_search_results(hits=hits)) - search_scroll_id = search_result.body['_scroll_id'] - result_count = len(hits) - pr_bar.update(result_count) - - self.elastic.clear_scroll(scroll_id = search_scroll_id) - except BaseException as err: - if(type(err) is KeyboardInterrupt): - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", "\033[0;33m", "\033[0;33m") - pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) - self.elastic.clear_scroll(scroll_id = search_scroll_id) - print("Request cancelled and current search_scroll_id deleted...") - else: - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") - pr_bar.set_description("CogStack read failed! Processed", refresh=True) - print(Exception(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}"), f"{search_scroll_id=}", sep='\n') - finally: - return self.__create_dataframe(all_mapped_results, include_fields) + generator: A generator object that can be used to iterate through the documents in the specified Elasticsearch index. + """ + docs_generator = elasticsearch.helpers.scan(self.elastic, + query=query, + index=index, + size=es_gen_size, + request_timeout=request_timeout) + return docs_generator - def read_data_with_sorting(self, - index: str | Sequence[str], - query: dict, - include_fields: Optional[list[str]]=None, - size: Optional[int]=1000, - sort: Optional[dict|list[str]] = {"id": "asc"}, - search_after: Optional[list[str|int|float|Any|None]] = None, - request_timeout: Optional[int]=ES_TIMEOUT, - show_progress: Optional[bool] = True): + def cogstack2df(self, query: Dict, index: str, column_headers=None, es_gen_size: int=800, request_timeout: int=300, + show_progress: bool = True): """ - Retrieve documents from an Elasticsearch index using search query and convert them to a Pandas DataFrame. + Retrieve documents from an Elasticsearch index and convert them to a Pandas DataFrame. - Parameters - ---------- - index : str or Sequence[str] - The name(s) of the Elasticsearch indices or their aliases to search. - query : dict - A dictionary containing the search query parameters. - Query can start with `query` key and contain other query options which will be ignored + Args: + query (Dict): A dictionary containing the search query parameters. + index (str): The name of the Elasticsearch index to search. + column_headers (List[str], optional): A list of column headers to use for the DataFrame. If not provided, the DataFrame will have default column names. + es_gen_size (int, optional): The number of documents to retrieve per batch. Defaults to 800. + request_timeout (int, optional): The time in seconds to wait for a response from Elasticsearch before timing out. Defaults to 300. + show_progress (bool, optional): Whether to show the progress in console. Defaults to true. - .. code-block:: json - {"query": {"match": {"title": "python"}}}} - or only consist of content of `query` block - .. code-block:: json - {"match": {"title": "python"}}} - include_fields : list[str], optional - A list of fields to be included in search results and presented as columns in the DataFrame. - If not provided, only _index, _id and _score fields will be included. - Columns _index, _id, _score are present in all search results - size : int, optional, default = 1000 - The number of documents to be returned by the query or scroll API during each iteration. - MAX: 10,000. - sort : dict|list[str], optional, default = {"id": "asc"} - Sort field name(s) and order (`asc` or `desc`) in dictionary format or list of field names without order. - `{"id":"asc"}` or `id` is added if not provided as a tiebreaker field. - Default sorting order is `asc` - >Example: - - `dict : {"filed_Name" : "desc", "id" : "asc"}` - - `list : ["filed_Name", "id"]` - search_after : list[str|int|float|Any|None], optional - The sort value of the last record in search results. - Can be provided if the a search fails and needs to be restarted from the last successful search. - Use the value of `search_after_value` from the error message - request_timeout : int, optional, default = 300 - The time in seconds to wait for a response from Elasticsearch before timing out. - show_progress : bool, optional - Whether to show the progress in console. Defaults to true. - - Returns - ------ - pandas.DataFrame - A DataFrame containing the retrieved documents. - - Raises - ------ - Exception - If the search fails or cancelled by the user. - Error message includes the value of current `search_after_value` which can be used as a function parameter to continue the search. + Returns: + pandas.DataFrame: A DataFrame containing the retrieved documents. """ - try: - result_count = size - all_mapped_results =[] - search_after_value = search_after - - self.__validate_size(size=size) - query = self.__extract_query(query=query) - - if ((type(sort) is dict and 'id' not in sort.keys()) or (type(sort) is list and 'id' not in sort)): - if type(sort) is dict: - sort['id'] = 'asc' - else: - sort.append('id') - - pr_bar = tqdm.tqdm(desc="CogStack retrieved...", disable=not show_progress, colour='green') - - while result_count == size: - search_result = self.elastic.search(index=index, - size=size, - query=query, - fields=include_fields, - source=False, - sort=sort, - search_after=search_after_value, - timeout=f"{request_timeout}s", - track_scores=True, - allow_no_indices=False, - rest_total_hits_as_int=True) - - hits = search_result['hits']['hits'] - all_mapped_results.extend(self.__map_search_results(hits=hits)) - result_count = len(hits) - pr_bar.update(result_count) - search_after_value = hits[-1]['sort'] - pr_bar.total = pr_bar.total if pr_bar.total else search_result.body['hits']['total'] - except BaseException as err: - if(type(err) is KeyboardInterrupt): - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", "\033[0;33m", "\033[0;33m") - pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) - print("Request cancelled.") - else: - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") - pr_bar.set_description("CogStack read failed! Processed", refresh=True) - print(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}") - print(f"The last {search_after_value=}") - finally: - return self.__create_dataframe(all_mapped_results, include_fields) - - def __extract_query(self, query: dict): - if "query" in query.keys(): - query = query['query'] - return query - - def __validate_size(self, size): - if size > 10000: - raise ValueError('Size must not be greater then 10000') - - def __map_search_results(self, hits: Iterable): - hit: dict - for hit in hits: + docs_generator = elasticsearch.helpers.scan(self.elastic, + query=query, + index=index, + size=es_gen_size, + request_timeout=request_timeout) + temp_results = [] + results = self.elastic.count(index=index, query=query['query'], request_timeout=300) # type: ignore + for hit in tqdm(docs_generator, total=results['count'], desc="CogStack retrieved...", disable=not show_progress): row = dict() row['_index'] = hit['_index'] row['_id'] = hit['_id'] row['_score'] = hit['_score'] - if 'fields' in hit.keys(): - row.update({k: ', '.join(map(str, v)) for k,v in dict(hit['fields']).items()}) - yield row - - def __create_dataframe(self, all_mapped_results, column_headers): - """ - Create a Pandas DataFrame from the search results. - - Parameters - ---------- - all_mapped_results : list - The list of mapped search results. - column_headers : list or None - The list of column headers to include in the DataFrame. - - Returns - ------- - pandas.DataFrame - A DataFrame containing the search results. - """ + row.update(hit['_source']) + temp_results.append(row) + if column_headers: df_headers = ['_index', '_id', '_score'] - if column_headers and "*" not in column_headers: - df_headers.extend(column_headers) - return pd.DataFrame(data=all_mapped_results, columns=df_headers) - return pd.DataFrame(data=all_mapped_results) - -def print_dataframe(df : pd.DataFrame, separator : str = '\\n'): + df_headers.extend(column_headers) + df = pd.DataFrame(temp_results, columns=df_headers) + else: + df = pd.DataFrame(temp_results) + return df + + def DataFrame(self, index: str, columns: Optional[List[str]] = None): + """ + Fast method to return a pandas dataframe from a CogStack search. + + Args: + index (str): A list of indices to search. + columns (List[str], optional): A list of column names to include in the DataFrame. If not provided, all columns will be included. + + Returns: + DataFrame: A pd.DataFrame like object containing the retrieved documents. """ - Replace separator string with HTML <br/> tag for printing in Notebook + return ed.DataFrame(es_client=self.elastic, es_index_pattern=index, columns=columns) - Parameters: - ----------- - df : DataFrame - Input DataFrame - separator : str - Separator to be replaced with HTML <br/> - """ - return display(HTML(df.to_html().replace(separator, '
'))) def list_chunker(user_list: List[Any], n: int) -> List[List[Any]]: """ Divide a list into sublists of a specified size. - Parameters: - ---------- - user_list : List[Any] - The list to be divided. - n : int - The size of the sublists. + Args: + user_list (List[Any]): The list to be divided. + n (int): The size of the sublists. Returns: - -------- List[List[Any]]: A list of sublists containing the elements of the input list. """ n=max(1, n) return [user_list[i:i+n] for i in range(0, len(user_list), n)] +def _no_progress_bar(iterable: Iterable, **kwargs): + return iterable diff --git a/cogstack2.py b/cogstack2.py new file mode 100644 index 0000000..24038c8 --- /dev/null +++ b/cogstack2.py @@ -0,0 +1,618 @@ +import getpass +import traceback +from typing import Dict, List, Any, Optional, Iterable, Sequence, Tuple +import elasticsearch +import elasticsearch.helpers as es_helpers +from IPython.display import display, HTML +import pandas as pd +import tqdm + +import warnings +warnings.filterwarnings("ignore") + +from credentials import * + +class CogStack(object): + """ + A class for interacting with Elasticsearch. + + Parameters + ------------ + hosts : List[str] + A list of Elasticsearch host URLs. + """ + ES_TIMEOUT = 300 + + def __init__(self, hosts: List[str]): + self.hosts = hosts + + @classmethod + def with_basic_auth(cls, hosts: List[str], username: Optional[str] = None, password: Optional[str] = None) -> 'CogStack': + """ + Create an instance of CogStack using basic authentication. + + Parameters + ---------- + hosts : List[str] + A list of Elasticsearch host URLs. + username : str, optional + The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username. + password : str, optional + The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password. + Returns + ------- + CogStack: An instance of the CogStack class. + """ + cs = cls(hosts) + cs.use_basic_auth(username, password) + return cs + + @classmethod + def with_api_key_auth(cls, hosts: List[str], api_key: Optional[Dict] = None) -> 'CogStack': + """ + Create an instance of CogStack using API key authentication. + + Parameters + ---------- + hosts : List[str] + A list of Elasticsearch host URLs. + apiKey : Dict, optional + + API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticseach or Kibana + and provided by your CogStack administrator. + + If not provided, the user will be prompted to enter API key "encoded" value. + + Example: + .. code-block:: json + { + "id": "API_KEY_ID", + "api_key": "API_KEY", + "encoded": "API_KEY_ENCODED_STRING" + } + Returns + ------- + CogStack: An instance of the CogStack class. + """ + cs = cls(hosts) + cs.use_api_key_auth(api_key) + return cs + + def use_basic_auth(self, username: Optional[str] = None, password:Optional[str] = None) -> 'CogStack': + """ + Create an instance of CogStack using basic authentication. + If the `username` or `password` parameters are not provided, the user will be prompted to enter them. + + Parameters + ---------- + username : str, optional + The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username. + password : str, optional + The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password. + + Returns + ------- + CogStack: An instance of the CogStack class. + """ + if username is None: + username = input("Username: ") + if password is None: + password = getpass.getpass("Password: ") + + return self.__connect(basic_auth=(username, password) if username and password else None) + + def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': + """ + Create an instance of CogStack using API key authentication. + + Parameters + ---------- + apiKey : Dict, optional + + API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticsearch or Kibana + and provided by your CogStack administrator. + + If not provided, the user will be prompted to enter API key "encoded" value. + + Example: + .. code-block:: json + { + "id": "API_KEY_ID", + "api_key": "API_KEY", + "encoded": "API_KEY_ENCODED_STRING" + } + + Returns + ------- + CogStack: An instance of the CogStack class. + """ + if not api_key: + api_key = {"encoded": input("Encoded API key: ")} + + if api_key is not None: + if isinstance(api_key, str): + # If api_key is a string, it is assumed to be the encoded API key + encoded = api_key + hasEncodedValue = True + elif isinstance(api_key, Dict): + # If api_key is a dictionary, check for "encoded", "id" and "api_key" keys + if "id" in api_key.keys() and api_key["id"] != '' and "api_key" in api_key.keys() and api_key["api_key"] != '': + # If both "id" and "api_key" are present, use them + encoded = None + hasEncodedValue = False + else: + # If "encoded" is present, use it; otherwise prompt for it + encoded = api_key["encoded"] if "encoded" in api_key.keys() and api_key["encoded"] != '' else input("Encoded API key: ") + hasEncodedValue = encoded is not None and encoded != '' + + if(not hasEncodedValue): + api_Id = api_key["id"] if "id" in api_key.keys() and api_key["id"] != '' else input("API Id: ") + api_key = api_key["api_key"] if "api_key" in api_key.keys() and api_key["api_key"] != '' else getpass.getpass("API Key: ") + + return self.__connect(api_key=encoded if hasEncodedValue else (api_Id, api_key)) + + def __connect(self, basic_auth : Optional[Tuple[str,str]] = None, api_key: Optional[str | Tuple[str, str]] = None) -> 'CogStack': + """ Connect to Elasticsearch using the provided credentials. + Parameters + ---------- + basic_auth : Tuple[str, str], optional + A tuple containing the username and password for basic authentication. + api_key : str or Tuple[str, str], optional + The API key or a tuple containing the API key ID and API key for API key authentication. + Returns + ------- + CogStack: An instance of the CogStack class. + Raises + ------ + Exception: If the connection to Elasticsearch fails. + """ + self.elastic = elasticsearch.Elasticsearch(hosts=hosts, + api_key=api_key, + basic_auth=basic_auth, + verify_certs=False, + request_timeout=self.ES_TIMEOUT) + if not self.elastic.ping(): + raise Exception("CogStack connection failed. Please check your host list and credentials and try again.") + print("CogStack connection established successfully.") + return self + + def get_indices_and_aliases(self): + """ + Retrieve indices and their aliases + + Returns: + --------- + A table of indices and aliases to use in subsequent queries + """ + all_aliases = self.elastic.indices.get_alias().body + index_aliases_coll = [] + for index in all_aliases: + index_aliases = dict() + index_aliases['Index'] = index + aliases=[] + for alias in all_aliases[index]['aliases']: + aliases.append(alias) + index_aliases['Aliases'] = ', '.join(aliases) + index_aliases_coll.append(index_aliases) + with pd.option_context('display.max_colwidth', None): + return pd.DataFrame(index_aliases_coll, columns=['Index', 'Aliases']) + + def get_index_fields(self, index: str | Sequence[str]): + """ + Retrieve indices and their fields with data type + + Parameters + ---------- + index: str | Sequence[str] + Name(s) of indices or aliases for which the list of fields is retrieved + + Returns + ---------- + pandas.DataFrame + A DataFrame containing index names and their fields with data types + + Raises + ------ + Exception + If the operation fails for any reason. + """ + try: + all_mappings = self.elastic.indices.get_mapping(index=index, allow_no_indices=False).body + columns= ['Field', 'Type'] + if (isinstance(index, List)): + columns.insert(0, 'Index') + index_mappings_coll = [] + for index in all_mappings: + for property in all_mappings[index]['mappings']['properties']: + index_mapping = dict() + index_mapping['Index'] = index + index_mapping['Field'] = property + index_mapping['Type'] = all_mappings[index]['mappings']['properties'][property]['type'] if "type" in all_mappings[index]['mappings']['properties'][property].keys() else '?' + index_mappings_coll.append(index_mapping) + except Exception as err: + raise Exception(f"Unexpected {err=}, {type(err)=}") + with pd.option_context('display.max_rows', len(index_mappings_coll) + 1): + return display(pd.DataFrame(data= index_mappings_coll, columns=columns)) + + def count_search_results(self, index: str | Sequence[str], query: dict): + """ + Count number of documents returned by the query + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch indices or their aliases to search. + + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other query options which will be ignored + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + .. code-block:: json + {"match": {"title": "python"}}} + """ + query = self.__extract_query(query=query) + count = self.elastic.count(index=index, query=query)['count'] + return f"Number of documents: {format(count, ',')}" + + def read_data_with_scan(self, + index: str, + query: dict, + include_fields: list[str]=None, + size: int=1000, + request_timeout: int=ES_TIMEOUT, + show_progress: bool = True): + """ + Retrieve documents from an Elasticsearch index using search query and elasticsearch scan helper function. + The function converts search results to a Pandas DataFrame and does not return current scroll id if the process fails. + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch indices or their aliases to search. + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other query options which will be used in the search + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block (preferred method to avoid clashing with other parameters) + .. code-block:: json + {"match": {"title": "python"}}} + + include_fields : list[str], optional + A list of fields to be included in search results and presented as columns in the DataFrame. + If not provided, only _index, _id and _score fields will be included. + Columns _index, _id, _score are present in all search results + size : int, optional, default = 1000 + The number of documents to be returned by the query or scroll API during each iteration. + MAX: 10,000. + request_timeout : int, optional, default=300 + The time in seconds to wait for a response from Elasticsearch before timing out. + show_progress : bool, optional, default=True + Whether to show the progress in console. + Returns + ------ + pandas.DataFrame + A DataFrame containing the retrieved documents. + + Raises + ------ + Exception + If the search fails or cancelled by the user. + """ + try: + self.__validate_size(size=size) + if "query" not in query.keys(): + temp_query = query.copy() + query.clear() + query["query"] = temp_query + + scan_results = es_helpers.scan(self.elastic, + index=index, + query=query, + size=size, + request_timeout=request_timeout, + source=False, + fields = include_fields) + all_mapped_results = [] + results = self.elastic.count(index=index, query=query["query"], request_timeout=request_timeout) + pr_bar = tqdm.tqdm(scan_results, total=results["count"], desc="CogStack retrieved...", disable=not show_progress, colour='green') + all_mapped_results = self.__map_search_results(hits=pr_bar) + except BaseException as err: + if(type(err) is KeyboardInterrupt): + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", "\033[0;33m", "\033[0;33m") + pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) + print("Request cancelled and current search_scroll_id deleted...") + else: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") + pr_bar.set_description("CogStack read failed! Processed", refresh=True) + print(Exception(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}")) + finally: + return self.__create_dataframe(all_mapped_results, include_fields) + + def read_data_with_scroll(self, + index: str | Sequence[str], + query: dict, + include_fields:Optional[list[str]]=None, + size: int=1000, + search_scroll_id: Optional[str] = None, + request_timeout: Optional[int]=ES_TIMEOUT, + show_progress: Optional[bool] = True): + + """ + Retrieves documents from an Elasticsearch index using search query and scroll API. + Default scroll timeout is set to 10 minutes. + The function converts search results to a Pandas DataFrame. + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch indices or their aliases to search. + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other query options which will be ignored + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + .. code-block:: json + {"match": {"title": "python"}}} + include_fields : list[str], optional + A list of fields to be included in search results and presented as columns in the DataFrame. + If not provided, only _index, _id and _score fields will be included. + Columns _index, _id, _score are present in all search results + size : int, optional, default = 1000 + The number of documents to be returned by the query or scroll API during each iteration. + MAX: 10,000. + search_scroll_id : str, optional + The value of the last scroll_id returned by scroll API and used to continue the search if the current search fails. + The value of scroll_id times out after 10 minutes. After which the search will have to be restarted. + Note: Absence of this parameter indicates a new search. + request_timeout : int, optional, default=300 + The time in seconds to wait for a response from Elasticsearch before timing out. + show_progress : bool, optional, default=True + Whether to show the progress in console. + IMPORTANT: The progress bar displays the total hits for the query even if continuing the search using `search_scroll_id`. + Returns + ------ + pandas.DataFrame + A DataFrame containing the retrieved documents. + + Raises + ------ + Exception + If the search fails or cancelled by the user. + If the search fails, error message includes the value of current `search_scroll_id` which can be used as a function parameter to continue the search. + IMPORTANT: If the function fails after `scroll` request, the subsequent request will skip results of the failed scroll by the value of `size` parameter. + """ + try: + self.__validate_size(size=size) + query = self.__extract_query(query=query) + result_count = size + all_mapped_results =[] + search_result=None + pr_bar = tqdm.tqdm(desc="CogStack retrieved...", disable=not show_progress, colour='green') + + if search_scroll_id is None: + search_result = self.elastic.search(index=index, + size=size, + query=query, + fields=include_fields, + source=False, + scroll="10m", + timeout=f"{request_timeout}s", + rest_total_hits_as_int=True) + + pr_bar.total = search_result.body['hits']['total'] + hits = search_result.body['hits']['hits'] + result_count = len(hits) + search_scroll_id = search_result.body['_scroll_id'] + all_mapped_results.extend(self.__map_search_results(hits=hits)) + pr_bar.update(len(hits)) + + while search_scroll_id and result_count == size: + # Perform ES scroll request + search_result = self.elastic.scroll(scroll_id=search_scroll_id, scroll="10m", rest_total_hits_as_int=True) + hits = search_result.body['hits']['hits'] + pr_bar.total = pr_bar.total if pr_bar.total else search_result.body['hits']['total'] + all_mapped_results.extend(self.__map_search_results(hits=hits)) + search_scroll_id = search_result.body['_scroll_id'] + result_count = len(hits) + pr_bar.update(result_count) + + self.elastic.clear_scroll(scroll_id = search_scroll_id) + except BaseException as err: + if(type(err) is KeyboardInterrupt): + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", "\033[0;33m", "\033[0;33m") + pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) + self.elastic.clear_scroll(scroll_id = search_scroll_id) + print("Request cancelled and current search_scroll_id deleted...") + else: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") + pr_bar.set_description("CogStack read failed! Processed", refresh=True) + print(Exception(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}"), f"{search_scroll_id=}", sep='\n') + finally: + return self.__create_dataframe(all_mapped_results, include_fields) + + def read_data_with_sorting(self, + index: str | Sequence[str], + query: dict, + include_fields: Optional[list[str]]=None, + size: Optional[int]=1000, + sort: Optional[dict|list[str]] = {"id": "asc"}, + search_after: Optional[list[str|int|float|Any|None]] = None, + request_timeout: Optional[int]=ES_TIMEOUT, + show_progress: Optional[bool] = True): + """ + Retrieve documents from an Elasticsearch index using search query and convert them to a Pandas DataFrame. + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch indices or their aliases to search. + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other query options which will be ignored + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + .. code-block:: json + {"match": {"title": "python"}}} + include_fields : list[str], optional + A list of fields to be included in search results and presented as columns in the DataFrame. + If not provided, only _index, _id and _score fields will be included. + Columns _index, _id, _score are present in all search results + size : int, optional, default = 1000 + The number of documents to be returned by the query or scroll API during each iteration. + MAX: 10,000. + sort : dict|list[str], optional, default = {"id": "asc"} + Sort field name(s) and order (`asc` or `desc`) in dictionary format or list of field names without order. + `{"id":"asc"}` or `id` is added if not provided as a tiebreaker field. + Default sorting order is `asc` + >Example: + - `dict : {"filed_Name" : "desc", "id" : "asc"}` + - `list : ["filed_Name", "id"]` + search_after : list[str|int|float|Any|None], optional + The sort value of the last record in search results. + Can be provided if the a search fails and needs to be restarted from the last successful search. + Use the value of `search_after_value` from the error message + request_timeout : int, optional, default = 300 + The time in seconds to wait for a response from Elasticsearch before timing out. + show_progress : bool, optional + Whether to show the progress in console. Defaults to true. + + Returns + ------ + pandas.DataFrame + A DataFrame containing the retrieved documents. + + Raises + ------ + Exception + If the search fails or cancelled by the user. + Error message includes the value of current `search_after_value` which can be used as a function parameter to continue the search. + """ + try: + result_count = size + all_mapped_results =[] + search_after_value = search_after + + self.__validate_size(size=size) + query = self.__extract_query(query=query) + + if ((type(sort) is dict and 'id' not in sort.keys()) or (type(sort) is list and 'id' not in sort)): + if type(sort) is dict: + sort['id'] = 'asc' + else: + sort.append('id') + + pr_bar = tqdm.tqdm(desc="CogStack retrieved...", disable=not show_progress, colour='green') + + while result_count == size: + search_result = self.elastic.search(index=index, + size=size, + query=query, + fields=include_fields, + source=False, + sort=sort, + search_after=search_after_value, + timeout=f"{request_timeout}s", + track_scores=True, + allow_no_indices=False, + rest_total_hits_as_int=True) + + hits = search_result['hits']['hits'] + all_mapped_results.extend(self.__map_search_results(hits=hits)) + result_count = len(hits) + pr_bar.update(result_count) + search_after_value = hits[-1]['sort'] + pr_bar.total = pr_bar.total if pr_bar.total else search_result.body['hits']['total'] + except BaseException as err: + if(type(err) is KeyboardInterrupt): + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", "\033[0;33m", "\033[0;33m") + pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) + print("Request cancelled.") + else: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") + pr_bar.set_description("CogStack read failed! Processed", refresh=True) + print(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}") + print(f"The last {search_after_value=}") + finally: + return self.__create_dataframe(all_mapped_results, include_fields) + + def __extract_query(self, query: dict): + if "query" in query.keys(): + query = query['query'] + return query + + def __validate_size(self, size): + if size > 10000: + raise ValueError('Size must not be greater then 10000') + + def __map_search_results(self, hits: Iterable): + hit: dict + for hit in hits: + row = dict() + row['_index'] = hit['_index'] + row['_id'] = hit['_id'] + row['_score'] = hit['_score'] + if 'fields' in hit.keys(): + row.update({k: ', '.join(map(str, v)) for k,v in dict(hit['fields']).items()}) + yield row + + def __create_dataframe(self, all_mapped_results, column_headers): + """ + Create a Pandas DataFrame from the search results. + + Parameters + ---------- + all_mapped_results : list + The list of mapped search results. + column_headers : list or None + The list of column headers to include in the DataFrame. + + Returns + ------- + pandas.DataFrame + A DataFrame containing the search results. + """ + df_headers = ['_index', '_id', '_score'] + if column_headers and "*" not in column_headers: + df_headers.extend(column_headers) + return pd.DataFrame(data=all_mapped_results, columns=df_headers) + return pd.DataFrame(data=all_mapped_results) + +def print_dataframe(df : pd.DataFrame, separator : str = '\\n'): + """ + Replace separator string with HTML <br/> tag for printing in Notebook + + Parameters: + ----------- + df : DataFrame + Input DataFrame + separator : str + Separator to be replaced with HTML <br/> + """ + return display(HTML(df.to_html().replace(separator, '
'))) + +def list_chunker(user_list: List[Any], n: int) -> List[List[Any]]: + """ + Divide a list into sublists of a specified size. + + Parameters: + ---------- + user_list : List[Any] + The list to be divided. + n : int + The size of the sublists. + + Returns: + -------- + List[List[Any]]: A list of sublists containing the elements of the input list. + """ + n=max(1, n) + return [user_list[i:i+n] for i in range(0, len(user_list), n)] diff --git a/search/search_template.ipynb b/search/search_template.ipynb index e6122a5..07ff188 100644 --- a/search/search_template.ipynb +++ b/search/search_template.ipynb @@ -17,9 +17,8 @@ "source": [ "import sys\n", "sys.path.append('..')\n", - "import pandas as pd\n", "from credentials import *\n", - "from cogstack import CogStack, print_dataframe" + "from cogstack import CogStack" ] }, { @@ -35,11 +34,7 @@ "metadata": {}, "outputs": [], "source": [ - "cs = CogStack.with_api_key_auth(hosts=hosts, api_key=api_key)\n", - "#cs = CogStack.with_basic_auth(hosts=hosts, username=username, password=password)\n", - "#cs = CogStack(hosts).use_api_key_auth(api_key=api_key)\n", - "#cs = CogStack(hosts).use_basic_auth(username=username, password=password)\n", - "#cs = CogStack(hosts).use_api_key_auth(\"\")\n" + "cs = CogStack(hosts, username=username, password=password, api=True)" ] }, { @@ -49,27 +44,14 @@ "# Check the list of Indices and columns" ] }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "View all indices and their aliases available to this user. Either index names or their aliases can be used to extract data" - ] - }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "print_dataframe(cs.get_indices_and_aliases(), ', ')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "View fields/columns and their data types for provided index names or aliases" + "for i in cs.elastic.indices.get_mapping().keys():\n", + " print(i)" ] }, { @@ -78,14 +60,17 @@ "metadata": {}, "outputs": [], "source": [ - "cs.get_index_fields([])" + "# Check the list of columns in that index\n", + "index = ''\n", + "for col in cs.elastic.indices.get_mapping(index=index)[index]['mappings']['properties'].keys():\n", + " print(col)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "# Set search query parameters" + "# Set parameters" ] }, { @@ -94,7 +79,7 @@ "metadata": {}, "outputs": [], "source": [ - "pt_list = [ ] # example list of patients' patient_TrustNumber here" + "pt_list = [] # example list of patients' patient_TrustNumber here" ] }, { @@ -132,34 +117,24 @@ "metadata": {}, "outputs": [], "source": [ - "search_query = {\n", - " \"bool\": {\n", - " #\"filter\": {\n", - " # \"terms\": {\n", - " # \"patient_TrustNumber\": pt_list\n", - " # }\n", - " #},\n", - " \"must\": [\n", - " {\n", - " \"query_string\": {\n", - " \"query\": \"\",\n", - " \"default_field\":\"\"\n", - " }\n", - " }\n", - " ]\n", + "# Example query structure\n", + "query = {\n", + " \"from\": 0,\n", + " \"size\": 10000,\n", + " \"query\": {\n", + " \"bool\": {\n", + " \"filter\": {\n", + " \"terms\": {\"patient_TrustNumber\": pt_list}\n", + " },\n", + " \"must\": [\n", + " {\"query_string\": {\n", + " \"query\": \"***YOUR LUCENE QUERY HERE***\"}\n", " }\n", - "}\n", - " " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Count the number of documents matching the search query\n", - "cs.count_search_results(index=[], query=search_query)\n" + " ]\n", + " }\n", + " },\n", + " \"_source\": columns # This is a search column filter. remove if all columns are to be retrieved\n", + "}" ] }, { @@ -168,30 +143,7 @@ "tags": [] }, "source": [ - "# Search, Process, and Save\n", - "Use either of the functions to extract search results" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Read data using scan helper\n", - "df = cs.read_data_with_scan(index=[], query=search_query, include_fields=columns)\n", - "print(df)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Read data with scroll API and get scroll id if search fails midway.\n", - "df = cs.read_data_with_scroll(index=[], query=search_query, include_fields=columns)\n", - "print(df)" + "# Search, Process, and Save" ] }, { @@ -200,11 +152,7 @@ "metadata": {}, "outputs": [], "source": [ - "# Read data with sorting and get search_after value if search fails midway.\n", - "# Note: Sorting requires a field to sort by, which should be present in the index. Default sorting is by _id.\n", - "df = cs.read_data_with_sorting(index=[], query=search_query, \n", - " include_fields=columns)\n", - "print(df)" + "df = cs.cogstack2df(query=query, index=index, column_headers=columns)" ] }, { @@ -221,7 +169,6 @@ "outputs": [], "source": [ "# Whatever you want here\n", - "# For example, display the first few rows of the DataFrame\n", "df.head()" ] }, @@ -238,7 +185,6 @@ "metadata": {}, "outputs": [], "source": [ - "# Save the DataFrame to a CSV file\n", "path_to_results = \"../data/cogstack_search_results\"\n", "file_name = \"file_name.csv\"" ] @@ -249,7 +195,7 @@ "metadata": {}, "outputs": [], "source": [ - "df.to_csv(path_to_results + '\\\\' + file_name, index=False)" + "df.to_csv(path_to_results + '/' +file_name, index=False)" ] } ], diff --git a/search/search_template2.ipynb b/search/search_template2.ipynb new file mode 100644 index 0000000..a8053a8 --- /dev/null +++ b/search/search_template2.ipynb @@ -0,0 +1,282 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Seaching CogStack\n", + "\n", + "This script is designed to be a template for cogstack searches" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import sys\n", + "sys.path.append('..')\n", + "import pandas as pd\n", + "from credentials import *\n", + "from cogstack2 import CogStack, print_dataframe" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Login and Initialise" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cs = CogStack.with_api_key_auth(hosts=hosts, api_key=api_key)\n", + "#cs = CogStack.with_basic_auth(hosts=hosts, username=username, password=password)\n", + "#cs = CogStack(hosts).use_api_key_auth(api_key=api_key)\n", + "#cs = CogStack(hosts).use_basic_auth(username=username, password=password)\n", + "#cs = CogStack(hosts).use_api_key_auth(\"\")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Check the list of Indices and columns" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "View all indices and their aliases available to this user. Either index names or their aliases can be used to extract data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print_dataframe(cs.get_indices_and_aliases(), ', ')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "View fields/columns and their data types for provided index names or aliases" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cs.get_index_fields([])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Set search query parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pt_list = [ ] # example list of patients' patient_TrustNumber here" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Columns of interest\n", + "\n", + "Select your fields and list in order of output columns" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "columns = []" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Build query\n", + "\n", + "For further information on [how to build a query can be found here](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html)\n", + "\n", + "Further information on [free text string queries can be found here](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-simple-query-string-query.html)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "search_query = {\n", + " \"bool\": {\n", + " #\"filter\": {\n", + " # \"terms\": {\n", + " # \"patient_TrustNumber\": pt_list\n", + " # }\n", + " #},\n", + " \"must\": [\n", + " {\n", + " \"query_string\": {\n", + " \"query\": \"\",\n", + " \"default_field\":\"\"\n", + " }\n", + " }\n", + " ]\n", + " }\n", + "}\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Count the number of documents matching the search query\n", + "cs.count_search_results(index=[], query=search_query)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "tags": [] + }, + "source": [ + "# Search, Process, and Save\n", + "Use either of the functions to extract search results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Read data using scan helper\n", + "df = cs.read_data_with_scan(index=[], query=search_query, include_fields=columns)\n", + "print(df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Read data with scroll API and get scroll id if search fails midway.\n", + "df = cs.read_data_with_scroll(index=[], query=search_query, include_fields=columns)\n", + "print(df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Read data with sorting and get search_after value if search fails midway.\n", + "# Note: Sorting requires a field to sort by, which should be present in the index. Default sorting is by _id.\n", + "df = cs.read_data_with_sorting(index=[], query=search_query, \n", + " include_fields=columns)\n", + "print(df)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Process" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Whatever you want here\n", + "# For example, display the first few rows of the DataFrame\n", + "df.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Save" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Save the DataFrame to a CSV file\n", + "path_to_results = \"../data/cogstack_search_results\"\n", + "file_name = \"file_name.csv\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df.to_csv(path_to_results + '\\\\' + file_name, index=False)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.5" + }, + "vscode": { + "interpreter": { + "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6" + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} From eda2b5231410795b108c0ed482fd3817363af8a4 Mon Sep 17 00:00:00 2001 From: vitaliyok Date: Tue, 12 Aug 2025 11:34:27 +0100 Subject: [PATCH 07/10] Added some more comments to explain a difference between search functions --- search/search_template2.ipynb | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/search/search_template2.ipynb b/search/search_template2.ipynb index a8053a8..c442f0a 100644 --- a/search/search_template2.ipynb +++ b/search/search_template2.ipynb @@ -178,7 +178,8 @@ "metadata": {}, "outputs": [], "source": [ - "# Read data using scan helper\n", + "# Read data using scan helper function.\n", + "# Does not provide a scroll id, so cannot be resumed if search fails midway.\n", "df = cs.read_data_with_scan(index=[], query=search_query, include_fields=columns)\n", "print(df)" ] @@ -189,7 +190,8 @@ "metadata": {}, "outputs": [], "source": [ - "# Read data with scroll API and get scroll id if search fails midway.\n", + "# Read data with scroll API and get scroll id if search fails midway. \n", + "# Can be used to resume the search from the failed scroll id.\n", "df = cs.read_data_with_scroll(index=[], query=search_query, include_fields=columns)\n", "print(df)" ] @@ -201,6 +203,7 @@ "outputs": [], "source": [ "# Read data with sorting and get search_after value if search fails midway.\n", + "# Can be used to resume the search from the failed search_after value but can be slower than scan or scroll methods for large datasets.\n", "# Note: Sorting requires a field to sort by, which should be present in the index. Default sorting is by _id.\n", "df = cs.read_data_with_sorting(index=[], query=search_query, \n", " include_fields=columns)\n", From b74cdcf8e7aa9582a38aec7548540798b69d0dd3 Mon Sep 17 00:00:00 2001 From: vitaliyok Date: Wed, 13 Aug 2025 17:23:15 +0100 Subject: [PATCH 08/10] Fixed compatibility issues with from Python3.9 and some refactoring --- cogstack.py | 2 +- cogstack2.py | 374 ++++++++++++++++++++++++++++++--------------------- 2 files changed, 225 insertions(+), 151 deletions(-) diff --git a/cogstack.py b/cogstack.py index cd7fc21..cf545fa 100644 --- a/cogstack.py +++ b/cogstack.py @@ -119,7 +119,7 @@ def cogstack2df(self, query: Dict, index: str, column_headers=None, es_gen_size: size=es_gen_size, request_timeout=request_timeout) temp_results = [] - results = self.elastic.count(index=index, query=query['query'], request_timeout=300) # type: ignore + results = self.elastic.count(index=index, query=query['query']) # type: ignore for hit in tqdm(docs_generator, total=results['count'], desc="CogStack retrieved...", disable=not show_progress): row = dict() row['_index'] = hit['_index'] diff --git a/cogstack2.py b/cogstack2.py index 24038c8..6174c54 100644 --- a/cogstack2.py +++ b/cogstack2.py @@ -1,18 +1,17 @@ +from collections.abc import Mapping import getpass import traceback -from typing import Dict, List, Any, Optional, Iterable, Sequence, Tuple +from typing import Dict, List, Any, Optional, Iterable, Sequence, Union +import warnings import elasticsearch import elasticsearch.helpers as es_helpers from IPython.display import display, HTML import pandas as pd import tqdm -import warnings warnings.filterwarnings("ignore") -from credentials import * - -class CogStack(object): +class CogStack(): """ A class for interacting with Elasticsearch. @@ -25,9 +24,13 @@ class CogStack(object): def __init__(self, hosts: List[str]): self.hosts = hosts + self.elastic = None @classmethod - def with_basic_auth(cls, hosts: List[str], username: Optional[str] = None, password: Optional[str] = None) -> 'CogStack': + def with_basic_auth(cls, + hosts: List[str], + username: Optional[str] = None, + password: Optional[str] = None) -> 'CogStack': """ Create an instance of CogStack using basic authentication. @@ -36,9 +39,11 @@ def with_basic_auth(cls, hosts: List[str], username: Optional[str] = None, passw hosts : List[str] A list of Elasticsearch host URLs. username : str, optional - The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username. + The username to use when connecting to Elasticsearch. + If not provided, the user will be prompted to enter a username. password : str, optional - The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password. + The password to use when connecting to Elasticsearch. + If not provided, the user will be prompted to enter a password. Returns ------- CogStack: An instance of the CogStack class. @@ -46,9 +51,11 @@ def with_basic_auth(cls, hosts: List[str], username: Optional[str] = None, passw cs = cls(hosts) cs.use_basic_auth(username, password) return cs - + @classmethod - def with_api_key_auth(cls, hosts: List[str], api_key: Optional[Dict] = None) -> 'CogStack': + def with_api_key_auth(cls, + hosts: List[str], + api_key: Optional[Dict] = None) -> 'CogStack': """ Create an instance of CogStack using API key authentication. @@ -58,8 +65,8 @@ def with_api_key_auth(cls, hosts: List[str], api_key: Optional[Dict] = None) -> A list of Elasticsearch host URLs. apiKey : Dict, optional - API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticseach or Kibana - and provided by your CogStack administrator. + API key object with "id" and "api_key" or "encoded" strings as fields. + Generated in Elasticsearch or Kibana and provided by your CogStack administrator. If not provided, the user will be prompted to enter API key "encoded" value. @@ -78,17 +85,22 @@ def with_api_key_auth(cls, hosts: List[str], api_key: Optional[Dict] = None) -> cs.use_api_key_auth(api_key) return cs - def use_basic_auth(self, username: Optional[str] = None, password:Optional[str] = None) -> 'CogStack': + def use_basic_auth(self, + username: Optional[str] = None, + password:Optional[str] = None) -> 'CogStack': """ Create an instance of CogStack using basic authentication. - If the `username` or `password` parameters are not provided, the user will be prompted to enter them. + If the `username` or `password` parameters are not provided, + the user will be prompted to enter them. Parameters ---------- username : str, optional - The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username. + The username to use when connecting to Elasticsearch. + If not provided, the user will be prompted to enter a username. password : str, optional - The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password. + The password to use when connecting to Elasticsearch. + If not provided, the user will be prompted to enter a password. Returns ------- @@ -101,7 +113,7 @@ def use_basic_auth(self, username: Optional[str] = None, password:Optional[str] return self.__connect(basic_auth=(username, password) if username and password else None) - def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': + def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': """ Create an instance of CogStack using API key authentication. @@ -109,8 +121,8 @@ def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': ---------- apiKey : Dict, optional - API key object with "id" and "api_key" or "encoded" strings as fields. Generated in Elasticsearch or Kibana - and provided by your CogStack administrator. + API key object with "id" and "api_key" or "encoded" strings as fields. + Generated in Elasticsearch or Kibana and provided by your CogStack administrator. If not provided, the user will be prompted to enter API key "encoded" value. @@ -128,37 +140,47 @@ def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': """ if not api_key: api_key = {"encoded": input("Encoded API key: ")} - + has_encoded_value = False + api_id_value, api_key_value = None, None if api_key is not None: if isinstance(api_key, str): # If api_key is a string, it is assumed to be the encoded API key encoded = api_key - hasEncodedValue = True + has_encoded_value = True elif isinstance(api_key, Dict): # If api_key is a dictionary, check for "encoded", "id" and "api_key" keys - if "id" in api_key.keys() and api_key["id"] != '' and "api_key" in api_key.keys() and api_key["api_key"] != '': + if "id" in api_key.keys() and api_key["id"] != '' and "api_key" in api_key.keys() \ + and api_key["api_key"] != '': # If both "id" and "api_key" are present, use them encoded = None - hasEncodedValue = False else: # If "encoded" is present, use it; otherwise prompt for it - encoded = api_key["encoded"] if "encoded" in api_key.keys() and api_key["encoded"] != '' else input("Encoded API key: ") - hasEncodedValue = encoded is not None and encoded != '' - - if(not hasEncodedValue): - api_Id = api_key["id"] if "id" in api_key.keys() and api_key["id"] != '' else input("API Id: ") - api_key = api_key["api_key"] if "api_key" in api_key.keys() and api_key["api_key"] != '' else getpass.getpass("API Key: ") - - return self.__connect(api_key=encoded if hasEncodedValue else (api_Id, api_key)) + encoded = api_key["encoded"] \ + if "encoded" in api_key.keys() and api_key["encoded"] != '' \ + else input("Encoded API key: ") + has_encoded_value = encoded is not None and encoded != '' + + if(not has_encoded_value): + api_id_value = api_key["id"] \ + if "id" in api_key.keys() and api_key["id"] != '' \ + else input("API Id: ") + api_key_value = api_key["api_key"] \ + if "api_key" in api_key.keys() and api_key["api_key"] != '' \ + else getpass.getpass("API Key: ") + + return self.__connect(api_key=encoded if has_encoded_value else (api_id_value, api_key_value)) - def __connect(self, basic_auth : Optional[Tuple[str,str]] = None, api_key: Optional[str | Tuple[str, str]] = None) -> 'CogStack': + def __connect(self, + basic_auth : Optional[tuple[str,str]] = None, + api_key: Optional[Union[str, tuple[str, str], None]] = None) -> 'CogStack': """ Connect to Elasticsearch using the provided credentials. Parameters ---------- basic_auth : Tuple[str, str], optional A tuple containing the username and password for basic authentication. api_key : str or Tuple[str, str], optional - The API key or a tuple containing the API key ID and API key for API key authentication. + The API key or a tuple containing the API key ID and API key + for API key authentication. Returns ------- CogStack: An instance of the CogStack class. @@ -166,13 +188,14 @@ def __connect(self, basic_auth : Optional[Tuple[str,str]] = None, api_key: Optio ------ Exception: If the connection to Elasticsearch fails. """ - self.elastic = elasticsearch.Elasticsearch(hosts=hosts, + self.elastic = elasticsearch.Elasticsearch(hosts=self.hosts, api_key=api_key, basic_auth=basic_auth, verify_certs=False, request_timeout=self.ES_TIMEOUT) if not self.elastic.ping(): - raise Exception("CogStack connection failed. Please check your host list and credentials and try again.") + raise ConnectionError("CogStack connection failed. " \ + "Please check your host list and credentials and try again.") print("CogStack connection established successfully.") return self @@ -187,8 +210,8 @@ def get_indices_and_aliases(self): all_aliases = self.elastic.indices.get_alias().body index_aliases_coll = [] for index in all_aliases: - index_aliases = dict() - index_aliases['Index'] = index + index_aliases = {} + index_aliases['Index'] = index aliases=[] for alias in all_aliases[index]['aliases']: aliases.append(alias) @@ -197,7 +220,7 @@ def get_indices_and_aliases(self): with pd.option_context('display.max_colwidth', None): return pd.DataFrame(index_aliases_coll, columns=['Index', 'Aliases']) - def get_index_fields(self, index: str | Sequence[str]): + def get_index_fields(self, index: Union[str, Sequence[str]]): """ Retrieve indices and their fields with data type @@ -217,56 +240,64 @@ def get_index_fields(self, index: str | Sequence[str]): If the operation fails for any reason. """ try: - all_mappings = self.elastic.indices.get_mapping(index=index, allow_no_indices=False).body + all_mappings = self.elastic.indices\ + .get_mapping(index=index, allow_no_indices=False).body columns= ['Field', 'Type'] - if (isinstance(index, List)): + if isinstance(index, List): columns.insert(0, 'Index') index_mappings_coll = [] - for index in all_mappings: - for property in all_mappings[index]['mappings']['properties']: - index_mapping = dict() - index_mapping['Index'] = index - index_mapping['Field'] = property - index_mapping['Type'] = all_mappings[index]['mappings']['properties'][property]['type'] if "type" in all_mappings[index]['mappings']['properties'][property].keys() else '?' + for index_name in all_mappings: + for property_name in all_mappings[index_name]['mappings']['properties']: + index_mapping = {} + index_mapping['Index'] = index_name + index_mapping['Field'] = property_name + index_mapping['Type'] = \ + all_mappings[index_name]['mappings']['properties'][property_name]['type'] \ + if "type" in all_mappings[index_name]['mappings']\ + ['properties'][property_name].keys() \ + else '?' index_mappings_coll.append(index_mapping) except Exception as err: raise Exception(f"Unexpected {err=}, {type(err)=}") with pd.option_context('display.max_rows', len(index_mappings_coll) + 1): return display(pd.DataFrame(data= index_mappings_coll, columns=columns)) - def count_search_results(self, index: str | Sequence[str], query: dict): - """ - Count number of documents returned by the query - - Parameters - ---------- - index : str or Sequence[str] - The name(s) of the Elasticsearch indices or their aliases to search. - - query : dict - A dictionary containing the search query parameters. - Query can start with `query` key and contain other query options which will be ignored - - .. code-block:: json - {"query": {"match": {"title": "python"}}}} - or only consist of content of `query` block - .. code-block:: json - {"match": {"title": "python"}}} - """ - query = self.__extract_query(query=query) - count = self.elastic.count(index=index, query=query)['count'] - return f"Number of documents: {format(count, ',')}" + def count_search_results(self, index: Union[str, Sequence[str]], query: dict): + """ + Count number of documents returned by the query + + Parameters + ---------- + index : str or Sequence[str] + The name(s) of the Elasticsearch indices or their aliases to search. + + query : dict + A dictionary containing the search query parameters. + Query can start with `query` key and contain other + query options which will be ignored + + .. code-block:: json + {"query": {"match": {"title": "python"}}}} + or only consist of content of `query` block + .. code-block:: json + {"match": {"title": "python"}}} + """ + query = self.__extract_query(query=query) + count = self.elastic.count(index=index, query=query)['count'] + return f"Number of documents: {format(count, ',')}" def read_data_with_scan(self, - index: str, + index: Union[str, Sequence[str]], query: dict, - include_fields: list[str]=None, + include_fields: Optional[list[str]]=None, size: int=1000, request_timeout: int=ES_TIMEOUT, show_progress: bool = True): """ - Retrieve documents from an Elasticsearch index using search query and elasticsearch scan helper function. - The function converts search results to a Pandas DataFrame and does not return current scroll id if the process fails. + Retrieve documents from an Elasticsearch index or + indices using search query and elasticsearch scan helper function. + The function converts search results to a Pandas DataFrame and does + not return current scroll id if the process fails. Parameters ---------- @@ -274,23 +305,28 @@ def read_data_with_scan(self, The name(s) of the Elasticsearch indices or their aliases to search. query : dict A dictionary containing the search query parameters. - Query can start with `query` key and contain other query options which will be used in the search + Query can start with `query` key and contain other + query options which will be used in the search .. code-block:: json {"query": {"match": {"title": "python"}}}} - or only consist of content of `query` block (preferred method to avoid clashing with other parameters) + or only consist of content of `query` block + (preferred method to avoid clashing with other parameters) + .. code-block:: json {"match": {"title": "python"}}} include_fields : list[str], optional - A list of fields to be included in search results and presented as columns in the DataFrame. + A list of fields to be included in search results + and presented as columns in the DataFrame. If not provided, only _index, _id and _score fields will be included. Columns _index, _id, _score are present in all search results size : int, optional, default = 1000 - The number of documents to be returned by the query or scroll API during each iteration. - MAX: 10,000. + The number of documents to be returned by the query or scroll + API during each iteration. MAX: 10,000. request_timeout : int, optional, default=300 - The time in seconds to wait for a response from Elasticsearch before timing out. + The time in seconds to wait for a response + from Elasticsearch before timing out. show_progress : bool, optional, default=True Whether to show the progress in console. Returns @@ -309,6 +345,7 @@ def read_data_with_scan(self, temp_query = query.copy() query.clear() query["query"] = temp_query + pr_bar = None scan_results = es_helpers.scan(self.elastic, index=index, @@ -318,30 +355,35 @@ def read_data_with_scan(self, source=False, fields = include_fields) all_mapped_results = [] - results = self.elastic.count(index=index, query=query["query"], request_timeout=request_timeout) - pr_bar = tqdm.tqdm(scan_results, total=results["count"], desc="CogStack retrieved...", disable=not show_progress, colour='green') + results = self.elastic.count(index=index, query=query["query"]) + pr_bar = tqdm.tqdm(scan_results, total=results["count"], + desc="CogStack retrieved...", + disable=not show_progress, colour='green') all_mapped_results = self.__map_search_results(hits=pr_bar) except BaseException as err: - if(type(err) is KeyboardInterrupt): - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", "\033[0;33m", "\033[0;33m") + if isinstance(err, KeyboardInterrupt): + pr_bar.bar_format ="%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", + "\033[0;33m", + "\033[0;33m") pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) print("Request cancelled and current search_scroll_id deleted...") else: - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") - pr_bar.set_description("CogStack read failed! Processed", refresh=True) + if pr_bar is not None: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", + "\033[0;31m", + "\033[0;31m") + pr_bar.set_description("CogStack read failed! Processed", refresh=True) print(Exception(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}")) - finally: - return self.__create_dataframe(all_mapped_results, include_fields) + return self.__create_dataframe(all_mapped_results, include_fields) - def read_data_with_scroll(self, - index: str | Sequence[str], - query: dict, - include_fields:Optional[list[str]]=None, - size: int=1000, - search_scroll_id: Optional[str] = None, - request_timeout: Optional[int]=ES_TIMEOUT, - show_progress: Optional[bool] = True): - + def read_data_with_scroll(self, + index: Union[str, Sequence[str]], + query: dict, + include_fields:Optional[list[str]]=None, + size: int=1000, + search_scroll_id: Optional[str] = None, + request_timeout: Optional[int]=ES_TIMEOUT, + show_progress: Optional[bool] = True): """ Retrieves documents from an Elasticsearch index using search query and scroll API. Default scroll timeout is set to 10 minutes. @@ -353,29 +395,38 @@ def read_data_with_scroll(self, The name(s) of the Elasticsearch indices or their aliases to search. query : dict A dictionary containing the search query parameters. - Query can start with `query` key and contain other query options which will be ignored + Query can start with `query` key + and contain other query options which will be ignored .. code-block:: json {"query": {"match": {"title": "python"}}}} or only consist of content of `query` block .. code-block:: json {"match": {"title": "python"}}} + include_fields : list[str], optional - A list of fields to be included in search results and presented as columns in the DataFrame. + A list of fields to be included in search results + and presented as columns in the DataFrame. If not provided, only _index, _id and _score fields will be included. Columns _index, _id, _score are present in all search results size : int, optional, default = 1000 - The number of documents to be returned by the query or scroll API during each iteration. + The number of documents to be returned by the query + or scroll API during each iteration. MAX: 10,000. search_scroll_id : str, optional - The value of the last scroll_id returned by scroll API and used to continue the search if the current search fails. - The value of scroll_id times out after 10 minutes. After which the search will have to be restarted. + The value of the last scroll_id + returned by scroll API and used to continue the search + if the current search fails. + The value of scroll_id + times out after 10 minutes. + After which the search will have to be restarted. Note: Absence of this parameter indicates a new search. request_timeout : int, optional, default=300 The time in seconds to wait for a response from Elasticsearch before timing out. show_progress : bool, optional, default=True Whether to show the progress in console. - IMPORTANT: The progress bar displays the total hits for the query even if continuing the search using `search_scroll_id`. + IMPORTANT: The progress bar displays the total hits + for the query even if continuing the search using `search_scroll_id`. Returns ------ pandas.DataFrame @@ -385,8 +436,11 @@ def read_data_with_scroll(self, ------ Exception If the search fails or cancelled by the user. - If the search fails, error message includes the value of current `search_scroll_id` which can be used as a function parameter to continue the search. - IMPORTANT: If the function fails after `scroll` request, the subsequent request will skip results of the failed scroll by the value of `size` parameter. + If the search fails, error message includes the value of current `search_scroll_id` + which can be used as a function parameter to continue the search. + IMPORTANT: If the function fails after `scroll` request, + the subsequent request will skip results of the failed scroll by the + value of `size` parameter. """ try: self.__validate_size(size=size) @@ -394,13 +448,17 @@ def read_data_with_scroll(self, result_count = size all_mapped_results =[] search_result=None - pr_bar = tqdm.tqdm(desc="CogStack retrieved...", disable=not show_progress, colour='green') + include_fields_map: Sequence[Mapping[str, Any]] = include_fields \ + if include_fields is not None else None + + pr_bar = tqdm.tqdm(desc="CogStack retrieved...", + disable=not show_progress, colour='green') if search_scroll_id is None: search_result = self.elastic.search(index=index, size=size, query=query, - fields=include_fields, + fields=include_fields_map, source=False, scroll="10m", timeout=f"{request_timeout}s", @@ -415,35 +473,38 @@ def read_data_with_scroll(self, while search_scroll_id and result_count == size: # Perform ES scroll request - search_result = self.elastic.scroll(scroll_id=search_scroll_id, scroll="10m", rest_total_hits_as_int=True) + search_result = self.elastic.scroll(scroll_id=search_scroll_id, scroll="10m", + rest_total_hits_as_int=True) hits = search_result.body['hits']['hits'] pr_bar.total = pr_bar.total if pr_bar.total else search_result.body['hits']['total'] all_mapped_results.extend(self.__map_search_results(hits=hits)) search_scroll_id = search_result.body['_scroll_id'] result_count = len(hits) pr_bar.update(result_count) - self.elastic.clear_scroll(scroll_id = search_scroll_id) - except BaseException as err: - if(type(err) is KeyboardInterrupt): - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", "\033[0;33m", "\033[0;33m") + except BaseException as err: + if isinstance(err, KeyboardInterrupt): + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", + "\033[0;33m", + "\033[0;33m") pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) self.elastic.clear_scroll(scroll_id = search_scroll_id) print("Request cancelled and current search_scroll_id deleted...") else: - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") - pr_bar.set_description("CogStack read failed! Processed", refresh=True) + if pr_bar is not None: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") + pr_bar.set_description("CogStack read failed! Processed", refresh=True) print(Exception(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}"), f"{search_scroll_id=}", sep='\n') - finally: - return self.__create_dataframe(all_mapped_results, include_fields) + + return self.__create_dataframe(all_mapped_results, include_fields) def read_data_with_sorting(self, - index: str | Sequence[str], + index: Union[str, Sequence[str]], query: dict, include_fields: Optional[list[str]]=None, size: Optional[int]=1000, - sort: Optional[dict|list[str]] = {"id": "asc"}, - search_after: Optional[list[str|int|float|Any|None]] = None, + sort: Optional[Union[dict,list[str]]] = None, + search_after: Optional[list[Union[str,int,float,Any,None]]] = None, request_timeout: Optional[int]=ES_TIMEOUT, show_progress: Optional[bool] = True): """ @@ -494,37 +555,44 @@ def read_data_with_sorting(self, ------ Exception If the search fails or cancelled by the user. - Error message includes the value of current `search_after_value` which can be used as a function parameter to continue the search. + Error message includes the value of current `search_after_value` + which can be used as a function parameter to continue the search. """ try: result_count = size all_mapped_results =[] + if sort is None: + sort = {'id': 'asc'} search_after_value = search_after + include_fields_map: Sequence[Mapping[str, Any]] = include_fields \ + if include_fields is not None \ + else None self.__validate_size(size=size) query = self.__extract_query(query=query) - if ((type(sort) is dict and 'id' not in sort.keys()) or (type(sort) is list and 'id' not in sort)): - if type(sort) is dict: - sort['id'] = 'asc' + if ((isinstance(sort, dict) and 'id' not in sort.keys()) + or (isinstance(sort, list) and 'id' not in sort)): + if isinstance(sort, dict): + sort['id'] = 'asc' else: - sort.append('id') - - pr_bar = tqdm.tqdm(desc="CogStack retrieved...", disable=not show_progress, colour='green') + sort.append('id') + pr_bar = tqdm.tqdm(desc="CogStack retrieved...", + disable=not show_progress, + colour='green') while result_count == size: search_result = self.elastic.search(index=index, size=size, query=query, - fields=include_fields, + fields=include_fields_map, source=False, sort=sort, search_after=search_after_value, timeout=f"{request_timeout}s", track_scores=True, allow_no_indices=False, - rest_total_hits_as_int=True) - + rest_total_hits_as_int=True) hits = search_result['hits']['hits'] all_mapped_results.extend(self.__map_search_results(hits=hits)) result_count = len(hits) @@ -532,17 +600,22 @@ def read_data_with_sorting(self, search_after_value = hits[-1]['sort'] pr_bar.total = pr_bar.total if pr_bar.total else search_result.body['hits']['total'] except BaseException as err: - if(type(err) is KeyboardInterrupt): - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", "\033[0;33m", "\033[0;33m") + if isinstance(err, KeyboardInterrupt): + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", + "\033[0;33m", + "\033[0;33m") pr_bar.set_description("CogStack read cancelled! Processed", refresh=True) print("Request cancelled.") else: - pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", "\033[0;31m", "\033[0;31m") - pr_bar.set_description("CogStack read failed! Processed", refresh=True) + if pr_bar is not None: + pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;31m", + "\033[0;31m", + "\033[0;31m") + pr_bar.set_description("CogStack read failed! Processed", refresh=True) print(f"Unexpected {err=},\n {traceback.format_exc()}, {type(err)=}") print(f"The last {search_after_value=}") - finally: - return self.__create_dataframe(all_mapped_results, include_fields) + + return self.__create_dataframe(all_mapped_results, include_fields) def __extract_query(self, query: dict): if "query" in query.keys(): @@ -565,33 +638,34 @@ def __map_search_results(self, hits: Iterable): yield row def __create_dataframe(self, all_mapped_results, column_headers): - """ - Create a Pandas DataFrame from the search results. + """ + Create a Pandas DataFrame from the search results. - Parameters - ---------- + Parameters + ---------- all_mapped_results : list The list of mapped search results. column_headers : list or None The list of column headers to include in the DataFrame. - Returns - ------- - pandas.DataFrame - A DataFrame containing the search results. - """ - df_headers = ['_index', '_id', '_score'] - if column_headers and "*" not in column_headers: - df_headers.extend(column_headers) - return pd.DataFrame(data=all_mapped_results, columns=df_headers) - return pd.DataFrame(data=all_mapped_results) + Returns + ------- + pandas.DataFrame + A DataFrame containing the search results. + """ + df_headers = ['_index', '_id', '_score'] + if column_headers and "*" not in column_headers: + df_headers.extend(column_headers) + return pd.DataFrame(data=all_mapped_results, columns=df_headers) + return pd.DataFrame(data=all_mapped_results) def print_dataframe(df : pd.DataFrame, separator : str = '\\n'): """ - Replace separator string with HTML <br/> tag for printing in Notebook + Replace separator string with HTML <br/> + tag for printing in Notebook - Parameters: - ----------- + Parameters: + ----------- df : DataFrame Input DataFrame separator : str From cd05f54d45eb45eabf666e247764112223c32241 Mon Sep 17 00:00:00 2001 From: vitaliyok Date: Thu, 14 Aug 2025 17:09:56 +0100 Subject: [PATCH 09/10] Fixed type matching issues flagged during build --- cogstack2.py | 64 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 23 deletions(-) diff --git a/cogstack2.py b/cogstack2.py index 6174c54..2cbaaf8 100644 --- a/cogstack2.py +++ b/cogstack2.py @@ -24,7 +24,7 @@ class CogStack(): def __init__(self, hosts: List[str]): self.hosts = hosts - self.elastic = None + self.elastic: elasticsearch.Elasticsearch @classmethod def with_basic_auth(cls, @@ -138,11 +138,13 @@ def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': ------- CogStack: An instance of the CogStack class. """ + has_encoded_value = False + api_id_value:str + api_key_value:str + if not api_key: api_key = {"encoded": input("Encoded API key: ")} - has_encoded_value = False - api_id_value, api_key_value = None, None - if api_key is not None: + else: if isinstance(api_key, str): # If api_key is a string, it is assumed to be the encoded API key encoded = api_key @@ -161,18 +163,18 @@ def use_api_key_auth(self, api_key: Optional[Dict] = None) -> 'CogStack': has_encoded_value = encoded is not None and encoded != '' if(not has_encoded_value): - api_id_value = api_key["id"] \ + api_id_value = str(api_key["id"] \ if "id" in api_key.keys() and api_key["id"] != '' \ - else input("API Id: ") - api_key_value = api_key["api_key"] \ + else input("API Id: ")) + api_key_value = str(api_key["api_key"] \ if "api_key" in api_key.keys() and api_key["api_key"] != '' \ - else getpass.getpass("API Key: ") + else getpass.getpass("API Key: ")) return self.__connect(api_key=encoded if has_encoded_value else (api_id_value, api_key_value)) def __connect(self, basic_auth : Optional[tuple[str,str]] = None, - api_key: Optional[Union[str, tuple[str, str], None]] = None) -> 'CogStack': + api_key: Optional[Union[str, tuple[str, str]]] = None) -> 'CogStack': """ Connect to Elasticsearch using the provided credentials. Parameters ---------- @@ -189,10 +191,10 @@ def __connect(self, Exception: If the connection to Elasticsearch fails. """ self.elastic = elasticsearch.Elasticsearch(hosts=self.hosts, - api_key=api_key, - basic_auth=basic_auth, - verify_certs=False, - request_timeout=self.ES_TIMEOUT) + api_key=api_key, + basic_auth=basic_auth, + verify_certs=False, + request_timeout=self.ES_TIMEOUT) if not self.elastic.ping(): raise ConnectionError("CogStack connection failed. " \ "Please check your host list and credentials and try again.") @@ -240,6 +242,8 @@ def get_index_fields(self, index: Union[str, Sequence[str]]): If the operation fails for any reason. """ try: + if len(index) == 0: + raise ValueError('Provide at least one index or index alias name') all_mappings = self.elastic.indices\ .get_mapping(index=index, allow_no_indices=False).body columns= ['Field', 'Type'] @@ -282,8 +286,10 @@ def count_search_results(self, index: Union[str, Sequence[str]], query: dict): .. code-block:: json {"match": {"title": "python"}}} """ + if len(index) == 0: + raise ValueError('Provide at least one index or index alias name') query = self.__extract_query(query=query) - count = self.elastic.count(index=index, query=query)['count'] + count = self.elastic.count(index=index, query=query, allow_no_indices=False)['count'] return f"Number of documents: {format(count, ',')}" def read_data_with_scan(self, @@ -340,12 +346,14 @@ def read_data_with_scan(self, If the search fails or cancelled by the user. """ try: + if len(index) == 0: + raise ValueError('Provide at least one index or index alias name') self.__validate_size(size=size) if "query" not in query.keys(): temp_query = query.copy() query.clear() query["query"] = temp_query - pr_bar = None + pr_bar: tqdm.tqdm = None scan_results = es_helpers.scan(self.elastic, index=index, @@ -353,7 +361,8 @@ def read_data_with_scan(self, size=size, request_timeout=request_timeout, source=False, - fields = include_fields) + fields = include_fields, + allow_no_indices=False,) all_mapped_results = [] results = self.elastic.count(index=index, query=query["query"]) pr_bar = tqdm.tqdm(scan_results, total=results["count"], @@ -443,13 +452,15 @@ def read_data_with_scroll(self, value of `size` parameter. """ try: + if len(index) == 0: + raise ValueError('Provide at least one index or index alias name') self.__validate_size(size=size) query = self.__extract_query(query=query) result_count = size all_mapped_results =[] search_result=None - include_fields_map: Sequence[Mapping[str, Any]] = include_fields \ - if include_fields is not None else None + include_fields_map: Union[Sequence[Mapping[str, Any]], None] = \ + [{"field": field} for field in include_fields] if include_fields is not None else None pr_bar = tqdm.tqdm(desc="CogStack retrieved...", disable=not show_progress, colour='green') @@ -462,6 +473,7 @@ def read_data_with_scroll(self, source=False, scroll="10m", timeout=f"{request_timeout}s", + allow_no_indices=False, rest_total_hits_as_int=True) pr_bar.total = search_result.body['hits']['total'] @@ -470,6 +482,8 @@ def read_data_with_scroll(self, search_scroll_id = search_result.body['_scroll_id'] all_mapped_results.extend(self.__map_search_results(hits=hits)) pr_bar.update(len(hits)) + if search_result["_shards"]["failed"] > 0: + raise LookupError(search_result["_shards"]["failures"]) while search_scroll_id and result_count == size: # Perform ES scroll request @@ -559,14 +573,15 @@ def read_data_with_sorting(self, which can be used as a function parameter to continue the search. """ try: + if len(index) == 0: + raise ValueError('Provide at least one index or index alias name') result_count = size all_mapped_results =[] if sort is None: sort = {'id': 'asc'} search_after_value = search_after - include_fields_map: Sequence[Mapping[str, Any]] = include_fields \ - if include_fields is not None \ - else None + include_fields_map: Union[Sequence[Mapping[str, Any]], None] = \ + [{"field": field} for field in include_fields] if include_fields is not None else None self.__validate_size(size=size) query = self.__extract_query(query=query) @@ -591,6 +606,7 @@ def read_data_with_sorting(self, search_after=search_after_value, timeout=f"{request_timeout}s", track_scores=True, + track_total_hits=True, allow_no_indices=False, rest_total_hits_as_int=True) hits = search_result['hits']['hits'] @@ -599,6 +615,8 @@ def read_data_with_sorting(self, pr_bar.update(result_count) search_after_value = hits[-1]['sort'] pr_bar.total = pr_bar.total if pr_bar.total else search_result.body['hits']['total'] + if search_result["_shards"]["failed"] > 0: + raise LookupError(search_result["_shards"]["failures"]) except BaseException as err: if isinstance(err, KeyboardInterrupt): pr_bar.bar_format = "%s{l_bar}%s{bar}%s{r_bar}" % ("\033[0;33m", @@ -619,12 +637,12 @@ def read_data_with_sorting(self, def __extract_query(self, query: dict): if "query" in query.keys(): - query = query['query'] + return query['query'] return query def __validate_size(self, size): if size > 10000: - raise ValueError('Size must not be greater then 10000') + raise ValueError('Size must not be greater than 10000') def __map_search_results(self, hits: Iterable): hit: dict From 18995083beec91005cdedf41fc4b8e9feacca800 Mon Sep 17 00:00:00 2001 From: vitaliyok Date: Fri, 15 Aug 2025 10:36:37 +0100 Subject: [PATCH 10/10] Fixed yet another Incompatible type issue --- cogstack2.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cogstack2.py b/cogstack2.py index 2cbaaf8..151986f 100644 --- a/cogstack2.py +++ b/cogstack2.py @@ -353,7 +353,8 @@ def read_data_with_scan(self, temp_query = query.copy() query.clear() query["query"] = temp_query - pr_bar: tqdm.tqdm = None + pr_bar = tqdm.tqdm(desc="CogStack retrieved...", + disable=not show_progress, colour='green') scan_results = es_helpers.scan(self.elastic, index=index, @@ -364,10 +365,8 @@ def read_data_with_scan(self, fields = include_fields, allow_no_indices=False,) all_mapped_results = [] - results = self.elastic.count(index=index, query=query["query"]) - pr_bar = tqdm.tqdm(scan_results, total=results["count"], - desc="CogStack retrieved...", - disable=not show_progress, colour='green') + pr_bar.iterable = scan_results + pr_bar.total = self.elastic.count(index=index, query=query["query"])["count"] all_mapped_results = self.__map_search_results(hits=pr_bar) except BaseException as err: if isinstance(err, KeyboardInterrupt):