Skip to content

Commit

Permalink
Rewrite schema fetching to use stream
Browse files Browse the repository at this point in the history
  • Loading branch information
akrantz01 committed May 5, 2024
1 parent 6b78d3e commit 26e5b61
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 21 deletions.
38 changes: 20 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ multimap = "0.9"
opentelemetry-http = "0.9"
opentelemetry_api = "0.20.0"
pin-project-lite = "0.2"
reqwest = { version = "0.11.24", default-features = false, features = ["json", "rustls-tls"] }
rustls = "0.21"
schemars = { version = "0.8", features = ["url"] }
serde = "1"
serde_json = "1"
sha2 = "0.10.8"
tokio = { version = "1", default-features = false, features = ["io-util"] }
tokio-stream = "0.1.14"
tower = { version = "0.4", default-features = false }
tower-http = { version = "0.4", features = ["compression-br", "compression-deflate", "compression-gzip", "decompression-br", "decompression-deflate", "decompression-gzip"] }
tracing = "0.1"
Expand Down
126 changes: 126 additions & 0 deletions src/hive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use anyhow::{Context, Result};
use futures::{Stream, StreamExt};
use http::{header, HeaderMap, HeaderValue, StatusCode};
use reqwest::Client;
use sha2::{Digest, Sha256};
use std::{env, time::Duration};
use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{info_span, Instrument};
use url::Url;

static COMMIT: Option<&'static str> = option_env!("GITHUB_SHA");

pub(crate) fn schema() -> Result<impl Stream<Item = String> + Send> {
let config = RegistryConfig::from_env()?;
let (sender, receiver) = channel(2);

let headers = {
let mut map = HeaderMap::new();
map.insert(
header::USER_AGENT,
HeaderValue::from_str(&format!("apollo-router/{}", COMMIT.unwrap_or("local"))).unwrap(),
);
map.insert(
"X-Hive-CDN-Key",
HeaderValue::from_str(&config.key).unwrap(),
);
map
};
let client = Client::builder().default_headers(headers).build()?;

let task = async move {
let mut etag = None;
let mut last_schema = None;

loop {
let request = match etag.as_deref() {
Some(etag) => client
.get(config.endpoint.as_str())
.header(header::IF_NONE_MATCH, HeaderValue::from_str(etag).unwrap()),
None => client.get(config.endpoint.as_str()),
};

match request.send().await {
Ok(response) => {
tracing::info!(
monotonic_counter.hive_registry_fetch_count_total = 1u64,
status = "success",
);

etag = response
.headers()
.get("etag")
.and_then(|etag| etag.to_str().ok())
.map(ToOwned::to_owned);

if response.status() != StatusCode::NOT_MODIFIED {
match response.text().await {
Ok(schema) => {
let schema_hash = Some(hash(schema.as_bytes()));
if schema_hash != last_schema {
last_schema = schema_hash;
if let Err(e) = sender.send(schema).await {
tracing::debug!("failed to push to stream, router is likely shutting down: {e}");
break;
}
}
}
Err(err) => log_fetch_failure(err),
}
}
}
Err(err) => log_fetch_failure(err),
};

tokio::time::sleep(config.poll_interval).await;
}
};
drop(tokio::task::spawn(task.instrument(info_span!("registry"))));

Ok(ReceiverStream::new(receiver).boxed())
}

struct RegistryConfig {
endpoint: Url,
key: String,
poll_interval: Duration,
}

impl RegistryConfig {
fn from_env() -> Result<RegistryConfig> {
let endpoint = env::var("HIVE_CDN_ENDPOINT")
.context("missing HIVE_CDN_ENDPOINT environment variable")?;
let endpoint = Url::parse(&endpoint).context("invalid CDN endpoint")?;

let key = env::var("HIVE_CDN_KEY").context("missing HIVE_CDN_KEY environment variable")?;

let poll_interval =
env::var("HIVE_CDN_POLL_INTERVAL").unwrap_or_else(|_| String::from("10"));
let poll_interval = Duration::from_secs(
poll_interval
.parse()
.context("invalid poll interval format")?,
);

Ok(RegistryConfig {
endpoint,
key,
poll_interval,
})
}
}

fn log_fetch_failure(err: impl std::fmt::Display) {
tracing::info!(
monotonic_counter.hive_registry_fetch_count_total = 1u64,
status = "failure"
);
tracing::error!(code = "HIVE_REGISTRY_FETCH_FAILURE", "{}", err);
}

fn hash(bytes: &[u8]) -> [u8; 32] {
let mut hasher = Sha256::new();
hasher.update(bytes);
hasher.finalize().into()
}
27 changes: 24 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
mod hive;
mod http;
mod plugins;
mod responses;

use anyhow::{Context, Result};
use graphql_hive_router::{registry::HiveRegistry, usage};
use apollo_router::{Executable, SchemaSource};
use graphql_hive_router::usage;
use tokio::runtime;

fn main() -> Result<()> {
let mut builder = runtime::Builder::new_multi_thread();
builder.enable_all();

if let Some(nb) = std::env::var("APOLLO_ROUTER_NUM_CORES")
.ok()
.and_then(|value| value.parse::<usize>().ok())
{
builder.worker_threads(nb);
}

let runtime = builder.build().context("failed to configure runtime")?;
runtime.block_on(inner_main())
}

async fn inner_main() -> Result<()> {
usage::register();

HiveRegistry::new(None).context("failed to load GraphQL Hive registry")?;
let schema = Box::pin(hive::schema().context("failed to load schema")?);

apollo_router::main()
Executable::builder()
.schema(SchemaSource::Stream(schema))
.start()
.await
}

0 comments on commit 26e5b61

Please sign in to comment.