|
| 1 | +use async_trait::async_trait; |
| 2 | +use chrono::DateTime; |
| 3 | +use chrono::Utc; |
| 4 | +use redis::Client; |
| 5 | +use redis::Value; |
| 6 | + |
| 7 | +use crate::core::LockConfig; |
| 8 | +use crate::core::Lockable; |
| 9 | +use crate::error::LockResult; |
| 10 | + |
| 11 | +const KEY_PREFIX: &str = "dist_lock"; |
| 12 | + |
| 13 | +#[derive(Debug)] |
| 14 | +pub struct RedisLock<'a, T> { |
| 15 | + pub key: String, |
| 16 | + pub config: LockConfig, |
| 17 | + pub driver: &'a T, |
| 18 | + pub create_at: DateTime<Utc>, |
| 19 | + pub locked_at: Option<DateTime<Utc>>, |
| 20 | +} |
| 21 | + |
| 22 | +impl<'a, T> RedisLock<'a, T> { |
| 23 | + pub fn new(config: LockConfig, driver: &'a T) -> Self { |
| 24 | + RedisLock { |
| 25 | + key: format!("{}:{}", KEY_PREFIX, &config.name), |
| 26 | + config, |
| 27 | + driver, |
| 28 | + create_at: Utc::now(), |
| 29 | + locked_at: None, |
| 30 | + } |
| 31 | + } |
| 32 | +} |
| 33 | + |
| 34 | +#[async_trait] |
| 35 | +impl<'a> Lockable<Client> for RedisLock<'a, Client> { |
| 36 | + async fn acquire(&mut self) -> LockResult<bool> { |
| 37 | + let mut conn = self.driver.get_async_connection().await?; |
| 38 | + let value: Value = redis::cmd("SET") |
| 39 | + .arg(&self.key) |
| 40 | + .arg(Utc::now().timestamp_millis()) |
| 41 | + .arg("NX") |
| 42 | + .arg("PX") |
| 43 | + .arg(self.config.max_lock.num_milliseconds() as usize) |
| 44 | + .query_async(&mut conn) |
| 45 | + .await?; |
| 46 | + |
| 47 | + match value { |
| 48 | + Value::Okay => { |
| 49 | + self.locked_at = Some(Utc::now()); |
| 50 | + Ok(true) |
| 51 | + } |
| 52 | + _ => Ok(false), |
| 53 | + } |
| 54 | + } |
| 55 | + |
| 56 | + async fn release(&self) -> LockResult<bool> { |
| 57 | + let now = Utc::now(); |
| 58 | + let elapsed = now - self.locked_at.unwrap_or_default(); |
| 59 | + let remaining = self.config.min_lock - elapsed; |
| 60 | + let mut conn = self.driver.get_async_connection().await?; |
| 61 | + let value: Value = if remaining.num_milliseconds() > 0 { |
| 62 | + redis::cmd("SET") |
| 63 | + .arg(&self.key) |
| 64 | + .arg(Utc::now().timestamp_millis()) |
| 65 | + .arg("XX") |
| 66 | + .arg("PX") |
| 67 | + .arg((self.config.min_lock - remaining).num_milliseconds()) |
| 68 | + .query_async(&mut conn) |
| 69 | + .await? |
| 70 | + } else { |
| 71 | + redis::cmd("DEL").arg(&self.key).query_async(&mut conn).await? |
| 72 | + }; |
| 73 | + |
| 74 | + match value { |
| 75 | + Value::Okay => Ok(true), |
| 76 | + _ => Ok(false), |
| 77 | + } |
| 78 | + } |
| 79 | + |
| 80 | + async fn extend(&mut self) -> LockResult<bool> { |
| 81 | + todo!() |
| 82 | + } |
| 83 | +} |
0 commit comments