Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions api/lua/build/grpc_client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,31 @@ function extension_methods:get_headers_with_retries(timeout, retries)
end
end

---Call h2_stream:get_next_chunk() without timeout, retrying on spurious wakeup.
local function _each_chunk_helper(self)
local chunk, err, errno
repeat
chunk, err, errno = self:get_next_chunk()
until chunk ~= nil or errno ~= ce.ETIMEDOUT

return chunk, err, errno
end

---Iterate over stream's chunks.
---
---The difference between this function and h2_stream:each_chunk is that this function
---will retry calling get_next_chunk if the function fails due to a spurious wakeup.
---@return function
---@return grpc_client.h2.Stream
function extension_methods:each_chunk2()
--return self:each_chunk()
return _each_chunk_helper, self
end

---Extend a stream with new methods
---
---@param s http.h2_stream.stream
---@return http.h2_stream.stream
---@param s grpc_client.h2.Stream
---@return grpc_client.h2.Stream
function StreamExtension.extend(s)
for k, v in pairs(extension_methods) do
s[k] = v
Expand Down Expand Up @@ -354,7 +375,7 @@ function Client:server_streaming_request(request_specifier, data, callback, done
end

self.loop:wrap(function()
for response_body in stream:each_chunk() do
for response_body in stream:each_chunk2() do
while response_body:len() > 0 do
local msg_len = string.unpack(">I4", response_body:sub(2, 5))

Expand Down Expand Up @@ -432,7 +453,7 @@ function Client:bidirectional_streaming_request(request_specifier, callback, don
end

self.loop:wrap(function()
for response_body in stream:each_chunk() do
for response_body in stream:each_chunk2() do
while response_body:len() > 0 do
local msg_len = string.unpack(">I4", response_body:sub(2, 5))

Expand Down
1 change: 1 addition & 0 deletions api/lua/pinnacle-api-dev-1.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ build = {
["pinnacle.snowcap.snowcap.popup"] = "pinnacle/snowcap/snowcap/popup.lua",
["pinnacle.snowcap.snowcap.signal"] = "pinnacle/snowcap/snowcap/signal.lua",
["pinnacle.snowcap.snowcap.util"] = "pinnacle/snowcap/snowcap/util.lua",
["pinnacle.snowcap.snowcap.util.channel"] = "pinnacle/snowcap/snowcap/util/channel.lua",
["pinnacle.snowcap.snowcap.log"] = "pinnacle/snowcap/snowcap/log.lua",
},
}
29 changes: 25 additions & 4 deletions api/lua/pinnacle/grpc/defs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,31 @@ function extension_methods:get_headers_with_retries(timeout, retries)
end
end

---Call h2_stream:get_next_chunk() without timeout, retrying on spurious wakeup.
local function _each_chunk_helper(self)
local chunk, err, errno
repeat
chunk, err, errno = self:get_next_chunk()
until chunk ~= nil or errno ~= ce.ETIMEDOUT

return chunk, err, errno
end

---Iterate over stream's chunks.
---
---The difference between this function and h2_stream:each_chunk is that this function
---will retry calling get_next_chunk if the function fails due to a spurious wakeup.
---@return function
---@return grpc_client.h2.Stream
function extension_methods:each_chunk2()
--return self:each_chunk()
return _each_chunk_helper, self
end

---Extend a stream with new methods
---
---@param s http.h2_stream.stream
---@return http.h2_stream.stream
---@param s grpc_client.h2.Stream
---@return grpc_client.h2.Stream
function StreamExtension.extend(s)
for k, v in pairs(extension_methods) do
s[k] = v
Expand Down Expand Up @@ -356,7 +377,7 @@ function Client:server_streaming_request(request_specifier, data, callback, done
end

self.loop:wrap(function()
for response_body in stream:each_chunk() do
for response_body in stream:each_chunk2() do
while response_body:len() > 0 do
local msg_len = string.unpack(">I4", response_body:sub(2, 5))

Expand Down Expand Up @@ -434,7 +455,7 @@ function Client:bidirectional_streaming_request(request_specifier, callback, don
end

self.loop:wrap(function()
for response_body in stream:each_chunk() do
for response_body in stream:each_chunk2() do
while response_body:len() > 0 do
local msg_len = string.unpack(">I4", response_body:sub(2, 5))

Expand Down
161 changes: 110 additions & 51 deletions snowcap/api/lua/snowcap/decoration.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@ local decoration_handle = {}

---@class snowcap.decoration.DecorationHandle
---@field id integer
---@field private update fun(msg: any)
---@field private _update fun(msg: any)
---@field private _operate fun(oper: snowcap.widget.operation.Operation)
local DecorationHandle = {}

---@param id integer
---@param update fun(msg: any?)
---@param operate fun(oper: snowcap.widget.operation.Operation)
---@return snowcap.decoration.DecorationHandle
function decoration_handle.new(id, update)
function decoration_handle.new(id, update, operate)
---@type snowcap.decoration.DecorationHandle
local self = {
id = id,
update = update,
_update = update,
_operate = operate,
}
setmetatable(self, { __index = DecorationHandle })
return self
Expand Down Expand Up @@ -80,29 +83,30 @@ function decoration.new_widget(args)

local decoration_id = response.decoration_id or 0

local sender, receiver = require("snowcap.util.channel").spsc()

---@type fun(msg: any?)
local update_on_msg = function(msg)
if msg ~= nil then
args.program:update(msg)
end

---@diagnostic disable-next-line: redefined-local
local _, err = client:snowcap_decoration_v1_DecorationService_RequestView({
decoration_id = decoration_id,
})

if err then
log.error(err)
sender:send({
message = msg,
})
else
sender:send({
redraw = {}
})
end
end

local handle = decoration_handle.new(decoration_id, update_on_msg)

---@type fun(oper: snowcap.widget.operation.Operation)
local forward_operation = function(oper)
handle:operate(oper)
local operate_surface = function(oper)
sender:send({
operation = oper,
})
end

local handle = decoration_handle.new(decoration_id, update_on_msg, operate_surface)

---@type fun(): snowcap.signal.HandlerPolicy
local close_surface = function()
handle:close()
Expand All @@ -112,48 +116,110 @@ function decoration.new_widget(args)

args.program:connect(widget_signal.redraw_needed, update_on_msg)
args.program:connect(widget_signal.send_message, update_on_msg)
args.program:connect(widget_signal.operation, forward_operation)
args.program:connect(widget_signal.operation, operate_surface)
args.program:connect(widget_signal.request_close, close_surface)

args.program:event({
created = widget.SurfaceHandle.from_decoration_handle(handle),
})

err = client:snowcap_decoration_v1_DecorationService_GetDecorationEvents({
decoration_id = decoration_id,
}, function(response) ---@diagnostic disable-line:redefined-local
sender:send({
decoration_events = response.decoration_events or {}
})
end)

err = client:snowcap_widget_v1_WidgetService_GetWidgetEvents({
decoration_id = decoration_id,
}, function(response) ---@diagnostic disable-line: redefined-local
for _, event in ipairs(response.widget_events) do
---@diagnostic disable-next-line:invisible
local msg = widget._message_from_event(callbacks, event)

if msg then
local ok, update_err = pcall(function()
args.program:update(msg)
end)
if not ok then
log.error(update_err)
sender:send({
widget_events = response.widget_events or {}
})
end)

client.loop:wrap(function()
local pending_operations = {}

local msg = receiver:recv()
while msg do
local update_view = false

if msg.widget_events then
for _, event in ipairs(msg.widget_events) do
---@diagnostic disable-next-line:invisible
local msg = widget._message_from_event(callbacks, event)

if msg then
local ok, update_err = pcall(function()
args.program:update(msg)
end)
if not ok then
log.error(update_err)
end
end
end

update_view = true
elseif msg.message then
args.program:update(msg)
elseif msg.operation then
table.insert(pending_operations, msg.operation)
elseif msg.decoration_events then
for _, decoration_event in ipairs(msg.decoration_events) do
if decoration_event.closing ~= nil then
goto main_loop_break
end
end
end
end

---@diagnostic disable-next-line:redefined-local
local widget_def = args.program:view() or widget.row({ children = {} })
callbacks = {}
if not update_view then
---@diagnostic disable-next-line: redefined-local
local _, err = client:snowcap_decoration_v1_DecorationService_RequestView({
decoration_id = decoration_id,
})

widget._traverse_widget_tree(widget_def, callbacks, widget._collect_callbacks)
if err then
log.error(err)
end
else
---@diagnostic disable-next-line:redefined-local
local widget_def = args.program:view() or widget.row({ children = {} })
callbacks = {}

---@diagnostic disable-next-line:redefined-local
local _, err = client:snowcap_decoration_v1_DecorationService_UpdateDecoration({
decoration_id = decoration_id,
widget_def = widget.widget_def_into_api(widget_def),
})
widget._traverse_widget_tree(widget_def, callbacks, widget._collect_callbacks)

if err then
log.error(err)
---@diagnostic disable-next-line:redefined-local
local _, err = client:snowcap_decoration_v1_DecorationService_UpdateDecoration({
decoration_id = decoration_id,
widget_def = widget.widget_def_into_api(widget_def),
})

if err then
log.error(err)
end

for _, oper in ipairs(pending_operations) do
---@diagnostic disable-next-line:redefined-local
local _, err = client:snowcap_decoration_v1_DecorationService_OperateDecoration({
decoration_id = decoration_id,
operation = require("snowcap.widget.operation")._to_api(oper), ---@diagnostic disable-line: invisible
})

if err then
log.error(err)
end
end
pending_operations = {}
end

msg = receiver:recv()
end
end, function()
::main_loop_break::

args.program:event({
closing = {},
closing = {}
})
end)

Expand All @@ -178,20 +244,13 @@ end
---
---@param message any
function DecorationHandle:send_message(message)
self.update(message)
self._update(message)
end

---Sends an `Operation` to this decoration.
---@param operation snowcap.widget.operation.Operation
function DecorationHandle:operate(operation)
local _, err = client:snowcap_decoration_v1_DecorationService_OperateDecoration({
decoration_id = self.id,
operation = require("snowcap.widget.operation")._to_api(operation), ---@diagnostic disable-line: invisible
})

if err then
log.error(err)
end
self._operate(operation)
end

---Sets the z-index at which this decoration will render.
Expand Down
Loading
Loading