From 9e7ca110f53d5e1dbf81680b2d24916df93d93f1 Mon Sep 17 00:00:00 2001 From: igeni Date: Sat, 23 Mar 2024 19:46:46 +0300 Subject: [PATCH] Changed concatenation of strings to f-strings to avoid potential type's mismatch, simplify the code and unify it with others parts of code --- datastore/providers/milvus_datastore.py | 79 ++++++++----------------- 1 file changed, 26 insertions(+), 53 deletions(-) diff --git a/datastore/providers/milvus_datastore.py b/datastore/providers/milvus_datastore.py index dfd404d28..f9384e2f9 100644 --- a/datastore/providers/milvus_datastore.py +++ b/datastore/providers/milvus_datastore.py @@ -138,14 +138,10 @@ def _create_connection(self): if ( x[1] and ("address" in addr) - and (addr["address"] == "{}:{}".format(MILVUS_HOST, MILVUS_PORT)) + and (addr["address"] == f"{MILVUS_HOST}:{MILVUS_PORT}") ): self.alias = x[0] - logger.info( - "Reuse connection to Milvus server '{}:{}' with alias '{:s}'".format( - MILVUS_HOST, MILVUS_PORT, self.alias - ) - ) + logger.info(f"Reuse connection to Milvus server '{MILVUS_HOST}:{MILVUS_PORT}' with alias '{self.alias}'") break # Connect to the Milvus instance using the passed in Environment variables @@ -159,17 +155,9 @@ def _create_connection(self): password=MILVUS_PASSWORD, # type: ignore secure=MILVUS_USE_SECURITY, ) - logger.info( - "Create connection to Milvus server '{}:{}' with alias '{:s}'".format( - MILVUS_HOST, MILVUS_PORT, self.alias - ) - ) + logger.info(f"Create connection to Milvus server '{MILVUS_HOST}:{MILVUS_PORT}' with alias '{self.alias}'") except Exception as e: - logger.error( - "Failed to create connection to Milvus server '{}:{}', error: {}".format( - MILVUS_HOST, MILVUS_PORT, e - ) - ) + logger.error(f"Failed to create connection to Milvus server '{MILVUS_HOST}:{MILVUS_PORT}', error: {e}") def _create_collection(self, collection_name, create_new: bool) -> None: """Create a collection based on environment and passed in variables. @@ -197,9 +185,7 @@ def _create_collection(self, collection_name, create_new: bool) -> None: ) self._schema_ver = "V2" logger.info( - "Create Milvus collection '{}' with schema {} and consistency level {}".format( - collection_name, self._schema_ver, self._consistency_level - ) + f"Create Milvus collection '{collection_name}' with schema {self._schema_ver} and consistency level {self._consistency_level}" ) else: # If the collection exists, point to it @@ -209,15 +195,9 @@ def _create_collection(self, collection_name, create_new: bool) -> None: if field.name == "id" and field.is_primary: self._schema_ver = "V2" break - logger.info( - "Milvus collection '{}' already exists with schema {}".format( - collection_name, self._schema_ver - ) - ) + logger.info(f"Milvus collection '{collection_name}' already exists with schema {self._schema_ver}") except Exception as e: - logger.error( - "Failed to create collection '{}', error: {}".format(collection_name, e) - ) + logger.error(f"Failed to create collection '{collection_name}', error: {e}") def _create_index(self): # TODO: verify index/search params passed by os.environ @@ -229,7 +209,7 @@ def _create_index(self): if self.index_params is not None: # Convert the string format to JSON format parameters passed by MILVUS_INDEX_PARAMS self.index_params = json.loads(self.index_params) - logger.info("Create Milvus index: {}".format(self.index_params)) + logger.info(f"Create Milvus index: {self.index_params}") # Create an index on the 'embedding' field with the index params found in init self.col.create_index( EMBEDDING_FIELD, index_params=self.index_params @@ -242,17 +222,10 @@ def _create_index(self): "index_type": "HNSW", "params": {"M": 8, "efConstruction": 64}, } - logger.info( - "Attempting creation of Milvus '{}' index".format( - i_p["index_type"] - ) - ) + logger.info(f"Attempting creation of Milvus '{i_p['index_type']}' index") self.col.create_index(EMBEDDING_FIELD, index_params=i_p) self.index_params = i_p - logger.info( - "Creation of Milvus '{}' index successful".format( - i_p["index_type"] - ) + logger.info(f"Creation of Milvus '{i_p['index_type']}' index successful") ) # If create fails, most likely due to being Zilliz Cloud instance, try to create an AutoIndex except MilvusException: @@ -271,7 +244,7 @@ def _create_index(self): for index in self.col.indexes: idx = index.to_dict() if idx["field"] == EMBEDDING_FIELD: - logger.info("Index already exists: {}".format(idx)) + logger.info(f"Index already exists: {idx}") self.index_params = idx["index_param"] break @@ -304,9 +277,9 @@ def _create_index(self): self.search_params = default_search_params[ self.index_params["index_type"] ] - logger.info("Milvus search parameters: {}".format(self.search_params)) + logger.info(f"Milvus search parameters: {self.search_params}") except Exception as e: - logger.error("Failed to create index, error: {}".format(e)) + logger.error(f"Failed to create index, error: {e}") async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]: """Upsert chunks into the datastore. @@ -362,7 +335,7 @@ async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]: # self.col.flush() return doc_ids except Exception as e: - logger.error("Failed to insert records, error: {}".format(e)) + logger.error(f"Failed to insert records, error: {e}") return [] def _get_values(self, chunk: DocumentChunk) -> List[any] | None: # type: ignore @@ -396,7 +369,7 @@ def _get_values(self, chunk: DocumentChunk) -> List[any] | None: # type: ignore x = values.get(key) or default # If one of our required fields is missing, ignore the entire entry if x is Required: - logger.info("Chunk " + values["id"] + " missing " + key + " skipping") + logger.info(f"Chunk {values['id']} missing {key} skipping") return None # Add the corresponding value if it passes the tests ret.append(x) @@ -468,7 +441,7 @@ async def _single_query(query: QueryWithEmbedding) -> QueryResult: return QueryResult(query=query.query, results=results) except Exception as e: - logger.error("Failed to query, error: {}".format(e)) + logger.error(f"Failed to query, error: {e}") return QueryResult(query=query.query, results=[]) results: List[QueryResult] = await asyncio.gather( @@ -493,7 +466,7 @@ async def delete( if delete_all: coll_name = self.col.name logger.info( - "Delete the entire collection {} and create new one".format(coll_name) + f"Delete the entire collection {coll_name} and create new one" ) # Release the collection from memory self.col.release() @@ -514,14 +487,14 @@ async def delete( # in future version we can delete by expression if (ids is not None) and len(ids) > 0: # Add quotation marks around the string format id - ids = ['"' + str(id) + '"' for id in ids] + ids = [f'"{id}"' for id in ids] # Query for the pk's of entries that match id's ids = self.col.query(f"document_id in [{','.join(ids)}]") # Convert to list of pks pks = [str(entry[pk_name]) for entry in ids] # type: ignore # for schema V2, the "id" is varchar, rewrite the expression if self._schema_ver != "V1": - pks = ['"' + pk + '"' for pk in pks] + pks = [f'"{pk}"' for pk in pks] # Delete by ids batch by batch(avoid too long expression) logger.info( @@ -537,7 +510,7 @@ async def delete( # Increment our deleted count delete_count += int(res.delete_count) # type: ignore except Exception as e: - logger.error("Failed to delete by ids, error: {}".format(e)) + logger.error(f"Failed to delete by ids, error: {e}") try: # Check if empty filter @@ -552,7 +525,7 @@ async def delete( pks = [str(entry[pk_name]) for entry in res] # type: ignore # for schema V2, the "id" is varchar, rewrite the expression if self._schema_ver != "V1": - pks = ['"' + pk + '"' for pk in pks] + pks = [f'"{pk}"' for pk in pks] # Check to see if there are valid pk's to delete, delete batch by batch(avoid too long expression) while len(pks) > 0: # type: ignore batch_pks = pks[:batch_size] @@ -562,7 +535,7 @@ async def delete( # Increment our delete count delete_count += int(res.delete_count) # type: ignore except Exception as e: - logger.error("Failed to delete by filter, error: {}".format(e)) + logger.error(f"Failed to delete by filter, error: {e}") logger.info("{:d} records deleted".format(delete_count)) @@ -588,18 +561,18 @@ def _get_filter(self, filter: DocumentMetadataFilter) -> Optional[str]: # Convert start_date to int and add greater than or equal logic if field == "start_date": filters.append( - "(created_at >= " + str(to_unix_timestamp(value)) + ")" + f"(created_at >= {to_unix_timestamp(value)})" ) # Convert end_date to int and add less than or equal logic elif field == "end_date": filters.append( - "(created_at <= " + str(to_unix_timestamp(value)) + ")" + f"(created_at <= {to_unix_timestamp(value)})" ) # Convert Source to its string value and check equivalency elif field == "source": - filters.append("(" + field + ' == "' + str(value.value) + '")') + filters.append(f'({field} == "{value.value}")') # Check equivalency of rest of string fields else: - filters.append("(" + field + ' == "' + str(value) + '")') + filters.append(f'({field} == "{value}")') # Join all our expressions with `and`` return " and ".join(filters)