diff --git a/datafusion-postgres/src/pg_catalog.rs b/datafusion-postgres/src/pg_catalog.rs index e417abb..851bd2a 100644 --- a/datafusion-postgres/src/pg_catalog.rs +++ b/datafusion-postgres/src/pg_catalog.rs @@ -4,7 +4,8 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion::arrow::array::{ - as_boolean_array, ArrayRef, AsArray, BooleanBuilder, RecordBatch, StringArray, StringBuilder, + as_boolean_array, ArrayRef, AsArray, BooleanBuilder, Int32Builder, RecordBatch, StringArray, + StringBuilder, }; use datafusion::arrow::datatypes::{DataType, Field, Int32Type, SchemaRef}; use datafusion::arrow::ipc::reader::FileReader; @@ -32,6 +33,7 @@ pub mod pg_get_expr_udf; pub mod pg_namespace; pub mod pg_replication_slot; pub mod pg_settings; +pub mod pg_stat_gssapi; pub mod pg_tables; pub mod pg_views; @@ -100,6 +102,7 @@ const PG_CATALOG_VIEW_PG_SETTINGS: &str = "pg_settings"; const PG_CATALOG_VIEW_PG_VIEWS: &str = "pg_views"; const PG_CATALOG_VIEW_PG_MATVIEWS: &str = "pg_matviews"; const PG_CATALOG_VIEW_PG_TABLES: &str = "pg_tables"; +const PG_CATALOG_VIEW_PG_STAT_GSSAPI: &str = "pg_stat_gssapi"; const PG_CATALOG_VIEW_PG_STAT_USER_TABLES: &str = "pg_stat_user_tables"; const PG_CATALOG_VIEW_PG_REPLICATION_SLOTS: &str = "pg_replication_slots"; @@ -339,6 +342,13 @@ impl SchemaProvider for PgCatalogSchemaProvider { vec![table], )?))) } + PG_CATALOG_VIEW_PG_STAT_GSSAPI => { + let table = Arc::new(pg_stat_gssapi::PgStatGssApiTable::new()); + Ok(Some(Arc::new(StreamingTable::try_new( + Arc::clone(table.schema()), + vec![table], + )?))) + } PG_CATALOG_VIEW_PG_TABLES => { let table = Arc::new(pg_tables::PgTablesTable::new(self.catalog_list.clone())); Ok(Some(Arc::new(StreamingTable::try_new( @@ -1162,6 +1172,25 @@ pub fn create_pg_encoding_to_char_udf() -> ScalarUDF { ) } +pub fn create_pg_backend_pid_udf() -> ScalarUDF { + let func = move |_args: &[ColumnarValue]| { + let mut builder = Int32Builder::new(); + builder.append_value(BACKEND_PID); + let array: ArrayRef = Arc::new(builder.finish()); + Ok(ColumnarValue::Array(array)) + }; + + create_udf( + "pg_backend_pid", + vec![], + DataType::Int32, + Volatility::Stable, + Arc::new(func), + ) +} + +const BACKEND_PID: i32 = 1; + /// Install pg_catalog and postgres UDFs to current `SessionContext` pub fn setup_pg_catalog( session_context: &SessionContext, @@ -1207,6 +1236,7 @@ pub fn setup_pg_catalog( session_context.register_udf(create_pg_relation_is_publishable_udf()); session_context.register_udf(create_pg_get_statisticsobjdef_columns_udf()); session_context.register_udf(create_pg_encoding_to_char_udf()); + session_context.register_udf(create_pg_backend_pid_udf()); Ok(()) } diff --git a/datafusion-postgres/src/pg_catalog/pg_stat_gssapi.rs b/datafusion-postgres/src/pg_catalog/pg_stat_gssapi.rs new file mode 100644 index 0000000..777fe28 --- /dev/null +++ b/datafusion-postgres/src/pg_catalog/pg_stat_gssapi.rs @@ -0,0 +1,65 @@ +use datafusion::arrow::array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::error::Result; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::streaming::PartitionStream; +use std::sync::Arc; + +use crate::pg_catalog::BACKEND_PID; + +#[derive(Debug, Clone)] +pub(crate) struct PgStatGssApiTable { + schema: SchemaRef, +} + +impl PgStatGssApiTable { + pub(crate) fn new() -> Self { + let schema = Arc::new(Schema::new(vec![ + Field::new("pid", DataType::Int32, true), + Field::new("gss_authenticated", DataType::Boolean, false), + Field::new("principal", DataType::Utf8, true), + Field::new("encrypted", DataType::Boolean, false), + Field::new("credentials_delegated", DataType::Boolean, false), + ])); + + Self { schema } + } + + /// Generate record batches based on the current state of the catalog + async fn get_data(this: Self) -> Result { + let pid = vec![BACKEND_PID]; + let gss_authenticated = vec![false]; + let principal: Vec> = vec![None]; + let encrypted = vec![false]; + let credentials_delegated = vec![false]; + + // Create Arrow arrays from the collected data + let arrays: Vec = vec![ + Arc::new(Int32Array::from(pid)), + Arc::new(BooleanArray::from(gss_authenticated)), + Arc::new(StringArray::from(principal)), + Arc::new(BooleanArray::from(encrypted)), + Arc::new(BooleanArray::from(credentials_delegated)), + ]; + + // Create a record batch + let batch = RecordBatch::try_new(this.schema.clone(), arrays)?; + + Ok(batch) + } +} + +impl PartitionStream for PgStatGssApiTable { + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { + let this = self.clone(); + Box::pin(RecordBatchStreamAdapter::new( + this.schema.clone(), + futures::stream::once(async move { PgStatGssApiTable::get_data(this).await }), + )) + } +}