From 8928a705f59a4f041083e214b6ac04a0c4545efb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sat, 16 Nov 2024 18:40:12 +0000 Subject: [PATCH] remove object store registry (#1114) ... as users can plug in their own. --- ballista-cli/Cargo.toml | 2 +- ballista/client/Cargo.toml | 2 - ballista/core/Cargo.toml | 5 +- ballista/core/src/lib.rs | 1 - .../core/src/object_store_registry/mod.rs | 136 ------------------ ballista/core/src/utils.rs | 11 +- ballista/executor/src/standalone.rs | 5 +- 7 files changed, 5 insertions(+), 157 deletions(-) delete mode 100644 ballista/core/src/object_store_registry/mod.rs diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml index ec1cb5c5e..f8fc3694d 100644 --- a/ballista-cli/Cargo.toml +++ b/ballista-cli/Cargo.toml @@ -41,4 +41,4 @@ rustyline = "14.0.0" tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } [features] -s3 = ["ballista/s3"] + diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml index 7a63dcefc..219974d0d 100644 --- a/ballista/client/Cargo.toml +++ b/ballista/client/Cargo.toml @@ -50,8 +50,6 @@ object_store = { workspace = true, features = ["aws"] } testcontainers-modules = { version = "0.11", features = ["minio"] } [features] -azure = ["ballista-core/azure"] default = [] -s3 = ["ballista-core/s3"] standalone = ["ballista-executor", "ballista-scheduler"] testcontainers = [] diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml index 8e5cb608e..80a3d1028 100644 --- a/ballista/core/Cargo.toml +++ b/ballista/core/Cargo.toml @@ -34,12 +34,10 @@ exclude = ["*.proto"] rustc-args = ["--cfg", "docsrs"] [features] -azure = ["object_store/azure"] docsrs = [] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion/force_hash_collisions"] -gcs = ["object_store/gcp"] -s3 = ["object_store/aws"] + [dependencies] arrow-flight = { workspace = true } @@ -54,7 +52,6 @@ futures = { workspace = true } itertools = "0.13" log = { workspace = true } md-5 = { version = "^0.10.0" } -object_store = { workspace = true } parse_arg = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs index 8ae5dfb59..4341f443a 100644 --- a/ballista/core/src/lib.rs +++ b/ballista/core/src/lib.rs @@ -32,7 +32,6 @@ pub mod consistent_hash; pub mod error; pub mod event_loop; pub mod execution_plans; -pub mod object_store_registry; pub mod utils; #[macro_use] diff --git a/ballista/core/src/object_store_registry/mod.rs b/ballista/core/src/object_store_registry/mod.rs deleted file mode 100644 index e7fbee216..000000000 --- a/ballista/core/src/object_store_registry/mod.rs +++ /dev/null @@ -1,136 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use datafusion::common::DataFusionError; -use datafusion::datasource::object_store::{ - DefaultObjectStoreRegistry, ObjectStoreRegistry, -}; -use datafusion::execution::runtime_env::RuntimeConfig; -#[cfg(feature = "s3")] -use object_store::aws::AmazonS3Builder; -#[cfg(feature = "azure")] -use object_store::azure::MicrosoftAzureBuilder; -#[cfg(feature = "gcs")] -use object_store::gcp::GoogleCloudStorageBuilder; -use object_store::ObjectStore; -use std::sync::Arc; -use url::Url; - -/// Get a RuntimeConfig with specific ObjectStoreRegistry -// TODO: #[deprecated] this method -pub fn with_object_store_registry(config: RuntimeConfig) -> RuntimeConfig { - let registry = Arc::new(BallistaObjectStoreRegistry::default()); - config.with_object_store_registry(registry) -} - -/// An object store detector based on which features are enable for different kinds of object stores -#[derive(Debug, Default)] -pub struct BallistaObjectStoreRegistry { - inner: DefaultObjectStoreRegistry, -} - -impl BallistaObjectStoreRegistry { - pub fn new() -> Self { - Default::default() - } - - /// Find a suitable object store based on its url and enabled features if possible - fn get_feature_store( - &self, - url: &Url, - ) -> datafusion::error::Result> { - #[cfg(feature = "s3")] - { - if url.as_str().starts_with("s3://") { - if let Some(bucket_name) = url.host_str() { - let store = Arc::new( - AmazonS3Builder::from_env() - .with_bucket_name(bucket_name) - .build()?, - ); - return Ok(store); - } - // Support Alibaba Cloud OSS - // Use S3 compatibility mode to access Alibaba Cloud OSS - // The `AWS_ENDPOINT` should have bucket name included - } else if url.as_str().starts_with("oss://") { - if let Some(bucket_name) = url.host_str() { - let store = Arc::new( - AmazonS3Builder::from_env() - .with_virtual_hosted_style_request(true) - .with_bucket_name(bucket_name) - .build()?, - ); - return Ok(store); - } - } - } - - #[cfg(feature = "azure")] - { - if url.to_string().starts_with("azure://") { - if let Some(bucket_name) = url.host_str() { - let store = Arc::new( - MicrosoftAzureBuilder::from_env() - .with_container_name(bucket_name) - .build()?, - ); - return Ok(store); - } - } - } - - #[cfg(feature = "gcs")] - { - if url.to_string().starts_with("gs://") - || url.to_string().starts_with("gcs://") - { - if let Some(bucket_name) = url.host_str() { - let store = Arc::new( - GoogleCloudStorageBuilder::from_env() - .with_bucket_name(bucket_name) - .build()?, - ); - return Ok(store); - } - } - } - - Err(DataFusionError::Execution(format!( - "No object store available for: {url}" - ))) - } -} - -impl ObjectStoreRegistry for BallistaObjectStoreRegistry { - fn register_store( - &self, - url: &Url, - store: Arc, - ) -> Option> { - self.inner.register_store(url, store) - } - - fn get_store(&self, url: &Url) -> datafusion::error::Result> { - self.inner.get_store(url).or_else(|_| { - let store = self.get_feature_store(url)?; - self.inner.register_store(url, store.clone()); - - Ok(store) - }) - } -} diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index c3f040b96..58057733e 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -23,7 +23,6 @@ use crate::error::{BallistaError, Result}; use crate::execution_plans::{ DistributedQueryExec, ShuffleWriterExec, UnresolvedShuffleExec, }; -use crate::object_store_registry::with_object_store_registry; use crate::serde::protobuf::KeyValuePair; use crate::serde::scheduler::PartitionStats; use crate::serde::{BallistaLogicalExtensionCodec, BallistaPhysicalExtensionCodec}; @@ -73,10 +72,7 @@ pub fn default_session_builder(config: SessionConfig) -> SessionState { SessionStateBuilder::new() .with_default_features() .with_config(config) - .with_runtime_env(Arc::new( - RuntimeEnv::new(with_object_store_registry(RuntimeConfig::default())) - .unwrap(), - )) + .with_runtime_env(Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap())) .build() } @@ -273,10 +269,7 @@ pub fn create_df_ctx_with_ballista_query_planner( let session_state = SessionStateBuilder::new() .with_default_features() .with_config(session_config) - .with_runtime_env(Arc::new( - RuntimeEnv::new(with_object_store_registry(RuntimeConfig::default())) - .unwrap(), - )) + .with_runtime_env(Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap())) .with_query_planner(planner) .with_session_id(session_id) .build(); diff --git a/ballista/executor/src/standalone.rs b/ballista/executor/src/standalone.rs index 8ce2390b4..ac67a5a2b 100644 --- a/ballista/executor/src/standalone.rs +++ b/ballista/executor/src/standalone.rs @@ -23,7 +23,6 @@ use ballista_core::serde::scheduler::BallistaFunctionRegistry; use ballista_core::utils::{default_config_producer, SessionConfigExt}; use ballista_core::{ error::Result, - object_store_registry::with_object_store_registry, serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient, ExecutorRegistration}, serde::scheduler::ExecutorSpecification, serde::BallistaCodec, @@ -187,9 +186,7 @@ pub async fn new_standalone_executor< let config_producer = Arc::new(default_config_producer); let wd = work_dir.clone(); let runtime_producer: RuntimeProducer = Arc::new(move |_: &SessionConfig| { - let config = with_object_store_registry( - RuntimeConfig::new().with_temp_file_path(wd.clone()), - ); + let config = RuntimeConfig::new().with_temp_file_path(wd.clone()); Ok(Arc::new(RuntimeEnv::new(config)?)) });