Skip to content

Commit

Permalink
fix: file progress reset
Browse files Browse the repository at this point in the history
  • Loading branch information
long2ice committed May 31, 2023
1 parent 395cf36 commit f8fc92f
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,4 @@ dmypy.json
.pyre/
.idea
*.json
.DS_Store
22 changes: 9 additions & 13 deletions meilisync/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ async def _():
if sync.full and not await meili.index_exists(sync.index_name):
count = 0
await progress.reset()
async for item in source.get_full_data(sync, meili_settings.insert_size or 10000):
count += len(item)
await meili.add_full_data(sync.index_name, sync.pk, item)
async for items in source.get_full_data(sync, meili_settings.insert_size or 10000):
count += len(items)
await meili.add_full_data(sync.index_name, sync.pk, items)
if count:
logger.info(
f'Full data sync for table "{settings.source.database}.{sync.table}" '
Expand Down Expand Up @@ -152,21 +152,17 @@ def refresh(
async def _():
settings = context.obj["settings"]
source = context.obj["source"]
meili = context.obj["meili"]
meili = context.obj["meili"] # type: Meili
progress = context.obj["progress"]
for sync in settings.sync:
if not table or sync.table in table:
index_name = sync.index_name
data = await source.get_full_data(sync, size)
count = 0
await progress.reset()
for item in data:
count += len(item)
await meili.refresh_data(
index_name,
sync.pk,
item,
)
count = await meili.refresh_data(
index_name,
sync.pk,
source.get_full_data(sync, size),
)
if count:
logger.info(
f'Full data sync for table "{settings.source.database}.{sync.table}" '
Expand Down
9 changes: 6 additions & 3 deletions meilisync/meili.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import Iterable, List, Optional, Type, Union
from typing import AsyncGenerator, Iterable, List, Optional, Type, Union

from loguru import logger
from meilisearch_python_async import Client
Expand Down Expand Up @@ -33,7 +33,7 @@ async def add_full_data(self, index: str, pk: str, data: list):
data, batch_size=1000, primary_key=pk
)

async def refresh_data(self, index: str, pk: str, data: Iterable[list]):
async def refresh_data(self, index: str, pk: str, data: AsyncGenerator):
index_name_tmp = f"{index}_tmp"
settings = await self.client.index(index).get_settings()
index_tmp = await self.client.create_index(index_name_tmp, primary_key=pk)
Expand All @@ -43,7 +43,9 @@ async def refresh_data(self, index: str, pk: str, data: Iterable[list]):
client=self.client, task_id=task.task_uid, timeout_in_ms=self.wait_for_task_timeout
)
tasks = []
for items in data:
count = 0
async for items in data:
count += len(items)
tasks.extend(await self.add_full_data(index_name_tmp, pk, items))
wait_tasks = [
wait_for_task(
Expand All @@ -60,6 +62,7 @@ async def refresh_data(self, index: str, pk: str, data: Iterable[list]):
)
await self.client.index(index_name_tmp).delete()
logger.success(f"Swap index {index} complete")
return count

async def get_count(self, index: str):
stats = await self.client.index(index).get_stats()
Expand Down
1 change: 1 addition & 0 deletions meilisync/progress/file.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json

import aiofiles
import aiofiles.os

from meilisync.enums import ProgressType
from meilisync.progress import Progress
Expand Down

0 comments on commit f8fc92f

Please sign in to comment.