diff --git a/monaistream/bin/standalone_multi_inout_no_plugin.py b/monaistream/bin/standalone_multi_inout_no_plugin.py deleted file mode 100644 index 540ba09..0000000 --- a/monaistream/bin/standalone_multi_inout_no_plugin.py +++ /dev/null @@ -1,122 +0,0 @@ -import numpy as np -import gi -import traceback -gi.require_version('Gst', '1.0') -from gi.repository import Gst, GLib - -Gst.init(None) - -n = 0 -def process_frame(processor_id, app_src): - def _inner(sink): - """Callback function to process each video frame.""" - global n - print(f"frame {n} for processor_id {processor_id}") - n += 1 - - sample = sink.emit("pull-sample") - if not sample: - return Gst.FlowReturn.ERROR - - buffer = sample.get_buffer() - caps = sample.get_caps() - width = caps.get_structure(0).get_int("width")[1] - height = caps.get_structure(0).get_int("height")[1] - - # Extract data from buffer - success, map_info = buffer.map(Gst.MapFlags.READ) - if not success: - return Gst.FlowReturn.ERROR - - frame = np.frombuffer(map_info.data, dtype=np.uint8).reshape((height, width, 3)) - buffer.unmap(map_info) - dframe = np.array(frame) - # Blank out top-left corner (e.g., 100x100 pixels) - dframe[:100, :100] = (0, 0, 0) # Set pixels to black (BGR) - - # Push modified frame to appsrc - new_buffer = Gst.Buffer.new_wrapped(dframe.tobytes()) - app_src.emit("push-buffer", new_buffer) - - return Gst.FlowReturn.OK - - return _inner - - -# Create pipeline -pipeline_in_0 = Gst.parse_launch( - "videotestsrc is-live=true " - "! videoconvert " - "! video/x-raw,format=BGR " - "! queue " - "! appsink name=app_sink_0 " -) - -pipeline_in_1 = Gst.parse_launch( - "videotestsrc is-live=true " - "! videoconvert " - "! video/x-raw,format=BGR " - "! queue " - "! appsink name=app_sink_1 " -) - -pipeline_out_0 = Gst.parse_launch( - "appsrc name=mysrc " - "! queue " - "! videoconvert " - "! x264enc " - "! mp4mux " - "! fakesink" -) - -# Get elements -appsink_0 = pipeline_in_0.get_by_name("app_sink_0") -appsink_1 = pipeline_in_1.get_by_name("app_sink_1") -appsrc = pipeline_out_0.get_by_name("mysrc") - -# Configure appsink_0 -appsink_0.set_property("emit-signals", True) -appsink_0.set_property("max-buffers", 1) -appsink_0.set_property("drop", True) -appsink_0.connect("new-sample", process_frame(0, appsrc)) - -# Configure appsink_1 -appsink_1.set_property("emit-signals", True) -appsink_1.set_property("max-buffers", 1) -appsink_1.set_property("drop", True) -appsink_1.connect("new-sample", process_frame(1, appsrc)) - -# Configure appsrc -caps = Gst.Caps.from_string("video/x-raw, format=BGR, width=640, height=480, framerate=30/1") -appsrc.set_property("caps", caps) -appsrc.set_property("format", Gst.Format.TIME) -appsrc.set_property("block", True) -appsrc.set_property("is-live", True) - -# Start pipeline -pipeline_out_0.set_state(Gst.State.PLAYING) -pipeline_in_0.set_state(Gst.State.PLAYING) -pipeline_in_1.set_state(Gst.State.PLAYING) - -# Run main loop -loop = GLib.MainLoop() -print("set up main loop") -try: - loop.run() -except KeyboardInterrupt: - raise -except Exception as e: - print(f"Exiting due to: {traceback.format_exc()}") -finally: - if loop and loop.is_running(): - loop.quit() - if pipeline_in_0: - pipeline_in_0.set_state(Gst.State.NULL) - if pipeline_in_1: - pipeline_in_1.set_state(Gst.State.NULL) - if pipeline_out_0: - pipeline_out_0.set_state(Gst.State.NULL) - print("Pipeline stopped.") - -# Stop pipeline -# pipeline.set_state(Gst.State.NULL) diff --git a/monaistream/bin/standalone_multi_input_plugin.py b/monaistream/bin/standalone_multi_input_plugin.py deleted file mode 100644 index 82419ea..0000000 --- a/monaistream/bin/standalone_multi_input_plugin.py +++ /dev/null @@ -1,101 +0,0 @@ -import inspect - -import gi - -gi.require_version("Gst", "1.0") -gi.require_version("GstBase", "1.0") -from gi.repository import Gst, GLib, GObject, GstBase - -import numpy as np - -Gst.init() -from monaistream.streamrunners.gstreamer_plugin import ( - GstMultiInputStreamRunner, - GstMultiInputStreamRunner2, - GstMultiInputStreamRunner3, - GstMultiInOutStreamRunner2, - GstMultiInOutStreamRunner2_5, - GstMultiInOutStreamRunner3, -) - -FORMATS = "{RGBx,BGRx,xRGB,xBGR,RGBA,BGRA,ARGB,ABGR,RGB,BGR}" - - -class MyAdaptorOp(GstMultiInputStreamRunner): - - def do_op(self, input_data): - print(f"got input data with shapes {[input_data[i].shape for i in range(len(input_data))]}") - output_data = np.array(input_data[0]) - output_data[:128, :128, :] = input_data[1] - return output_data - - -def register(runner_type, runner_alias): - RunnerType = GObject.type_register(runner_type) - if not Gst.Element.register(None, runner_alias, Gst.Rank.NONE, RunnerType): - raise RuntimeError(f"Failed to register {runner_alias}; you may be missing gst-python plugins") - - -def run_pipeline(pipeline_descriptor): - - # register(MyInPlaceOp, 'myop') - register(MyAdaptorOp, 'myop') - - pipeline = Gst.parse_launch(pipeline_descriptor) - - pipeline.set_state(Gst.State.PLAYING) - - loop = GLib.MainLoop() - try: - print("running loop") - loop.run() - except KeyboardInterrupt: - pass - finally: - pipeline.set_state(Gst.State.NULL) - - -if __name__ == '__main__': - """ - TODO: - - specify source/sink one of several ways: - - pipeline descriptor string - - presets that look up descriptor strings (with argument specification) - - move runner class inside a factory class - - the runner is constructed - """ - - - - # run_pipeline( - # 'udpsrc address=0.0.0.0 port=5001 caps=application/x-rtp,media=video,payout=96,encoding-name=H264', - # 'udpsink host=255.255.255.255 port=5002' - # ) - # pipeline_descriptor = ( - # "videotestsrc pattern=0 ! video/x-raw, width=256, height=256, format=RGB ! queue ! myop.sink_0 " - # "videotestsrc pattern=1 ! video/x-raw, width=128, height=128, format=RGB ! queue ! myop.sink_1 " - # "myop ! videoconvert ! fakesink" - # ) - pipeline_descriptor = ( - "myop name=myop " - "videotestsrc pattern=0 ! video/x-raw, width=256, height=256, format=RGB ! queue ! myop. " - "videotestsrc pattern=1 ! video/x-raw, width=128, height=128, format=RGB ! queue ! myop. " - "myop ! queue ! videoconvert ! fakesink" - ) - - # pipeline_descriptor = ( - # "videotestsrc pattern=0 ! video/x-raw,width=256,height=256 ! myop.sink_1 " - # "videotestsrc pattern=1 ! video/x-raw,width=128,height=128 ! myop.sink_2 " - # "myop. " - # "tee name=t ! queue ! videoconvert ! fakesink t. ! queue ! videoconvert ! fakesink" - # ) - # pipeline_descriptor = ( - # "myop name=myop " - # "videotestsrc pattern=0 ! video/x-raw,width=256,height=256 ! myop.sink_1 " - # "videotestsrc pattern=1 ! video/x-raw,width=128,height=128 ! myop.sink_2 " - # "myop.src_1 ! queue ! videoconvert ! fakesink " - # "myop.src_2 ! queue ! videoconvert ! fakesink" - # ) - print(pipeline_descriptor) - - run_pipeline(pipeline_descriptor) diff --git a/monaistream/bin/standalone_single_inout_no_plugin.py b/monaistream/bin/standalone_single_inout_no_plugin.py deleted file mode 100644 index 95db5ec..0000000 --- a/monaistream/bin/standalone_single_inout_no_plugin.py +++ /dev/null @@ -1,100 +0,0 @@ -import numpy as np -import gi -import traceback -gi.require_version('Gst', '1.0') -from gi.repository import Gst, GLib - -Gst.init(None) - -n = 0 -def process_frame(sink): - """Callback function to process each video frame.""" - global n - print(f"frame {n}") - n += 1 - - sample = sink.emit("pull-sample") - if not sample: - return Gst.FlowReturn.ERROR - - buffer = sample.get_buffer() - caps = sample.get_caps() - width = caps.get_structure(0).get_int("width")[1] - height = caps.get_structure(0).get_int("height")[1] - - # Extract data from buffer - success, map_info = buffer.map(Gst.MapFlags.READ) - if not success: - return Gst.FlowReturn.ERROR - - frame = np.frombuffer(map_info.data, dtype=np.uint8).reshape((height, width, 3)) - buffer.unmap(map_info) - dframe = np.array(frame) - # Blank out top-left corner (e.g., 100x100 pixels) - dframe[:100, :100] = (0, 0, 0) # Set pixels to black (BGR) - - # Push modified frame to appsrc - new_buffer = Gst.Buffer.new_wrapped(dframe.tobytes()) - appsrc.emit("push-buffer", new_buffer) - - return Gst.FlowReturn.OK - -# Create pipeline -pipeline = Gst.parse_launch( - "videotestsrc is-live=true " - "! videoconvert " - "! video/x-raw,format=BGR " - "! queue " - "! appsink name=mysink " -) - -pipeline2 = Gst.parse_launch( - "appsrc name=mysrc " - "! queue " - "! videoconvert " - "! x264enc " - "! mp4mux " - "! fakesink" -) - -# Get elements -appsink = pipeline.get_by_name("mysink") -appsrc = pipeline2.get_by_name("mysrc") - -# Configure appsink -appsink.set_property("emit-signals", True) -appsink.set_property("max-buffers", 1) -appsink.set_property("drop", True) -appsink.connect("new-sample", process_frame) - -# Configure appsrc -caps = Gst.Caps.from_string("video/x-raw, format=BGR, width=640, height=480, framerate=30/1") -appsrc.set_property("caps", caps) -appsrc.set_property("format", Gst.Format.TIME) -appsrc.set_property("block", True) -appsrc.set_property("is-live", True) - -# Start pipeline -pipeline2.set_state(Gst.State.PLAYING) -pipeline.set_state(Gst.State.PLAYING) - -# Run main loop -loop = GLib.MainLoop() -print("set up main loop") -try: - loop.run() -except KeyboardInterrupt: - raise -except Exception as e: - print(f"Exiting due to: {traceback.format_exc()}") -finally: - if loop and loop.is_running(): - loop.quit() - if pipeline: - pipeline.set_state(Gst.State.NULL) - if pipeline2: - pipeline2.set_state(Gst.State.NULL) - print("Pipeline stopped.") - -# Stop pipeline -pipeline.set_state(Gst.State.NULL) diff --git a/monaistream/bin/standalone_single_inout_plugin.py b/monaistream/bin/standalone_single_inout_plugin.py deleted file mode 100644 index cf148b5..0000000 --- a/monaistream/bin/standalone_single_inout_plugin.py +++ /dev/null @@ -1,77 +0,0 @@ -import inspect - -import gi - -gi.require_version("Gst", "1.0") -gi.require_version("GstBase", "1.0") -from gi.repository import Gst, GLib, GObject, GstBase - -import numpy as np - -Gst.init() -from monaistream.streamrunners.gstreamer_plugin import GstAdaptorStreamRunner - -FORMATS = "{RGBx,BGRx,xRGB,xBGR,RGBA,BGRA,ARGB,ABGR,RGB,BGR}" - - -class MyAdaptorOp(GstAdaptorStreamRunner): - - def do_op(self, src_data, snk_data): - print(f"got source data with shape {src_data.shape}") - snk_data[...] = src_data[:snk_data.shape[0], :snk_data.shape[1], :] - - -def register(runner_type, runner_alias): - RunnerType = GObject.type_register(runner_type) - if not Gst.Element.register(None, runner_alias, Gst.Rank.NONE, RunnerType): - raise RuntimeError(f"Failed to register {runner_alias}; you may be missing gst-python plugins") - - -def run_pipeline(pipeline_descriptor): - - # register(MyInPlaceOp, 'myop') - register(MyAdaptorOp, 'myop') - - pipeline = Gst.parse_launch(pipeline_descriptor) - - pipeline.set_state(Gst.State.PLAYING) - - loop = GLib.MainLoop() - try: - print("running loop") - loop.run() - except KeyboardInterrupt: - pass - finally: - pipeline.set_state(Gst.State.NULL) - - -if __name__ == '__main__': - """ - TODO: - - specify source/sink one of several ways: - - pipeline descriptor string - - presets that look up descriptor strings (with argument specification) - - move runner class inside a factory class - - the runner is constructed - """ - - - - # run_pipeline( - # 'udpsrc address=0.0.0.0 port=5001 caps=application/x-rtp,media=video,payout=96,encoding-name=H264', - # 'udpsink host=255.255.255.255 port=5002' - # ) - pipeline_descriptor = ( - 'videotestsrc is-live=true ' - '! videoconvert ' - # '! myop message=foo ' - '! queue ' - '! myop ' - '! queue ' - '! videoconvert ' - '! fakesink' - ) - print(pipeline_descriptor) - - run_pipeline(pipeline_descriptor) diff --git a/monaistream/streamrunners/utils.py b/monaistream/datasets/__init__.py similarity index 100% rename from monaistream/streamrunners/utils.py rename to monaistream/datasets/__init__.py diff --git a/monaistream/datasets/gstreamer/__init__.py b/monaistream/datasets/gstreamer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/monaistream/threadsafe.py b/monaistream/datasets/gstreamer/iterable_buffer_dataset.py similarity index 80% rename from monaistream/threadsafe.py rename to monaistream/datasets/gstreamer/iterable_buffer_dataset.py index 3074836..1d3c94f 100644 --- a/monaistream/threadsafe.py +++ b/monaistream/datasets/gstreamer/iterable_buffer_dataset.py @@ -14,9 +14,8 @@ import torch from queue import Empty, Queue -from threading import Thread, RLock -from monai.transforms import Transform -from monai.utils.enums import CommonKeys +from threading import RLock + __all__ = ["IterableBufferDataset", "StreamSinkTransform"] @@ -72,19 +71,3 @@ def __iter__(self): pass # queue was empty this time, try again finally: self.stop() - - -class StreamSinkTransform(Transform): - def __init__(self, result_key: str = CommonKeys.PRED, buffer_size: int = 0, timeout: float = 1.0): - super().__init__() - self.result_key = result_key - self.buffer_size = buffer_size - self.timeout = timeout - self.queue: Queue = Queue(self.buffer_size) - - def __call__(self, data): - self.queue.put(data[self.result_key], timeout=self.timeout) - return data - - def get_result(self): - return self.queue.get(timeout=self.timeout) diff --git a/monaistream/gstreamer/__init__.py b/monaistream/gstreamer/__init__.py deleted file mode 100644 index aceb2d4..0000000 --- a/monaistream/gstreamer/__init__.py +++ /dev/null @@ -1,49 +0,0 @@ -# Copyright (c) MONAI Consortium -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -This module contains code for GStreamer related components and plugins. It contains the utility definitions for use with -extension classes as well as Python based plugins which can be loaded with the gst-python module. This module requires -a list of directories be provided in the environment variable GST_PLUGIN_PATH. In these directories it looks for a -subdirectory called "python" in which it expects plugin source files to load. This module adds its directory to this -variable and has a "python" symlink linking to its directory so that this mechanism works on import. -""" - -import os -from monai.utils.module import optional_import - -gi, HAS_GI = optional_import("gi") - -if HAS_GI: - plugin_path = os.environ.get("GST_PLUGIN_PATH", None) - if plugin_path: - plugin_path += ":" + os.path.dirname(__file__) - else: - plugin_path = os.path.dirname(__file__) - - os.environ["GST_PLUGIN_PATH"] = plugin_path # set the plugin path so that this directory is searched - - gi.require_version("Gst", "1.0") - gi.require_version("GstBase", "1.0") - gi.require_version("GstVideo", "1.0") - from gi.repository import Gst - - Gst.init([]) - # use GST_DEBUG instead https://gstreamer.freedesktop.org/documentation/gstreamer/running.html - # Gst.debug_set_active(True) - # Gst.debug_set_default_threshold(5) - - from monaistream.gstreamer.utils import * - from monaistream.gstreamer.numpy_transforms import * - - # TODO: import more things here - -# Silently import nothing if gi not present? If gi not present don't annoy user with warning on every import. diff --git a/monaistream/gstreamer/numpy_transforms.py b/monaistream/gstreamer/numpy_transforms.py deleted file mode 100644 index 809d858..0000000 --- a/monaistream/gstreamer/numpy_transforms.py +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright (c) MONAI Consortium -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from contextlib import contextmanager -import numpy as np - -from gi.repository import Gst, GObject, GstBase - -from monaistream.gstreamer.utils import map_buffer_to_numpy, get_video_pad_template - -__all__=["NumpyInplaceTransform"] - -class NumpyInplaceTransform(GstBase.BaseTransform): - """ - """ - __gstmetadata__ = ("Numpy Inplace Transform", "Transform", "Description", "Author") - - __gsttemplates__ = ( - get_video_pad_template("src", Gst.PadDirection.SRC), - get_video_pad_template("sink", Gst.PadDirection.SINK), - ) - - def do_transform_ip(self, buffer: Gst.Buffer) -> Gst.FlowReturn: - """ - """ - with map_buffer_to_numpy(buffer, Gst.MapFlags.WRITE, self.sinkpad.get_current_caps()) as image_array: - height, width, _ = image_array.shape - image_array[: height // 2, : width // 2] = 128 - - return Gst.FlowReturn.OK - - -GObject.type_register(NumpyInplaceTransform) -__gstelementfactory__ = ("numpyinplacetransform", Gst.Rank.NONE, NumpyInplaceTransform) diff --git a/monaistream/gstreamer/python b/monaistream/gstreamer/python deleted file mode 120000 index 945c9b4..0000000 --- a/monaistream/gstreamer/python +++ /dev/null @@ -1 +0,0 @@ -. \ No newline at end of file diff --git a/monaistream/simple_inference.py b/monaistream/simple_inference.py deleted file mode 100644 index 1c32f13..0000000 --- a/monaistream/simple_inference.py +++ /dev/null @@ -1,151 +0,0 @@ -# Copyright (c) MONAI Consortium -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import TYPE_CHECKING, Any, Callable, Sequence -from monai.inferers import Inferer -from monai.transforms import apply_transform, Transform -from monai.engines import SupervisedEvaluator, default_metric_cmp_fn, default_prepare_batch -from monai.utils import ForwardMode,CommonKeys -from monai.data import Dataset -from monai.handlers import MeanSquaredError, from_engine -import torch -from torch.nn import Module - -from monai.utils import IgniteInfo, min_version, optional_import - -if TYPE_CHECKING: - from ignite.engine import Engine, EventEnum - from ignite.metrics import Metric -else: - version = IgniteInfo.OPT_IMPORT_VERSION - Engine, _ = optional_import("ignite.engine", version, min_version, "Engine", as_type="decorator") - Metric, _ = optional_import("ignite.metrics", version, min_version, "Metric", as_type="decorator") - EventEnum, _ = optional_import("ignite.engine", version, min_version, "EventEnum", as_type="decorator") - - -class SimpleInferenceEngine: - """ - A simple engine-like class is for running inference on a per-input basis, such as with per-frame data in a - video stream. It relies on a supplied Inferer instance and a network. - """ - - def __init__( - self, inferer: Inferer, network: Module, preprocess: Callable | None = None, postprocess: Callable | None = None - ): - self.inferer = inferer - self.network = network - self.preprocess = preprocess - self.postprocess = postprocess - - def __call__(self, inputs: torch.Tensor, *args: Any, **kwargs: Any) -> Any: - if self.preprocess: - inputs = apply_transform(self.preprocess, inputs) - - outputs = self.inferer(inputs, self.network, *args, **kwargs) - - if self.postprocess: - outputs = apply_transform(self.postprocess, outputs) - - return outputs - - -class SingleItemDataset(Dataset): - """ - This simple dataset only ever has one item and acts as its own iterable. This is used with InferenceEngine to - represent a changeable single item epoch. - """ - def __init__(self, transform: Sequence[Callable] | Callable | None = None) -> None: - super().__init__([None], transform) - - def set_item(self, item): - self.data[0] = item - - def __iter__(self): - yield self.data[0] - - -class InferenceEngine(SupervisedEvaluator): - """ - A simple inference engine type for applying inference to one input at a time as a callable. This is meant to be used - for inference on per-frame video stream data where the state of the engine and other setup should be done initially - but reused for every frame. This allows for synchronous use of an engine class rather than running one in a separate - thread. - """ - - def __init__( - self, - device: torch.device, - network: torch.nn.Module, - preprocessing: Transform | None = None, - non_blocking: bool = False, - prepare_batch: Callable = default_prepare_batch, - iteration_update: Callable[[Engine, Any], Any] | None = None, - inferer: Inferer | None = None, - postprocessing: Transform | None = None, - key_val_metric: dict[str, Metric] | None = None, - additional_metrics: dict[str, Metric] | None = None, - metric_cmp_fn: Callable = default_metric_cmp_fn, - handlers: Sequence | None = None, - amp: bool = False, - event_names: list[str | EventEnum | type[EventEnum]] | None = None, - event_to_attr: dict | None = None, - decollate: bool = True, - to_kwargs: dict | None = None, - amp_kwargs: dict | None = None, - compile: bool = False, - compile_kwargs: dict | None = None, - ) -> None: - super().__init__( - device=device, - val_data_loader=SingleItemDataset(preprocessing), - epoch_length=1, - network=network, - inferer=inferer, - non_blocking=non_blocking, - prepare_batch=prepare_batch, - iteration_update=iteration_update, - postprocessing=postprocessing, - key_val_metric=key_val_metric, - additional_metrics=additional_metrics, - metric_cmp_fn=metric_cmp_fn, - val_handlers=handlers, - amp=amp, - mode=ForwardMode.EVAL, - event_names=event_names, - event_to_attr=event_to_attr, - decollate=decollate, - to_kwargs=to_kwargs, - amp_kwargs=amp_kwargs, - compile=compile, - compile_kwargs=compile_kwargs, - ) - - def __call__(self, item: Any, include_metrics: bool = False) -> Any: - self.data_loader.set_item(item) - self.run() - - out = self.state.output[0][CommonKeys.PRED] - - if include_metrics: - return out, dict(engine.state.metrics) - else: - return out - - -if __name__ == "__main__": - net = torch.nn.Identity() - engine = InferenceEngine( - network=net, - device="cpu", - key_val_metric={"mse": MeanSquaredError(output_transform=from_engine([CommonKeys.IMAGE, CommonKeys.PRED]))}, - ) - print(engine(torch.rand(1, 5, 5))) - print(engine(torch.rand(1, 6, 6), True)) diff --git a/monaistream/streamrunner/__init__.py b/monaistream/streamrunner/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/monaistream/streamrunners/adaptors.py b/monaistream/streamrunner/adaptors.py similarity index 100% rename from monaistream/streamrunners/adaptors.py rename to monaistream/streamrunner/adaptors.py diff --git a/monaistream/streamrunners/gstreamer/backend.py b/monaistream/streamrunner/gstreamer/backend.py similarity index 99% rename from monaistream/streamrunners/gstreamer/backend.py rename to monaistream/streamrunner/gstreamer/backend.py index d50cd78..8eabbe4 100644 --- a/monaistream/streamrunners/gstreamer/backend.py +++ b/monaistream/streamrunner/gstreamer/backend.py @@ -10,7 +10,7 @@ import torch -from monaistream.streamrunners.gstreamer.utils import PadEntry +from monaistream.streamrunner.gstreamer.utils import PadEntry Gst.init(None) diff --git a/monaistream/streamrunners/gstreamer/subnet.py b/monaistream/streamrunner/gstreamer/subnet.py similarity index 97% rename from monaistream/streamrunners/gstreamer/subnet.py rename to monaistream/streamrunner/gstreamer/subnet.py index bf56b7f..20530c2 100644 --- a/monaistream/streamrunners/gstreamer/subnet.py +++ b/monaistream/streamrunner/gstreamer/subnet.py @@ -2,7 +2,7 @@ gi.require_version('Gst', '1.0') from gi.repository import Gst -from monaistream.streamrunners.gstreamer.utils import parse_node_entry +from monaistream.streamrunner.gstreamer.utils import parse_node_entry class GstStreamRunnerSubnet: diff --git a/monaistream/gstreamer/utils.py b/monaistream/streamrunner/gstreamer/utils.py similarity index 62% rename from monaistream/gstreamer/utils.py rename to monaistream/streamrunner/gstreamer/utils.py index 7b92b01..68e86ca 100644 --- a/monaistream/gstreamer/utils.py +++ b/monaistream/streamrunner/gstreamer/utils.py @@ -9,6 +9,14 @@ # See the License for the specific language governing permissions and # limitations under the License. + +from dataclasses import dataclass + +import gi +gi.require_version('Gst', '1.0') +from gi.repository import Gst, GLib, GObject + + from contextlib import contextmanager import numpy as np @@ -20,6 +28,80 @@ BYTE_FORMATS = "{RGBx,BGRx,xRGB,xBGR,RGBA,BGRA,ARGB,ABGR,RGB,BGR,GRAY8,GRAY16_BE,GRAY16_LE}" +def parse_node_entry(entry): + element = Gst.parse_bin_from_description_full( + # entry.description, False, None, Gst.ParseFlags.NO_SINGLE_ELEMENT_BINS) + entry.description, True, None, Gst.ParseFlags.PLACE_IN_BIN) + + if not element: + raise ValueError(f"Failed to parse element {entry.description}") + + return element + + + +def register(runner_type, runner_alias): + RunnerType = GObject.type_register(runner_type) + if not Gst.Element.register(None, runner_alias, Gst.Rank.NONE, RunnerType): + raise ValueError(f"Failed to register {runner_alias}; you may be missing gst-python plugins") + + + +def create_registerable_plugin(base_type, class_name, inputs, outputs, do_op): + # TODO: is this class actually gstreamer specific? + def init_with_do_op(self): + base_type.__init__(self, inputs=inputs, outputs=outputs, do_op=do_op) + + sub_class_type = type( + class_name, + (base_type,), + { + "__init__": init_with_do_op, + } + ) + + return sub_class_type + + + +def run_pipeline(pipeline): + pipeline.set_state(Gst.State.PLAYING) + print("logging pipeline graph") + Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, 'pipeline_state') + + loop = GLib.MainLoop() + try: + print("running loop") + loop.run() + except KeyboardInterrupt: + pass + finally: + print("shutting down") + if pipeline: + pipeline.send_event(Gst.Event.new_eos()) + bus = pipeline.get_bus() + msg = bus.timed_pop_filtered(2 * Gst.SECOND, Gst.MessageType.EOS) + pipeline.set_state(Gst.State.NULL) + # pipeline.get_state(Gst.CLOCK_TIME_NONE) + if loop and loop.is_running(): + loop.quit() + + + +@dataclass(frozen=True) +class PadEntry: + name: str + format: str + + + +@dataclass(frozen=True) +class SubnetEntry: + name: str + description: str + + + def get_video_pad_template( name, direction=Gst.PadDirection.SRC, presence=Gst.PadPresence.ALWAYS, caps_str=f"video/x-raw,format={BYTE_FORMATS}" ): @@ -97,3 +179,5 @@ def map_buffer_to_numpy(buffer, flags, caps, dtype=None): yield bufarray finally: buffer.unmap(map_info) + + diff --git a/monaistream/streamrunners/gstreamer_noplugin.py b/monaistream/streamrunner/gstreamer_noplugin.py similarity index 100% rename from monaistream/streamrunners/gstreamer_noplugin.py rename to monaistream/streamrunner/gstreamer_noplugin.py diff --git a/monaistream/streamrunners/gstreamer_plugin.py b/monaistream/streamrunner/gstreamer_plugin.py similarity index 100% rename from monaistream/streamrunners/gstreamer_plugin.py rename to monaistream/streamrunner/gstreamer_plugin.py diff --git a/monaistream/streamrunners/streamrunner.py b/monaistream/streamrunner/streamrunner.py similarity index 97% rename from monaistream/streamrunners/streamrunner.py rename to monaistream/streamrunner/streamrunner.py index 475820b..57e8585 100644 --- a/monaistream/streamrunners/streamrunner.py +++ b/monaistream/streamrunner/streamrunner.py @@ -1,6 +1,6 @@ from dataclasses import dataclass -from monaistream.streamrunners.gstreamer.backend import GstStreamRunnerBackend +from monaistream.streamrunner.gstreamer.backend import GstStreamRunnerBackend diff --git a/monaistream/streamrunner/utils.py b/monaistream/streamrunner/utils.py new file mode 100644 index 0000000..e69de29 diff --git a/monaistream/streamrunners/gstreamer/utils.py b/monaistream/streamrunners/gstreamer/utils.py deleted file mode 100644 index c0e0733..0000000 --- a/monaistream/streamrunners/gstreamer/utils.py +++ /dev/null @@ -1,78 +0,0 @@ -from dataclasses import dataclass - -import gi -gi.require_version('Gst', '1.0') -from gi.repository import Gst, GLib, GObject - - -def parse_node_entry(entry): - element = Gst.parse_bin_from_description_full( - # entry.description, False, None, Gst.ParseFlags.NO_SINGLE_ELEMENT_BINS) - entry.description, True, None, Gst.ParseFlags.PLACE_IN_BIN) - - if not element: - raise ValueError(f"Failed to parse element {entry.description}") - - return element - - - -def register(runner_type, runner_alias): - RunnerType = GObject.type_register(runner_type) - if not Gst.Element.register(None, runner_alias, Gst.Rank.NONE, RunnerType): - raise ValueError(f"Failed to register {runner_alias}; you may be missing gst-python plugins") - - - -def create_registerable_plugin(base_type, class_name, inputs, outputs, do_op): - # TODO: is this class actually gstreamer specific? - def init_with_do_op(self): - base_type.__init__(self, inputs=inputs, outputs=outputs, do_op=do_op) - - sub_class_type = type( - class_name, - (base_type,), - { - "__init__": init_with_do_op, - } - ) - - return sub_class_type - - - -def run_pipeline(pipeline): - pipeline.set_state(Gst.State.PLAYING) - print("logging pipeline graph") - Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, 'pipeline_state') - - loop = GLib.MainLoop() - try: - print("running loop") - loop.run() - except KeyboardInterrupt: - pass - finally: - print("shutting down") - if pipeline: - pipeline.send_event(Gst.Event.new_eos()) - bus = pipeline.get_bus() - msg = bus.timed_pop_filtered(2 * Gst.SECOND, Gst.MessageType.EOS) - pipeline.set_state(Gst.State.NULL) - # pipeline.get_state(Gst.CLOCK_TIME_NONE) - if loop and loop.is_running(): - loop.quit() - - - -@dataclass(frozen=True) -class PadEntry: - name: str - format: str - - - -@dataclass(frozen=True) -class SubnetEntry: - name: str - description: str diff --git a/monaistream/transforms/__init__.py b/monaistream/transforms/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/monaistream/transforms/gstreamer/__init__.py b/monaistream/transforms/gstreamer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/monaistream/transforms/gstreamer/streaming_sink_transform.py b/monaistream/transforms/gstreamer/streaming_sink_transform.py new file mode 100644 index 0000000..03643bf --- /dev/null +++ b/monaistream/transforms/gstreamer/streaming_sink_transform.py @@ -0,0 +1,30 @@ +# Copyright (c) MONAI Consortium +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from queue import Queue +from monai.transforms import Transform +from monai.utils.enums import CommonKeys + + +class StreamSinkTransform(Transform): + def __init__(self, result_key: str = CommonKeys.PRED, buffer_size: int = 0, timeout: float = 1.0): + super().__init__() + self.result_key = result_key + self.buffer_size = buffer_size + self.timeout = timeout + self.queue: Queue = Queue(self.buffer_size) + + def __call__(self, data): + self.queue.put(data[self.result_key], timeout=self.timeout) + return data + + def get_result(self): + return self.queue.get(timeout=self.timeout) diff --git a/tests/adaptors/test_workflow_adaptor.py b/tests/adaptors/test_workflow_adaptor.py index 75ec646..5959b44 100644 --- a/tests/adaptors/test_workflow_adaptor.py +++ b/tests/adaptors/test_workflow_adaptor.py @@ -5,7 +5,7 @@ import monai from monai.engines import Workflow -from monaistream.streamrunners.adaptors import IgniteEngineAdaptor +from monaistream.streamrunner.adaptors import IgniteEngineAdaptor class TestIgniteEngineAdaptor(unittest.TestCase): diff --git a/tests/standalone/__init__.py b/tests/standalone/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/monaistream/bin/standalone_ignite_demo.py b/tests/standalone/standalone_ignite_demo.py similarity index 82% rename from monaistream/bin/standalone_ignite_demo.py rename to tests/standalone/standalone_ignite_demo.py index 30c8bd8..f5f9052 100644 --- a/monaistream/bin/standalone_ignite_demo.py +++ b/tests/standalone/standalone_ignite_demo.py @@ -4,10 +4,10 @@ from ignite.engine import Engine -from monaistream.streamrunners.streamrunner import StreamRunner -from monaistream.streamrunners.adaptors import StreamingDataLoader, IgniteEngineAdaptor -from monaistream.streamrunners.gstreamer.subnet import GstStreamRunnerSubnet -from monaistream.streamrunners.gstreamer.utils import run_pipeline, PadEntry, SubnetEntry +from monaistream.streamrunner.streamrunner import StreamRunner +from monaistream.streamrunner.adaptors import StreamingDataLoader, IgniteEngineAdaptor +from monaistream.streamrunner.gstreamer.subnet import GstStreamRunnerSubnet +from monaistream.streamrunner.gstreamer.utils import run_pipeline, PadEntry, SubnetEntry diff --git a/monaistream/bin/standalone_multi_input_output_plugin.py b/tests/standalone/standalone_multi_input_output_plugin.py similarity index 95% rename from monaistream/bin/standalone_multi_input_output_plugin.py rename to tests/standalone/standalone_multi_input_output_plugin.py index 8b6a1bc..434857d 100644 --- a/monaistream/bin/standalone_multi_input_output_plugin.py +++ b/tests/standalone/standalone_multi_input_output_plugin.py @@ -9,7 +9,7 @@ import numpy as np -from monaistream.streamrunners.gstreamer.utils import ( +from monaistream.streamrunner.gstreamer.utils import ( create_registerable_plugin, register, run_pipeline @@ -19,8 +19,8 @@ Gst.init() -from monaistream.streamrunners.gstreamer.backend import GstStreamRunnerBackend, GstStreamRunnerBackendStatic -from monaistream.streamrunners.gstreamer.utils import PadEntry +from monaistream.streamrunner.gstreamer.backend import GstStreamRunnerBackend, GstStreamRunnerBackendStatic +from monaistream.streamrunner.gstreamer.utils import PadEntry FORMATS = "{RGBx,BGRx,xRGB,xBGR,RGBA,BGRA,ARGB,ABGR,RGB,BGR}" diff --git a/tests/test_numpy_transforms.py b/tests/test_numpy_transforms.py deleted file mode 100644 index d7a4c14..0000000 --- a/tests/test_numpy_transforms.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright (c) MONAI Consortium -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys -from tempfile import TemporaryDirectory -from subprocess import check_call, CalledProcessError -import unittest -from tests.utils import SkipIfNoModule - - -@SkipIfNoModule("gi") -class TestNumpyInplaceTransform(unittest.TestCase): - def test_import(self): - """ - Test importation of the transform. - """ - from monaistream.gstreamer import NumpyInplaceTransform - - def test_pipeline(self): - """ - Test the transform can be loaded with `parse_launchv`. - """ - from gi.repository import Gst - - pipeline = Gst.parse_launchv(["videotestsrc", "numpyinplacetransform"]) - self.assertIsNotNone(pipeline) - - def test_gst_launch(self): - """ - Test launching a separate pipeline subprocess with gst-launch-1.0 correctly imports the transform. - """ - pipeline = "videotestsrc num-buffers=1 ! numpyinplacetransform ! jpegenc ! filesink location=img.jpg" - - with TemporaryDirectory() as td: - try: - check_call(["gst-launch-1.0"] + list(pipeline.split()), cwd=td) - except CalledProcessError as cpe: - print("Output gst-launch-1.0:\n", repr(cpe.output), file=sys.stderr) - raise - - self.assertTrue(os.path.isfile(os.path.join(td, "img.jpg"))) - - -if __name__ == "__main__": - unittest.main()