Skip to content

Commit

Permalink
feat: add insert size and insert interval
Browse files Browse the repository at this point in the history
  • Loading branch information
long2ice committed Mar 8, 2023
1 parent 3448e40 commit cc1a744
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 25 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ source:
meilisearch:
api_url: http://192.168.123.205:7700
api_key:
insert_size: 1000
insert_interval: 10
sync:
- table: collection
index: beauty-collections
Expand Down Expand Up @@ -148,6 +150,13 @@ MeiliSearch configuration.

- `api_url`: the MeiliSearch API URL.
- `api_key`: the MeiliSearch API key.
- `insert_size`: insert after collecting this many documents, optional.
- `insert_interval`: insert after this many seconds have passed, optional.

If nether `insert_size` nor `insert_interval` is set, it will insert each document immediately.

If you prefer performance, just set and increase `insert_size` and `insert_interval`. The insert will be made as long as
one of the conditions is met.

### sync

Expand Down
2 changes: 2 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ source:
meilisearch:
api_url: http://192.168.123.205:7700
api_key:
insert_size: 1000
insert_interval: 10
sync:
- table: test
index: test
Expand Down
38 changes: 38 additions & 0 deletions meilisync/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from meilisync.enums import EventType
from meilisync.schemas import Event
from meilisync.settings import Sync


class EventCollection:
def __init__(self):
self._size = 0
self._events = {}

def add_event(self, sync: Sync, event: Event):
pk = event.data[sync.pk]
self._events.setdefault(sync, {})
self._events[sync][pk] = event

@property
def size(self):
return self._size

@property
def pop_events(self):
updated_events = {}
created_events = {}
deleted_events = {}
for sync, events in self._events.items():
updated_events[sync] = []
created_events[sync] = []
deleted_events[sync] = []
for event in events.values():
if event.type == EventType.create:
created_events[sync].append(event)
elif event.type == EventType.update:
updated_events[sync].append(event)
elif event.type == EventType.delete:
deleted_events[sync].append(event)
self._events = {}
self._size = 0
return created_events, updated_events, deleted_events
59 changes: 47 additions & 12 deletions meilisync/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from loguru import logger

from meilisync.discover import get_progress, get_source
from meilisync.event import EventCollection
from meilisync.meili import Meili
from meilisync.schemas import Event
from meilisync.settings import Settings
Expand Down Expand Up @@ -47,9 +48,7 @@ async def _():
**settings.source.dict(exclude={"type"}),
)
meilisearch = settings.meilisearch
meili = Meili(
settings.debug, meilisearch.api_url, meilisearch.api_key, settings.plugins_cls()
)
meili = Meili(meilisearch.api_url, meilisearch.api_key, settings.plugins_cls())
context.obj["current_progress"] = current_progress
context.obj["source"] = source
context.obj["meili"] = meili
Expand All @@ -63,12 +62,17 @@ async def _():
def start(
context: typer.Context,
):
current_progress = context.obj["current_progress"]
source = context.obj["source"]
meili = context.obj["meili"]
settings = context.obj["settings"]
progress = context.obj["progress"]
meili_settings = settings.meilisearch
collection = EventCollection()
lock = None

async def _():
current_progress = context.obj["current_progress"]
source = context.obj["source"]
meili = context.obj["meili"]
settings = context.obj["settings"]
progress = context.obj["progress"]
nonlocal current_progress
if not current_progress:
for sync in settings.sync:
if sync.full:
Expand All @@ -85,13 +89,44 @@ async def _():
)
logger.info(f'Start increment sync data from "{settings.source.type}" to MeiliSearch...')
async for event in source:
if settings.debug:
logger.debug(event)
current_progress = event.progress
if isinstance(event, Event):
sync = settings.get_sync(event.table)
if sync:
if not sync:
continue
if not meili_settings.insert_size and not meili_settings.insert_interval:
await meili.handle_event(event, sync)
await progress.set(**event.progress)

asyncio.run(_())
await progress.set(**current_progress)
else:
collection.add_event(sync, event)
if collection.size >= meili_settings.insert_size:
async with lock:
await meili.handle_events(collection)
await progress.set(**current_progress)
else:
await progress.set(**current_progress)

async def interval():
if not settings.meilisearch.insert_interval:
return
while True:
await asyncio.sleep(settings.meilisearch.insert_interval)
try:
async with lock:
await meili.handle_events(collection)
await progress.set(**current_progress)
except Exception as e:
logger.exception(e)
logger.error(f"Error when insert data to MeiliSearch: {e}")

async def run():
nonlocal lock
lock = asyncio.Lock()
await asyncio.gather(_(), interval())

asyncio.run(run())


@app.command(help="Delete all data in MeiliSearch and full sync")
Expand Down
58 changes: 45 additions & 13 deletions meilisync/meili.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import List, Optional, Type, Union

from loguru import logger
from meilisearch_python_async import Client

from meilisync.enums import EventType
from meilisync.event import EventCollection
from meilisync.plugin import Plugin
from meilisync.schemas import Event
from meilisync.settings import Sync
Expand All @@ -12,7 +12,6 @@
class Meili:
def __init__(
self,
debug: bool,
api_url: str,
api_key: str,
plugins: Optional[List[Union[Type[Plugin], Plugin]]] = None,
Expand All @@ -21,7 +20,6 @@ def __init__(
api_url,
api_key,
)
self.debug = debug
self.plugins = plugins or []

async def add_full_data(self, index: str, pk: str, data: list):
Expand All @@ -37,9 +35,16 @@ async def get_count(self, index: str):
stats = await self.client.index(index).get_stats()
return stats.number_of_documents

async def handle_event(self, event: Event, sync: Sync):
if self.debug:
logger.debug(event)
async def handle_events(self, collection: EventCollection):
created_events, updated_events, deleted_events = collection.pop_events
for sync, events in created_events.items():
await self.handle_events_by_type(sync, events, EventType.create)
for sync, events in updated_events.items():
await self.handle_events_by_type(sync, events, EventType.update)
for sync, events in deleted_events.items():
await self.handle_events_by_type(sync, events, EventType.delete)

async def handle_plugins_pre(self, sync: Sync, event: Event):
for plugin in self.plugins:
if isinstance(plugin, Plugin):
event = await plugin.pre_event(event)
Expand All @@ -50,13 +55,9 @@ async def handle_event(self, event: Event, sync: Sync):
event = await plugin.pre_event(event)
else:
event = await plugin().pre_event(event)
index = self.client.index(sync.index_name)
if event.type == EventType.create:
await index.add_documents([event.mapping_data(sync.fields)], primary_key=sync.pk)
elif event.type == EventType.update:
await index.update_documents([event.mapping_data(sync.fields)], primary_key=sync.pk)
elif event.type == EventType.delete:
await index.delete_documents([str(event.data[sync.pk])])
return event

async def handle_plugins_post(self, sync: Sync, event: Event):
for plugin in self.plugins:
if isinstance(plugin, Plugin):
event = await plugin.post_event(event)
Expand All @@ -67,3 +68,34 @@ async def handle_event(self, event: Event, sync: Sync):
event = await plugin.post_event(event)
else:
event = await plugin().post_event(event)
return event

async def handle_events_by_type(self, sync: Sync, events: List[Event], event_type: EventType):
if not events:
return
index = self.client.index(sync.index_name)
for event in events:
await self.handle_plugins_pre(sync, event)
if event_type == EventType.create:
await index.add_documents(
[event.mapping_data(sync.fields) for event in events], primary_key=sync.pk
)
elif event_type == EventType.update:
await index.update_documents(
[event.mapping_data(sync.fields) for event in events], primary_key=sync.pk
)
elif event_type == EventType.delete:
await index.delete_documents([str(event.data[sync.pk]) for event in events])
for event in events:
await self.handle_plugins_post(sync, event)

async def handle_event(self, event: Event, sync: Sync):
event = await self.handle_plugins_pre(sync, event)
index = self.client.index(sync.index_name)
if event.type == EventType.create:
await index.add_documents([event.mapping_data(sync.fields)], primary_key=sync.pk)
elif event.type == EventType.update:
await index.update_documents([event.mapping_data(sync.fields)], primary_key=sync.pk)
elif event.type == EventType.delete:
await index.delete_documents([str(event.data[sync.pk])])
await self.handle_plugins_post(sync, event)
5 changes: 5 additions & 0 deletions meilisync/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class Config:
class MeiliSearch(BaseModel):
api_url: str
api_key: Optional[str]
insert_size: Optional[int]
insert_interval: Optional[int]


class BasePlugin(BaseModel):
Expand Down Expand Up @@ -44,6 +46,9 @@ class Sync(BasePlugin):
def index_name(self):
return self.index or self.table

def __hash__(self):
return hash(self.table)


class Progress(BaseModel):
type: ProgressType
Expand Down

0 comments on commit cc1a744

Please sign in to comment.