Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 103 additions & 5 deletions syncstorage-postgres/src/db/db_impl.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(unused_variables)]
// XXX:
use async_trait::async_trait;
use chrono::NaiveDateTime;
use chrono::{NaiveDateTime, TimeDelta};
use diesel::{
delete,
dsl::{count, max, sql},
Expand All @@ -11,15 +11,17 @@ use diesel::{
};
use diesel_async::{AsyncConnection, RunQueryDsl, TransactionManager};
use futures::TryStreamExt;
use std::collections::HashMap;
use syncstorage_db_common::{
error::DbErrorIntrospect, params, results, util::SyncTimestamp, Db, Sorting,
error::DbErrorIntrospect, params, results, util::SyncTimestamp, Db, Sorting, DEFAULT_BSO_TTL,
};
use syncstorage_settings::Quota;

use super::PgDb;
use crate::{
bsos_query,
db::CollectionLock,
orm_models::BsoChangeset,
pool::Conn,
schema::{bsos, user_collections},
DbError, DbResult,
Expand Down Expand Up @@ -356,7 +358,28 @@ impl Db for PgDb {
}

async fn post_bsos(&mut self, params: params::PostBsos) -> DbResult<results::PostBsos> {
todo!()
let collection_id = self.get_or_create_collection_id(&params.collection).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have the collection id here, let's pass it to put_bso instead of repeatedly call get_or_create_collection there.

let modified = self.timestamp();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why couldn't we have a quota check here and error out early?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think since it uses put_bso checks the quotas immediately as it iterates though the param that's passed to it, it removes the step of having to check?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put_bso checks the quotas immediately as it iterates

Precisely. That's the issue. It repeatedly queries the db to check quotas. Why not do it here, once?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be in favor of this, but we should look at filing another issue for this one, since it requires modifying the shared params struct for PutBso that MySQL and Postgres use, so making a change would alter both and require a little extra thought, and might warrant changing the params object for both since . I know we've got some things down to alter across the two implementations and it'd be better to address that in one separate issue imo.

I was also thinking, can't put_bso just be called on its own outside of the context of being called by post_bsos? In which case, we'd not be passed it. Or do you mean something else?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we've got some things down to alter across the two implementations

Are those in an epic? I want to take a quick look to just get some context.

since it requires modifying the shared params struct for PutBso
can't put_bso just be called on its own outside of the context of being called by post_bsos?

Temporarily set pgdb.quota.enabled to false for put_bso during the execution of post_bsos solves both. We don't need to alter put_bso.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taddes and I chatted about this. The complicating factor for calculating the new usage is the updates. So we'll table this for now.

for pbso in params.bsos {
self.put_bso(params::PutBso {
user_id: params.user_id.clone(),
collection: params.collection.clone(),
id: pbso.id.clone(),
payload: pbso.payload,
sortindex: pbso.sortindex,
ttl: pbso.ttl,
})
.await?;
}
self.update_collection(params::UpdateCollection {
user_id: params.user_id,
collection_id,
collection: params.collection,
})
.await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already called in put_bso.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need the update_collection call here since put_bso already call it.


Ok(modified)
}

async fn delete_bso(&mut self, params: params::DeleteBso) -> DbResult<results::DeleteBso> {
Expand Down Expand Up @@ -416,8 +439,83 @@ impl Db for PgDb {
Ok(modified)
}

async fn put_bso(&mut self, params: params::PutBso) -> DbResult<results::PutBso> {
todo!()
async fn put_bso(&mut self, bso: params::PutBso) -> DbResult<results::PutBso> {
let collection_id = self.get_or_create_collection_id(&bso.collection).await?;
let user_id: u64 = bso.user_id.legacy_id;
if self.quota.enabled {
let usage = self
.get_quota_usage(params::GetQuotaUsage {
user_id: bso.user_id.clone(),
collection: bso.collection.clone(),
collection_id,
})
.await?;
if usage.total_bytes >= self.quota.size {
let mut tags = HashMap::default();
tags.insert("collection".to_owned(), bso.collection.clone());
self.metrics.incr_with_tags("storage.quota.at_limit", tags);
if self.quota.enforced {
return Err(DbError::quota());
} else {
warn!("Quota at limit for user's collection ({} bytes)", usage.total_bytes; "collection"=>bso.collection.clone());
}
}
}

let payload = bso.payload.as_deref().unwrap_or_default();
let sortindex = bso.sortindex;
let ttl = bso.ttl.map_or(DEFAULT_BSO_TTL, |ttl| ttl);

let modified = self.timestamp().as_naive_datetime()?;
// Expiry originally required millisecond conversion
let expiry = modified + TimeDelta::seconds(ttl as i64);

// The changeset utilizes Diesel's `AsChangeset` trait.
// This allows selective updates of fields if and only if they are `Some(<T>)`
let changeset = BsoChangeset {
sortindex: if bso.sortindex.is_some() {
sortindex // sortindex is already an Option of type `Option<i32>`
} else {
None
},
payload: if bso.payload.is_some() {
Some(payload)
} else {
None
},
Comment on lines +481 to +485
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
payload: if bso.payload.is_some() {
Some(payload)
} else {
None
},
payload: bso.payload.as_deref(),

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Little confused by this one. A few lines up there's
let payload = bso.payload.as_deref().unwrap_or_default(); which I wrap in Some and return. If we don't call unwrap_or_default, we get a compiler error since it expects the &str. Speaking of: I see in the SQL version we use a string slice, opposed to a String. Might we just want to use a String here too? Curious why it's a &str. I see in the Spanner impl it's using a String type for the option. Any reason not to change to String?

expiry: if bso.ttl.is_some() {
Some(expiry)
} else {
None
},
modified: if bso.payload.is_some() || bso.sortindex.is_some() {
Some(modified)
} else {
None
},
};
diesel::insert_into(bsos::table)
.values((
bsos::user_id.eq(user_id as i64),
bsos::collection_id.eq(&collection_id),
bsos::bso_id.eq(&bso.id),
bsos::sortindex.eq(sortindex),
bsos::payload.eq(payload),
bsos::modified.eq(modified),
bsos::expiry.eq(expiry),
))
.on_conflict((bsos::user_id, bsos::collection_id, bsos::bso_id))
.do_update()
.set(changeset)
.execute(&mut self.conn)
.await?;

self.update_collection(params::UpdateCollection {
user_id: bso.user_id,
collection_id,
collection: bso.collection,
})
.await
}

async fn get_collection_id(&mut self, name: &str) -> DbResult<results::GetCollectionId> {
Expand Down
1 change: 1 addition & 0 deletions syncstorage-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#[macro_use]
extern crate diesel;
extern crate diesel_migrations;
#[macro_use]
extern crate slog_scope;

mod db;
Expand Down
11 changes: 10 additions & 1 deletion syncstorage-postgres/src/orm_models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use chrono::NaiveDateTime;
use uuid::Uuid;

use crate::schema::{batch_bsos, batches, bsos, collections, user_collections};
use diesel::{Identifiable, Queryable};
use diesel::{AsChangeset, Identifiable, Queryable};

#[allow(clippy::all)]
#[derive(Queryable, Debug, Identifiable)]
Expand Down Expand Up @@ -39,6 +39,15 @@ pub struct Bso {
pub expiry: NaiveDateTime,
}

#[derive(AsChangeset)]
#[diesel(table_name = bsos)]
pub struct BsoChangeset<'a> {
pub sortindex: Option<i32>,
pub payload: Option<&'a str>,
pub modified: Option<NaiveDateTime>,
pub expiry: Option<NaiveDateTime>,
}

#[derive(Queryable, Debug, Identifiable)]
#[diesel(primary_key(collection_id))]
pub struct Collection {
Expand Down