Skip to content

Commit

Permalink
add support for feeding gstreamer pipelines into folk image frames
Browse files Browse the repository at this point in the history
  • Loading branch information
s-ol committed Jun 13, 2024
1 parent ff165ca commit 85698b4
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 0 deletions.
29 changes: 29 additions & 0 deletions test/gstreamer.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
loadVirtualPrograms [list "virtual-programs/gstreamer.folk" "virtual-programs/images.folk"]
Step

# namespace eval Pipeline $::makePipeline
# set pl [Pipeline::create "videotestsrc"]
# set img [Pipeline::frame $pl]
# Pipeline::freeImage $img
# Pipeline::destroy $pl

Step

When the gstreamer pipeline "videotestsrc" frame is /frame/ at /ts/ {
Wish the web server handles route "/gst-image/$" with handler [list apply {{im} {
# set width [dict get $im width]
# set height [dict get $im height]
set filename "/tmp/web-image-frame.png"
image saveAsPng $im $filename
set fsize [file size $filename]
set fd [open $filename r]
fconfigure $fd -encoding binary -translation binary
set body [read $fd $fsize]
close $fd
dict create statusAndHeaders "HTTP/1.1 200 OK\nConnection: close\nContent-Type: image/png\nContent-Length: $fsize\n\n" body $body
}} $frame]
}

Step

forever { Step }
205 changes: 205 additions & 0 deletions virtual-programs/gstreamer.folk
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
set makePipeline {
rename [c create] cc

cc cflags {*}[exec pkg-config --cflags --libs gstreamer-1.0]
cc include <gst/gst.h>
cc include <assert.h>

proc defineGObjectType {cc type cast} {
set cc [uplevel {namespace current}]::$cc
$cc argtype $type* [format {
%s* $argname;
GObject* _$argname;
sscanf(Tcl_GetString($obj), "(%s) 0x%%p", &_$argname);
$argname = %s(_$argname);
} $type $type $cast]

# Tcl_ObjPrintf doesn't work with %lld/%llx for some reason,
# so we do it by hand.
$cc rtype $type* [format {
$robj = Tcl_ObjPrintf("(%s) 0x%%" PRIxPTR, (uintptr_t) G_OBJECT($rvalue));
} $type]
}

defineImageType cc
defineGObjectType cc GstElement GST_ELEMENT
defineGObjectType cc GstBus GST_BUS

cc struct pipeline_t {
GstElement* pipeline;
GstElement* sink;
GstBus* bus;
}

cc struct frame_t {
bool valid;
uint64_t timestamp;
image_t image;
}

cc code {
void quit(const char* msg) {
fprintf(stderr, "[%s] %d: %s\n", msg, errno, strerror(errno));
exit(1);
}

void log_messages(GstBus* bus) {
GstMessage* msg;
GError *err = NULL;
gchar *dbg_info = NULL;
while ((msg = gst_bus_pop_filtered(bus, GST_MESSAGE_ERROR | GST_MESSAGE_WARNING))) {
switch (GST_MESSAGE_TYPE (msg)) {
case GST_MESSAGE_ERROR: {
gst_message_parse_error(msg, &err, &dbg_info);
g_printerr("ERROR from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
g_printerr("Debugging info: %s\n", (dbg_info) ? dbg_info : "none");
g_error_free(err);
g_free(dbg_info);
break;
}
case GST_MESSAGE_WARNING: {
gst_message_parse_warning(msg, &err, &dbg_info);
g_printerr("WARNING from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
g_printerr("Debugging info: %s\n", (dbg_info) ? dbg_info : "none");
g_error_free(err);
g_free(dbg_info);
break;
}
default:
break;
}
}
}
}

cc proc destroy {pipeline_t* p} void {
gst_object_unref(p->bus);
gst_object_unref(p->sink);
gst_element_set_state(p->pipeline, GST_STATE_NULL);
gst_object_unref(p->pipeline);
ckfree(p);
}

cc proc create {char* srcdec} pipeline_t* {
GError* err = NULL;
gst_init(NULL, NULL);

char buf[512];
snprintf(buf, sizeof(buf), "%s ! videoconvert ! appsink caps=video/x-raw,format=RGBA name=output", srcdec);
GstElement* pipeline = gst_parse_launch(buf, &err);
if (err) {
g_printerr("ERROR launching gst pipeline: %s\n", err->message);
return NULL;
}

pipeline_t* p = ckalloc(sizeof (pipeline_t));
p->pipeline = pipeline;
p->bus = gst_element_get_bus(p->pipeline);
p->sink = gst_bin_get_by_name(GST_BIN(p->pipeline), "output");

GstState state;
gst_element_set_state(p->pipeline, GST_STATE_PLAYING);
gst_element_get_state(p->pipeline, &state, NULL, GST_CLOCK_TIME_NONE);
log_messages(p->bus);

if (state != GST_STATE_PLAYING) {
g_printerr("ERROR launching gst pipeline: pipeline failed to start\n");
destroy(p);
return NULL;
}

return p;
}

if {[namespace exists ::Heap]} {
cc import ::Heap::cc folkHeapAlloc as folkHeapAlloc
cc import ::Heap::cc folkHeapFree as folkHeapFree
} else {
cc code {
#define folkHeapAlloc malloc
#define folkHeapFree free
}
}
cc proc frame {pipeline_t* p} frame_t {
frame_t frame;
frame.valid = TRUE;

GstSample* sample;
g_signal_emit_by_name(p->sink, "pull-sample", &sample);
if (!sample) {
frame.valid = FALSE;
return frame;
}

GstCaps* caps = gst_sample_get_caps(sample);
// gst_println("caps are %" GST_PTR_FORMAT, caps);

GstStructure* s = gst_caps_get_structure(caps, 0);
assert(gst_structure_get_int(s, "width", (gint*)&frame.image.width));
assert(gst_structure_get_int(s, "height", (gint*)&frame.image.height));
const gchar* format = gst_structure_get_string(s, "format");
if (g_str_equal(format, "RGB")) {
frame.image.components = 3;
} else if (g_str_equal(format, "RGBA")) {
frame.image.components = 4;
} else {
fprintf(stderr, "frame: invalid cap format '%s'\n", format);
assert(0);
}
frame.image.bytesPerRow = frame.image.width * frame.image.components;

GstMapInfo map;
GstBuffer* buffer = gst_sample_get_buffer(sample);
gst_buffer_map(buffer, &map, GST_MAP_READ);

frame.image.data = folkHeapAlloc(map.size);
memmove(frame.image.data, map.data, map.size);
frame.timestamp = (uint64_t) GST_BUFFER_DTS(buffer);

gst_buffer_unmap(buffer, &map);
gst_sample_unref(sample);

return frame;
}

cc proc freeImage {image_t image} void {
folkHeapFree(image.data);
}

cc compile
}

set ::pipelineIndex 0
When when the gstreamer pipeline /pl/ frame is /frame/ at /ts/ /lambda/ with environment /e/ {
Start process "gstreamer-[incr ::pipelineIndex]" {
puts "starting gst-$pl"
Wish $::thisProcess shares statements like \
[list /someone/ claims the gstreamer pipeline /...anything/]

namespace eval Pipeline $makePipeline

set pipe [Pipeline::create $pl]

set ::oldFrames [list]
When $::thisProcess has step count /c/ {
set frame [Pipeline::frame $pipe]
dict with frame {
if {!$valid} {
Commit {}
return
}

Commit {
Claim the gstreamer pipeline $pl time is $timestamp
Claim the gstreamer pipeline $pl frame is $image at [clock milliseconds]
}

lappend ::oldFrames $image
if {[llength $::oldFrames] >= 10} {
set ::oldFrames [lassign $::oldFrames oldestFrame]
Pipeline::freeImage $oldestFrame
}
}
}
}
}

0 comments on commit 85698b4

Please sign in to comment.