diff --git a/README.md b/README.md index 97ea6d1..310f06b 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,8 @@ source: meilisearch: api_url: http://192.168.123.205:7700 api_key: + insert_size: 1000 + insert_interval: 10 sync: - table: collection index: beauty-collections @@ -148,6 +150,13 @@ MeiliSearch configuration. - `api_url`: the MeiliSearch API URL. - `api_key`: the MeiliSearch API key. +- `insert_size`: insert after collecting this many documents, optional. +- `insert_interval`: insert after this many seconds have passed, optional. + +If nether `insert_size` nor `insert_interval` is set, it will insert each document immediately. + +If you prefer performance, just set and increase `insert_size` and `insert_interval`. The insert will be made as long as +one of the conditions is met. ### sync diff --git a/config.yml b/config.yml index 8f5ea83..a4b7e06 100644 --- a/config.yml +++ b/config.yml @@ -13,6 +13,8 @@ source: meilisearch: api_url: http://192.168.123.205:7700 api_key: + insert_size: 1000 + insert_interval: 10 sync: - table: test index: test diff --git a/meilisync/event.py b/meilisync/event.py new file mode 100644 index 0000000..20e07b1 --- /dev/null +++ b/meilisync/event.py @@ -0,0 +1,38 @@ +from meilisync.enums import EventType +from meilisync.schemas import Event +from meilisync.settings import Sync + + +class EventCollection: + def __init__(self): + self._size = 0 + self._events = {} + + def add_event(self, sync: Sync, event: Event): + pk = event.data[sync.pk] + self._events.setdefault(sync, {}) + self._events[sync][pk] = event + + @property + def size(self): + return self._size + + @property + def pop_events(self): + updated_events = {} + created_events = {} + deleted_events = {} + for sync, events in self._events.items(): + updated_events[sync] = [] + created_events[sync] = [] + deleted_events[sync] = [] + for event in events.values(): + if event.type == EventType.create: + created_events[sync].append(event) + elif event.type == EventType.update: + updated_events[sync].append(event) + elif event.type == EventType.delete: + deleted_events[sync].append(event) + self._events = {} + self._size = 0 + return created_events, updated_events, deleted_events diff --git a/meilisync/main.py b/meilisync/main.py index 573e4b6..d6324d8 100644 --- a/meilisync/main.py +++ b/meilisync/main.py @@ -6,6 +6,7 @@ from loguru import logger from meilisync.discover import get_progress, get_source +from meilisync.event import EventCollection from meilisync.meili import Meili from meilisync.schemas import Event from meilisync.settings import Settings @@ -47,9 +48,7 @@ async def _(): **settings.source.dict(exclude={"type"}), ) meilisearch = settings.meilisearch - meili = Meili( - settings.debug, meilisearch.api_url, meilisearch.api_key, settings.plugins_cls() - ) + meili = Meili(meilisearch.api_url, meilisearch.api_key, settings.plugins_cls()) context.obj["current_progress"] = current_progress context.obj["source"] = source context.obj["meili"] = meili @@ -63,12 +62,17 @@ async def _(): def start( context: typer.Context, ): + current_progress = context.obj["current_progress"] + source = context.obj["source"] + meili = context.obj["meili"] + settings = context.obj["settings"] + progress = context.obj["progress"] + meili_settings = settings.meilisearch + collection = EventCollection() + lock = None + async def _(): - current_progress = context.obj["current_progress"] - source = context.obj["source"] - meili = context.obj["meili"] - settings = context.obj["settings"] - progress = context.obj["progress"] + nonlocal current_progress if not current_progress: for sync in settings.sync: if sync.full: @@ -85,13 +89,44 @@ async def _(): ) logger.info(f'Start increment sync data from "{settings.source.type}" to MeiliSearch...') async for event in source: + if settings.debug: + logger.debug(event) + current_progress = event.progress if isinstance(event, Event): sync = settings.get_sync(event.table) - if sync: + if not sync: + continue + if not meili_settings.insert_size and not meili_settings.insert_interval: await meili.handle_event(event, sync) - await progress.set(**event.progress) - - asyncio.run(_()) + await progress.set(**current_progress) + else: + collection.add_event(sync, event) + if collection.size >= meili_settings.insert_size: + async with lock: + await meili.handle_events(collection) + await progress.set(**current_progress) + else: + await progress.set(**current_progress) + + async def interval(): + if not settings.meilisearch.insert_interval: + return + while True: + await asyncio.sleep(settings.meilisearch.insert_interval) + try: + async with lock: + await meili.handle_events(collection) + await progress.set(**current_progress) + except Exception as e: + logger.exception(e) + logger.error(f"Error when insert data to MeiliSearch: {e}") + + async def run(): + nonlocal lock + lock = asyncio.Lock() + await asyncio.gather(_(), interval()) + + asyncio.run(run()) @app.command(help="Delete all data in MeiliSearch and full sync") diff --git a/meilisync/meili.py b/meilisync/meili.py index d6e9b25..2078317 100644 --- a/meilisync/meili.py +++ b/meilisync/meili.py @@ -1,9 +1,9 @@ from typing import List, Optional, Type, Union -from loguru import logger from meilisearch_python_async import Client from meilisync.enums import EventType +from meilisync.event import EventCollection from meilisync.plugin import Plugin from meilisync.schemas import Event from meilisync.settings import Sync @@ -12,7 +12,6 @@ class Meili: def __init__( self, - debug: bool, api_url: str, api_key: str, plugins: Optional[List[Union[Type[Plugin], Plugin]]] = None, @@ -21,7 +20,6 @@ def __init__( api_url, api_key, ) - self.debug = debug self.plugins = plugins or [] async def add_full_data(self, index: str, pk: str, data: list): @@ -37,9 +35,16 @@ async def get_count(self, index: str): stats = await self.client.index(index).get_stats() return stats.number_of_documents - async def handle_event(self, event: Event, sync: Sync): - if self.debug: - logger.debug(event) + async def handle_events(self, collection: EventCollection): + created_events, updated_events, deleted_events = collection.pop_events + for sync, events in created_events.items(): + await self.handle_events_by_type(sync, events, EventType.create) + for sync, events in updated_events.items(): + await self.handle_events_by_type(sync, events, EventType.update) + for sync, events in deleted_events.items(): + await self.handle_events_by_type(sync, events, EventType.delete) + + async def handle_plugins_pre(self, sync: Sync, event: Event): for plugin in self.plugins: if isinstance(plugin, Plugin): event = await plugin.pre_event(event) @@ -50,13 +55,9 @@ async def handle_event(self, event: Event, sync: Sync): event = await plugin.pre_event(event) else: event = await plugin().pre_event(event) - index = self.client.index(sync.index_name) - if event.type == EventType.create: - await index.add_documents([event.mapping_data(sync.fields)], primary_key=sync.pk) - elif event.type == EventType.update: - await index.update_documents([event.mapping_data(sync.fields)], primary_key=sync.pk) - elif event.type == EventType.delete: - await index.delete_documents([str(event.data[sync.pk])]) + return event + + async def handle_plugins_post(self, sync: Sync, event: Event): for plugin in self.plugins: if isinstance(plugin, Plugin): event = await plugin.post_event(event) @@ -67,3 +68,34 @@ async def handle_event(self, event: Event, sync: Sync): event = await plugin.post_event(event) else: event = await plugin().post_event(event) + return event + + async def handle_events_by_type(self, sync: Sync, events: List[Event], event_type: EventType): + if not events: + return + index = self.client.index(sync.index_name) + for event in events: + await self.handle_plugins_pre(sync, event) + if event_type == EventType.create: + await index.add_documents( + [event.mapping_data(sync.fields) for event in events], primary_key=sync.pk + ) + elif event_type == EventType.update: + await index.update_documents( + [event.mapping_data(sync.fields) for event in events], primary_key=sync.pk + ) + elif event_type == EventType.delete: + await index.delete_documents([str(event.data[sync.pk]) for event in events]) + for event in events: + await self.handle_plugins_post(sync, event) + + async def handle_event(self, event: Event, sync: Sync): + event = await self.handle_plugins_pre(sync, event) + index = self.client.index(sync.index_name) + if event.type == EventType.create: + await index.add_documents([event.mapping_data(sync.fields)], primary_key=sync.pk) + elif event.type == EventType.update: + await index.update_documents([event.mapping_data(sync.fields)], primary_key=sync.pk) + elif event.type == EventType.delete: + await index.delete_documents([str(event.data[sync.pk])]) + await self.handle_plugins_post(sync, event) diff --git a/meilisync/settings.py b/meilisync/settings.py index 91cce3c..889da0a 100644 --- a/meilisync/settings.py +++ b/meilisync/settings.py @@ -17,6 +17,8 @@ class Config: class MeiliSearch(BaseModel): api_url: str api_key: Optional[str] + insert_size: Optional[int] + insert_interval: Optional[int] class BasePlugin(BaseModel): @@ -44,6 +46,9 @@ class Sync(BasePlugin): def index_name(self): return self.index or self.table + def __hash__(self): + return hash(self.table) + class Progress(BaseModel): type: ProgressType