Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 197cbde

Browse files
committed
test RemoteConnection
1 parent 8f4bade commit 197cbde

File tree

4 files changed

+183
-46
lines changed

4 files changed

+183
-46
lines changed

sqld/src/connection/write_proxy.rs

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ use std::path::PathBuf;
22
use std::sync::Arc;
33

44
use futures_core::future::BoxFuture;
5+
use futures_core::Stream;
56
use parking_lot::Mutex as PMutex;
67
use sqld_libsql_bindings::wal_hook::{TransparentMethods, TRANSPARENT_METHODS};
78
use tokio::sync::{mpsc, watch, Mutex};
89
use tokio_stream::StreamExt;
910
use tonic::metadata::BinaryMetadataValue;
1011
use tonic::transport::Channel;
1112
use tonic::{Request, Streaming};
12-
use uuid::Uuid;
1313

1414
use crate::auth::Authenticated;
1515
use crate::connection::program::{DescribeCol, DescribeParam};
@@ -19,7 +19,7 @@ use crate::query_analysis::TxnStatus;
1919
use crate::query_result_builder::{QueryBuilderConfig, QueryResultBuilder};
2020
use crate::replication::FrameNo;
2121
use crate::rpc::proxy::rpc::proxy_client::ProxyClient;
22-
use crate::rpc::proxy::rpc::{self, exec_req, exec_resp, DisconnectMessage, ExecReq, ExecResp};
22+
use crate::rpc::proxy::rpc::{self, exec_req, exec_resp, ExecReq, ExecResp};
2323
use crate::rpc::NAMESPACE_METADATA_KEY;
2424
use crate::stats::Stats;
2525
use crate::{Result, DEFAULT_AUTO_CHECKPOINT};
@@ -102,12 +102,11 @@ impl MakeConnection for MakeWriteProxyConn {
102102
}
103103
}
104104

105-
pub struct WriteProxyConnection {
105+
pub struct WriteProxyConnection<R = Streaming<ExecResp>> {
106106
/// Lazily initialized read connection
107107
read_conn: LibSqlConnection<TransparentMethods>,
108108
write_proxy: ProxyClient<Channel>,
109109
state: Mutex<TxnStatus>,
110-
client_id: Uuid,
111110
/// FrameNo of the last write performed by this connection on the primary.
112111
/// any subsequent read on this connection must wait for the replicator to catch up with this
113112
/// frame_no
@@ -118,7 +117,7 @@ pub struct WriteProxyConnection {
118117
stats: Arc<Stats>,
119118
namespace: NamespaceName,
120119

121-
remote_conn: Mutex<Option<RemoteConnection>>,
120+
remote_conn: Mutex<Option<RemoteConnection<R>>>,
122121
}
123122

124123
impl WriteProxyConnection {
@@ -135,7 +134,6 @@ impl WriteProxyConnection {
135134
read_conn,
136135
write_proxy,
137136
state: Mutex::new(TxnStatus::Init),
138-
client_id: Uuid::new_v4(),
139137
last_write_frame_no: Default::default(),
140138
applied_frame_no_receiver,
141139
builder_config,
@@ -234,8 +232,8 @@ impl WriteProxyConnection {
234232
}
235233
}
236234

237-
struct RemoteConnection {
238-
response_stream: Streaming<ExecResp>,
235+
struct RemoteConnection<R = Streaming<ExecResp>> {
236+
response_stream: R,
239237
request_sender: mpsc::Sender<ExecReq>,
240238
current_request_id: u32,
241239
builder_config: QueryBuilderConfig,
@@ -265,7 +263,12 @@ impl RemoteConnection {
265263
builder_config,
266264
})
267265
}
266+
}
268267

268+
impl<R> RemoteConnection<R>
269+
where
270+
R: Stream<Item = Result<ExecResp, tonic::Status>> + Unpin,
271+
{
269272
/// Perform a request on to the remote peer, and call message_cb for every message received for
270273
/// that request. message cb should return whether to expect more message for that request.
271274
async fn make_request(
@@ -458,17 +461,6 @@ impl Connection for WriteProxyConnection {
458461
}
459462
}
460463

461-
impl Drop for WriteProxyConnection {
462-
fn drop(&mut self) {
463-
// best effort attempt to disconnect
464-
let mut remote = self.write_proxy.clone();
465-
let client_id = self.client_id.to_string();
466-
tokio::spawn(async move {
467-
let _ = remote.disconnect(DisconnectMessage { client_id }).await;
468-
});
469-
}
470-
}
471-
472464
#[cfg(test)]
473465
pub mod test {
474466
use arbitrary::{Arbitrary, Unstructured};
@@ -479,7 +471,7 @@ pub mod test {
479471
use super::*;
480472
use crate::{
481473
query_result_builder::{test::test_driver, Column, QueryResultBuilderError},
482-
rpc::proxy::rpc::{query_result::RowResult, ExecuteResults},
474+
rpc::{proxy::rpc::{query_result::RowResult, ExecuteResults}, streaming_exec::test::random_valid_program_resp},
483475
};
484476

485477
/// generate an arbitraty rpc value. see build.rs for usage.
@@ -555,4 +547,21 @@ pub mod test {
555547
},
556548
);
557549
}
550+
551+
#[tokio::test]
552+
// in this test we do a roundtrip: generate a random valid program, stream it to
553+
// RemoteConnection, and make sure that the remote connection drives the builder with the same
554+
// state transitions.
555+
async fn validate_random_stream_response() {
556+
let (response_stream, validator) = random_valid_program_resp(500, 150);
557+
let (request_sender, _request_recver) = mpsc::channel(1);
558+
let mut remote = RemoteConnection {
559+
response_stream: response_stream.map(Ok),
560+
request_sender,
561+
current_request_id: 0,
562+
builder_config: QueryBuilderConfig::default(),
563+
};
564+
565+
remote.execute(Program::seq(&[]), validator).await.unwrap().0.into_ret();
566+
}
558567
}

sqld/src/http/user/result_builder.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,15 +311,17 @@ impl QueryResultBuilder for JsonHttpPayloadBuilder {
311311

312312
#[cfg(test)]
313313
mod test {
314-
use crate::query_result_builder::test::random_builder_driver;
314+
315+
use crate::query_result_builder::test::{random_transition, fsm_builder_driver};
315316

316317
use super::*;
317318

318319
#[test]
319320
fn test_json_builder() {
320321
for _ in 0..1000 {
321322
let builder = JsonHttpPayloadBuilder::new();
322-
let ret = random_builder_driver(100, builder).into_ret();
323+
let trace = random_transition(100);
324+
let ret = fsm_builder_driver(&trace, builder).into_ret();
323325
println!("{}", std::str::from_utf8(&ret).unwrap());
324326
// we produce valid json
325327
serde_json::from_slice::<Vec<serde_json::Value>>(&ret).unwrap();

sqld/src/query_result_builder.rs

Lines changed: 118 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,7 @@ pub mod test {
633633
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
634634
#[repr(usize)]
635635
// do not reorder!
636-
enum FsmState {
636+
pub enum FsmState {
637637
Init = 0,
638638
Finish,
639639
BeginStep,
@@ -702,11 +702,31 @@ pub mod test {
702702
}
703703
}
704704

705-
pub fn random_builder_driver<B: QueryResultBuilder>(mut max_steps: usize, mut b: B) -> B {
705+
pub fn random_transition(mut max_steps: usize) -> Vec<FsmState> {
706+
let mut trace = Vec::with_capacity(max_steps);
707+
let mut state = Init;
708+
trace.push(state);
709+
loop {
710+
if max_steps > 0 {
711+
state = state.rand_transition(false);
712+
} else {
713+
state = state.toward_finish()
714+
}
715+
716+
trace.push(state);
717+
if state == FsmState::Finish {
718+
break
719+
}
720+
721+
max_steps = max_steps.saturating_sub(1);
722+
}
723+
trace
724+
}
725+
726+
pub fn fsm_builder_driver<B: QueryResultBuilder>(trace: &[FsmState], mut b: B) -> B {
706727
let mut rand_data = [0; 10_000];
707728
rand_data.try_fill(&mut rand::thread_rng()).unwrap();
708729
let mut u = Unstructured::new(&rand_data);
709-
let mut trace = Vec::new();
710730

711731
#[derive(Arbitrary)]
712732
pub enum ValueRef<'a> {
@@ -729,9 +749,7 @@ pub mod test {
729749
}
730750
}
731751

732-
let mut state = Init;
733-
trace.push(state);
734-
loop {
752+
for state in trace {
735753
match state {
736754
Init => b.init(&QueryBuilderConfig::default()).unwrap(),
737755
BeginStep => b.begin_step().unwrap(),
@@ -758,22 +776,106 @@ pub mod test {
758776
}
759777
BuilderError => return b,
760778
}
779+
}
761780

762-
if max_steps > 0 {
763-
state = state.rand_transition(false);
764-
} else {
765-
state = state.toward_finish()
766-
}
781+
b
782+
}
767783

768-
trace.push(state);
784+
/// A Builder that validates a given execution trace
785+
pub struct ValidateTraceBuilder {
786+
trace: Vec<FsmState>,
787+
current: usize,
788+
}
769789

770-
max_steps = max_steps.saturating_sub(1);
790+
impl ValidateTraceBuilder {
791+
pub fn new(trace: Vec<FsmState>) -> Self {
792+
Self { trace, current: 0 }
771793
}
794+
}
772795

773-
// this can be usefull to help debug the generated test case
774-
dbg!(trace);
796+
impl QueryResultBuilder for ValidateTraceBuilder {
797+
type Ret = ();
775798

776-
b
799+
fn init(&mut self, _config: &QueryBuilderConfig) -> Result<(), QueryResultBuilderError> {
800+
assert_eq!(self.trace[self.current], FsmState::Init);
801+
self.current += 1;
802+
Ok(())
803+
}
804+
805+
fn begin_step(&mut self) -> Result<(), QueryResultBuilderError> {
806+
assert_eq!(self.trace[self.current], FsmState::BeginStep);
807+
self.current += 1;
808+
Ok(())
809+
}
810+
811+
fn finish_step(
812+
&mut self,
813+
_affected_row_count: u64,
814+
_last_insert_rowid: Option<i64>,
815+
) -> Result<(), QueryResultBuilderError> {
816+
assert_eq!(self.trace[self.current], FsmState::FinishStep);
817+
self.current += 1;
818+
Ok(())
819+
}
820+
821+
fn step_error(&mut self, _error: crate::error::Error) -> Result<(), QueryResultBuilderError> {
822+
assert_eq!(self.trace[self.current], FsmState::StepError);
823+
self.current += 1;
824+
Ok(())
825+
}
826+
827+
fn cols_description<'a>(
828+
&mut self,
829+
_cols: impl IntoIterator<Item = impl Into<Column<'a>>>,
830+
) -> Result<(), QueryResultBuilderError> {
831+
assert_eq!(self.trace[self.current], FsmState::ColsDescription);
832+
self.current += 1;
833+
Ok(())
834+
}
835+
836+
fn begin_rows(&mut self) -> Result<(), QueryResultBuilderError> {
837+
assert_eq!(self.trace[self.current], FsmState::BeginRows);
838+
self.current += 1;
839+
Ok(())
840+
}
841+
842+
fn begin_row(&mut self) -> Result<(), QueryResultBuilderError> {
843+
assert_eq!(self.trace[self.current], FsmState::BeginRow);
844+
self.current += 1;
845+
Ok(())
846+
}
847+
848+
fn add_row_value(&mut self, _v: ValueRef) -> Result<(), QueryResultBuilderError> {
849+
assert_eq!(self.trace[self.current], FsmState::AddRowValue);
850+
self.current += 1;
851+
Ok(())
852+
}
853+
854+
fn finish_row(&mut self) -> Result<(), QueryResultBuilderError> {
855+
assert_eq!(self.trace[self.current], FsmState::FinishRow);
856+
self.current += 1;
857+
Ok(())
858+
}
859+
860+
fn finish_rows(&mut self) -> Result<(), QueryResultBuilderError> {
861+
assert_eq!(self.trace[self.current], FsmState::FinishRows);
862+
self.current += 1;
863+
Ok(())
864+
}
865+
866+
fn finish(
867+
&mut self,
868+
_last_frame_no: Option<FrameNo>,
869+
_state: TxnStatus,
870+
) -> Result<(), QueryResultBuilderError> {
871+
assert_eq!(self.trace[self.current], FsmState::Finish);
872+
self.current += 1;
873+
Ok(())
874+
}
875+
876+
fn into_ret(self) -> Self::Ret {
877+
assert_eq!(self.current, self.trace.len());
878+
}
777879
}
778880

779881
pub struct FsmQueryBuilder {

0 commit comments

Comments
 (0)