diff --git a/datafusion-postgres/src/pg_catalog.rs b/datafusion-postgres/src/pg_catalog.rs index 286f61a..cbf0efe 100644 --- a/datafusion-postgres/src/pg_catalog.rs +++ b/datafusion-postgres/src/pg_catalog.rs @@ -22,6 +22,7 @@ use tokio::sync::RwLock; use crate::auth::AuthManager; use crate::pg_catalog::catalog_info::CatalogInfo; +use crate::pg_catalog::empty_table::EmptyTable; pub mod catalog_info; pub mod empty_table; @@ -171,7 +172,10 @@ pub const PG_CATALOG_TABLES: &[&str] = &[ PG_CATALOG_TABLE_PG_TABLESPACE, PG_CATALOG_TABLE_PG_TRIGGER, PG_CATALOG_TABLE_PG_USER_MAPPING, + PG_CATALOG_VIEW_PG_ROLES, PG_CATALOG_VIEW_PG_SETTINGS, + PG_CATALOG_VIEW_PG_STAT_GSSAPI, + PG_CATALOG_VIEW_PG_TABLES, PG_CATALOG_VIEW_PG_VIEWS, PG_CATALOG_VIEW_PG_MATVIEWS, PG_CATALOG_VIEW_PG_STAT_USER_TABLES, @@ -207,99 +211,160 @@ impl SchemaProvider for PgCatalogSchemaProvider { } async fn table(&self, name: &str) -> Result>> { + if let Some(table) = self.build_table_by_name(name)? { + let table_provider = table.try_into_table_provider()?; + Ok(Some(table_provider)) + } else { + Ok(None) + } + } + + fn table_exist(&self, name: &str) -> bool { + PG_CATALOG_TABLES.contains(&name.to_ascii_lowercase().as_str()) + } +} + +impl PgCatalogSchemaProvider { + pub fn try_new( + catalog_list: C, + static_tables: Arc, + auth_manager: Arc, + ) -> Result> { + Ok(Self { + catalog_list, + oid_counter: Arc::new(AtomicU32::new(16384)), + oid_cache: Arc::new(RwLock::new(HashMap::new())), + static_tables, + auth_manager, + }) + } + + pub fn build_table_by_name(&self, name: &str) -> Result> { match name.to_ascii_lowercase().as_str() { - PG_CATALOG_TABLE_PG_AGGREGATE => Ok(Some(self.static_tables.pg_aggregate.clone())), - PG_CATALOG_TABLE_PG_AM => Ok(Some(self.static_tables.pg_am.clone())), - PG_CATALOG_TABLE_PG_AMOP => Ok(Some(self.static_tables.pg_amop.clone())), - PG_CATALOG_TABLE_PG_AMPROC => Ok(Some(self.static_tables.pg_amproc.clone())), - PG_CATALOG_TABLE_PG_CAST => Ok(Some(self.static_tables.pg_cast.clone())), - PG_CATALOG_TABLE_PG_COLLATION => Ok(Some(self.static_tables.pg_collation.clone())), - PG_CATALOG_TABLE_PG_CONVERSION => Ok(Some(self.static_tables.pg_conversion.clone())), - PG_CATALOG_TABLE_PG_LANGUAGE => Ok(Some(self.static_tables.pg_language.clone())), - PG_CATALOG_TABLE_PG_OPCLASS => Ok(Some(self.static_tables.pg_opclass.clone())), - PG_CATALOG_TABLE_PG_OPERATOR => Ok(Some(self.static_tables.pg_operator.clone())), - PG_CATALOG_TABLE_PG_OPFAMILY => Ok(Some(self.static_tables.pg_opfamily.clone())), - PG_CATALOG_TABLE_PG_PROC => Ok(Some(self.static_tables.pg_proc.clone())), - PG_CATALOG_TABLE_PG_RANGE => Ok(Some(self.static_tables.pg_range.clone())), - PG_CATALOG_TABLE_PG_TS_CONFIG => Ok(Some(self.static_tables.pg_ts_config.clone())), - PG_CATALOG_TABLE_PG_TS_DICT => Ok(Some(self.static_tables.pg_ts_dict.clone())), - PG_CATALOG_TABLE_PG_TS_PARSER => Ok(Some(self.static_tables.pg_ts_parser.clone())), - PG_CATALOG_TABLE_PG_TS_TEMPLATE => Ok(Some(self.static_tables.pg_ts_template.clone())), - PG_CATALOG_TABLE_PG_TYPE => Ok(Some(self.static_tables.pg_type.clone())), - PG_CATALOG_TABLE_PG_ATTRDEF => Ok(Some(self.static_tables.pg_attrdef.clone())), + PG_CATALOG_TABLE_PG_AGGREGATE => { + Ok(Some(self.static_tables.pg_aggregate.clone().into())) + } + PG_CATALOG_TABLE_PG_AM => Ok(Some(self.static_tables.pg_am.clone().into())), + PG_CATALOG_TABLE_PG_AMOP => Ok(Some(self.static_tables.pg_amop.clone().into())), + PG_CATALOG_TABLE_PG_AMPROC => Ok(Some(self.static_tables.pg_amproc.clone().into())), + PG_CATALOG_TABLE_PG_CAST => Ok(Some(self.static_tables.pg_cast.clone().into())), + PG_CATALOG_TABLE_PG_COLLATION => { + Ok(Some(self.static_tables.pg_collation.clone().into())) + } + PG_CATALOG_TABLE_PG_CONVERSION => { + Ok(Some(self.static_tables.pg_conversion.clone().into())) + } + PG_CATALOG_TABLE_PG_LANGUAGE => Ok(Some(self.static_tables.pg_language.clone().into())), + PG_CATALOG_TABLE_PG_OPCLASS => Ok(Some(self.static_tables.pg_opclass.clone().into())), + PG_CATALOG_TABLE_PG_OPERATOR => Ok(Some(self.static_tables.pg_operator.clone().into())), + PG_CATALOG_TABLE_PG_OPFAMILY => Ok(Some(self.static_tables.pg_opfamily.clone().into())), + PG_CATALOG_TABLE_PG_PROC => Ok(Some(self.static_tables.pg_proc.clone().into())), + PG_CATALOG_TABLE_PG_RANGE => Ok(Some(self.static_tables.pg_range.clone().into())), + PG_CATALOG_TABLE_PG_TS_CONFIG => { + Ok(Some(self.static_tables.pg_ts_config.clone().into())) + } + PG_CATALOG_TABLE_PG_TS_DICT => Ok(Some(self.static_tables.pg_ts_dict.clone().into())), + PG_CATALOG_TABLE_PG_TS_PARSER => { + Ok(Some(self.static_tables.pg_ts_parser.clone().into())) + } + PG_CATALOG_TABLE_PG_TS_TEMPLATE => { + Ok(Some(self.static_tables.pg_ts_template.clone().into())) + } + PG_CATALOG_TABLE_PG_TYPE => Ok(Some(self.static_tables.pg_type.clone().into())), + PG_CATALOG_TABLE_PG_ATTRDEF => Ok(Some(self.static_tables.pg_attrdef.clone().into())), PG_CATALOG_TABLE_PG_AUTH_MEMBERS => { - Ok(Some(self.static_tables.pg_auth_members.clone())) + Ok(Some(self.static_tables.pg_auth_members.clone().into())) } - PG_CATALOG_TABLE_PG_AUTHID => Ok(Some(self.static_tables.pg_authid.clone())), + PG_CATALOG_TABLE_PG_AUTHID => Ok(Some(self.static_tables.pg_authid.clone().into())), - PG_CATALOG_TABLE_PG_CONSTRAINT => Ok(Some(self.static_tables.pg_constraint.clone())), + PG_CATALOG_TABLE_PG_CONSTRAINT => { + Ok(Some(self.static_tables.pg_constraint.clone().into())) + } PG_CATALOG_TABLE_PG_DB_ROLE_SETTING => { - Ok(Some(self.static_tables.pg_db_role_setting.clone())) + Ok(Some(self.static_tables.pg_db_role_setting.clone().into())) + } + PG_CATALOG_TABLE_PG_DEFAULT_ACL => { + Ok(Some(self.static_tables.pg_default_acl.clone().into())) } - PG_CATALOG_TABLE_PG_DEFAULT_ACL => Ok(Some(self.static_tables.pg_default_acl.clone())), - PG_CATALOG_TABLE_PG_DEPEND => Ok(Some(self.static_tables.pg_depend.clone())), - PG_CATALOG_TABLE_PG_DESCRIPTION => Ok(Some(self.static_tables.pg_description.clone())), - PG_CATALOG_TABLE_PG_ENUM => Ok(Some(self.static_tables.pg_enum.clone())), + PG_CATALOG_TABLE_PG_DEPEND => Ok(Some(self.static_tables.pg_depend.clone().into())), + PG_CATALOG_TABLE_PG_DESCRIPTION => { + Ok(Some(self.static_tables.pg_description.clone().into())) + } + PG_CATALOG_TABLE_PG_ENUM => Ok(Some(self.static_tables.pg_enum.clone().into())), PG_CATALOG_TABLE_PG_EVENT_TRIGGER => { - Ok(Some(self.static_tables.pg_event_trigger.clone())) + Ok(Some(self.static_tables.pg_event_trigger.clone().into())) } - PG_CATALOG_TABLE_PG_EXTENSION => Ok(Some(self.static_tables.pg_extension.clone())), - PG_CATALOG_TABLE_PG_FOREIGN_DATA_WRAPPER => { - Ok(Some(self.static_tables.pg_foreign_data_wrapper.clone())) + PG_CATALOG_TABLE_PG_EXTENSION => { + Ok(Some(self.static_tables.pg_extension.clone().into())) } + PG_CATALOG_TABLE_PG_FOREIGN_DATA_WRAPPER => Ok(Some( + self.static_tables.pg_foreign_data_wrapper.clone().into(), + )), PG_CATALOG_TABLE_PG_FOREIGN_SERVER => { - Ok(Some(self.static_tables.pg_foreign_server.clone())) + Ok(Some(self.static_tables.pg_foreign_server.clone().into())) } PG_CATALOG_TABLE_PG_FOREIGN_TABLE => { - Ok(Some(self.static_tables.pg_foreign_table.clone())) + Ok(Some(self.static_tables.pg_foreign_table.clone().into())) + } + PG_CATALOG_TABLE_PG_INDEX => Ok(Some(self.static_tables.pg_index.clone().into())), + PG_CATALOG_TABLE_PG_INHERITS => Ok(Some(self.static_tables.pg_inherits.clone().into())), + PG_CATALOG_TABLE_PG_INIT_PRIVS => { + Ok(Some(self.static_tables.pg_init_privs.clone().into())) } - PG_CATALOG_TABLE_PG_INDEX => Ok(Some(self.static_tables.pg_index.clone())), - PG_CATALOG_TABLE_PG_INHERITS => Ok(Some(self.static_tables.pg_inherits.clone())), - PG_CATALOG_TABLE_PG_INIT_PRIVS => Ok(Some(self.static_tables.pg_init_privs.clone())), - PG_CATALOG_TABLE_PG_LARGEOBJECT => Ok(Some(self.static_tables.pg_largeobject.clone())), - PG_CATALOG_TABLE_PG_LARGEOBJECT_METADATA => { - Ok(Some(self.static_tables.pg_largeobject_metadata.clone())) + PG_CATALOG_TABLE_PG_LARGEOBJECT => { + Ok(Some(self.static_tables.pg_largeobject.clone().into())) } + PG_CATALOG_TABLE_PG_LARGEOBJECT_METADATA => Ok(Some( + self.static_tables.pg_largeobject_metadata.clone().into(), + )), PG_CATALOG_TABLE_PG_PARTITIONED_TABLE => { - Ok(Some(self.static_tables.pg_partitioned_table.clone())) + Ok(Some(self.static_tables.pg_partitioned_table.clone().into())) } - PG_CATALOG_TABLE_PG_POLICY => Ok(Some(self.static_tables.pg_policy.clone())), - PG_CATALOG_TABLE_PG_PUBLICATION => Ok(Some(self.static_tables.pg_publication.clone())), - PG_CATALOG_TABLE_PG_PUBLICATION_NAMESPACE => { - Ok(Some(self.static_tables.pg_publication_namespace.clone())) + PG_CATALOG_TABLE_PG_POLICY => Ok(Some(self.static_tables.pg_policy.clone().into())), + PG_CATALOG_TABLE_PG_PUBLICATION => { + Ok(Some(self.static_tables.pg_publication.clone().into())) } + PG_CATALOG_TABLE_PG_PUBLICATION_NAMESPACE => Ok(Some( + self.static_tables.pg_publication_namespace.clone().into(), + )), PG_CATALOG_TABLE_PG_PUBLICATION_REL => { - Ok(Some(self.static_tables.pg_publication_rel.clone())) - } - PG_CATALOG_TABLE_PG_REPLICATION_ORIGIN => { - Ok(Some(self.static_tables.pg_replication_origin.clone())) + Ok(Some(self.static_tables.pg_publication_rel.clone().into())) } - PG_CATALOG_TABLE_PG_REWRITE => Ok(Some(self.static_tables.pg_rewrite.clone())), - PG_CATALOG_TABLE_PG_SECLABEL => Ok(Some(self.static_tables.pg_seclabel.clone())), - PG_CATALOG_TABLE_PG_SEQUENCE => Ok(Some(self.static_tables.pg_sequence.clone())), - PG_CATALOG_TABLE_PG_SHDEPEND => Ok(Some(self.static_tables.pg_shdepend.clone())), + PG_CATALOG_TABLE_PG_REPLICATION_ORIGIN => Ok(Some( + self.static_tables.pg_replication_origin.clone().into(), + )), + PG_CATALOG_TABLE_PG_REWRITE => Ok(Some(self.static_tables.pg_rewrite.clone().into())), + PG_CATALOG_TABLE_PG_SECLABEL => Ok(Some(self.static_tables.pg_seclabel.clone().into())), + PG_CATALOG_TABLE_PG_SEQUENCE => Ok(Some(self.static_tables.pg_sequence.clone().into())), + PG_CATALOG_TABLE_PG_SHDEPEND => Ok(Some(self.static_tables.pg_shdepend.clone().into())), PG_CATALOG_TABLE_PG_SHDESCRIPTION => { - Ok(Some(self.static_tables.pg_shdescription.clone())) + Ok(Some(self.static_tables.pg_shdescription.clone().into())) } - PG_CATALOG_TABLE_PG_SHSECLABEL => Ok(Some(self.static_tables.pg_shseclabel.clone())), - PG_CATALOG_TABLE_PG_STATISTIC => Ok(Some(self.static_tables.pg_statistic.clone())), - PG_CATALOG_TABLE_PG_STATISTIC_EXT => { - Ok(Some(self.static_tables.pg_statistic_ext.clone())) + PG_CATALOG_TABLE_PG_SHSECLABEL => { + Ok(Some(self.static_tables.pg_shseclabel.clone().into())) + } + PG_CATALOG_TABLE_PG_STATISTIC => { + Ok(Some(self.static_tables.pg_statistic.clone().into())) } - PG_CATALOG_TABLE_PG_STATISTIC_EXT_DATA => { - Ok(Some(self.static_tables.pg_statistic_ext_data.clone())) + PG_CATALOG_TABLE_PG_STATISTIC_EXT => { + Ok(Some(self.static_tables.pg_statistic_ext.clone().into())) } + PG_CATALOG_TABLE_PG_STATISTIC_EXT_DATA => Ok(Some( + self.static_tables.pg_statistic_ext_data.clone().into(), + )), PG_CATALOG_TABLE_PG_SUBSCRIPTION => { - Ok(Some(self.static_tables.pg_subscription.clone())) + Ok(Some(self.static_tables.pg_subscription.clone().into())) } PG_CATALOG_TABLE_PG_SUBSCRIPTION_REL => { - Ok(Some(self.static_tables.pg_subscription_rel.clone())) + Ok(Some(self.static_tables.pg_subscription_rel.clone().into())) } - PG_CATALOG_TABLE_PG_TABLESPACE => Ok(Some(self.static_tables.pg_tablespace.clone())), - PG_CATALOG_TABLE_PG_TRIGGER => Ok(Some(self.static_tables.pg_trigger.clone())), + PG_CATALOG_TABLE_PG_TABLESPACE => { + Ok(Some(self.static_tables.pg_tablespace.clone().into())) + } + PG_CATALOG_TABLE_PG_TRIGGER => Ok(Some(self.static_tables.pg_trigger.clone().into())), PG_CATALOG_TABLE_PG_USER_MAPPING => { - Ok(Some(self.static_tables.pg_user_mapping.clone())) + Ok(Some(self.static_tables.pg_user_mapping.clone().into())) } PG_CATALOG_TABLE_PG_ATTRIBUTE => { @@ -308,10 +373,7 @@ impl SchemaProvider for PgCatalogSchemaProvider { self.oid_counter.clone(), self.oid_cache.clone(), )); - Ok(Some(Arc::new(StreamingTable::try_new( - Arc::clone(table.schema()), - vec![table], - )?))) + Ok(Some(PgCatalogTable::Dynamic(table))) } PG_CATALOG_TABLE_PG_CLASS => { let table = Arc::new(pg_class::PgClassTable::new( @@ -319,10 +381,7 @@ impl SchemaProvider for PgCatalogSchemaProvider { self.oid_counter.clone(), self.oid_cache.clone(), )); - Ok(Some(Arc::new(StreamingTable::try_new( - Arc::clone(table.schema()), - vec![table], - )?))) + Ok(Some(PgCatalogTable::Dynamic(table))) } PG_CATALOG_TABLE_PG_DATABASE => { let table = Arc::new(pg_database::PgDatabaseTable::new( @@ -330,10 +389,7 @@ impl SchemaProvider for PgCatalogSchemaProvider { self.oid_counter.clone(), self.oid_cache.clone(), )); - Ok(Some(Arc::new(StreamingTable::try_new( - Arc::clone(table.schema()), - vec![table], - )?))) + Ok(Some(PgCatalogTable::Dynamic(table))) } PG_CATALOG_TABLE_PG_NAMESPACE => { let table = Arc::new(pg_namespace::PgNamespaceTable::new( @@ -341,73 +397,41 @@ impl SchemaProvider for PgCatalogSchemaProvider { self.oid_counter.clone(), self.oid_cache.clone(), )); - Ok(Some(Arc::new(StreamingTable::try_new( - Arc::clone(table.schema()), - vec![table], - )?))) - } - PG_CATALOG_VIEW_PG_STAT_GSSAPI => { - let table = Arc::new(pg_stat_gssapi::PgStatGssApiTable::new()); - Ok(Some(Arc::new(StreamingTable::try_new( - Arc::clone(table.schema()), - vec![table], - )?))) - } - PG_CATALOG_VIEW_PG_ROLES => { - let table = Arc::new(pg_roles::PgRolesTable::new(Arc::clone(&self.auth_manager))); - Ok(Some(Arc::new(StreamingTable::try_new( - Arc::clone(table.schema()), - vec![table], - )?))) + Ok(Some(PgCatalogTable::Dynamic(table))) } PG_CATALOG_VIEW_PG_TABLES => { let table = Arc::new(pg_tables::PgTablesTable::new(self.catalog_list.clone())); - Ok(Some(Arc::new(StreamingTable::try_new( - Arc::clone(table.schema()), - vec![table], - )?))) + Ok(Some(PgCatalogTable::Dynamic(table))) } PG_CATALOG_VIEW_PG_SETTINGS => { - let table = pg_settings::PgSettingsView::try_new()?; - Ok(Some(Arc::new(table.try_into_memtable()?))) + let table = Arc::new(pg_settings::PgSettingsView::new()); + Ok(Some(PgCatalogTable::Dynamic(table))) + } + + PG_CATALOG_VIEW_PG_STAT_GSSAPI => { + let table = Arc::new(pg_stat_gssapi::PgStatGssApiTable::new()); + Ok(Some(PgCatalogTable::Dynamic(table))) } - PG_CATALOG_VIEW_PG_VIEWS => Ok(Some(Arc::new(pg_views::pg_views()?))), - PG_CATALOG_VIEW_PG_MATVIEWS => Ok(Some(Arc::new(pg_views::pg_matviews()?))), - PG_CATALOG_VIEW_PG_STAT_USER_TABLES => { - Ok(Some(Arc::new(pg_views::pg_stat_user_tables()?))) + PG_CATALOG_VIEW_PG_ROLES => { + let table = Arc::new(pg_roles::PgRolesTable::new(Arc::clone(&self.auth_manager))); + Ok(Some(PgCatalogTable::Dynamic(table))) } + + PG_CATALOG_VIEW_PG_VIEWS => Ok(Some(pg_views::pg_views().into())), + PG_CATALOG_VIEW_PG_MATVIEWS => Ok(Some(pg_views::pg_matviews().into())), + PG_CATALOG_VIEW_PG_STAT_USER_TABLES => Ok(Some(pg_views::pg_stat_user_tables().into())), PG_CATALOG_VIEW_PG_REPLICATION_SLOTS => { - let table = pg_replication_slot::pg_replication_slots()?; - Ok(Some(table)) + Ok(Some(pg_replication_slot::pg_replication_slots().into())) } + _ => Ok(None), } } - - fn table_exist(&self, name: &str) -> bool { - PG_CATALOG_TABLES.contains(&name.to_ascii_lowercase().as_str()) - } -} - -impl PgCatalogSchemaProvider { - pub fn try_new( - catalog_list: C, - static_tables: Arc, - auth_manager: Arc, - ) -> Result> { - Ok(Self { - catalog_list, - oid_counter: Arc::new(AtomicU32::new(16384)), - oid_cache: Arc::new(RwLock::new(HashMap::new())), - static_tables, - auth_manager, - }) - } } /// A table that reads data from Avro bytes #[derive(Debug, Clone)] -struct ArrowTable { +pub struct ArrowTable { schema: SchemaRef, data: Vec, } @@ -433,15 +457,71 @@ impl ArrowTable { } /// Convert the arrow data into datafusion MemTable - pub fn try_into_memtable(self) -> Result { - MemTable::try_new(self.schema, vec![self.data]) + pub fn try_into_memtable(&self) -> Result> { + let mem_table = MemTable::try_new(self.schema.clone(), vec![self.data.clone()])?; + Ok(Arc::new(mem_table)) + } + + pub fn data(&self) -> &[RecordBatch] { + &self.data + } + + pub fn schema(&self) -> SchemaRef { + self.schema.clone() } } impl TableFunctionImpl for ArrowTable { fn call(&self, _args: &[Expr]) -> Result> { - let table = self.clone().try_into_memtable()?; - Ok(Arc::new(table)) + let table = self.try_into_memtable()?; + Ok(table) + } +} + +/// an enum to wrap all kinds of tables +pub enum PgCatalogTable { + Static(Arc), + Dynamic(Arc), + Empty(EmptyTable), +} + +impl From> for PgCatalogTable { + fn from(t: Arc) -> PgCatalogTable { + PgCatalogTable::Static(t) + } +} + +impl From for PgCatalogTable { + fn from(t: EmptyTable) -> PgCatalogTable { + Self::Empty(t) + } +} + +impl PgCatalogTable { + pub fn try_into_table_provider(&self) -> Result> { + match self { + Self::Static(t) => { + let memtable = t.try_into_memtable()?; + Ok(memtable) + } + Self::Dynamic(t) => { + let streaming_table = + StreamingTable::try_new(Arc::clone(t.schema()), vec![t.clone()])?; + Ok(Arc::new(streaming_table)) + } + Self::Empty(t) => { + let memtable = t.try_into_memtable()?; + Ok(memtable) + } + } + } + + pub fn schema(&self) -> SchemaRef { + match self { + Self::Static(t) => t.schema(), + Self::Dynamic(t) => t.schema().clone(), + Self::Empty(t) => t.schema(), + } } } @@ -450,65 +530,65 @@ impl TableFunctionImpl for ArrowTable { /// This implementation only contains static tables #[derive(Debug, Clone)] pub struct PgCatalogStaticTables { - pub pg_aggregate: Arc, - pub pg_am: Arc, - pub pg_amop: Arc, - pub pg_amproc: Arc, - pub pg_cast: Arc, - pub pg_collation: Arc, - pub pg_conversion: Arc, - pub pg_language: Arc, - pub pg_opclass: Arc, - pub pg_operator: Arc, - pub pg_opfamily: Arc, - pub pg_proc: Arc, - pub pg_range: Arc, - pub pg_ts_config: Arc, - pub pg_ts_dict: Arc, - pub pg_ts_parser: Arc, - pub pg_ts_template: Arc, - pub pg_type: Arc, - pub pg_attrdef: Arc, - pub pg_auth_members: Arc, - pub pg_authid: Arc, - pub pg_constraint: Arc, - pub pg_db_role_setting: Arc, - pub pg_default_acl: Arc, - pub pg_depend: Arc, - pub pg_description: Arc, - pub pg_enum: Arc, - pub pg_event_trigger: Arc, - pub pg_extension: Arc, - pub pg_foreign_data_wrapper: Arc, - pub pg_foreign_server: Arc, - pub pg_foreign_table: Arc, - pub pg_index: Arc, - pub pg_inherits: Arc, - pub pg_init_privs: Arc, - pub pg_largeobject: Arc, - pub pg_largeobject_metadata: Arc, - pub pg_partitioned_table: Arc, - pub pg_policy: Arc, - pub pg_publication: Arc, - pub pg_publication_namespace: Arc, - pub pg_publication_rel: Arc, - pub pg_replication_origin: Arc, - pub pg_rewrite: Arc, - pub pg_seclabel: Arc, - pub pg_sequence: Arc, - pub pg_shdepend: Arc, - pub pg_shdescription: Arc, - pub pg_shseclabel: Arc, - pub pg_statistic: Arc, - pub pg_statistic_ext: Arc, - pub pg_statistic_ext_data: Arc, - pub pg_subscription: Arc, - pub pg_subscription_rel: Arc, - pub pg_tablespace: Arc, - pub pg_trigger: Arc, - pub pg_user_mapping: Arc, - - pub pg_get_keywords: Arc, + pub pg_aggregate: Arc, + pub pg_am: Arc, + pub pg_amop: Arc, + pub pg_amproc: Arc, + pub pg_cast: Arc, + pub pg_collation: Arc, + pub pg_conversion: Arc, + pub pg_language: Arc, + pub pg_opclass: Arc, + pub pg_operator: Arc, + pub pg_opfamily: Arc, + pub pg_proc: Arc, + pub pg_range: Arc, + pub pg_ts_config: Arc, + pub pg_ts_dict: Arc, + pub pg_ts_parser: Arc, + pub pg_ts_template: Arc, + pub pg_type: Arc, + pub pg_attrdef: Arc, + pub pg_auth_members: Arc, + pub pg_authid: Arc, + pub pg_constraint: Arc, + pub pg_db_role_setting: Arc, + pub pg_default_acl: Arc, + pub pg_depend: Arc, + pub pg_description: Arc, + pub pg_enum: Arc, + pub pg_event_trigger: Arc, + pub pg_extension: Arc, + pub pg_foreign_data_wrapper: Arc, + pub pg_foreign_server: Arc, + pub pg_foreign_table: Arc, + pub pg_index: Arc, + pub pg_inherits: Arc, + pub pg_init_privs: Arc, + pub pg_largeobject: Arc, + pub pg_largeobject_metadata: Arc, + pub pg_partitioned_table: Arc, + pub pg_policy: Arc, + pub pg_publication: Arc, + pub pg_publication_namespace: Arc, + pub pg_publication_rel: Arc, + pub pg_replication_origin: Arc, + pub pg_rewrite: Arc, + pub pg_seclabel: Arc, + pub pg_sequence: Arc, + pub pg_shdepend: Arc, + pub pg_shdescription: Arc, + pub pg_shseclabel: Arc, + pub pg_statistic: Arc, + pub pg_statistic_ext: Arc, + pub pg_statistic_ext_data: Arc, + pub pg_subscription: Arc, + pub pg_subscription_rel: Arc, + pub pg_tablespace: Arc, + pub pg_trigger: Arc, + pub pg_user_mapping: Arc, + + pub pg_get_keywords: Arc, } impl PgCatalogStaticTables { @@ -915,7 +995,7 @@ impl PgCatalogStaticTables { .to_vec(), )?, - pg_get_keywords: Self::create_arrow_table_function( + pg_get_keywords: Self::create_arrow_table( include_bytes!(concat!( env!("CARGO_MANIFEST_DIR"), "/pg_catalog_arrow_exports/pg_get_keywords.feather" @@ -926,15 +1006,8 @@ impl PgCatalogStaticTables { } /// Create table from dumped arrow data - fn create_arrow_table(data_bytes: Vec) -> Result> { - let table = ArrowTable::from_ipc_data(data_bytes)?; - let mem_table = table.try_into_memtable()?; - Ok(Arc::new(mem_table)) - } - - fn create_arrow_table_function(data_bytes: Vec) -> Result> { - let table = ArrowTable::from_ipc_data(data_bytes)?; - Ok(Arc::new(table)) + fn create_arrow_table(data_bytes: Vec) -> Result> { + ArrowTable::from_ipc_data(data_bytes).map(Arc::new) } } diff --git a/datafusion-postgres/src/pg_catalog/empty_table.rs b/datafusion-postgres/src/pg_catalog/empty_table.rs index e12032d..256e43d 100644 --- a/datafusion-postgres/src/pg_catalog/empty_table.rs +++ b/datafusion-postgres/src/pg_catalog/empty_table.rs @@ -1,9 +1,11 @@ +use std::sync::Arc; + use datafusion::arrow::datatypes::SchemaRef; use datafusion::catalog::MemTable; use datafusion::error::Result; #[derive(Debug, Clone)] -pub(crate) struct EmptyTable { +pub struct EmptyTable { schema: SchemaRef, } @@ -12,7 +14,11 @@ impl EmptyTable { Self { schema } } - pub fn try_into_memtable(self) -> Result { - MemTable::try_new(self.schema, vec![vec![]]) + pub fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + pub fn try_into_memtable(&self) -> Result> { + MemTable::try_new(self.schema.clone(), vec![vec![]]).map(Arc::new) } } diff --git a/datafusion-postgres/src/pg_catalog/pg_replication_slot.rs b/datafusion-postgres/src/pg_catalog/pg_replication_slot.rs index a1d50c4..0137a11 100644 --- a/datafusion-postgres/src/pg_catalog/pg_replication_slot.rs +++ b/datafusion-postgres/src/pg_catalog/pg_replication_slot.rs @@ -1,10 +1,8 @@ use crate::pg_catalog::empty_table::EmptyTable; use datafusion::arrow::datatypes::{DataType, Field, Schema}; -use datafusion::catalog::TableProvider; -use datafusion::error::Result; use std::sync::Arc; -pub(crate) fn pg_replication_slots() -> Result> { +pub(crate) fn pg_replication_slots() -> EmptyTable { let schema = Arc::new(Schema::new(vec![ Field::new("slot_name", DataType::Utf8, true), Field::new("plugin", DataType::Utf8, true), @@ -24,7 +22,5 @@ pub(crate) fn pg_replication_slots() -> Result> { Field::new("conflicting", DataType::Boolean, false), ])); - let table = EmptyTable::new(schema).try_into_memtable()?; - - Ok(Arc::new(table)) + EmptyTable::new(schema) } diff --git a/datafusion-postgres/src/pg_catalog/pg_settings.rs b/datafusion-postgres/src/pg_catalog/pg_settings.rs index c94cd82..b091ab6 100644 --- a/datafusion-postgres/src/pg_catalog/pg_settings.rs +++ b/datafusion-postgres/src/pg_catalog/pg_settings.rs @@ -2,22 +2,19 @@ use std::sync::Arc; use datafusion::arrow::array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion::catalog::MemTable; use datafusion::error::Result; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::streaming::PartitionStream; #[derive(Debug, Clone)] pub(crate) struct PgSettingsView { schema: SchemaRef, - data: Vec, } impl PgSettingsView { - pub(crate) fn try_new() -> Result { + pub fn new() -> PgSettingsView { let schema = Arc::new(Schema::new(vec![ - // name | setting | unit | category | short_ - //desc | extra_desc - //| context | vartype | source | min_val | max_val | enumvals | - //boot_val | reset_val | sourcefile | sourceline | pending_restart Field::new("name", DataType::Utf8, true), Field::new("setting", DataType::Utf8, true), Field::new("unit", DataType::Utf8, true), @@ -37,12 +34,10 @@ impl PgSettingsView { Field::new("pending_restart", DataType::Boolean, true), ])); - let data = Self::create_data(schema.clone())?; - - Ok(Self { schema, data }) + Self { schema } } - fn create_data(schema: Arc) -> Result> { + fn create_data(schema: Arc) -> Result { let mut name: Vec> = Vec::new(); let mut setting: Vec> = Vec::new(); let mut unit: Vec> = Vec::new(); @@ -104,12 +99,20 @@ impl PgSettingsView { Arc::new(BooleanArray::from(pending_restart)), ]; - let batch = RecordBatch::try_new(schema.clone(), arrays)?; + RecordBatch::try_new(schema.clone(), arrays).map_err(Into::into) + } +} - Ok(vec![batch]) +impl PartitionStream for PgSettingsView { + fn schema(&self) -> &SchemaRef { + &self.schema } - pub(crate) fn try_into_memtable(self) -> Result { - MemTable::try_new(self.schema, vec![self.data]) + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { + let this = self.clone(); + Box::pin(RecordBatchStreamAdapter::new( + this.schema.clone(), + futures::stream::once(async move { Self::create_data(this.schema().clone()) }), + )) } } diff --git a/datafusion-postgres/src/pg_catalog/pg_views.rs b/datafusion-postgres/src/pg_catalog/pg_views.rs index 36286e3..1485770 100644 --- a/datafusion-postgres/src/pg_catalog/pg_views.rs +++ b/datafusion-postgres/src/pg_catalog/pg_views.rs @@ -1,22 +1,20 @@ use std::sync::Arc; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; -use datafusion::catalog::MemTable; -use datafusion::error::Result; use super::empty_table::EmptyTable; -pub fn pg_views() -> Result { +pub fn pg_views() -> EmptyTable { let schema = Arc::new(Schema::new(vec![ Field::new("schemaname", DataType::Utf8, true), Field::new("viewname", DataType::Utf8, true), Field::new("viewowner", DataType::Utf8, true), Field::new("definition", DataType::Utf8, true), ])); - EmptyTable::new(schema).try_into_memtable() + EmptyTable::new(schema) } -pub fn pg_matviews() -> Result { +pub fn pg_matviews() -> EmptyTable { let schema = Arc::new(Schema::new(vec![ Field::new("schemaname", DataType::Utf8, true), Field::new("matviewname", DataType::Utf8, true), @@ -27,10 +25,10 @@ pub fn pg_matviews() -> Result { Field::new("definition", DataType::Utf8, true), ])); - EmptyTable::new(schema).try_into_memtable() + EmptyTable::new(schema) } -pub fn pg_stat_user_tables() -> Result { +pub fn pg_stat_user_tables() -> EmptyTable { let schema = Arc::new(Schema::new(vec![ Field::new("relid", DataType::Int32, false), Field::new("schemaname", DataType::Utf8, false), @@ -88,5 +86,5 @@ pub fn pg_stat_user_tables() -> Result { Field::new("total_autoanalyze_time", DataType::Float64, false), ])); - EmptyTable::new(schema).try_into_memtable() + EmptyTable::new(schema) }