Skip to content

Support timeout-based flushing in async_streaming_bulk #3051

@polerjn1

Description

@polerjn1

Currently, async_streaming_bulk only flushes requests based on chunk_size and max_chunk_bytes. This works well for high-throughput use cases, but in scenarios with low or bursty traffic, requests may remain unflushed indefinitely until a batch fills.

It would be helpful to support a timeout fallback mechanism that flushes after a configurable duration, ensuring progress is made even when throughput is low.

Proposed approach / discussion points:

  • Introduce a configurable timeout parameter for async_streaming_bulk.
  • Modify ActionChunker to operate as an async iterator, allowing explicit calls to next().
  • Use asyncio.wait_for (or anyio.fail_after) to implement time-based flushing.

Initial design approach is from discussing with @pquentin in slack

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions