Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
435 changes: 209 additions & 226 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "object-store"
version = "1.0.0-alpha.2"
version = "1.0.0-alpha.4"
authors = ["Stephen Cirner <scirner@figure.com>"]
edition = "2024"

Expand Down
5 changes: 3 additions & 2 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.build_server(true)
.compile_protos(
&[
"proto/public_key.proto",
"proto/object.proto",
"proto/admin.proto",
"proto/dime.proto",
"proto/mailbox.proto",
"proto/object.proto",
"proto/public_key.proto",
],
&["proto"],
)?;
Expand Down
21 changes: 21 additions & 0 deletions docs/FEATURES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Features
## Maintenance Mode
Introduced: `v1.0.0`
Puts `object-store` in read-only mode. Objects can be retrieved and replication, if enabled, will run.
Useful if 1) a migration to another instance is needed or 2) you need a read-only replica

### Usage
Enable maintenance mode
```bash
grpcurl -proto ./proto/admin.proto -plaintext -d '{"maintenance_state": true}' localhost:5000 objectstore.AdminService/SetConfig
```

Disable maintenance mode
```bash
grpcurl -proto ./proto/admin.proto -plaintext -d '{"maintenance_state": false}' localhost:5000 objectstore.AdminService/SetConfig
```

Get current config
```bash
grpcurl -proto ./proto/admin.proto -plaintext localhost:5000 objectstore.AdminService/GetConfig
```
21 changes: 21 additions & 0 deletions proto/admin.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
syntax = "proto3";

package objectstore;

option java_package = "io.provenance.objectstore.proto";
option java_outer_classname = "Admin";

service AdminService {
rpc SetConfig(SetConfigRequest) returns (ConfigResponse);
rpc GetConfig(GetConfigRequest) returns (ConfigResponse);
}

message SetConfigRequest {
optional bool maintenance_state = 1;
Comment thread
rpatel-figure marked this conversation as resolved.
}

message GetConfigRequest {}

message ConfigResponse {
bool maintenance_state = 1;
}
48 changes: 48 additions & 0 deletions src/admin/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::sync::Arc;

use tonic::{Request, Response};

use crate::{
config::Config,
pb::{ConfigResponse, GetConfigRequest, SetConfigRequest, admin_service_server::AdminService},
types::GrpcResult,
};

#[derive(Debug)]
pub struct AdminGrpc {
config: Arc<Config>,
}

impl AdminGrpc {
pub fn new(config: Arc<Config>) -> Self {
Self { config }
}
}

#[tonic::async_trait]
impl AdminService for AdminGrpc {
async fn set_config(
&self,
request: Request<SetConfigRequest>,
) -> GrpcResult<Response<ConfigResponse>> {
let request = request.into_inner();

if let Some(maintenance_state) = request.maintenance_state {
log::info!("Setting maintenance_state to {}", maintenance_state);
self.config.set_maintenance_state(maintenance_state);
}

Ok(Response::new(ConfigResponse {
maintenance_state: self.config.is_maintenance_state(),
}))
}

async fn get_config(
&self,
_request: Request<GetConfigRequest>,
) -> GrpcResult<Response<ConfigResponse>> {
Ok(Response::new(ConfigResponse {
maintenance_state: self.config.is_maintenance_state(),
}))
}
}
16 changes: 16 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::env;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;

use chrono::TimeDelta;
Expand Down Expand Up @@ -70,6 +71,8 @@ pub struct Config {
pub trace_header: String,
pub user_auth_enabled: bool,
pub health_service_enabled: bool,
/// Runtime maintenance mode state. When true, write operations are rejected.
pub maintenance_state: AtomicBool,
}

const BASE_SPAN_TAGS: [(&str, &str); 3] = [
Expand Down Expand Up @@ -189,6 +192,10 @@ impl Config {
.unwrap_or("true".to_owned())
.parse()
.expect("HEALTH_SERVICE_ENABLED could not be parsed into a bool");
let maintenance_state: bool = env::var("MAINTENANCE_STATE")
.unwrap_or("false".to_owned())
.parse()
.expect("MAINTENANCE_STATE could not be parsed into a bool");

let replication_config = ReplicationConfig {
replication_enabled,
Expand Down Expand Up @@ -224,6 +231,7 @@ impl Config {
trace_header,
user_auth_enabled,
health_service_enabled,
maintenance_state: AtomicBool::new(maintenance_state),
})
}

Expand All @@ -236,4 +244,12 @@ impl Config {
self.db_user, password, self.db_host, self.db_port, self.db_database,
)
}

pub fn is_maintenance_state(&self) -> bool {
self.maintenance_state.load(Ordering::Relaxed)
}

pub fn set_maintenance_state(&self, enabled: bool) {
self.maintenance_state.store(enabled, Ordering::Relaxed);
}
}
7 changes: 6 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use sqlx::PgPool;
use tonic_health::pb::health_server::{Health, HealthServer};

use crate::{
admin::AdminGrpc,
cache::Cache,
config::Config,
db::connect_and_migrate,
Expand All @@ -16,6 +17,7 @@ use crate::{
types::OsError,
};

pub mod admin;
pub mod authorization;
pub mod cache;
pub mod config;
Expand Down Expand Up @@ -44,6 +46,7 @@ pub struct AppContext {
pub cache: Arc<Mutex<Cache>>,
pub db_pool: Arc<PgPool>,
pub storage: Arc<Box<dyn Storage>>,
pub admin_service: AdminGrpc,
pub public_key_service: PublicKeyGrpc,
pub mailbox_service: MailboxGrpc,
pub object_service: ObjectGrpc,
Expand All @@ -59,7 +62,8 @@ impl AppContext {
let cache = Cache::new(db_pool.clone()).await?;
let storage = new_storage(&config).await?;

let public_key_service = PublicKeyGrpc::new(cache.clone(), db_pool.clone());
let admin_service = AdminGrpc::new(config.clone());
let public_key_service = PublicKeyGrpc::new(cache.clone(), config.clone(), db_pool.clone());
let mailbox_service = MailboxGrpc::new(cache.clone(), config.clone(), db_pool.clone());
let object_service = ObjectGrpc::new(
cache.clone(),
Expand All @@ -84,6 +88,7 @@ impl AppContext {
cache,
db_pool,
storage,
admin_service,
public_key_service,
mailbox_service,
object_service,
Expand Down
4 changes: 4 additions & 0 deletions src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ impl MailboxService for MailboxGrpc {

#[trace(name = "mailbox::ack")]
async fn ack(&self, request: Request<AckRequest>) -> GrpcResult<Response<()>> {
if self.config.is_maintenance_state() {
return Err(Status::unavailable("Service is in maintenance mode"));
}

let metadata = request.metadata().clone();
let request = request.into_inner();

Expand Down
4 changes: 4 additions & 0 deletions src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl ObjectService for ObjectGrpc {
&self,
request: Request<Streaming<ChunkBidi>>,
) -> GrpcResult<Response<ObjectResponse>> {
if self.config.is_maintenance_state() {
return Err(Status::unavailable("Service is in maintenance mode"));
}

let metadata = request.metadata().clone();
let mut stream = request.into_inner();

Expand Down
20 changes: 15 additions & 5 deletions src/public_key.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::cache::Cache;
use crate::config::Config;
use crate::datastore;
use crate::domain::PublicKeyApiResponse;
use crate::pb::public_key_request::Impl::HeaderAuth as HeaderAuthEnumRequest;
Expand All @@ -16,16 +17,21 @@ use url::Url;

#[derive(Debug)]
pub struct PublicKeyGrpc {
// This cache is using a std::sync::Mutex because the tokio docs mention that this is often
// preferrable to the tokio Mutex when you are strictly locking data. In cases where you
// are locking over a database connection, or io resource, the tokio Mutex is required.
/// This cache is using a std::sync::Mutex because the tokio docs mention that this is often
/// preferrable to the tokio Mutex when you are strictly locking data. In cases where you
/// are locking over a database connection, or io resource, the tokio Mutex is required.
cache: Arc<Mutex<Cache>>,
config: Arc<Config>,
db_pool: Arc<PgPool>,
}

impl PublicKeyGrpc {
pub fn new(cache: Arc<Mutex<Cache>>, db_pool: Arc<PgPool>) -> Self {
Self { cache, db_pool }
pub fn new(cache: Arc<Mutex<Cache>>, config: Arc<Config>, db_pool: Arc<PgPool>) -> Self {
Self {
cache,
config,
db_pool,
}
}
}

Expand All @@ -35,6 +41,10 @@ impl PublicKeyService for PublicKeyGrpc {
&self,
request: Request<PublicKeyRequest>,
) -> GrpcResult<Response<PublicKeyResponse>> {
if self.config.is_maintenance_state() {
return Err(Status::unavailable("Service is in maintenance mode"));
}

let request = request.into_inner();

// validate public_key
Expand Down
4 changes: 3 additions & 1 deletion src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use crate::{
AppContext,
middleware::{LoggingMiddlewareLayer, MinitraceGrpcMiddlewareLayer},
pb::{
mailbox_service_server::MailboxServiceServer, object_service_server::ObjectServiceServer,
admin_service_server::AdminServiceServer, mailbox_service_server::MailboxServiceServer,
object_service_server::ObjectServiceServer,
public_key_service_server::PublicKeyServiceServer,
},
server::trace::start_trace_reporter,
Expand Down Expand Up @@ -39,6 +40,7 @@ pub async fn configure_and_start_server(mut context: AppContext) -> Result<(), E
.into_inner(),
)
.add_optional_service(health_service)
.add_service(AdminServiceServer::new(context.admin_service))
.add_service(PublicKeyServiceServer::new(context.public_key_service))
.add_service(MailboxServiceServer::new(context.mailbox_service))
.add_service(ObjectServiceServer::new(context.object_service))
Expand Down
Loading