3535
3636import sniffio
3737
38+ from ..compat import safe_task
3839from ..exceptions import ApiError , NotFoundError , TransportError
3940from ..helpers .actions import (
4041 _TYPE_BULK_ACTION ,
4142 _TYPE_BULK_ACTION_BODY ,
4243 _TYPE_BULK_ACTION_HEADER ,
4344 _TYPE_BULK_ACTION_HEADER_AND_BODY ,
45+ _TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY ,
46+ _TYPE_BULK_ACTION_WITH_META ,
47+ BulkMeta ,
4448 _ActionChunker ,
4549 _process_bulk_chunk_error ,
4650 _process_bulk_chunk_success ,
@@ -65,9 +69,10 @@ async def _sleep(seconds: float) -> None:
6569
6670
6771async def _chunk_actions (
68- actions : AsyncIterable [_TYPE_BULK_ACTION_HEADER_AND_BODY ],
72+ actions : AsyncIterable [_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY ],
6973 chunk_size : int ,
7074 max_chunk_bytes : int ,
75+ flush_after_seconds : Optional [float ],
7176 serializer : Serializer ,
7277) -> AsyncIterable [
7378 Tuple [
@@ -87,10 +92,42 @@ async def _chunk_actions(
8792 chunker = _ActionChunker (
8893 chunk_size = chunk_size , max_chunk_bytes = max_chunk_bytes , serializer = serializer
8994 )
90- async for action , data in actions :
91- ret = chunker .feed (action , data )
92- if ret :
93- yield ret
95+
96+ if not flush_after_seconds :
97+ async for action , data in actions :
98+ ret = chunker .feed (action , data )
99+ if ret :
100+ yield ret
101+ else :
102+ item_queue : asyncio .Queue [_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY ] = (
103+ asyncio .Queue ()
104+ )
105+
106+ async def get_items () -> None :
107+ try :
108+ async for item in actions :
109+ await item_queue .put (item )
110+ finally :
111+ await item_queue .put ((BulkMeta .done , None ))
112+
113+ async with safe_task (get_items ()):
114+ timeout : Optional [float ] = flush_after_seconds
115+ while True :
116+ try :
117+ action , data = await asyncio .wait_for (
118+ item_queue .get (), timeout = timeout
119+ )
120+ timeout = flush_after_seconds
121+ except asyncio .TimeoutError :
122+ action , data = BulkMeta .flush , None
123+ timeout = None
124+
125+ if action is BulkMeta .done :
126+ break
127+ ret = chunker .feed (action , data )
128+ if ret :
129+ yield ret
130+
94131 ret = chunker .flush ()
95132 if ret :
96133 yield ret
@@ -170,9 +207,13 @@ async def azip(
170207
171208async def async_streaming_bulk (
172209 client : AsyncElasticsearch ,
173- actions : Union [Iterable [_TYPE_BULK_ACTION ], AsyncIterable [_TYPE_BULK_ACTION ]],
210+ actions : Union [
211+ Iterable [_TYPE_BULK_ACTION_WITH_META ],
212+ AsyncIterable [_TYPE_BULK_ACTION_WITH_META ],
213+ ],
174214 chunk_size : int = 500 ,
175215 max_chunk_bytes : int = 100 * 1024 * 1024 ,
216+ flush_after_seconds : Optional [float ] = None ,
176217 raise_on_error : bool = True ,
177218 expand_action_callback : Callable [
178219 [_TYPE_BULK_ACTION ], _TYPE_BULK_ACTION_HEADER_AND_BODY
@@ -205,6 +246,9 @@ async def async_streaming_bulk(
205246 :arg actions: iterable or async iterable containing the actions to be executed
206247 :arg chunk_size: number of docs in one chunk sent to es (default: 500)
207248 :arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB)
249+ :arg flush_after_seconds: time in seconds after which a chunk is written even
250+ if hasn't reached `chunk_size` or `max_chunk_bytes`. Set to 0 to not use a
251+ timeout-based flush. (default: 0)
208252 :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
209253 from the execution of the last chunk when some occur. By default we raise.
210254 :arg raise_on_exception: if ``False`` then don't propagate exceptions from
@@ -231,9 +275,14 @@ async def async_streaming_bulk(
231275 if isinstance (retry_on_status , int ):
232276 retry_on_status = (retry_on_status ,)
233277
234- async def map_actions () -> AsyncIterable [_TYPE_BULK_ACTION_HEADER_AND_BODY ]:
278+ async def map_actions () -> (
279+ AsyncIterable [_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY ]
280+ ):
235281 async for item in aiter (actions ):
236- yield expand_action_callback (item )
282+ if isinstance (item , BulkMeta ):
283+ yield item , None
284+ else :
285+ yield expand_action_callback (item )
237286
238287 serializer = client .transport .serializers .get_serializer ("application/json" )
239288
@@ -245,7 +294,7 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
245294 ]
246295 bulk_actions : List [bytes ]
247296 async for bulk_data , bulk_actions in _chunk_actions (
248- map_actions (), chunk_size , max_chunk_bytes , serializer
297+ map_actions (), chunk_size , max_chunk_bytes , flush_after_seconds , serializer
249298 ):
250299 for attempt in range (max_retries + 1 ):
251300 to_retry : List [bytes ] = []
0 commit comments