diff --git a/api/lua/build/grpc_client.lua b/api/lua/build/grpc_client.lua index fb64d2e94..0c6da121f 100644 --- a/api/lua/build/grpc_client.lua +++ b/api/lua/build/grpc_client.lua @@ -43,10 +43,31 @@ function extension_methods:get_headers_with_retries(timeout, retries) end end +---Call h2_stream:get_next_chunk() without timeout, retrying on spurious wakeup. +local function _each_chunk_helper(self) + local chunk, err, errno + repeat + chunk, err, errno = self:get_next_chunk() + until chunk ~= nil or errno ~= ce.ETIMEDOUT + + return chunk, err, errno +end + +---Iterate over stream's chunks. +--- +---The difference between this function and h2_stream:each_chunk is that this function +---will retry calling get_next_chunk if the function fails due to a spurious wakeup. +---@return function +---@return grpc_client.h2.Stream +function extension_methods:each_chunk2() + --return self:each_chunk() + return _each_chunk_helper, self +end + ---Extend a stream with new methods --- ----@param s http.h2_stream.stream ----@return http.h2_stream.stream +---@param s grpc_client.h2.Stream +---@return grpc_client.h2.Stream function StreamExtension.extend(s) for k, v in pairs(extension_methods) do s[k] = v @@ -354,7 +375,7 @@ function Client:server_streaming_request(request_specifier, data, callback, done end self.loop:wrap(function() - for response_body in stream:each_chunk() do + for response_body in stream:each_chunk2() do while response_body:len() > 0 do local msg_len = string.unpack(">I4", response_body:sub(2, 5)) @@ -432,7 +453,7 @@ function Client:bidirectional_streaming_request(request_specifier, callback, don end self.loop:wrap(function() - for response_body in stream:each_chunk() do + for response_body in stream:each_chunk2() do while response_body:len() > 0 do local msg_len = string.unpack(">I4", response_body:sub(2, 5)) diff --git a/api/lua/pinnacle/grpc/defs.lua b/api/lua/pinnacle/grpc/defs.lua index 774b8da80..862187734 100644 --- a/api/lua/pinnacle/grpc/defs.lua +++ b/api/lua/pinnacle/grpc/defs.lua @@ -45,10 +45,31 @@ function extension_methods:get_headers_with_retries(timeout, retries) end end +---Call h2_stream:get_next_chunk() without timeout, retrying on spurious wakeup. +local function _each_chunk_helper(self) + local chunk, err, errno + repeat + chunk, err, errno = self:get_next_chunk() + until chunk ~= nil or errno ~= ce.ETIMEDOUT + + return chunk, err, errno +end + +---Iterate over stream's chunks. +--- +---The difference between this function and h2_stream:each_chunk is that this function +---will retry calling get_next_chunk if the function fails due to a spurious wakeup. +---@return function +---@return grpc_client.h2.Stream +function extension_methods:each_chunk2() + --return self:each_chunk() + return _each_chunk_helper, self +end + ---Extend a stream with new methods --- ----@param s http.h2_stream.stream ----@return http.h2_stream.stream +---@param s grpc_client.h2.Stream +---@return grpc_client.h2.Stream function StreamExtension.extend(s) for k, v in pairs(extension_methods) do s[k] = v @@ -356,7 +377,7 @@ function Client:server_streaming_request(request_specifier, data, callback, done end self.loop:wrap(function() - for response_body in stream:each_chunk() do + for response_body in stream:each_chunk2() do while response_body:len() > 0 do local msg_len = string.unpack(">I4", response_body:sub(2, 5)) @@ -434,7 +455,7 @@ function Client:bidirectional_streaming_request(request_specifier, callback, don end self.loop:wrap(function() - for response_body in stream:each_chunk() do + for response_body in stream:each_chunk2() do while response_body:len() > 0 do local msg_len = string.unpack(">I4", response_body:sub(2, 5)) diff --git a/snowcap/api/lua/snowcap/grpc/defs.lua b/snowcap/api/lua/snowcap/grpc/defs.lua index c3a1d4c89..7820b9b4c 100644 --- a/snowcap/api/lua/snowcap/grpc/defs.lua +++ b/snowcap/api/lua/snowcap/grpc/defs.lua @@ -45,10 +45,31 @@ function extension_methods:get_headers_with_retries(timeout, retries) end end +---Call h2_stream:get_next_chunk() without timeout, retrying on spurious wakeup. +local function _each_chunk_helper(self) + local chunk, err, errno + repeat + chunk, err, errno = self:get_next_chunk() + until chunk ~= nil or errno ~= ce.ETIMEDOUT + + return chunk, err, errno +end + +---Iterate over stream's chunks. +--- +---The difference between this function and h2_stream:each_chunk is that this function +---will retry calling get_next_chunk if the function fails due to a spurious wakeup. +---@return function +---@return grpc_client.h2.Stream +function extension_methods:each_chunk2() + --return self:each_chunk() + return _each_chunk_helper, self +end + ---Extend a stream with new methods --- ----@param s http.h2_stream.stream ----@return http.h2_stream.stream +---@param s grpc_client.h2.Stream +---@return grpc_client.h2.Stream function StreamExtension.extend(s) for k, v in pairs(extension_methods) do s[k] = v @@ -356,7 +377,7 @@ function Client:server_streaming_request(request_specifier, data, callback, done end self.loop:wrap(function() - for response_body in stream:each_chunk() do + for response_body in stream:each_chunk2() do while response_body:len() > 0 do local msg_len = string.unpack(">I4", response_body:sub(2, 5)) @@ -434,7 +455,7 @@ function Client:bidirectional_streaming_request(request_specifier, callback, don end self.loop:wrap(function() - for response_body in stream:each_chunk() do + for response_body in stream:each_chunk2() do while response_body:len() > 0 do local msg_len = string.unpack(">I4", response_body:sub(2, 5))