Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
dc9798d
chore: Add sift-stream-bindings as a dep
Jun 26, 2025
b0a3e23
feat: Wrap grpc.Channel with a class that includes config
Jun 27, 2025
0bfaa9b
feat: Initialize builder in IngestionService
Jun 28, 2025
3115424
feat: Added sift stream helpers
Jul 18, 2025
9230bdd
fix: Bug fixes & async
Jul 18, 2025
000733f
feat: Pipe client-key for runs to attach_run methods
Jul 21, 2025
5540cd6
tmp: experiments
Jul 22, 2025
bc9969c
placeholder for messy testing
Jul 23, 2025
d81e636
placeholder
Jul 23, 2025
d8c39ce
fix: Fix runs, bitfields, clean up
Jul 24, 2025
9fa1350
just debugging
Jul 25, 2025
7fb0833
fix: Insecure channel return type
Jul 29, 2025
57c845a
chore: buffered ingestion can call regular ingest now
Jul 29, 2025
5fd8436
FD-83: Use longer lived threads for ingestion.
ian-sift Aug 7, 2025
563a3ef
Build builder after attaching run so data goes to run and don't creat…
ian-sift Aug 7, 2025
e8fdaba
TODO -> ticket
ian-sift Aug 7, 2025
1c72294
lint
ian-sift Aug 7, 2025
5683a74
PR fb.
ian-sift Aug 13, 2025
eade3c3
lint
ian-sift Aug 13, 2025
324185d
Rename SiftChannelWithConfig instead of type alias.
ian-sift Aug 14, 2025
db38aa4
Ensure thread is alive before joining
ian-sift Aug 14, 2025
63d53e2
add bytes type.
ian-sift Aug 14, 2025
27d4f9c
mypy
ian-sift Aug 14, 2025
e4437ef
Fix tests.
ian-sift Aug 14, 2025
ba8c06c
Deconflict _stop event w/ Threading internal _stop function.
ian-sift Aug 14, 2025
63263fa
re-checkout examples from main. use uuid for client key generation.
ian-sift Aug 19, 2025
e835275
Release candidate version
ian-sift Aug 19, 2025
37bbe09
Release candidate version correction
ian-sift Aug 19, 2025
6227fbe
Release candidate version correction
ian-sift Aug 19, 2025
0814bd6
Dead threads must be reassigned.
ian-sift Aug 21, 2025
25d6127
toml rev
ian-sift Aug 21, 2025
872df8e
Check for existing asyncio loop.
ian-sift Aug 23, 2025
8dcb1f1
asdf
ian-sift Aug 23, 2025
9a75eea
Rev version.
ian-sift Sep 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions python/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ All notable changes to this project will be documented in this file.

This project adheres to [Semantic Versioning](http://semver.org/).

## [v0.8.6-rc.1] - September 9, 2025
### What's New
Update ingestion to use compiled Rust binary under the hood for performance improvements.

## [v0.8.5] - August 31, 2025
### What's New
#### Bytes support
Add plumbing to allow specifying bytes type data for ingestion.

## [v0.8.3] - August 11, 2025
- [Fix windows utf-8 encoding bug with Hdf5UploadService](https://github.com/sift-stack/sift/pull/289)

Expand Down
4 changes: 3 additions & 1 deletion python/examples/ingestion_with_threading/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ def ingestion_thread(data_queue: Queue):
it to Sift.
"""
# Can tune ingestion performance with buffer_size and flush_interval_sec
with ingestion_service.buffered_ingestion() as buffered_ingestion:
with ingestion_service.buffered_ingestion(
buffer_size=200, flush_interval_sec=1
) as buffered_ingestion:
while True:
try:
item = data_queue.get(timeout=1)
Expand Down
356 changes: 356 additions & 0 deletions python/examples/ingestion_with_threading/sample_data/sample_logs.txt

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion python/examples/ingestion_with_threading/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def __init__(self, data_queue: Queue, asset_name: str, run_id: Optional[str]):
sample_bit_field_values = ["00001001", "00100011", "00001101", "11000001"]
self.sample_bit_field_values = [bytes([int(byte, 2)]) for byte in sample_bit_field_values]

sample_logs = Path().joinpath("sample_data").joinpath("sample_logs.txt")
dir_path = Path(__file__).parent
sample_logs = dir_path.joinpath("sample_data").joinpath("sample_logs.txt")

with open(sample_logs, "r") as file:
self.sample_logs = file.readlines()
Expand Down
4 changes: 4 additions & 0 deletions python/lib/sift_py/_internal/test_util/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from grpc.aio import Channel as AsyncChannel
from grpc_testing import Channel

from sift_py.grpc.transport import SiftChannelConfig

SerializingFunction = Callable[[Any], bytes]
DeserializingFunction = Callable[[bytes], Any]
DoneCallbackType = Callable[[Any], None]
Expand All @@ -18,6 +20,8 @@ class MockChannel(Channel):
Used as a mock gRPC channel
"""

config = SiftChannelConfig(uri="localhost:50051", apikey="fake-api-key", use_ssl=False)

def take_unary_unary(self, method_descriptor):
pass

Expand Down
4 changes: 2 additions & 2 deletions python/lib/sift_py/asset/_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest import TestCase
from unittest.mock import MagicMock

import grpc
from sift.assets.v1.assets_pb2 import (
Asset,
GetAssetResponse,
Expand All @@ -12,7 +13,6 @@
from sift_py._internal.metadata import metadata_dict_to_pb
from sift_py.asset.config import AssetConfig
from sift_py.asset.service import AssetService
from sift_py.grpc.transport import SiftChannel


class TestAssetService(TestCase):
Expand All @@ -23,7 +23,7 @@ class TestAssetService(TestCase):
"""

def setUp(self):
self.channel = MagicMock(spec=SiftChannel)
self.channel = MagicMock(spec=grpc.Channel)
self.service = AssetService(self.channel)
self.asset_service_stub = self.service._asset_service_stub

Expand Down
37 changes: 32 additions & 5 deletions python/lib/sift_py/grpc/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,29 @@
from sift_py.grpc._retry import RetryPolicy
from sift_py.grpc.keepalive import DEFAULT_KEEPALIVE_CONFIG, KeepaliveConfig

SiftChannel: TypeAlias = grpc.Channel

class SiftChannel:
"""
A wrapper around grpc.Channel that includes the configuration used to create it.
This allows access to the original config for debugging or other purposes.
"""

def __init__(self, config: SiftChannelConfig, channel: grpc.Channel):
self._channel = channel
self.config = config

def __getattr__(self, name):
# Delegate all other attributes to the underlying channel
return getattr(self._channel, name)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
# Close the underlying channel
self._channel.close()


SiftAsyncChannel: TypeAlias = grpc_aio.Channel


Expand Down Expand Up @@ -67,14 +89,16 @@ def use_sift_channel(
cert_via_openssl = config.get("cert_via_openssl", False)

if not use_ssl:
return _use_insecure_sift_channel(config, metadata)
channel = _use_insecure_sift_channel(config, metadata)
return SiftChannel(config, channel)

credentials = get_ssl_credentials(cert_via_openssl)
options = _compute_channel_options(config)
api_uri = _clean_uri(config["uri"], use_ssl)
channel = grpc.secure_channel(api_uri, credentials, options)
interceptors = _compute_sift_interceptors(config, metadata)
return grpc.intercept_channel(channel, *interceptors)
intercepted_channel = grpc.intercept_channel(channel, *interceptors)
return SiftChannel(config, intercepted_channel)


def use_sift_async_channel(
Expand All @@ -100,7 +124,7 @@ def use_sift_async_channel(

def _use_insecure_sift_channel(
config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None
) -> SiftChannel:
) -> grpc.Channel:
"""
FOR DEVELOPMENT PURPOSES ONLY
"""
Expand Down Expand Up @@ -225,7 +249,10 @@ def _compute_keep_alive_channel_opts(config: KeepaliveConfig) -> List[Tuple[str,
("grpc.keepalive_time_ms", config["keepalive_time_ms"]),
("grpc.keepalive_timeout_ms", config["keepalive_timeout_ms"]),
("grpc.http2.max_pings_without_data", config["max_pings_without_data"]),
("grpc.keepalive_permit_without_calls", config["keepalive_permit_without_calls"]),
(
"grpc.keepalive_permit_without_calls",
config["keepalive_permit_without_calls"],
),
]


Expand Down
62 changes: 59 additions & 3 deletions python/lib/sift_py/ingestion/_internal/ingest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import atexit
import logging
from collections.abc import Callable
from datetime import datetime
from queue import Queue
from typing import Any, Dict, List, Optional, Union, cast

from google.protobuf.timestamp_pb2 import Timestamp
Expand All @@ -23,6 +25,11 @@
get_ingestion_config_flows,
)
from sift_py.ingestion._internal.run import create_run, get_run_id_by_name
from sift_py.ingestion._internal.stream import (
IngestionThread,
get_builder,
stream_requests,
)
from sift_py.ingestion.channel import (
ChannelConfig,
ChannelValue,
Expand Down Expand Up @@ -51,6 +58,8 @@ class _IngestionServiceImpl:

ingest_service_stub: IngestServiceStub
rule_service: RuleService
_request_queue: Queue
_ingestion_thread: IngestionThread

def __init__(
self,
Expand Down Expand Up @@ -81,6 +90,7 @@ def __init__(
rule.asset_names.append(config.asset_name)
self.rule_service.create_or_update_rules(config.rules)

self.builder = get_builder(channel, config)
self.rules = config.rules
self.asset_name = config.asset_name
self.transport_channel = channel
Expand All @@ -90,11 +100,48 @@ def __init__(
self.ingest_service_stub = IngestServiceStub(channel)
self.config = config

# Thread tracking for async ingestion
self._request_queue = Queue()
# Don't start thread here since user may attach a run after creating the ingestion service
self._ingestion_thread = IngestionThread(self.builder, self._request_queue)
atexit.register(self.wait_for_async_ingestion, timeout=0.1)

def ingest(self, *requests: IngestWithConfigDataStreamRequest):
"""
Perform data ingestion.
"""
self.ingest_service_stub.IngestWithConfigDataStream(iter(requests))
self.ingest_async(*requests)

def ingest_async(self, *requests: IngestWithConfigDataStreamRequest):
"""
Perform data ingestion asynchronously in a background thread.
This allows multiple ingest calls to run in parallel.
"""
# FD-179: Create a thread pool and add to whichever queue is smallest
# Start thread on first ingest on the assumption all modifications to the ingestion config have concluded.
if not self._ingestion_thread.is_alive():
self._ingestion_thread = IngestionThread(self.builder, self._request_queue)
self._ingestion_thread.start()
stream_requests(self._request_queue, *requests)

def wait_for_async_ingestion(self, timeout: Optional[float] = None) -> bool:
"""
Wait for all async ingestion threads to complete.

Args:
timeout: Maximum time to wait in seconds. If None, wait indefinitely.

Returns:
bool: True if all threads completed within timeout, False otherwise.
"""
self._request_queue.put(None)
if self._ingestion_thread.is_alive():
self._ingestion_thread.join(timeout=timeout)
if self._ingestion_thread.is_alive():
logger.error(f"Ingestion thread did not finish after {timeout} seconds. Forcing stop.")
self._ingestion_thread.stop()
return False
return True

def ingest_flows(self, *flows: FlowOrderedChannelValues):
"""
Expand All @@ -110,7 +157,7 @@ def ingest_flows(self, *flows: FlowOrderedChannelValues):
req = self.create_ingestion_request(flow_name, timestamp, channel_values)
requests.append(req)

self.ingest_service_stub.IngestWithConfigDataStream(iter(requests))
self.ingest_async(*requests)

def try_ingest_flows(self, *flows: Flow):
"""
Expand All @@ -126,7 +173,7 @@ def try_ingest_flows(self, *flows: Flow):
req = self.try_create_ingestion_request(flow_name, timestamp, channel_values)
requests.append(req)

self.ingest_service_stub.IngestWithConfigDataStream(iter(requests))
self.ingest_async(*requests)

def attach_run(
self,
Expand All @@ -137,12 +184,18 @@ def attach_run(
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Union[str, float, bool]]] = None,
force_new: bool = False,
client_key: Optional[str] = None,
):
"""
Retrieve an existing run or create one to use during this period of ingestion.

Include `force_new=True` to force the creation of a new run, which will allow creation of a new run using an existing name.
"""
if self._ingestion_thread.is_alive():
raise IngestionValidationError(
"Cannot attach run while ingestion thread is running. Invoke before ingesting."
)

if not force_new:
run_id = get_run_id_by_name(channel, run_name)

Expand All @@ -153,18 +206,21 @@ def attach_run(
self.run_id = create_run(
channel=channel,
run_name=run_name,
run_client_key=client_key,
description=description or "",
organization_id=organization_id or "",
tags=tags or [],
metadata=metadata,
)
self.builder.run_id = self.run_id

def detach_run(self):
"""
Detach run from this period of ingestion. Subsequent data ingested won't be associated with
the run being detached.
"""
self.run_id = None
self.builder.run = None

def try_create_ingestion_request_ordered_values(
self,
Expand Down
19 changes: 12 additions & 7 deletions python/lib/sift_py/ingestion/_internal/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,22 @@ def create_run(
organization_id: str,
tags: List[str],
metadata: Optional[Dict[str, Union[str, float, bool]]] = None,
run_client_key: Optional[str] = None,
) -> str:
svc = RunServiceStub(channel)

_metadata = metadata_dict_to_pb(metadata) if metadata else None

req = CreateRunRequest(
name=run_name,
description=description,
organization_id=organization_id,
tags=tags,
metadata=_metadata,
)
kwargs = {
"name": run_name,
"description": description,
"organization_id": organization_id,
"tags": tags,
"metadata": _metadata,
}
if run_client_key:
kwargs["client_key"] = run_client_key

req = CreateRunRequest(**kwargs) # type: ignore
res = cast(CreateRunResponse, svc.CreateRun(req))
return res.run.run_id
Loading
Loading