diff --git a/python/CHANGELOG.md b/python/CHANGELOG.md index f0dff305e..e0445e7dd 100644 --- a/python/CHANGELOG.md +++ b/python/CHANGELOG.md @@ -3,6 +3,15 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## [v0.8.6-rc.1] - September 9, 2025 +### What's New +Update ingestion to use compiled Rust binary under the hood for performance improvements. + +## [v0.8.5] - August 31, 2025 +### What's New +#### Bytes support +Add plumbing to allow specifying bytes type data for ingestion. + ## [v0.8.3] - August 11, 2025 - [Fix windows utf-8 encoding bug with Hdf5UploadService](https://github.com/sift-stack/sift/pull/289) diff --git a/python/examples/ingestion_with_threading/main.py b/python/examples/ingestion_with_threading/main.py index 3fc808dbf..87a39ab98 100644 --- a/python/examples/ingestion_with_threading/main.py +++ b/python/examples/ingestion_with_threading/main.py @@ -16,7 +16,9 @@ def ingestion_thread(data_queue: Queue): it to Sift. """ # Can tune ingestion performance with buffer_size and flush_interval_sec - with ingestion_service.buffered_ingestion() as buffered_ingestion: + with ingestion_service.buffered_ingestion( + buffer_size=200, flush_interval_sec=1 + ) as buffered_ingestion: while True: try: item = data_queue.get(timeout=1) diff --git a/python/examples/ingestion_with_threading/sample_data/sample_logs.txt b/python/examples/ingestion_with_threading/sample_data/sample_logs.txt new file mode 100644 index 000000000..1475b0961 --- /dev/null +++ b/python/examples/ingestion_with_threading/sample_data/sample_logs.txt @@ -0,0 +1,356 @@ +[sshd(pam_unix)[14281]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=217.60.212.66 user=guest +kernel:\\n audit(1122475266.4294965305:0): initialized +su(pam_unix)[2605]: session closed for user cyrus +syslog: klogd startup succeeded +kernel:\\n audit(1122475266.4294965305:0): initialized +sshd(pam_unix)[10035]: check pass; user unknown +bluetooth: sdpd startup succeeded +rpc.statd[1618]: Version 1.0.6 Starting +kernel:\\n There is already a security framework initialized + register_security failed. +gdm(pam_unix)[2803]: authentication failure; logname= uid=0 euid=0 tty=:0 ruser= rhost= +sshd(pam_unix)[28975]: check pass; user unknown +kernel:\\n usbcore: registered new driver hub +rpc.statd[1618]: Version 1.0.6 Starting +xinetd[26482]: warning: can't get client address: Connection reset by peer +kernel:\\n Intel machine check architecture supported. +kernel:\\n CPU: Intel Pentium III (Coppermine) stepping 06 +ftpd[24534]: connection from 217.187.83.139 () at Sun Jul 10 03:55:15 2005 +kernel:\\n CPU: Intel Pentium III (Coppermine) stepping 06 +kernel:\\n audit: initializing netlink socket (disabled) +kernel:\\n Linux version 2.6.5-1.358 (bhcompile@bugs.build.redhat.com) (gcc version 3.3.3 20040412 (Red Hat Linux 3.3.3-7)) #1 Sat May 8 09:04:50 EDT 2004 +su(pam_unix)[1595]: session closed for user news +kernel:\\n audit(1122475266.4294965305:0): initialized +sshd(pam_unix)[24030]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=61-220-159-99.hinet-ip.hinet.net user=root +su(pam_unix)[2605]: session closed for user cyrus +ftpd[24487]: connection from 203.101.45.59 (dsl-Chn-static-059.45.101.203.touchtelindia.net) at Sun Jul 17 15:09:17 2005 +gdm(pam_unix)[2803]: authentication failure; logname= uid=0 euid=0 tty=:0 ruser= rhost= +hcid[1690]: HCI daemon ver 2.4 started +kernel:\\n You can enable it with acpi=force +bluetooth: sdpd startup succeeded +kernel:\\n BIOS-provided physical RAM map: +kernel:\\n Memory: 125312k/129720k available (1540k kernel code +3860k reserved +599k data +144k init +0k highmem +kernel:\\n SELinux: Registering netfilter hooks +kernel:\\n PCI: Probing PCI hardware (bus 00) +kernel:\\n PCI: Invalid ACPI-PCI IRQ routing table +kernel:\\n Console: colour VGA+ 80x25 +kernel:\\n Calibrating delay loop... 1445.88 BogoMIPS +ftpd[16781]: ANONYMOUS FTP LOGIN FROM 84.102.20.2 + (anonymous) +cups: cupsd shutdown succeeded +ftpd[24378]: connection from 207.30.238.8 (host8.topspot.net) at Sun Jul 17 14:03:05 2005 +ftpd[16782]: ANONYMOUS FTP LOGIN FROM 84.102.20.2 + (anonymous) +kernel:\\n Enabling unmasked SIMD FPU exception support... done. +kernel:\\n Initializing Cryptographic API +kernel:\\n Capability LSM initialized +kernel:\\n Enabling fast FPU save and restore... done. +kernel:\\n You can enable it with acpi=force +sshd(pam_unix)[5586]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=193.110.106.11 user=root +kernel:\\n Failure registering capabilities with the kernel +ftpd[24091]: connection from 206.196.21.129 (host129.206.196.21.maximumasp.com) at Sat Jul 9 22:53:19 2005 +xinetd[26482]: warning: can't get client address: Connection reset by peer +kernel:\\n BIOS-provided physical RAM map: +kernel:\\n Linux Plug and Play Support v0.97 (c) Adam Belay +sshd(pam_unix)[31848]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=82.77.200.128 user=root +kernel:\\n Intel machine check architecture supported. +kernel:\\n Security Scaffold v1.0.0 initialized +kernel:\\n DMI 2.3 present. +gdm(pam_unix)[2803]: authentication failure; logname= uid=0 euid=0 tty=:0 ruser= rhost= +network: Setting network parameters: succeeded +kernel:\\n PCI: Invalid ACPI-PCI IRQ routing table +kernel:\\n Checking 'hlt' instruction... OK. +ftpd[25239]: connection from 82.68.222.195 (82-68-222-195.dsl.in-addr.zen.co.uk) at Sun Jul 17 23:21:54 2005 +kernel:\\n apm: BIOS version 1.2 Flags 0x03 (Driver version 1.16ac) +kernel:\\n Transparent bridge - 0000:00:1e.0 +kernel:\\n PCI: Using IRQ router PIIX/ICH [8086/2410] at 0000:00:1f.0 +ftpd[16782]: connection from 84.102.20.2 () at Sun Jul 24 02:38:22 2005 +ftpd[16781]: ANONYMOUS FTP LOGIN FROM 84.102.20.2 + (anonymous) +kernel:\\n BIOS-e820: 00000000000f0000 - 0000000000100000 (reserved) +ftpd[12299]: connection from 211.42.188.206 () at Fri Jul 22 09:27:24 2005 +kernel:\\n audit(1122475266.4294965305:0): initialized +kernel:\\n POSIX conformance testing by UNIFIX +kernel:\\n Initializing CPU#0 +kernel:\\n Kernel command line: ro root=LABEL=/ rhgb quiet +xinetd[26482]: warning: can't get client address: Connection reset by peer +ftpd[16782]: connection from 84.102.20.2 () at Sun Jul 24 02:38:22 2005 +ftpd[12299]: connection from 211.42.188.206 () at Fri Jul 22 09:27:24 2005 +kernel:\\n Real Time Clock Driver v1.12 +ftpd[24378]: connection from 207.30.238.8 (host8.topspot.net) at Sun Jul 17 14:03:05 2005 +ftpd[26466]: getpeername (ftpd): Transport endpoint is not connected +sshd(pam_unix)[10035]: check pass; user unknown +kernel:\\n Console: colour VGA+ 80x25 +kernel:\\n POSIX conformance testing by UNIFIX +kernel:\\n ACPI disabled because your bios is from 2000 and too old +ftpd[26466]: getpeername (ftpd): Transport endpoint is not connected +kernel:\\n Initializing CPU#0 +kernel:\\n SELinux: Starting in permissive mode +kernel:\\n CPU: Intel Pentium III (Coppermine) stepping 06 +ftpd[16782]: connection from 84.102.20.2 () at Sun Jul 24 02:38:22 2005 +kernel:\\n VFS: Disk quotas dquot_6.5.1 +ftpd[17689]: connection from 212.65.68.82 () at Sat Jul 16 08:14:07 2005 +sshd(pam_unix)[23798]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=p15105218.pureserver.info user=root +ftpd[12299]: connection from 211.42.188.206 () at Fri Jul 22 09:27:24 2005 +sshd(pam_unix)[5586]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=193.110.106.11 user=root +kernel:\\n NET: Registered protocol family 16 +su(pam_unix)[16058]: session opened for user cyrus by (uid=0) +gdm-binary[2803]: Couldn't authenticate user +ftpd[24978]: connection from 206.47.209.10 () at Mon Jul 25 06:39:18 2005 +kernel:\\n HighMem zone: 0 pages + LIFO batch:1 +kernel:\\n Capability LSM initialized +kernel:\\n ACPI: ACPI tables contain no PCI IRQ routing entries +kernel:\\n audit(1122475266.4294965305:0): initialized +ftpd[16782]: connection from 84.102.20.2 () at Sun Jul 24 02:38:22 2005 +logrotate: ALERT exited abnormally with [1] +kernel:\\n DMI 2.3 present. +kernel:\\n zapping low mappings. +sdpd[1696]: sdpd v1.5 started +kernel:\\n CPU: Intel Pentium III (Coppermine) stepping 06 +ftpd[25648]: connection from 211.72.151.162 () at Mon Jul 18 03:26:49 2005 +kernel:\\n SELinux: Starting in permissive mode +syslogd 1.4.1: restart. +ftpd[24534]: connection from 217.187.83.139 () at Sun Jul 10 03:55:15 2005 +bluetooth: sdpd startup succeeded +gdm-binary[2803]: Couldn't authenticate user +kernel:\\n CPU 0 irqstacks + hard=02345000 soft=02344000 +ftpd[24978]: connection from 206.47.209.10 () at Mon Jul 25 06:39:18 2005 +kernel:\\n Transparent bridge - 0000:00:1e.0 +ftpd[16781]: ANONYMOUS FTP LOGIN FROM 84.102.20.2 + (anonymous) +sshd(pam_unix)[31207]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=adsl-70-242-75-179.dsl.ksc2mo.swbell.net +kernel:\\n Using tsc for high-res timesource +network: Bringing up loopback interface: succeeded +kernel:\\n klogd 1.4.1 + log source = /proc/kmsg started. +kernel:\\n Enabling unmasked SIMD FPU exception support... done. +kernel:\\n SELinux: Registering netfilter hooks +network: Bringing up loopback interface: succeeded +kernel:\\n ACPI: Subsystem revision 20040326 +sdpd[1696]: sdpd v1.5 started +kernel:\\n CPU: L2 cache: 256K +sdpd[1696]: sdpd v1.5 started +sshd(pam_unix)[10035]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=65.166.159.14 +kernel:\\n Linux version 2.6.5-1.358 (bhcompile@bugs.build.redhat.com) (gcc version 3.3.3 20040412 (Red Hat Linux 3.3.3-7)) #1 Sat May 8 09:04:50 EDT 2004 +su(pam_unix)[10583]: session closed for user news +ftpd[16781]: ANONYMOUS FTP LOGIN FROM 84.102.20.2 + (anonymous) +ftpd[16781]: ANONYMOUS FTP LOGIN FROM 84.102.20.2 + (anonymous) +kernel:\\n BIOS-e820: 00000000ffb00000 - 0000000100000000 (reserved) +ftpd[16781]: ANONYMOUS FTP LOGIN FROM 84.102.20.2 + (anonymous) +kernel:\\n DMA zone: 4096 pages + LIFO batch:1 +sshd(pam_unix)[8113]: session opened for user test by (uid=509) +kernel:\\n Inode-cache hash table entries: 8192 (order: 3 + 32768 bytes) +kernel:\\n audit(1122475266.4294965305:0): initialized +kernel:\\n Checking 'hlt' instruction... OK. +kernel:\\n Enabling fast FPU save and restore... done. +kernel:\\n PCI: Using IRQ router PIIX/ICH [8086/2410] at 0000:00:1f.0 +kernel:\\n Checking 'hlt' instruction... OK. +syslog: klogd startup succeeded +kernel:\\n BIOS-e820: 0000000007eae000 - 0000000008000000 (reserved) +ftpd[24487]: connection from 203.101.45.59 (dsl-Chn-static-059.45.101.203.touchtelindia.net) at Sun Jul 17 15:09:17 2005 +kernel:\\n Memory: 125312k/129720k available (1540k kernel code + 3860k reserved + 599k data + 144k init + 0k highmem) +named[2306]: notify question section contains no SOA +kernel:\\n usbcore: registered new driver usbfs +kernel:\\n Inode-cache hash table entries: 8192 (order: 3 + 32768 bytes) +kernel:\\n PID hash table entries: 512 (order 9: 4096 bytes) +kernel:\\n audit(1122475266.4294965305:0): initialized +kernel:\\n Mount-cache hash table entries: 512 (order: 0 + 4096 bytes) +kernel:\\n ACPI: Subsystem revision 20040326 +nfslock: rpc.statd startup succeeded +kernel:\\n ACPI: Subsystem revision 20040326 +logrotate: ALERT exited abnormally with [1] +kernel:\\n BIOS-e820: 00000000000f0000 - 0000000000100000 (reserved) +kernel:\\n pci_hotplug: PCI Hot Plug PCI Core version: 0.5 +sshd(pam_unix)[8113]: session closed for user test +gdm(pam_unix)[2803]: check pass; user unknown +ftpd[24091]: connection from 206.196.21.129 (host129.206.196.21.maximumasp.com) at Sat Jul 9 22:53:19 2005 +sshd(pam_unix)[14281]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=217.60.212.66 user=guest +ftpd[26463]: getpeername (ftpd): Transport endpoint is not connected +kernel:\\n PCI: Invalid ACPI-PCI IRQ routing table +kernel:\\n Kernel command line: ro root=LABEL=/ rhgb quiet +ftpd[24487]: connection from 203.101.45.59 (dsl-Chn-static-059.45.101.203.touchtelindia.net) at Sun Jul 17 15:09:17 2005 +kernel:\\n Normal zone: 28334 pages + LIFO batch:6 +ftpd[24487]: connection from 203.101.45.59 (dsl-Chn-static-059.45.101.203.touchtelindia.net) at Sun Jul 17 15:09:17 2005 +kernel:\\n Capability LSM initialized +kernel:\\n pci_hotplug: PCI Hot Plug PCI Core version: 0.5 +kernel:\\n BIOS-e820: 00000000000f0000 - 0000000000100000 (reserved) +su(pam_unix)[10583]: session closed for user news +kernel:\\n Built 1 zonelists +ftpd[17689]: connection from 212.65.68.82 () at Sat Jul 16 08:14:07 2005 +kernel:\\n CPU: L2 cache: 256K +syslogd 1.4.1: restart. +ftpd[13162]: connection from 67.95.49.172 () at Fri Jul 22 19:29:10 2005 +kernel:\\n Enabling unmasked SIMD FPU exception support... done. +kernel:\\n Capability LSM initialized +bluetooth: sdpd startup succeeded +kernel:\\n Real Time Clock Driver v1.12 +sshd(pam_unix)[8113]: session opened for user test by (uid=509) +kernel:\\n BIOS-e820: 0000000000000000 - 00000000000a0000 (usable) +kernel:\\n pci_hotplug: PCI Hot Plug PCI Core version: 0.5 +rpc.statd[1618]: Version 1.0.6 Starting +su(pam_unix)[2605]: session opened for user cyrus by (uid=0) +ftpd[26463]: getpeername (ftpd): Transport endpoint is not connected +su(pam_unix)[10583]: session opened for user news by (uid=0) +kernel:\\n Initializing CPU#0 +kernel:\\n PCI: Using configuration type 1 +kernel:\\n isapnp: No Plug & Play device found +sshd(pam_unix)[8113]: session opened for user test by (uid=509) +irqbalance: irqbalance startup succeeded +kernel:\\n SELinux: Registering netfilter hooks +kernel:\\n Security Scaffold v1.0.0 initialized +ftpd[25239]: connection from 82.68.222.195 (82-68-222-195.dsl.in-addr.zen.co.uk) at Sun Jul 17 23:21:54 2005 +kernel:\\n PCI: Using configuration type 1 +kernel:\\n audit: initializing netlink socket (disabled) +ftpd[24378]: connection from 207.30.238.8 (host8.topspot.net) at Sun Jul 17 14:03:05 2005 +sshd(pam_unix)[8113]: session closed for user test +kernel:\\n PCI: Invalid ACPI-PCI IRQ routing table +kernel:\\n SELinux: Starting in permissive mode +kernel:\\n Dentry cache hash table entries: 16384 (order: 4 + 65536 bytes) +ftpd[15342]: connection from 211.107.232.1 () at Fri Jul 15 23:42:44 2005 +kernel:\\n zapping low mappings. +ftpd[25239]: connection from 82.68.222.195 (82-68-222-195.dsl.in-addr.zen.co.uk) at Sun Jul 17 23:21:54 2005 +kernel:\\n CPU: L1 I cache: 16K + L1 D cache: 16K +kernel:\\n CPU: L1 I cache: 16K + L1 D cache: 16K +gdm(pam_unix)[2803]: authentication failure; logname= uid=0 euid=0 tty=:0 ruser= rhost= +network: Bringing up loopback interface: succeeded +kernel:\\n Enabling unmasked SIMD FPU exception support... done. +kernel:\\n Linux Plug and Play Support v0.97 (c) Adam Belay +sshd(pam_unix)[14281]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=217.60.212.66 user=guest +kernel:\\n CPU: L2 cache: 256K +kernel:\\n PCI: Using IRQ router PIIX/ICH [8086/2410] at 0000:00:1f.0 +kernel:\\n ACPI disabled because your bios is from 2000 and too old +kernel:\\n Console: colour VGA+ 80x25 +kernel:\\n HighMem zone: 0 pages + LIFO batch:1 +kernel:\\n Intel machine check reporting enabled on CPU#0. +rpcidmapd: rpc.idmapd startup succeeded +su(pam_unix)[1595]: session closed for user news +kernel:\\n Intel machine check reporting enabled on CPU#0. +kernel:\\n BIOS-e820: 0000000007eae000 - 0000000008000000 (reserved) +kernel:\\n PCI: Using IRQ router PIIX/ICH [8086/2410] at 0000:00:1f.0 +kernel:\\n Mount-cache hash table entries: 512 (order: 0 + 4096 bytes) +kernel:\\n ACPI: Interpreter disabled. +kernel:\\n BIOS-e820: 0000000000100000 - 0000000007eae000 (usable) +kernel:\\n ACPI: ACPI tables contain no PCI IRQ routing entries +kernel:\\n There is already a security framework initialized + register_security failed. +kernel:\\n Normal zone: 28334 pages + LIFO batch:6 +bluetooth: hcid startup succeeded +su(pam_unix)[16058]: session opened for user cyrus by (uid=0) +kernel:\\n Checking 'hlt' instruction... OK. +kernel:\\n Enabling fast FPU save and restore... done. +sshd(pam_unix)[28975]: check pass; user unknown +sshd(pam_unix)[14281]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=217.60.212.66 user=guest +kernel:\\n usbcore: registered new driver usbfs +kernel:\\n On node 0 totalpages: 32430 +kernel:\\n BIOS-e820: 00000000000f0000 - 0000000000100000 (reserved) +kernel:\\n PCI: Probing PCI hardware (bus 00) +kernel:\\n You can enable it with acpi=force +kernel:\\n CPU: L2 cache: 256K +kernel:\\n Linux agpgart interface v0.100 (c) Dave Jones +ftpd[24487]: connection from 203.101.45.59 (dsl-Chn-static-059.45.101.203.touchtelindia.net) at Sun Jul 17 15:09:17 2005 +logrotate: ALERT exited abnormally with [1] +ftpd[26466]: getpeername (ftpd): Transport endpoint is not connected +ftpd[24487]: connection from 203.101.45.59 (dsl-Chn-static-059.45.101.203.touchtelindia.net) at Sun Jul 17 15:09:17 2005 +ftpd[24978]: connection from 206.47.209.10 () at Mon Jul 25 06:39:18 2005 +kernel:\\n PID hash table entries: 512 (order 9: 4096 bytes) +kernel:\\n Detected 731.219 MHz processor. +sshd(pam_unix)[23798]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=p15105218.pureserver.info user=root +kernel:\\n Intel machine check reporting enabled on CPU#0. +ftpd[17689]: connection from 212.65.68.82 () at Sat Jul 16 08:14:07 2005 +sshd(pam_unix)[5586]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=193.110.106.11 user=root +ftpd[13162]: connection from 67.95.49.172 () at Fri Jul 22 19:29:10 2005 +kernel:\\n apm: BIOS version 1.2 Flags 0x03 (Driver version 1.16ac) +kernel:\\n Intel machine check reporting enabled on CPU#0. +kernel:\\n Linux Plug and Play Support v0.97 (c) Adam Belay +su(pam_unix)[10583]: session opened for user news by (uid=0) +kernel:\\n Calibrating delay loop... 1445.88 BogoMIPS +su(pam_unix)[10583]: session opened for user news by (uid=0) +kernel:\\n Intel machine check architecture supported. +sshd(pam_unix)[30631]: session closed for user test +kernel:\\n PID hash table entries: 512 (order 9: 4096 bytes) +portmap: portmap startup succeeded +kernel:\\n Using tsc for high-res timesource +ftpd[26466]: getpeername (ftpd): Transport endpoint is not connected +kernel:\\n usbcore: registered new driver hub +ftpd[25239]: connection from 82.68.222.195 (82-68-222-195.dsl.in-addr.zen.co.uk) at Sun Jul 17 23:21:54 2005 +kernel:\\n Memory: 125312k/129720k available (1540k kernel code + 3860k reserved + 599k data + 144k init + 0k highmem) +ftpd[16781]: ANONYMOUS FTP LOGIN FROM 84.102.20.2 + (anonymous) +kernel:\\n Failure registering capabilities with the kernel +kernel:\\n NET: Registered protocol family 16 +ftpd[25648]: connection from 211.72.151.162 () at Mon Jul 18 03:26:49 2005 +bluetooth: sdpd startup succeeded +sshd(pam_unix)[14281]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=217.60.212.66 user=guest +kernel:\\n Intel machine check architecture supported. +hcid[1690]: HCI daemon ver 2.4 started +kernel:\\n BIOS-e820: 0000000000100000 - 0000000007eae000 (usable) +sshd(pam_unix)[31848]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=82.77.200.128 user=root +kernel:\\n ACPI: Interpreter disabled. +kernel:\\n 126MB LOWMEM available. +kernel:\\n ACPI: ACPI tables contain no PCI IRQ routing entries +sshd(pam_unix)[30632]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=150.183.249.110 user=root +sshd(pam_unix)[30632]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=150.183.249.110 user=root +named[2306]: notify question section contains no SOA +kernel:\\n DMI 2.3 present. +rpc.statd[1618]: Version 1.0.6 Starting +ftpd[16781]: connection from 84.102.20.2 () at Sun Jul 24 02:38:22 2005 +kernel:\\n PCI: Using IRQ router PIIX/ICH [8086/2410] at 0000:00:1f.0 +kernel:\\n Real Time Clock Driver v1.12 +sshd(pam_unix)[31848]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=82.77.200.128 user=root +sshd(pam_unix)[31848]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=82.77.200.128 user=root +syslog: syslogd startup succeeded +kernel:\\n PCI: Probing PCI hardware (bus 00) +su(pam_unix)[2605]: session closed for user cyrus +rc: Starting pcmcia: succeeded +logrotate: ALERT exited abnormally with [1] +kernel:\\n Initializing CPU#0 +kernel:\\n isapnp: Scanning for PnP cards... +sshd(pam_unix)[8113]: session opened for user test by (uid=509) +kernel:\\n BIOS-e820: 00000000ffb00000 - 0000000100000000 (reserved) +kernel:\\n 0MB HIGHMEM available. +sysctl: kernel.core_uses_pid = 1 +kernel:\\n PCI: Using IRQ router PIIX/ICH [8086/2410] at 0000:00:1f.0 +sshd(pam_unix)[30631]: session closed for user test +kernel:\\n Linux Plug and Play Support v0.97 (c) Adam Belay +sshd(pam_unix)[24030]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=61-220-159-99.hinet-ip.hinet.net user=root +kernel:\\n CPU: L2 cache: 256K +kernel:\\n zapping low mappings. +kernel:\\n Calibrating delay loop... 1445.88 BogoMIPS +kernel:\\n There is already a security framework initialized +register_security failed. +su(pam_unix)[10583]: session closed for user news +kernel:\\n Memory: 125312k/129720k available (1540k kernel code +3860k reserved +599k data +144k init +0k highmem) +ftpd[15342]: connection from 211.107.232.1 () at Fri Jul 15 23:42:44 2005 +random: Initializing random number generator: succeeded] diff --git a/python/examples/ingestion_with_threading/simulator.py b/python/examples/ingestion_with_threading/simulator.py index 92f73404e..52cf3168d 100644 --- a/python/examples/ingestion_with_threading/simulator.py +++ b/python/examples/ingestion_with_threading/simulator.py @@ -39,7 +39,8 @@ def __init__(self, data_queue: Queue, asset_name: str, run_id: Optional[str]): sample_bit_field_values = ["00001001", "00100011", "00001101", "11000001"] self.sample_bit_field_values = [bytes([int(byte, 2)]) for byte in sample_bit_field_values] - sample_logs = Path().joinpath("sample_data").joinpath("sample_logs.txt") + dir_path = Path(__file__).parent + sample_logs = dir_path.joinpath("sample_data").joinpath("sample_logs.txt") with open(sample_logs, "r") as file: self.sample_logs = file.readlines() diff --git a/python/lib/sift_py/_internal/test_util/channel.py b/python/lib/sift_py/_internal/test_util/channel.py index a549c28c7..67935514f 100644 --- a/python/lib/sift_py/_internal/test_util/channel.py +++ b/python/lib/sift_py/_internal/test_util/channel.py @@ -6,6 +6,8 @@ from grpc.aio import Channel as AsyncChannel from grpc_testing import Channel +from sift_py.grpc.transport import SiftChannelConfig + SerializingFunction = Callable[[Any], bytes] DeserializingFunction = Callable[[bytes], Any] DoneCallbackType = Callable[[Any], None] @@ -18,6 +20,8 @@ class MockChannel(Channel): Used as a mock gRPC channel """ + config = SiftChannelConfig(uri="localhost:50051", apikey="fake-api-key", use_ssl=False) + def take_unary_unary(self, method_descriptor): pass diff --git a/python/lib/sift_py/asset/_service_test.py b/python/lib/sift_py/asset/_service_test.py index ecf03970e..c113db25a 100644 --- a/python/lib/sift_py/asset/_service_test.py +++ b/python/lib/sift_py/asset/_service_test.py @@ -2,6 +2,7 @@ from unittest import TestCase from unittest.mock import MagicMock +import grpc from sift.assets.v1.assets_pb2 import ( Asset, GetAssetResponse, @@ -12,7 +13,6 @@ from sift_py._internal.metadata import metadata_dict_to_pb from sift_py.asset.config import AssetConfig from sift_py.asset.service import AssetService -from sift_py.grpc.transport import SiftChannel class TestAssetService(TestCase): @@ -23,7 +23,7 @@ class TestAssetService(TestCase): """ def setUp(self): - self.channel = MagicMock(spec=SiftChannel) + self.channel = MagicMock(spec=grpc.Channel) self.service = AssetService(self.channel) self.asset_service_stub = self.service._asset_service_stub diff --git a/python/lib/sift_py/grpc/transport.py b/python/lib/sift_py/grpc/transport.py index b6aff1438..f7139caa6 100644 --- a/python/lib/sift_py/grpc/transport.py +++ b/python/lib/sift_py/grpc/transport.py @@ -21,7 +21,29 @@ from sift_py.grpc._retry import RetryPolicy from sift_py.grpc.keepalive import DEFAULT_KEEPALIVE_CONFIG, KeepaliveConfig -SiftChannel: TypeAlias = grpc.Channel + +class SiftChannel: + """ + A wrapper around grpc.Channel that includes the configuration used to create it. + This allows access to the original config for debugging or other purposes. + """ + + def __init__(self, config: SiftChannelConfig, channel: grpc.Channel): + self._channel = channel + self.config = config + + def __getattr__(self, name): + # Delegate all other attributes to the underlying channel + return getattr(self._channel, name) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # Close the underlying channel + self._channel.close() + + SiftAsyncChannel: TypeAlias = grpc_aio.Channel @@ -67,14 +89,16 @@ def use_sift_channel( cert_via_openssl = config.get("cert_via_openssl", False) if not use_ssl: - return _use_insecure_sift_channel(config, metadata) + channel = _use_insecure_sift_channel(config, metadata) + return SiftChannel(config, channel) credentials = get_ssl_credentials(cert_via_openssl) options = _compute_channel_options(config) api_uri = _clean_uri(config["uri"], use_ssl) channel = grpc.secure_channel(api_uri, credentials, options) interceptors = _compute_sift_interceptors(config, metadata) - return grpc.intercept_channel(channel, *interceptors) + intercepted_channel = grpc.intercept_channel(channel, *interceptors) + return SiftChannel(config, intercepted_channel) def use_sift_async_channel( @@ -100,7 +124,7 @@ def use_sift_async_channel( def _use_insecure_sift_channel( config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None -) -> SiftChannel: +) -> grpc.Channel: """ FOR DEVELOPMENT PURPOSES ONLY """ @@ -225,7 +249,10 @@ def _compute_keep_alive_channel_opts(config: KeepaliveConfig) -> List[Tuple[str, ("grpc.keepalive_time_ms", config["keepalive_time_ms"]), ("grpc.keepalive_timeout_ms", config["keepalive_timeout_ms"]), ("grpc.http2.max_pings_without_data", config["max_pings_without_data"]), - ("grpc.keepalive_permit_without_calls", config["keepalive_permit_without_calls"]), + ( + "grpc.keepalive_permit_without_calls", + config["keepalive_permit_without_calls"], + ), ] diff --git a/python/lib/sift_py/ingestion/_internal/ingest.py b/python/lib/sift_py/ingestion/_internal/ingest.py index 8df404478..562660576 100644 --- a/python/lib/sift_py/ingestion/_internal/ingest.py +++ b/python/lib/sift_py/ingestion/_internal/ingest.py @@ -1,8 +1,10 @@ from __future__ import annotations +import atexit import logging from collections.abc import Callable from datetime import datetime +from queue import Queue from typing import Any, Dict, List, Optional, Union, cast from google.protobuf.timestamp_pb2 import Timestamp @@ -23,6 +25,11 @@ get_ingestion_config_flows, ) from sift_py.ingestion._internal.run import create_run, get_run_id_by_name +from sift_py.ingestion._internal.stream import ( + IngestionThread, + get_builder, + stream_requests, +) from sift_py.ingestion.channel import ( ChannelConfig, ChannelValue, @@ -51,6 +58,8 @@ class _IngestionServiceImpl: ingest_service_stub: IngestServiceStub rule_service: RuleService + _request_queue: Queue + _ingestion_thread: IngestionThread def __init__( self, @@ -81,6 +90,7 @@ def __init__( rule.asset_names.append(config.asset_name) self.rule_service.create_or_update_rules(config.rules) + self.builder = get_builder(channel, config) self.rules = config.rules self.asset_name = config.asset_name self.transport_channel = channel @@ -90,11 +100,48 @@ def __init__( self.ingest_service_stub = IngestServiceStub(channel) self.config = config + # Thread tracking for async ingestion + self._request_queue = Queue() + # Don't start thread here since user may attach a run after creating the ingestion service + self._ingestion_thread = IngestionThread(self.builder, self._request_queue) + atexit.register(self.wait_for_async_ingestion, timeout=0.1) + def ingest(self, *requests: IngestWithConfigDataStreamRequest): """ Perform data ingestion. """ - self.ingest_service_stub.IngestWithConfigDataStream(iter(requests)) + self.ingest_async(*requests) + + def ingest_async(self, *requests: IngestWithConfigDataStreamRequest): + """ + Perform data ingestion asynchronously in a background thread. + This allows multiple ingest calls to run in parallel. + """ + # FD-179: Create a thread pool and add to whichever queue is smallest + # Start thread on first ingest on the assumption all modifications to the ingestion config have concluded. + if not self._ingestion_thread.is_alive(): + self._ingestion_thread = IngestionThread(self.builder, self._request_queue) + self._ingestion_thread.start() + stream_requests(self._request_queue, *requests) + + def wait_for_async_ingestion(self, timeout: Optional[float] = None) -> bool: + """ + Wait for all async ingestion threads to complete. + + Args: + timeout: Maximum time to wait in seconds. If None, wait indefinitely. + + Returns: + bool: True if all threads completed within timeout, False otherwise. + """ + self._request_queue.put(None) + if self._ingestion_thread.is_alive(): + self._ingestion_thread.join(timeout=timeout) + if self._ingestion_thread.is_alive(): + logger.error(f"Ingestion thread did not finish after {timeout} seconds. Forcing stop.") + self._ingestion_thread.stop() + return False + return True def ingest_flows(self, *flows: FlowOrderedChannelValues): """ @@ -110,7 +157,7 @@ def ingest_flows(self, *flows: FlowOrderedChannelValues): req = self.create_ingestion_request(flow_name, timestamp, channel_values) requests.append(req) - self.ingest_service_stub.IngestWithConfigDataStream(iter(requests)) + self.ingest_async(*requests) def try_ingest_flows(self, *flows: Flow): """ @@ -126,7 +173,7 @@ def try_ingest_flows(self, *flows: Flow): req = self.try_create_ingestion_request(flow_name, timestamp, channel_values) requests.append(req) - self.ingest_service_stub.IngestWithConfigDataStream(iter(requests)) + self.ingest_async(*requests) def attach_run( self, @@ -137,12 +184,18 @@ def attach_run( tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Union[str, float, bool]]] = None, force_new: bool = False, + client_key: Optional[str] = None, ): """ Retrieve an existing run or create one to use during this period of ingestion. Include `force_new=True` to force the creation of a new run, which will allow creation of a new run using an existing name. """ + if self._ingestion_thread.is_alive(): + raise IngestionValidationError( + "Cannot attach run while ingestion thread is running. Invoke before ingesting." + ) + if not force_new: run_id = get_run_id_by_name(channel, run_name) @@ -153,11 +206,13 @@ def attach_run( self.run_id = create_run( channel=channel, run_name=run_name, + run_client_key=client_key, description=description or "", organization_id=organization_id or "", tags=tags or [], metadata=metadata, ) + self.builder.run_id = self.run_id def detach_run(self): """ @@ -165,6 +220,7 @@ def detach_run(self): the run being detached. """ self.run_id = None + self.builder.run = None def try_create_ingestion_request_ordered_values( self, diff --git a/python/lib/sift_py/ingestion/_internal/run.py b/python/lib/sift_py/ingestion/_internal/run.py index 4ea6efb80..46e20fbd1 100644 --- a/python/lib/sift_py/ingestion/_internal/run.py +++ b/python/lib/sift_py/ingestion/_internal/run.py @@ -36,17 +36,22 @@ def create_run( organization_id: str, tags: List[str], metadata: Optional[Dict[str, Union[str, float, bool]]] = None, + run_client_key: Optional[str] = None, ) -> str: svc = RunServiceStub(channel) _metadata = metadata_dict_to_pb(metadata) if metadata else None - req = CreateRunRequest( - name=run_name, - description=description, - organization_id=organization_id, - tags=tags, - metadata=_metadata, - ) + kwargs = { + "name": run_name, + "description": description, + "organization_id": organization_id, + "tags": tags, + "metadata": _metadata, + } + if run_client_key: + kwargs["client_key"] = run_client_key + + req = CreateRunRequest(**kwargs) # type: ignore res = cast(CreateRunResponse, svc.CreateRun(req)) return res.run.run_id diff --git a/python/lib/sift_py/ingestion/_internal/stream.py b/python/lib/sift_py/ingestion/_internal/stream.py new file mode 100644 index 000000000..4b74fa541 --- /dev/null +++ b/python/lib/sift_py/ingestion/_internal/stream.py @@ -0,0 +1,403 @@ +import asyncio +import logging +import threading +import time +import uuid +from queue import Queue +from typing import List, Optional + +from sift.ingest.v1.ingest_pb2 import ( + IngestWithConfigDataChannelValue, + IngestWithConfigDataStreamRequest, +) +from sift_stream_bindings import ( + ChannelBitFieldElementPy, + ChannelConfigPy, + ChannelDataTypePy, + ChannelEnumTypePy, + FlowConfigPy, + IngestionConfigFormPy, + IngestWithConfigDataChannelValuePy, + IngestWithConfigDataStreamRequestPy, + RecoveryStrategyPy, + RetryPolicyPy, + RunFormPy, + SiftStreamBuilderPy, + TimeValuePy, +) + +from sift_py.grpc.transport import SiftChannel +from sift_py.ingestion.config.telemetry import TelemetryConfig + +logger = logging.getLogger(__name__) + + +class IngestionThread(threading.Thread): + """ + Manages ingestion for a single ingestion config. + """ + + IDLE_LOOP_PERIOD = 0.1 # Time of intervals loop will sleep while waiting for data. + SIFT_STREAM_FINISH_TIMEOUT = 0.06 # Measured ~0.05s to finish stream. + CLEANUP_TIMEOUT = IDLE_LOOP_PERIOD + SIFT_STREAM_FINISH_TIMEOUT + + def __init__( + self, + sift_stream_builder: SiftStreamBuilderPy, + data_queue: Queue, + metric_interval: float = 0.5, + ): + """ + Initialize the IngestionThread. + + Args: + sift_stream_builder: The sift stream builder to build a new stream. + data_queue: The queue to put IngestWithConfigDataStreamRequestPy requests into for ingestion. + ingestion_config: The ingestion config to use for ingestion. + metric_interval: Time (seconds) to wait between logging metrics. + """ + super().__init__(daemon=True) + self.data_queue = data_queue + self._stop_event = threading.Event() + self.sift_stream_builder = sift_stream_builder + self.metric_interval = metric_interval + + def stop(self): + self._stop_event.set() + # Give a brief chance to finish the stream (should take < 50ms). + time.sleep(self.CLEANUP_TIMEOUT) + self.task.cancel() + + async def main(self): + logger.debug("Ingestion thread started") + sift_stream = await self.sift_stream_builder.build() + time_since_last_metric = time.time() - 1 + count = 0 + try: + while True: + while not self.data_queue.empty(): + if self._stop_event.is_set(): + # Being forced to stop. Try to finish the stream. + logger.info( + f"Ingestion thread received stop signal. Exiting. Sent {count} requests. {self.data_queue.qsize()} requests remaining." + ) + await sift_stream.finish() + return + item = self.data_queue.get() + if item is None: + self._stop_event.set() + continue + sift_stream = await sift_stream.send_requests(item) + count += 1 + if time.time() - time_since_last_metric > self.metric_interval: + logger.debug( + f"Ingestion thread sent {count} requests, remaining: {self.data_queue.qsize()}" + ) + time_since_last_metric = time.time() + + if self._stop_event.is_set(): + logger.debug( + f"No more requests. Stopping. Sent {count} requests. {self.data_queue.qsize()} requests remaining." + ) + await sift_stream.finish() + return + else: + await asyncio.sleep(self.IDLE_LOOP_PERIOD) + + except asyncio.CancelledError: + # It's possible the thread was joined while sleeping waiting for data. Only note error if we have data left. + if self.data_queue.qsize() > 0: + logger.error( + f"Ingestion thread cancelled without finishing stream. {self.data_queue.qsize()} requests were not sent." + ) + + async def _run(self): + self.task = asyncio.create_task(self.main()) + await self.task + + def run(self): + """This thread will handle sending data to Sift.""" + # Even thought this is a thread, we need to run this async task to await send_requests otherwise we get sift_stream consumed errors. + asyncio.run(self._run()) + + +def get_builder(channel: SiftChannel, ingestion_config: TelemetryConfig) -> SiftStreamBuilderPy: + """ + Get a builder for a stream. + + Args: + channel: The channel to get a builder for + ingestion_config: The ingestion config to use for the builder + + Returns: + SiftStreamBuilderPy: The builder for the channel + """ + uri = channel.config.get("uri") + apikey = channel.config.get("apikey") + + if not uri or not apikey: + raise ValueError(f"Channel config is missing uri or apikey: {channel.config}") + + # SiftStreamBuilder needs URI to start with http or https + if not uri.startswith("http"): + if "localhost" in uri: + uri = f"http://{uri}" + else: + uri = f"https://{uri}" + + builder = SiftStreamBuilderPy(uri, apikey) + builder.ingestion_config = telemetry_config_to_ingestion_config_py(ingestion_config) + builder.enable_tls = channel.config.get("use_ssl", True) + # FD-177: Expose configuration for recovery strategy. + builder.recovery_strategy = RecoveryStrategyPy.retry_only(RetryPolicyPy.default()) + + return builder + + +async def stream_requests_async(data_queue: Queue, *requests: IngestWithConfigDataStreamRequest): + """ + Non-blocking: Convert requests for rust bindings and put them into a queue. + + Args: + data_queue: The queue to put IngestWithConfigDataStreamRequestPy requests into for ingestion. + requests: List of IngestWithConfigDataStreamRequest protobuf objects + """ + + # Put each request individually into the queue, filtering out None values + processed_requests = [] + for request in requests: + if not isinstance(request, IngestWithConfigDataStreamRequest): + raise ValueError(f"Received unexpected request: {request} of type {type(request)}") + processed_requests.append(ingest_request_to_ingest_request_py(request)) + data_queue.put(processed_requests) + + +def stream_requests( + data_queue: Queue, + *requests: IngestWithConfigDataStreamRequest, +) -> None: + """ + Blocking: Convert requests for rust bindings and put them into a queue. + + Args: + data_queue: The queue to put IngestWithConfigDataStreamRequestPy requests into for ingestion. + requests: List of IngestWithConfigDataStreamRequest protobuf objects + """ + try: + loop = asyncio.get_running_loop() + loop.run_until_complete(stream_requests_async(data_queue, *requests)) + except RuntimeError: + # No running loop, start new loop + asyncio.run(stream_requests_async(data_queue, *requests)) + + +def telemetry_config_to_ingestion_config_py( + telemetry_config: TelemetryConfig, +) -> IngestionConfigFormPy: + """ + Convert a TelemetryConfig to an IngestionConfigFormPy. + + Args: + telemetry_config: The TelemetryConfig to convert + + Returns: + IngestionConfigFormPy: The converted ingestion config + """ + # Convert flows + flow_configs_py = [] + + for flow_config in telemetry_config.flows: + # Convert channels in this flow + channel_configs_py = [] + + for channel_config in flow_config.channels: + # Convert enum types + enum_types_py = [] + for enum_type in channel_config.enum_types: + enum_types_py.append( + ChannelEnumTypePy( + name=enum_type.name, + key=enum_type.key, + ) + ) + + # Convert bit field elements + bit_field_elements_py = [] + for bit_field_element in channel_config.bit_field_elements: + bit_field_elements_py.append( + ChannelBitFieldElementPy( + name=bit_field_element.name, + index=bit_field_element.index, + bit_count=bit_field_element.bit_count, + ) + ) + + # Convert data type + data_type_py = convert_channel_data_type(channel_config.data_type) + + # Create channel config + channel_config_py = ChannelConfigPy( + name=channel_config.name, + data_type=data_type_py, + unit=channel_config.unit or "", + description=channel_config.description or "", + enum_types=enum_types_py, + bit_field_elements=bit_field_elements_py, + ) + + channel_configs_py.append(channel_config_py) + + # Create flow config + flow_config_py = FlowConfigPy( + name=flow_config.name, + channels=channel_configs_py, + ) + + flow_configs_py.append(flow_config_py) + # Create ingestion config + ingestion_config_py = IngestionConfigFormPy( + asset_name=telemetry_config.asset_name, + client_key=telemetry_config.ingestion_client_key, + flows=flow_configs_py, + ) + + return ingestion_config_py + + +def convert_channel_data_type(data_type) -> ChannelDataTypePy: + """ + Convert a ChannelDataType to ChannelDataTypePy. + + Args: + data_type: The ChannelDataType to convert + + Returns: + ChannelDataTypePy: The converted data type + """ + # Import here to avoid circular imports + from sift_py.ingestion.channel import ChannelDataType + + if data_type == ChannelDataType.DOUBLE: + return ChannelDataTypePy.Double + elif data_type == ChannelDataType.STRING: + return ChannelDataTypePy.String + elif data_type == ChannelDataType.ENUM: + return ChannelDataTypePy.Enum + elif data_type == ChannelDataType.BIT_FIELD: + return ChannelDataTypePy.BitField + elif data_type == ChannelDataType.BOOL: + return ChannelDataTypePy.Bool + elif data_type == ChannelDataType.FLOAT: + return ChannelDataTypePy.Float + elif data_type == ChannelDataType.INT_32: + return ChannelDataTypePy.Int32 + elif data_type == ChannelDataType.UINT_32: + return ChannelDataTypePy.Uint32 + elif data_type == ChannelDataType.INT_64: + return ChannelDataTypePy.Int64 + elif data_type == ChannelDataType.UINT_64: + return ChannelDataTypePy.Uint64 + elif data_type == ChannelDataType.BYTES: + return ChannelDataTypePy.Bytes + else: + return ChannelDataTypePy.Unspecified + + +def get_run_form( + run_name: str, run_description: str, client_key: Optional[str] = None, run_tags: List[str] = [] +) -> RunFormPy: + """ + Get a run form. + + Args: + run_name: The name of the run + run_description: The description of the run + client_key: The client key to use (if empty, run_name will be used and validated) + run_tags: The tags of the run + + Returns: + RunFormPy: The run form + """ + return RunFormPy( + name=run_name, + description=run_description, + client_key=client_key or str(uuid.uuid4()), + tags=run_tags, + ) + + +def ingest_request_to_ingest_request_py( + request: IngestWithConfigDataStreamRequest, +) -> IngestWithConfigDataStreamRequestPy: + """ + Convert an IngestWithConfigDataStreamRequest to IngestWithConfigDataStreamRequestPy. + + Args: + request: The IngestWithConfigDataStreamRequest to convert + run_id: The run ID to use + + Returns: + IngestWithConfigDataStreamRequestPy: The converted request + """ + timestamp_py = None + if request.HasField("timestamp"): + timestamp_py = TimeValuePy.from_timestamp( + request.timestamp.seconds, request.timestamp.nanos + ) + + channel_values_py = [ + convert_channel_value_to_channel_value_py(channel_value) + for channel_value in request.channel_values + ] + + return IngestWithConfigDataStreamRequestPy( + ingestion_config_id=request.ingestion_config_id, + flow=request.flow, + timestamp=timestamp_py, + channel_values=channel_values_py, + run_id=request.run_id or "", + end_stream_on_validation_error=request.end_stream_on_validation_error, + organization_id=request.organization_id, + ) + + +def convert_channel_value_to_channel_value_py( + channel_value: IngestWithConfigDataChannelValue, +) -> IngestWithConfigDataChannelValuePy: + """ + Convert an IngestWithConfigDataChannelValue to IngestWithConfigDataChannelValuePy. + + Args: + channel_value: The IngestWithConfigDataChannelValue to convert + + Returns: + IngestWithConfigDataChannelValuePy: The converted channel value + """ + if channel_value.HasField("string"): + return IngestWithConfigDataChannelValuePy.string(channel_value.string) + elif channel_value.HasField("double"): + return IngestWithConfigDataChannelValuePy.double(channel_value.double) + elif channel_value.HasField("float"): + return IngestWithConfigDataChannelValuePy.float(channel_value.float) + elif channel_value.HasField("bool"): + return IngestWithConfigDataChannelValuePy.bool(channel_value.bool) + elif channel_value.HasField("int32"): + return IngestWithConfigDataChannelValuePy.int32(channel_value.int32) + elif channel_value.HasField("uint32"): + return IngestWithConfigDataChannelValuePy.uint32(channel_value.uint32) + elif channel_value.HasField("int64"): + return IngestWithConfigDataChannelValuePy.int64(channel_value.int64) + elif channel_value.HasField("uint64"): + return IngestWithConfigDataChannelValuePy.uint64(channel_value.uint64) + elif channel_value.HasField("enum"): + return IngestWithConfigDataChannelValuePy.enum_value(channel_value.enum) + elif channel_value.HasField("bit_field"): + return IngestWithConfigDataChannelValuePy.bitfield(channel_value.bit_field) + elif channel_value.HasField("bytes"): + # For bytes values, we'll convert to a string representation + return IngestWithConfigDataChannelValuePy.string(str(channel_value.bytes)) + elif channel_value.HasField("empty"): + # For empty values, we'll return a default value + return IngestWithConfigDataChannelValuePy.empty() + else: + raise ValueError(f"{channel_value} missing type field.") diff --git a/python/lib/sift_py/ingestion/buffer.py b/python/lib/sift_py/ingestion/buffer.py index 193d5aec5..0e9749dec 100644 --- a/python/lib/sift_py/ingestion/buffer.py +++ b/python/lib/sift_py/ingestion/buffer.py @@ -71,9 +71,13 @@ def __exit__( else: self.flush() + # Wait for async ingestion threads to complete before re-raising + self._ingestion_service.wait_for_async_ingestion(timeout=30.0) raise exc_val else: self.flush() + # Wait for async ingestion threads to complete before exiting + self._ingestion_service.wait_for_async_ingestion(timeout=30.0) return True diff --git a/python/lib/sift_py/ingestion/channel.py b/python/lib/sift_py/ingestion/channel.py index c1ab71dc1..dc7cef89e 100644 --- a/python/lib/sift_py/ingestion/channel.py +++ b/python/lib/sift_py/ingestion/channel.py @@ -209,6 +209,7 @@ class ChannelDataTypeStrRep(Enum): INT_64 = "int64" UINT_32 = "uint32" UINT_64 = "uint64" + BYTES = "bytes" @staticmethod def from_api_format(val: str) -> Optional["ChannelDataTypeStrRep"]: @@ -224,6 +225,7 @@ def from_api_format(val: str) -> Optional["ChannelDataTypeStrRep"]: "CHANNEL_DATA_TYPE_INT_64": ChannelDataTypeStrRep.INT_64, "CHANNEL_DATA_TYPE_UINT_32": ChannelDataTypeStrRep.UINT_32, "CHANNEL_DATA_TYPE_UINT_64": ChannelDataTypeStrRep.UINT_64, + "CHANNEL_DATA_TYPE_BYTES": ChannelDataTypeStrRep.BYTES, }[val] except KeyError: return None @@ -244,6 +246,7 @@ class ChannelDataType(Enum): INT_64 = channel_pb.CHANNEL_DATA_TYPE_INT_64 UINT_32 = channel_pb.CHANNEL_DATA_TYPE_UINT_32 UINT_64 = channel_pb.CHANNEL_DATA_TYPE_UINT_64 + BYTES = channel_pb.CHANNEL_DATA_TYPE_BYTES @classmethod def from_pb(cls, val: channel_pb.ChannelDataType.ValueType) -> "ChannelDataType": @@ -267,6 +270,8 @@ def from_pb(cls, val: channel_pb.ChannelDataType.ValueType) -> "ChannelDataType" return cls.UINT_32 elif val == cls.UINT_64.value: return cls.UINT_64 + elif val == cls.BYTES.value: + return cls.BYTES else: raise ValueError(f"Unknown channel data type '{val}'.") @@ -302,6 +307,8 @@ def from_str(cls, raw: str) -> Optional["ChannelDataType"]: return cls.UINT_32 elif val == ChannelDataTypeStrRep.UINT_64: return cls.UINT_64 + elif val == ChannelDataTypeStrRep.BYTES: + return cls.BYTES else: raise Exception("Unreachable") @@ -334,6 +341,8 @@ def as_human_str(self, api_format: bool = False) -> str: return ( "CHANNEL_DATA_TYPE_UINT_64" if api_format else ChannelDataTypeStrRep.UINT_64.value ) + elif self == ChannelDataType.BYTES: + return "CHANNEL_DATA_TYPE_BYTES" if api_format else ChannelDataTypeStrRep.BYTES.value else: raise Exception("Unreachable.") @@ -442,3 +451,6 @@ def is_data_type(val: IngestWithConfigDataChannelValue, target_type: ChannelData return val.HasField("uint32") elif target_type == ChannelDataType.UINT_64: return val.HasField("uint64") + elif target_type == ChannelDataType.BYTES: + return val.HasField("bytes") + raise ValueError(f"Unknown channel data type '{target_type}'.") diff --git a/python/lib/sift_py/ingestion/service.py b/python/lib/sift_py/ingestion/service.py index 773dc07f9..6eb82fd7e 100644 --- a/python/lib/sift_py/ingestion/service.py +++ b/python/lib/sift_py/ingestion/service.py @@ -68,6 +68,7 @@ def attach_run( tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Union[str, float, bool]]] = None, force_new: bool = False, + client_key: Optional[str] = None, ): """ Retrieve an existing run or create one to use during this period of ingestion. @@ -75,7 +76,7 @@ def attach_run( Include `force_new=True` to force the creation of a new run, which will allow creation of a new run using an existing name. """ super().attach_run( - channel, run_name, description, organization_id, tags, metadata, force_new + channel, run_name, description, organization_id, tags, metadata, force_new, client_key ) def detach_run(self): diff --git a/python/pyproject.toml b/python/pyproject.toml index a05067997..4f4a813e7 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sift_stack_py" -version = "0.8.3" +version = "0.8.6-rc.1" description = "Python client library for the Sift API" requires-python = ">=3.8" readme = { file = "README.md", content-type = "text/markdown" } @@ -28,6 +28,7 @@ dependencies = [ "pydantic_core~=2.3", "requests~=2.25", "requests-toolbelt~=1.0", + "sift-stream-bindings>=0.1.2", "alive-progress~=3.0", # May move these to optional dependencies in the future. "pandas-stubs~=2.0",