Skip to content

Commit 59306e4

Browse files
committed
Apply patch to upgrade pg_wire
1 parent 5d120b8 commit 59306e4

5 files changed

Lines changed: 37 additions & 52 deletions

File tree

Cargo.lock

Lines changed: 10 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ paste = "1.0"
215215
percent-encoding = "2.3"
216216
petgraph = { version = "0.6.5", default-features = false }
217217
pin-project-lite = "0.2.9"
218-
pgwire = { version = "0.28.0", features = ["server-api"] }
218+
pgwire = { version = "0.32", features = ["server-api"] }
219219
postgres-types = "0.2.5"
220220
pretty_assertions = { version = "1.4", features = ["unstable"] }
221221
proc-macro2 = "1.0"

crates/client-api/src/routes/database.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate>(
518518
// so, unless you are the owner, this will fail.
519519

520520
let (database_identity, db_name) = match &name_or_identity {
521-
Some(noa) => match noa.try_resolve(&ctx).await? {
521+
Some(noa) => match noa.try_resolve(&ctx).await.map_err(log_and_500)? {
522522
Ok(resolved) => (resolved, noa.name()),
523523
Err(name) => {
524524
// `name_or_identity` was a `NameOrIdentity::Name`, but no record

crates/client-api/src/util.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,18 +97,21 @@ impl NameOrIdentity {
9797
pub async fn try_resolve(
9898
&self,
9999
ctx: &(impl ControlStateReadAccess + ?Sized),
100-
) -> axum::response::Result<Result<Identity, &DatabaseName>> {
100+
) -> anyhow::Result<Result<Identity, &DatabaseName>> {
101101
Ok(match self {
102102
Self::Identity(identity) => Ok(Identity::from(*identity)),
103-
Self::Name(name) => ctx.lookup_identity(name.as_ref()).map_err(log_and_500)?.ok_or(name),
103+
Self::Name(name) => ctx.lookup_identity(name.as_ref())?.ok_or(name),
104104
})
105105
}
106106

107107
/// A variant of [`Self::try_resolve()`] which maps to a 404 (Not Found)
108108
/// response if `self` is a [`NameOrIdentity::Name`] for which no
109109
/// corresponding [`Identity`] is found in the SpacetimeDB DNS.
110110
pub async fn resolve(&self, ctx: &(impl ControlStateReadAccess + ?Sized)) -> axum::response::Result<Identity> {
111-
self.try_resolve(ctx).await?.map_err(|_| StatusCode::NOT_FOUND.into())
111+
self.try_resolve(ctx)
112+
.await
113+
.map_err(log_and_500)?
114+
.map_err(|_| StatusCode::NOT_FOUND.into())
112115
}
113116
}
114117

crates/pg/src/pg_server.rs

Lines changed: 19 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,10 @@ use pgwire::api::auth::{
1212
finish_authentication, save_startup_parameters_to_metadata, DefaultServerParameterProvider, LoginInfo,
1313
StartupHandler,
1414
};
15-
use pgwire::api::copy::NoopCopyHandler;
1615
use pgwire::api::portal::Format;
17-
use pgwire::api::query::{PlaceholderExtendedQueryHandler, SimpleQueryHandler};
16+
use pgwire::api::query::SimpleQueryHandler;
1817
use pgwire::api::results::{DataRowEncoder, FieldInfo, QueryResponse, Response, Tag};
19-
use pgwire::api::{ClientInfo, NoopErrorHandler, METADATA_DATABASE};
18+
use pgwire::api::{ClientInfo, METADATA_DATABASE};
2019
use pgwire::api::{PgWireConnectionState, PgWireServerHandlers};
2120
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
2221
use pgwire::messages::data::DataRow;
@@ -145,12 +144,12 @@ async fn response<T>(res: axum::response::Result<T>, database: &str) -> Result<T
145144
}
146145

147146
struct PgSpacetimeDB<T> {
148-
ctx: Arc<T>,
147+
ctx: T,
149148
cached: Mutex<Option<Metadata>>,
150149
parameter_provider: DefaultServerParameterProvider,
151150
}
152151

153-
impl<T: ControlStateReadAccess + ControlStateWriteAccess + NodeDelegate> PgSpacetimeDB<T> {
152+
impl<T: ControlStateReadAccess + ControlStateWriteAccess + NodeDelegate + Clone> PgSpacetimeDB<T> {
154153
async fn exe_sql<'a>(&self, query: String) -> PgWireResult<Vec<Response<'a>>> {
155154
let params = self.cached.lock().await.clone().unwrap();
156155
let db = SqlParams {
@@ -283,13 +282,11 @@ impl<T: Sync + Send + ControlStateReadAccess + ControlStateWriteAccess + NodeDel
283282
self.cached.lock().await.clone_from(&Some(metadata));
284283
finish_authentication(client, &self.parameter_provider).await?;
285284
}
286-
PgWireFrontendMessage::SslRequest(ssl) => {
287-
if ssl.is_some() {
288-
let err = PgError::SSLNotSupported;
289-
log::error!("{err}");
290-
let err = ErrorInfo::new("FATAL".to_owned(), "28P01".to_owned(), err.to_string());
291-
return close_client(client, err).await;
292-
}
285+
PgWireFrontendMessage::SslRequest(_) => {
286+
let err = PgError::SSLNotSupported;
287+
log::error!("{err}");
288+
let err = ErrorInfo::new("FATAL".to_owned(), "28P01".to_owned(), err.to_string());
289+
return close_client(client, err).await;
293290
}
294291
// The other messages are for features not supported by SpacetimeDB, that are rejected by the parser.
295292
_ => {
@@ -301,10 +298,10 @@ impl<T: Sync + Send + ControlStateReadAccess + ControlStateWriteAccess + NodeDel
301298
}
302299

303300
#[async_trait]
304-
impl<T: Sync + Send + ControlStateReadAccess + ControlStateWriteAccess + NodeDelegate> SimpleQueryHandler
301+
impl<T: Sync + Send + ControlStateReadAccess + ControlStateWriteAccess + NodeDelegate + Clone> SimpleQueryHandler
305302
for PgSpacetimeDB<T>
306303
{
307-
async fn do_query<'a, C>(&self, _client: &mut C, query: &'a str) -> PgWireResult<Vec<Response<'a>>>
304+
async fn do_query<'a, C>(&self, _client: &mut C, query: &str) -> PgWireResult<Vec<Response<'a>>>
308305
where
309306
C: ClientInfo + Unpin + Send + Sync,
310307
{
@@ -313,12 +310,12 @@ impl<T: Sync + Send + ControlStateReadAccess + ControlStateWriteAccess + NodeDel
313310
}
314311

315312
#[derive(Clone)]
316-
struct PgSpacetimeDBFactory<T> {
313+
pub struct PgSpacetimeDBFactory<T> {
317314
handler: Arc<PgSpacetimeDB<T>>,
318315
}
319316

320317
impl<T> PgSpacetimeDBFactory<T> {
321-
pub fn new(ctx: Arc<T>) -> Self {
318+
pub fn new(ctx: T) -> Self {
322319
let mut parameter_provider = DefaultServerParameterProvider::default();
323320
parameter_provider.server_version = format!("spacetime {}", spacetimedb_lib_version());
324321

@@ -333,39 +330,23 @@ impl<T> PgSpacetimeDBFactory<T> {
333330
}
334331
}
335332

336-
impl<T: Sync + Send + ControlStateReadAccess + ControlStateWriteAccess + NodeDelegate> PgWireServerHandlers
333+
impl<T: Sync + Send + ControlStateReadAccess + ControlStateWriteAccess + NodeDelegate + Clone> PgWireServerHandlers
337334
for PgSpacetimeDBFactory<T>
338335
{
339-
type StartupHandler = PgSpacetimeDB<T>;
340-
type SimpleQueryHandler = PgSpacetimeDB<T>;
341-
type ExtendedQueryHandler = PlaceholderExtendedQueryHandler;
342-
type CopyHandler = NoopCopyHandler;
343-
type ErrorHandler = NoopErrorHandler;
344-
345-
fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
336+
fn simple_query_handler(&self) -> Arc<impl SimpleQueryHandler> {
346337
self.handler.clone()
347338
}
348339

349-
fn extended_query_handler(&self) -> Arc<Self::ExtendedQueryHandler> {
350-
Arc::new(PlaceholderExtendedQueryHandler)
351-
}
340+
// TODO: fn extended_query_handler(&self) -> Arc<impl ExtendedQueryHandler> {}
352341

353-
fn startup_handler(&self) -> Arc<Self::StartupHandler> {
342+
fn startup_handler(&self) -> Arc<impl StartupHandler> {
354343
self.handler.clone()
355344
}
356-
357-
fn copy_handler(&self) -> Arc<Self::CopyHandler> {
358-
Arc::new(NoopCopyHandler)
359-
}
360-
361-
fn error_handler(&self) -> Arc<Self::ErrorHandler> {
362-
Arc::new(NoopErrorHandler)
363-
}
364345
}
365346

366-
pub async fn start_pg<T: ControlStateReadAccess + ControlStateWriteAccess + NodeDelegate + 'static>(
347+
pub async fn start_pg<T: ControlStateReadAccess + ControlStateWriteAccess + NodeDelegate + Clone + 'static>(
367348
shutdown: Arc<Notify>,
368-
ctx: Arc<T>,
349+
ctx: T,
369350
tcp: TcpListener,
370351
) {
371352
let factory = Arc::new(PgSpacetimeDBFactory::new(ctx));

0 commit comments

Comments
 (0)