Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ test-asan:
cargo +nightly test -Z build-std --target x86_64-unknown-linux-gnu --features $(RUST_FEATURES) $$packages -- \
--skip reftest_ \
--skip proptest_ \
--skip fork_test \
--skip sequential_read_large

.PHONY: fmt
Expand Down
2 changes: 1 addition & 1 deletion s3-file-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ supports-color = "1.3.0"
thiserror = "1.0.34"
tracing = { version = "0.1.35", default-features = false, features = ["std", "log"] }
tracing-subscriber = { version = "0.3.14", features = ["fmt", "env-filter"] }
nix = { version = "0.26.1", default-features = false, features = ["user"] }
nix = "0.26.2"
time = "0.3.17"
const_format = "0.2.30"

Expand Down
197 changes: 156 additions & 41 deletions s3-file-connector/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use std::fs::File;
use std::io::{Read, Write};
use std::os::unix::prelude::FromRawFd;
use std::path::PathBuf;
use std::thread;
use std::time::Duration;

use anyhow::{anyhow, Context as _};
use aws_crt_s3::common::rust_log_adapter::RustLogAdapter;
use clap::{ArgGroup, Parser};
use fuser::{BackgroundSession, MountOption, Session};
use nix::sys::signal::Signal;
use nix::unistd::ForkResult;
use s3_client::{AddressingStyle, Endpoint, HeadBucketError, ObjectClientError, S3ClientConfig, S3CrtClient};
use s3_file_connector::fs::S3FilesystemConfig;
use s3_file_connector::fuse::S3FuseFilesystem;
Expand Down Expand Up @@ -90,6 +97,9 @@ struct CliArgs {

#[clap(long, help = "File permissions [default: 0644]", value_parser = parse_perm_bits)]
pub file_mode: Option<u16>,

#[clap(short, long, help = "Run as foreground process")]
pub foreground: bool,
}

impl CliArgs {
Expand All @@ -105,8 +115,6 @@ impl CliArgs {
}

fn main() -> anyhow::Result<()> {
init_tracing_subscriber();

let args = CliArgs::parse();

// validate mount point
Expand All @@ -117,36 +125,123 @@ fn main() -> anyhow::Result<()> {
));
}

let mut options = vec![
MountOption::RO,
MountOption::DefaultPermissions,
MountOption::FSName("fuse_sync".to_string()),
MountOption::NoAtime,
];
if args.auto_unmount {
options.push(MountOption::AutoUnmount);
}
if args.allow_root {
options.push(MountOption::AllowRoot);
}
if args.allow_other {
options.push(MountOption::AllowOther);
}
if args.foreground {
// mount file system as a foreground process
init_tracing_subscriber();
let session = mount(args)?;

let mut filesystem_config = S3FilesystemConfig::default();
if let Some(uid) = args.uid {
filesystem_config.uid = uid;
}
if let Some(gid) = args.gid {
filesystem_config.gid = gid;
}
if let Some(dir_mode) = args.dir_mode {
filesystem_config.dir_mode = dir_mode;
}
if let Some(file_mode) = args.file_mode {
filesystem_config.file_mode = file_mode;
let (sender, receiver) = std::sync::mpsc::sync_channel(0);

ctrlc::set_handler(move || {
let _ = sender.send(());
})
.context("Failed to install signal handler")
.unwrap();

let _ = receiver.recv();

drop(session);
} else {
// mount file system as a background process

// create a pipe for interprocess communication.
// child process will report its status via this pipe.
let (read_fd, write_fd) = nix::unistd::pipe().context("Failed to create a pipe")?;

// SAFETY: Child process has full ownership of its resources.
// There is no shared data between parent and child processes.
let pid = unsafe { nix::unistd::fork() };
match pid.expect("Failed to fork mount process") {
ForkResult::Child => {
init_tracing_subscriber();

let child_args = CliArgs::parse();
let session = mount(child_args);

// close unused file descriptor, we only write from this end.
nix::unistd::close(read_fd).context("Failed to close unused file descriptor")?;

// SAFETY: `write_fd` is a valid file descriptor.
let mut pipe_file = unsafe { File::from_raw_fd(write_fd) };

let status_success = [b'0'];
let status_failure = [b'1'];

match session {
Ok(_session) => {
pipe_file
.write(&status_success)
.context("Failed to write data to the pipe")?;
drop(pipe_file);

// the session stays running because its lifetime is bound to the match statement.
// it won't be dropped until after the park.
// also `park()` does not guarantee to remain parked forever. so, we put it inside a loop.
loop {
thread::park();
}
Comment on lines +179 to +182
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need some kind of stopping condition.

When we receive SIGINT, we should exit. Maybe another thread waits for SIGINT, and sets an atomic bool?

Does this sound right, @jamesbornholt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the standard library already handle that. I have tested this by running kill -2 {pid} which send SIGINT to the running child process and it works just fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to figure something different out here, otherwise the process runs forever even if the user unmounts it. But we can do that as a followup, I think, because we'll probably have to make fuser changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that is existing problem, it keep running forever even for foreground process. It's just less visible when run in the background. Let's do it as a follow up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's open an issue and then we can resolve this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #93.

}
Err(e) => {
pipe_file
.write(&status_failure)
.context("Failed to write data to the pipe")?;
return Err(anyhow!(e));
}
}
}
ForkResult::Parent { child } => {
init_tracing_subscriber();

// close unused file descriptor, we only read from this end.
nix::unistd::close(write_fd).context("Failed to close unused file descriptor")?;

// SAFETY: `read_fd` is a valid file descriptor.
let mut pipe_file = unsafe { File::from_raw_fd(read_fd) };

let (sender, receiver) = std::sync::mpsc::channel();

// create a thread that read from the pipe so that we can enforce a time out.
std::thread::spawn(move || {
let mut buf = [0];
match pipe_file
.read_exact(&mut buf)
.context("Failed to read data from the pipe")
{
Ok(_) => {
let status = buf[0] as char;
sender.send(status).unwrap();
}
Err(_) => sender.send('1').unwrap(),
}
});

let timeout = Duration::from_secs(30);
let status = receiver.recv_timeout(timeout);
match status {
Ok('0') => tracing::debug!("success status flag received from child process"),
Ok(_) => {
nix::sys::wait::waitpid(child, None).context("Failed to wait for child process to exit")?;
return Err(anyhow!("Failed to create mount process"));
}
Err(_timeout_err) => {
// kill child process before returning error.
if let Err(e) = nix::sys::signal::kill(child, Signal::SIGTERM) {
tracing::error!("Unable to kill hanging child process with SIGTERM: {:?}", e);
}
return Err(anyhow!(
"Timeout after {} seconds while waiting for mount process to be ready",
timeout.as_secs()
));
}
};
}
}
}

Ok(())
}

fn mount(args: CliArgs) -> anyhow::Result<BackgroundSession> {
let throughput_target_gbps = args.throughput_target_gbps.map(|t| t as f64);

let addressing_style = args.addressing_style();
Expand Down Expand Up @@ -174,6 +269,20 @@ fn main() -> anyhow::Result<()> {
.context("Failed to create S3 client")?;
let runtime = client.event_loop_group();

let mut filesystem_config = S3FilesystemConfig::default();
if let Some(uid) = args.uid {
filesystem_config.uid = uid;
}
if let Some(gid) = args.gid {
filesystem_config.gid = gid;
}
if let Some(dir_mode) = args.dir_mode {
filesystem_config.dir_mode = dir_mode;
}
if let Some(file_mode) = args.file_mode {
filesystem_config.file_mode = file_mode;
}

let fs = S3FuseFilesystem::new(
client,
runtime,
Expand All @@ -182,6 +291,23 @@ fn main() -> anyhow::Result<()> {
filesystem_config,
);

let fs_name = String::from("s3-file-connector");
let mut options = vec![
MountOption::RO,
MountOption::DefaultPermissions,
MountOption::FSName(fs_name),
MountOption::NoAtime,
];
if args.auto_unmount {
options.push(MountOption::AutoUnmount);
}
if args.allow_root {
options.push(MountOption::AllowRoot);
}
if args.allow_other {
options.push(MountOption::AllowOther);
}

let session = Session::new(fs, &args.mount_point, &options).context("Failed to create FUSE session")?;

let session = if let Some(thread_count) = args.thread_count {
Expand All @@ -193,18 +319,7 @@ fn main() -> anyhow::Result<()> {

tracing::info!("successfully mounted {:?}", args.mount_point);

let (sender, receiver) = std::sync::mpsc::sync_channel(0);

ctrlc::set_handler(move || {
let _ = sender.send(());
})
.context("Failed to install signal handler")?;

let _ = receiver.recv();

drop(session);

Ok(())
Ok(session)
}

/// Create a client for a bucket in the given region and send a HeadBucket request to validate it's
Expand Down
48 changes: 48 additions & 0 deletions s3-file-connector/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use aws_crt_s3::common::rust_log_adapter::RustLogAdapter;
use aws_sdk_s3::types::ByteStream;
use aws_sdk_s3::Region;
use fuser::{FileAttr, FileType};
use futures::executor::ThreadPool;
use rand::rngs::OsRng;
use rand::RngCore;
use s3_client::mock_client::{MockClient, MockClientConfig};
use s3_file_connector::fs::{DirectoryReplier, ReadReplier};
use s3_file_connector::{S3Filesystem, S3FilesystemConfig};
Expand All @@ -27,6 +31,50 @@ pub fn make_test_filesystem(
(client, fs)
}

pub fn get_test_bucket_and_prefix(test_name: &str) -> (String, String) {
let bucket = std::env::var("S3_BUCKET_NAME").expect("Set S3_BUCKET_NAME to run integration tests");

// Generate a random nonce to make sure this prefix is truly unique
let nonce = OsRng.next_u64();

// Prefix always has a trailing "/" to keep meaning in sync with the S3 API.
let prefix = std::env::var("S3_BUCKET_TEST_PREFIX").expect("Set S3_BUCKET_TEST_PREFIX to run integration tests");
assert!(prefix.ends_with('/'), "S3_BUCKET_TEST_PREFIX should end in '/'");

let prefix = format!("{prefix}{test_name}/{nonce}/");

(bucket, prefix)
}

pub fn get_test_bucket_forbidden() -> String {
std::env::var("S3_FORBIDDEN_BUCKET_NAME").expect("Set S3_FORBIDDEN_BUCKET_NAME to run integration tests")
}

pub fn get_test_region() -> String {
std::env::var("S3_REGION").expect("Set S3_REGION to run integration tests")
}

pub fn create_objects(bucket: &str, prefix: &str, region: &str, key: &str, value: &[u8]) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.unwrap();

let config = runtime.block_on(aws_config::from_env().region(Region::new(region.to_string())).load());
let sdk_client = aws_sdk_s3::Client::new(&config);
// runtime.block_on(client.list_buckets());
let full_key = format!("{prefix}{key}");
let _ = runtime.block_on(
sdk_client
.put_object()
.bucket(bucket)
.key(full_key)
.body(ByteStream::from(value.to_vec()))
.send(),
);
}

#[track_caller]
pub fn assert_attr(attr: FileAttr, ftype: FileType, size: u64, uid: u32, gid: u32, perm: u16) {
assert_eq!(attr.kind, ftype);
Expand Down
Loading