Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions api/lua/build/grpc_client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,31 @@ function extension_methods:get_headers_with_retries(timeout, retries)
end
end

---Call h2_stream:get_next_chunk() without timeout, retrying on spurious wakeup.
local function _each_chunk_helper(self)
local chunk, err, errno
repeat
chunk, err, errno = self:get_next_chunk()
until chunk ~= nil or errno ~= ce.ETIMEDOUT

return chunk, err, errno
end

---Iterate over stream's chunks.
---
---The difference between this function and h2_stream:each_chunk is that this function
---will retry calling get_next_chunk if the function fails due to a spurious wakeup.
---@return function
---@return grpc_client.h2.Stream
function extension_methods:each_chunk2()
--return self:each_chunk()
return _each_chunk_helper, self
end

---Extend a stream with new methods
---
---@param s http.h2_stream.stream
---@return http.h2_stream.stream
---@param s grpc_client.h2.Stream
---@return grpc_client.h2.Stream
function StreamExtension.extend(s)
for k, v in pairs(extension_methods) do
s[k] = v
Expand Down Expand Up @@ -354,7 +375,7 @@ function Client:server_streaming_request(request_specifier, data, callback, done
end

self.loop:wrap(function()
for response_body in stream:each_chunk() do
for response_body in stream:each_chunk2() do
while response_body:len() > 0 do
local msg_len = string.unpack(">I4", response_body:sub(2, 5))

Expand Down Expand Up @@ -432,7 +453,7 @@ function Client:bidirectional_streaming_request(request_specifier, callback, don
end

self.loop:wrap(function()
for response_body in stream:each_chunk() do
for response_body in stream:each_chunk2() do
while response_body:len() > 0 do
local msg_len = string.unpack(">I4", response_body:sub(2, 5))

Expand Down
29 changes: 25 additions & 4 deletions api/lua/pinnacle/grpc/defs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,31 @@ function extension_methods:get_headers_with_retries(timeout, retries)
end
end

---Call h2_stream:get_next_chunk() without timeout, retrying on spurious wakeup.
local function _each_chunk_helper(self)
local chunk, err, errno
repeat
chunk, err, errno = self:get_next_chunk()
until chunk ~= nil or errno ~= ce.ETIMEDOUT

return chunk, err, errno
end

---Iterate over stream's chunks.
---
---The difference between this function and h2_stream:each_chunk is that this function
---will retry calling get_next_chunk if the function fails due to a spurious wakeup.
---@return function
---@return grpc_client.h2.Stream
function extension_methods:each_chunk2()
--return self:each_chunk()
return _each_chunk_helper, self
end

---Extend a stream with new methods
---
---@param s http.h2_stream.stream
---@return http.h2_stream.stream
---@param s grpc_client.h2.Stream
---@return grpc_client.h2.Stream
function StreamExtension.extend(s)
for k, v in pairs(extension_methods) do
s[k] = v
Expand Down Expand Up @@ -356,7 +377,7 @@ function Client:server_streaming_request(request_specifier, data, callback, done
end

self.loop:wrap(function()
for response_body in stream:each_chunk() do
for response_body in stream:each_chunk2() do
while response_body:len() > 0 do
local msg_len = string.unpack(">I4", response_body:sub(2, 5))

Expand Down Expand Up @@ -434,7 +455,7 @@ function Client:bidirectional_streaming_request(request_specifier, callback, don
end

self.loop:wrap(function()
for response_body in stream:each_chunk() do
for response_body in stream:each_chunk2() do
while response_body:len() > 0 do
local msg_len = string.unpack(">I4", response_body:sub(2, 5))

Expand Down
29 changes: 25 additions & 4 deletions snowcap/api/lua/snowcap/grpc/defs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,31 @@ function extension_methods:get_headers_with_retries(timeout, retries)
end
end

---Call h2_stream:get_next_chunk() without timeout, retrying on spurious wakeup.
local function _each_chunk_helper(self)
local chunk, err, errno
repeat
chunk, err, errno = self:get_next_chunk()
until chunk ~= nil or errno ~= ce.ETIMEDOUT

return chunk, err, errno
end

---Iterate over stream's chunks.
---
---The difference between this function and h2_stream:each_chunk is that this function
---will retry calling get_next_chunk if the function fails due to a spurious wakeup.
---@return function
---@return grpc_client.h2.Stream
function extension_methods:each_chunk2()
--return self:each_chunk()
return _each_chunk_helper, self
end

---Extend a stream with new methods
---
---@param s http.h2_stream.stream
---@return http.h2_stream.stream
---@param s grpc_client.h2.Stream
---@return grpc_client.h2.Stream
function StreamExtension.extend(s)
for k, v in pairs(extension_methods) do
s[k] = v
Expand Down Expand Up @@ -356,7 +377,7 @@ function Client:server_streaming_request(request_specifier, data, callback, done
end

self.loop:wrap(function()
for response_body in stream:each_chunk() do
for response_body in stream:each_chunk2() do
while response_body:len() > 0 do
local msg_len = string.unpack(">I4", response_body:sub(2, 5))

Expand Down Expand Up @@ -434,7 +455,7 @@ function Client:bidirectional_streaming_request(request_specifier, callback, don
end

self.loop:wrap(function()
for response_body in stream:each_chunk() do
for response_body in stream:each_chunk2() do
while response_body:len() > 0 do
local msg_len = string.unpack(">I4", response_body:sub(2, 5))

Expand Down
Loading