Skip to content

Commit bfbfa0d

Browse files
committed
feat(pool): add a Singleton pool type
1 parent b9dc3d2 commit bfbfa0d

File tree

6 files changed

+654
-0
lines changed

6 files changed

+654
-0
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ futures-util = { version = "0.3.16", default-features = false, features = ["allo
4242
http-body-util = "0.1.0"
4343
tokio = { version = "1", features = ["macros", "test-util", "signal"] }
4444
tokio-test = "0.4"
45+
tower-test = "0.4"
4546
pretty_env_logger = "0.5"
4647

4748
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
@@ -60,6 +61,7 @@ default = []
6061
full = [
6162
"client",
6263
"client-legacy",
64+
"client-pool",
6365
"client-proxy",
6466
"client-proxy-system",
6567
"server",
@@ -74,6 +76,7 @@ full = [
7476

7577
client = ["hyper/client", "tokio/net", "dep:tracing", "dep:futures-channel", "dep:tower-service"]
7678
client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures-util"]
79+
client-pool = []
7780
client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"]
7881
client-proxy-system = ["dep:system-configuration", "dep:windows-registry"]
7982

src/client/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,8 @@
44
#[cfg(feature = "client-legacy")]
55
pub mod legacy;
66

7+
#[cfg(feature = "client-pool")]
8+
pub mod pool;
9+
710
#[cfg(feature = "client-proxy")]
811
pub mod proxy;

src/client/pool/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
//! Composable pool services
2+
3+
pub mod singleton;

src/client/pool/singleton.rs

Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
//! Singleton pools
2+
//!
3+
//! The singleton pool combines a MakeService that should only produce a single
4+
//! active connection. It can bundle all concurrent calls to it, so that only
5+
//! one connection is made. All calls to the singleton will return a clone of
6+
//! the inner service once established.
7+
//!
8+
//! This fits the HTTP/2 case well.
9+
10+
use std::sync::{Arc, Mutex};
11+
use std::task::{self, Poll};
12+
13+
use tokio::sync::oneshot;
14+
use tower_service::Service;
15+
16+
use self::internal::{DitchGuard, SingletonError, SingletonFuture};
17+
18+
type BoxError = Box<dyn std::error::Error + Send + Sync>;
19+
20+
/// A singleton pool over an inner service.
21+
///
22+
/// The singleton wraps an inner service maker, bundling all calls to ensure
23+
/// only one service is created. Once made, it returns clones of the made
24+
/// service.
25+
#[derive(Debug)]
26+
pub struct Singleton<M, Dst>
27+
where
28+
M: Service<Dst>,
29+
{
30+
mk_svc: M,
31+
state: Arc<Mutex<State<M::Response>>>,
32+
}
33+
34+
#[derive(Debug)]
35+
enum State<S> {
36+
Empty,
37+
Making(Vec<oneshot::Sender<S>>),
38+
Made(S),
39+
}
40+
41+
impl<M, Target> Singleton<M, Target>
42+
where
43+
M: Service<Target>,
44+
M::Response: Clone,
45+
{
46+
/// Create a new singleton pool over an inner make service.
47+
pub fn new(mk_svc: M) -> Self {
48+
Singleton {
49+
mk_svc,
50+
state: Arc::new(Mutex::new(State::Empty)),
51+
}
52+
}
53+
54+
// pub fn reset?
55+
56+
/*
57+
/// Retains the inner made service if specified by the predicate.
58+
pub fn retain<F>(&mut self, predicate: F)
59+
where
60+
F: FnMut(&mut M::Response) -> bool,
61+
{
62+
let mut locked = self.state.lock().unwrap();
63+
match *locked {
64+
State::Empty => {},
65+
State::Making(..) => {},
66+
State::Made(mut svc) => {
67+
68+
}
69+
}
70+
}
71+
*/
72+
}
73+
74+
impl<M, Target> Service<Target> for Singleton<M, Target>
75+
where
76+
M: Service<Target>,
77+
M::Response: Clone,
78+
M::Error: Into<BoxError>,
79+
{
80+
type Response = M::Response;
81+
type Error = SingletonError;
82+
type Future = SingletonFuture<M::Future, M::Response>;
83+
84+
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
85+
if let State::Empty = *self.state.lock().unwrap() {
86+
return self
87+
.mk_svc
88+
.poll_ready(cx)
89+
.map_err(|e| SingletonError(e.into()));
90+
}
91+
Poll::Ready(Ok(()))
92+
}
93+
94+
fn call(&mut self, dst: Target) -> Self::Future {
95+
let mut locked = self.state.lock().unwrap();
96+
match *locked {
97+
State::Empty => {
98+
let fut = self.mk_svc.call(dst);
99+
*locked = State::Making(Vec::new());
100+
SingletonFuture::Driving {
101+
future: fut,
102+
singleton: DitchGuard(Arc::downgrade(&self.state)),
103+
}
104+
}
105+
State::Making(ref mut waiters) => {
106+
let (tx, rx) = oneshot::channel();
107+
waiters.push(tx);
108+
SingletonFuture::Waiting { rx }
109+
}
110+
State::Made(ref svc) => SingletonFuture::Made {
111+
svc: Some(svc.clone()),
112+
},
113+
}
114+
}
115+
}
116+
117+
impl<M, Target> Clone for Singleton<M, Target>
118+
where
119+
M: Service<Target> + Clone,
120+
{
121+
fn clone(&self) -> Self {
122+
Self {
123+
mk_svc: self.mk_svc.clone(),
124+
state: self.state.clone(),
125+
}
126+
}
127+
}
128+
129+
// Holds some "pub" items that otherwise shouldn't be public.
130+
mod internal {
131+
use std::future::Future;
132+
use std::pin::Pin;
133+
use std::sync::{Mutex, Weak};
134+
use std::task::{self, Poll};
135+
136+
use futures_core::ready;
137+
use pin_project_lite::pin_project;
138+
use tokio::sync::oneshot;
139+
140+
use super::{BoxError, State};
141+
142+
pin_project! {
143+
#[project = SingletonFutureProj]
144+
pub enum SingletonFuture<F, S> {
145+
Driving {
146+
#[pin]
147+
future: F,
148+
singleton: DitchGuard<S>,
149+
},
150+
Waiting {
151+
rx: oneshot::Receiver<S>,
152+
},
153+
Made {
154+
svc: Option<S>,
155+
},
156+
}
157+
}
158+
159+
// XXX: pub because of the enum SingletonFuture
160+
pub struct DitchGuard<S>(pub(super) Weak<Mutex<State<S>>>);
161+
162+
impl<F, S, E> Future for SingletonFuture<F, S>
163+
where
164+
F: Future<Output = Result<S, E>>,
165+
E: Into<BoxError>,
166+
S: Clone,
167+
{
168+
type Output = Result<S, SingletonError>;
169+
170+
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
171+
match self.project() {
172+
SingletonFutureProj::Driving { future, singleton } => {
173+
match ready!(future.poll(cx)) {
174+
Ok(svc) => {
175+
if let Some(state) = singleton.0.upgrade() {
176+
let mut locked = state.lock().unwrap();
177+
singleton.0 = Weak::new();
178+
match std::mem::replace(&mut *locked, State::Made(svc.clone())) {
179+
State::Making(waiters) => {
180+
for tx in waiters {
181+
let _ = tx.send(svc.clone());
182+
}
183+
}
184+
State::Empty | State::Made(_) => {
185+
// shouldn't happen!
186+
unreachable!()
187+
}
188+
}
189+
}
190+
Poll::Ready(Ok(svc))
191+
}
192+
Err(e) => {
193+
if let Some(state) = singleton.0.upgrade() {
194+
let mut locked = state.lock().unwrap();
195+
singleton.0 = Weak::new();
196+
*locked = State::Empty;
197+
}
198+
Poll::Ready(Err(SingletonError(e.into())))
199+
}
200+
}
201+
}
202+
SingletonFutureProj::Waiting { rx } => match ready!(Pin::new(rx).poll(cx)) {
203+
Ok(svc) => Poll::Ready(Ok(svc)),
204+
Err(_canceled) => Poll::Ready(Err(SingletonError(Canceled.into()))),
205+
},
206+
SingletonFutureProj::Made { svc } => Poll::Ready(Ok(svc.take().unwrap())),
207+
}
208+
}
209+
}
210+
211+
impl<S> Drop for DitchGuard<S> {
212+
fn drop(&mut self) {
213+
if let Some(state) = self.0.upgrade() {
214+
if let Ok(mut locked) = state.lock() {
215+
*locked = State::Empty;
216+
}
217+
}
218+
}
219+
}
220+
221+
// An opaque error type. By not exposing the type, nor being specifically
222+
// Box<dyn Error>, we can _change_ the type once we no longer need the Canceled
223+
// error type. This will be possible with the refactor to baton passing.
224+
#[derive(Debug)]
225+
pub struct SingletonError(pub(super) BoxError);
226+
227+
impl std::fmt::Display for SingletonError {
228+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229+
f.write_str("singleton connection error")
230+
}
231+
}
232+
233+
impl std::error::Error for SingletonError {
234+
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
235+
Some(&*self.0)
236+
}
237+
}
238+
239+
#[derive(Debug)]
240+
struct Canceled;
241+
242+
impl std::fmt::Display for Canceled {
243+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244+
f.write_str("singleton connection canceled")
245+
}
246+
}
247+
248+
impl std::error::Error for Canceled {}
249+
}
250+
251+
#[cfg(test)]
252+
mod tests {
253+
use std::future::Future;
254+
use std::pin::Pin;
255+
use std::task::Poll;
256+
257+
use tower_service::Service;
258+
259+
use super::Singleton;
260+
261+
#[tokio::test]
262+
async fn first_call_drives_subsequent_wait() {
263+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
264+
265+
let mut singleton = Singleton::new(mock_svc);
266+
267+
handle.allow(1);
268+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
269+
.await
270+
.unwrap();
271+
// First call: should go into Driving
272+
let fut1 = singleton.call(());
273+
// Second call: should go into Waiting
274+
let fut2 = singleton.call(());
275+
276+
// Expect exactly one request to the inner service
277+
let ((), send_response) = handle.next_request().await.unwrap();
278+
send_response.send_response("svc");
279+
280+
// Both futures should resolve to the same value
281+
assert_eq!(fut1.await.unwrap(), "svc");
282+
assert_eq!(fut2.await.unwrap(), "svc");
283+
}
284+
285+
#[tokio::test]
286+
async fn made_state_returns_immediately() {
287+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
288+
let mut singleton = Singleton::new(mock_svc);
289+
290+
handle.allow(1);
291+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
292+
.await
293+
.unwrap();
294+
// Drive first call to completion
295+
let fut1 = singleton.call(());
296+
let ((), send_response) = handle.next_request().await.unwrap();
297+
send_response.send_response("svc");
298+
assert_eq!(fut1.await.unwrap(), "svc");
299+
300+
// Second call should not hit inner service
301+
let res = singleton.call(()).await.unwrap();
302+
assert_eq!(res, "svc");
303+
}
304+
305+
#[tokio::test]
306+
async fn cancel_waiter_does_not_affect_others() {
307+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
308+
let mut singleton = Singleton::new(mock_svc);
309+
310+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
311+
.await
312+
.unwrap();
313+
let fut1 = singleton.call(());
314+
let fut2 = singleton.call(());
315+
drop(fut2); // cancel one waiter
316+
317+
let ((), send_response) = handle.next_request().await.unwrap();
318+
send_response.send_response("svc");
319+
320+
assert_eq!(fut1.await.unwrap(), "svc");
321+
}
322+
323+
// TODO: this should be able to be improved with a cooperative baton refactor
324+
#[tokio::test]
325+
async fn cancel_driver_cancels_all() {
326+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
327+
let mut singleton = Singleton::new(mock_svc);
328+
329+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
330+
.await
331+
.unwrap();
332+
let mut fut1 = singleton.call(());
333+
let fut2 = singleton.call(());
334+
335+
// poll driver just once, and then drop
336+
crate::common::future::poll_fn(move |cx| {
337+
let _ = Pin::new(&mut fut1).poll(cx);
338+
Poll::Ready(())
339+
})
340+
.await;
341+
342+
let ((), send_response) = handle.next_request().await.unwrap();
343+
send_response.send_response("svc");
344+
345+
assert_eq!(
346+
fut2.await.unwrap_err().0.to_string(),
347+
"singleton connection canceled"
348+
);
349+
}
350+
}

0 commit comments

Comments
 (0)