-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid using errors to communicate the state of other shards to the leader during query-status API. #1520
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better than what we have right now, but would love @andyleiserson opinion on this. Particularly the changes to transport API
type RecordsStream: BytesStream; | ||
/// The type used for responses to [`send`] and [`broadcast`]. | ||
type SendResponse: BytesStream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure this is the right way to do it. Send requests could be very different in nature and may not have the common response type. One may return a json while others can return a stream. Ideally, the return type should be defined by R: RouteParams
trait but I am not sure if I have a suggestion how to use it here
/// # Errors | ||
/// | ||
/// If the `BytesStream` cannot be collected into a `BytesMut`, an error is returned. | ||
pub async fn from_bytesstream<B: BytesStream>(value: B) -> Result<HelperResponse, BoxError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub async fn from_bytesstream<B: BytesStream>(value: B) -> Result<HelperResponse, BoxError> { | |
pub async fn from_bytes_stream<B: BytesStream>(value: B) -> Result<HelperResponse, BoxError> { |
/// | ||
/// If the `BytesStream` cannot be collected into a `BytesMut`, an error is returned. | ||
pub async fn from_bytesstream<B: BytesStream>(value: B) -> Result<HelperResponse, BoxError> { | ||
let bytes: bytes::BytesMut = value.try_collect().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't remember this API in detail, but is BytesMut
required or Bytes
would work too?
Self { body: v } | ||
} | ||
} | ||
|
||
impl From<HelperResponse> for QueryStatus { | ||
fn from(value: HelperResponse) -> Self { | ||
let response: QueryStatusResponse = serde_json::from_slice(value.body.as_ref()).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is likely a fallible conversion, so a method on HelperResponse
will do better imo. Or a TryFrom
implementation
Before this change, when the leader shard received a query-status API, it would call a match-status API on its shards, which would return an error if the state was different. The error contained the status of each shard, that the leader would use to compute the overall helper state. This wasn't the intended behavior, and was only done as a stop-gap solution, because the
Transport::send
(and consequentlyTransport::broadcast
) returned unit "()", prohibiting from receiving the state from other shards. This PR changes this:Transport::send
now returns a configurable associated type for all requests (Transport::SendReponse
)BodyStream
.I also tried a bunch of things so that
Transport::send
would return a specific type, bound to the route, but it all seemed very complex. The goal was that QueryStatus route would return a QueryStatusResponse, the rest could just return unit. That task can be tackled separately in the future.