Skip to content

Commit 1d2e22d

Browse files
authored
Refactor Tower.Service call method (#147)
1 parent f5c9909 commit 1d2e22d

File tree

1 file changed

+58
-83
lines changed

1 file changed

+58
-83
lines changed

src/lib.rs

Lines changed: 58 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ impl AdapterOptions {
8383
}
8484
}
8585

86+
#[derive(Clone)]
8687
pub struct Adapter {
8788
client: Arc<Client<HttpConnector>>,
8889
healthcheck_url: Uri,
@@ -101,13 +102,15 @@ impl Adapter {
101102
let client = Client::builder().pool_idle_timeout(Duration::from_secs(4)).build_http();
102103

103104
let healthcheck_url = format!(
104-
"http://{}:{}{}",
105-
options.host, options.readiness_check_port, options.readiness_check_path
105+
"{}://{}:{}{}",
106+
"http", options.host, options.readiness_check_port, options.readiness_check_path
106107
)
107108
.parse()
108109
.unwrap();
109110

110-
let domain = format!("http://{}:{}", options.host, options.port).parse().unwrap();
111+
let domain = format!("{}://{}:{}", "http", options.host, options.port)
112+
.parse()
113+
.unwrap();
111114

112115
Adapter {
113116
client: Arc::new(client),
@@ -186,101 +189,73 @@ impl Adapter {
186189
pub async fn run(self) -> Result<(), Error> {
187190
lambda_http::run(self).await
188191
}
189-
}
190192

191-
/// Implement a `Tower.Service` that sends the requests
192-
/// to the web server.
193-
impl Service<Request> for Adapter {
194-
type Response = Response<Body>;
195-
type Error = Error;
196-
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
193+
async fn fetch_response(&self, event: Request) -> Result<Response<Body>, Error> {
194+
if self.async_init && !self.ready_at_init.load(Ordering::SeqCst) {
195+
is_web_ready(&self.healthcheck_url, &self.healthcheck_protocol).await;
196+
self.ready_at_init.store(true, Ordering::SeqCst);
197+
}
197198

198-
fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<(), Self::Error>> {
199-
core::task::Poll::Ready(Ok(()))
200-
}
199+
let request_context = event.request_context();
200+
let path = event.raw_http_path();
201+
let mut path = path.as_str();
202+
let (parts, body) = event.into_parts();
201203

202-
fn call(&mut self, event: Request) -> Self::Future {
203-
let async_init = self.async_init;
204-
let client = self.client.clone();
205-
let ready_at_init = self.ready_at_init.clone();
206-
let healthcheck_url = self.healthcheck_url.clone();
207-
let healthcheck_protocol = self.healthcheck_protocol;
208-
let domain = self.domain.clone();
209-
let base_path = self.base_path.clone();
210-
211-
Box::pin(async move {
212-
fetch_response(
213-
async_init,
214-
ready_at_init,
215-
client,
216-
base_path,
217-
domain,
218-
healthcheck_url,
219-
healthcheck_protocol,
220-
event,
221-
)
222-
.await
223-
})
224-
}
225-
}
204+
// strip away Base Path if environment variable REMOVE_BASE_PATH is set.
205+
if let Some(base_path) = self.base_path.as_deref() {
206+
path = path.trim_start_matches(base_path);
207+
}
226208

227-
#[allow(clippy::too_many_arguments)]
228-
async fn fetch_response(
229-
async_init: bool,
230-
ready_at_init: Arc<AtomicBool>,
231-
client: Arc<Client<HttpConnector>>,
232-
base_path: Option<String>,
233-
domain: Uri,
234-
healthcheck_url: Uri,
235-
healthcheck_protocol: Protocol,
236-
event: Request,
237-
) -> Result<Response<Body>, Error> {
238-
if async_init && !ready_at_init.load(Ordering::SeqCst) {
239-
is_web_ready(&healthcheck_url, &healthcheck_protocol).await;
240-
ready_at_init.store(true, Ordering::SeqCst);
241-
}
209+
let mut req_headers = parts.headers;
242210

243-
let request_context = event.request_context();
244-
let path = event.raw_http_path();
245-
let mut path = path.as_str();
246-
let (parts, body) = event.into_parts();
211+
// include request context in http header "x-amzn-request-context"
212+
req_headers.append(
213+
HeaderName::from_static("x-amzn-request-context"),
214+
HeaderValue::from_bytes(serde_json::to_string(&request_context)?.as_bytes())?,
215+
);
247216

248-
// strip away Base Path if environment variable REMOVE_BASE_PATH is set.
249-
if let Some(base_path) = base_path.as_deref() {
250-
path = path.trim_start_matches(base_path);
251-
}
217+
let mut pq = path.to_string();
218+
if let Some(q) = parts.uri.query() {
219+
pq.push('?');
220+
pq.push_str(q);
221+
}
252222

253-
let mut req_headers = parts.headers;
223+
let mut app_parts = self.domain.clone().into_parts();
224+
app_parts.path_and_query = Some(pq.parse()?);
225+
let app_url = Uri::from_parts(app_parts)?;
254226

255-
// include request context in http header "x-amzn-request-context"
256-
req_headers.append(
257-
HeaderName::from_static("x-amzn-request-context"),
258-
HeaderValue::from_bytes(serde_json::to_string(&request_context)?.as_bytes())?,
259-
);
227+
tracing::debug!(app_url = %app_url, req_headers = ?req_headers, "sending request to app server");
260228

261-
let mut pq = path.to_string();
262-
if let Some(q) = parts.uri.query() {
263-
pq.push('?');
264-
pq.push_str(q);
265-
}
229+
let mut builder = hyper::Request::builder().method(parts.method).uri(app_url);
230+
if let Some(headers) = builder.headers_mut() {
231+
headers.extend(req_headers);
232+
}
266233

267-
let mut app_parts = domain.into_parts();
268-
app_parts.path_and_query = Some(pq.parse()?);
269-
let app_url = Uri::from_parts(app_parts)?;
234+
let request = builder.body(hyper::Body::from(body.to_vec()))?;
270235

271-
tracing::debug!(app_url = %app_url, req_headers = ?req_headers, "sending request to app server");
236+
let app_response = self.client.request(request).await?;
237+
tracing::debug!(status = %app_response.status(), body_size = app_response.body().size_hint().lower(),
238+
app_headers = ?app_response.headers().clone(), "responding to lambda event");
272239

273-
let mut builder = hyper::Request::builder().method(parts.method).uri(app_url);
274-
if let Some(headers) = builder.headers_mut() {
275-
headers.extend(req_headers);
240+
Ok(app_response)
276241
}
242+
}
243+
244+
/// Implement a `Tower.Service` that sends the requests
245+
/// to the web server.
246+
impl Service<Request> for Adapter {
247+
type Response = Response<Body>;
248+
type Error = Error;
249+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
277250

278-
let request = builder.body(hyper::Body::from(body.to_vec()))?;
251+
fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<(), Self::Error>> {
252+
core::task::Poll::Ready(Ok(()))
253+
}
279254

280-
let app_response = client.request(request).await?;
281-
tracing::debug!(status = %app_response.status(), body_size = app_response.body().size_hint().lower(),
282-
app_headers = ?app_response.headers().clone(), "responding to lambda event");
283-
Ok(app_response)
255+
fn call(&mut self, event: Request) -> Self::Future {
256+
let adapter = self.clone();
257+
Box::pin(async move { adapter.fetch_response(event).await })
258+
}
284259
}
285260

286261
async fn is_web_ready(url: &Uri, protocol: &Protocol) -> bool {

0 commit comments

Comments
 (0)