Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce KeySpace interface for API v2. #353

Open
wants to merge 46 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
1f314ee
fix type checking for new interface
iosmanthus Jun 21, 2022
aa6e4b5
cargo fmt
iosmanthus Jun 21, 2022
222908e
git fire: changing sig for PdRpcClient
iosmanthus Jun 22, 2022
605aed8
fix TxnApiV1 test
iosmanthus Jun 22, 2022
9164cf0
fix all tests for API v1
iosmanthus Jun 23, 2022
7668c37
fix integration tests for API v1
iosmanthus Jun 23, 2022
dd1c7b9
remove specification feature
iosmanthus Jun 23, 2022
60585f6
resolve conflict from master
iosmanthus Jun 24, 2022
3a0df00
reduce type annotation
iosmanthus Jun 24, 2022
7c3de1a
recover impl Trait for resolve_locks*
iosmanthus Jun 24, 2022
02377b7
git fire!: wip api v2
iosmanthus Jun 28, 2022
59002d1
impl KeyspaceCodec, along with RawKeyspaceCodec and TxnKeyspaceCodec
iosmanthus Jul 1, 2022
a440794
fix fmt check
iosmanthus Jul 1, 2022
0a72d31
rename request_codec.rs to codec.rs
iosmanthus Jul 1, 2022
19c3123
git fire!: wip impl RequestCodec for requests
iosmanthus Jul 1, 2022
624c68e
impl KvRequest for raw requests
iosmanthus Jul 4, 2022
71af50d
cargo fmt
iosmanthus Jul 5, 2022
a5f326f
impl KvRequest for txn requests
iosmanthus Jul 5, 2022
5efbc12
make check
iosmanthus Jul 5, 2022
b534cfd
impl Default for KeyspaceId
iosmanthus Jul 5, 2022
434e10d
cargo fmt
iosmanthus Jul 6, 2022
317116b
decode in place
iosmanthus Jul 6, 2022
5c5809a
introduce impl_kv_request to simplify code
iosmanthus Jul 6, 2022
701964d
refactor impl_kv_request macro
iosmanthus Jul 6, 2022
db38d95
fix tests
iosmanthus Jul 6, 2022
3660ca1
move default impl into RequestCodecExt
iosmanthus Jul 7, 2022
7d68247
fix api version context
iosmanthus Jul 7, 2022
81cbddd
move request codec into raw/txn owned module
iosmanthus Jul 8, 2022
5bd0f6e
fix doc test
iosmanthus Jul 8, 2022
1c1ce17
fix wrong TxnCodec mark trait for raw::Keyspace
iosmanthus Jul 9, 2022
051a157
refactor ApiVx with Mode generic para
iosmanthus Jul 11, 2022
159de5c
Merge branch 'master' of github.com:tikv/client-rust into api-v2
iosmanthus Jul 11, 2022
bfd1f28
remove dynamic dispatch while encoding request in Dispatch
iosmanthus Jul 11, 2022
7fc61ea
make IsDefault private in crate
iosmanthus Jul 11, 2022
6baaa94
remove impl Deref for Key
iosmanthus Jul 11, 2022
07c7af7
fix potential index out of range while meets corrupted keyspace prefix
iosmanthus Jul 12, 2022
0ed902d
introduce codec type parameter for Snapshot and Transcation
iosmanthus Jul 12, 2022
182a457
bound Client to right codec
iosmanthus Jul 12, 2022
0e76e10
fix encode range issue with unbound end
iosmanthus Jul 14, 2022
0c3c9e3
Merge branch 'master' of github.com:tikv/client-rust into api-v2
iosmanthus Jul 14, 2022
f46cea8
introduce newer version of proto
iosmanthus Jul 21, 2022
130e1b0
git fire: impl load keyspace
iosmanthus Jul 21, 2022
3819c8a
introduce keyspace mgr from keyspace service
iosmanthus Jul 21, 2022
a2047eb
fix clippy
iosmanthus Jul 22, 2022
45af3c5
refine trait bound of raw::Client
iosmanthus Jul 22, 2022
9de6f50
remove phatom of txn::Client
iosmanthus Jul 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
git fire: changing sig for PdRpcClient
Signed-off-by: iosmanthus <[email protected]>
iosmanthus committed Jun 22, 2022
commit 222908e2b908c1e52aa560911c22cc8b915fae33
14 changes: 5 additions & 9 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
@@ -219,11 +219,9 @@ pub struct PdRpcClient<C, KvC: KvConnect + Send + Sync + 'static = TikvConnect,
pd: Arc<RetryClient<Cl>>,
kv_connect: KvC,
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
enable_codec: bool,
region_cache: RegionCache<RetryClient<Cl>>,
logger: Logger,
// TODO: change to a real codec.
_phantom: PhantomData<C>,
codec: C,
}

#[async_trait]
@@ -280,7 +278,7 @@ impl<C> PdRpcClient<C, TikvConnect, Cluster> {
pub async fn connect(
pd_endpoints: &[String],
config: Config,
enable_codec: bool,
codec: C,
logger: Logger,
) -> Result<PdRpcClient<C, TikvConnect, Cluster>> {
PdRpcClient::new(
@@ -289,7 +287,7 @@ impl<C> PdRpcClient<C, TikvConnect, Cluster> {
|env, security_mgr| {
RetryClient::connect(env, pd_endpoints, security_mgr, config.timeout)
},
enable_codec,
codec,
logger,
)
.await
@@ -310,7 +308,7 @@ impl<C, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<C, KvC, Cl> {
config: Config,
kv_connect: MakeKvC,
pd: MakePd,
enable_codec: bool,
codec: C,
logger: Logger,
) -> Result<PdRpcClient<C, KvC, Cl>>
where
@@ -340,11 +338,9 @@ impl<C, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<C, KvC, Cl> {
pd: pd.clone(),
kv_client_cache,
kv_connect: kv_connect(env, security_mgr),
enable_codec,
region_cache: RegionCache::new(pd),
logger,
// TODO
_phantom: PhantomData,
codec
})
}

42 changes: 24 additions & 18 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
@@ -4,16 +4,17 @@ use core::ops::Range;
use std::{marker::PhantomData, str::FromStr, sync::Arc, u32};

use slog::{Drain, Logger};

use tikv_client_common::Error;
use tikv_client_proto::metapb;

use crate::{
Backoff,
backoff::DEFAULT_REGION_BACKOFF,
BoundRange,
ColumnFamily,
config::Config,
pd::{PdClient, PdRpcClient},
raw::lowering::*,
request::{request_codec::RequestCodec, Collect, CollectSingle, Plan},
Backoff, BoundRange, ColumnFamily, Key, KvPair, Result, Value,
Key, KvPair, pd::{PdClient, PdRpcClient}, raw::lowering::*, request::{Collect, CollectSingle, Plan, request_codec::RequestCodec}, Result, Value,
};

const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
@@ -53,9 +54,10 @@ impl<C: RequestCodec> Client<C, PdRpcClient<C>> {
/// ```
pub async fn new<S: Into<String>>(
pd_endpoints: Vec<S>,
codec: C,
logger: Option<Logger>,
) -> Result<Self> {
Self::new_with_config(pd_endpoints, Config::default(), logger).await
Self::new_with_config(pd_endpoints, Config::default(), codec, logger).await
}

/// Create a raw [`Client`] with a custom configuration, and connect to the TiKV cluster.
@@ -83,6 +85,7 @@ impl<C: RequestCodec> Client<C, PdRpcClient<C>> {
pub async fn new_with_config<S: Into<String>>(
pd_endpoints: Vec<S>,
config: Config,
codec: C,
optional_logger: Option<Logger>,
) -> Result<Self> {
let logger = optional_logger.unwrap_or_else(|| {
@@ -98,7 +101,7 @@ impl<C: RequestCodec> Client<C, PdRpcClient<C>> {
debug!(logger, "creating new raw client");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let rpc =
Arc::new(PdRpcClient::connect(&pd_endpoints, config, false, logger.clone()).await?);
Arc::new(PdRpcClient::connect(&pd_endpoints, config, codec, logger.clone()).await?);
Ok(Client {
rpc,
cf: None,
@@ -212,7 +215,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
/// ```
pub async fn batch_get(
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
keys: impl IntoIterator<Item=impl Into<Key>>,
) -> Result<Vec<KvPair>> {
debug!(self.logger, "invoking raw batch_get request");
let request =
@@ -274,7 +277,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
/// ```
pub async fn batch_put(
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
pairs: impl IntoIterator<Item=impl Into<KvPair>>,
) -> Result<()> {
debug!(self.logger, "invoking raw batch_put request");
let request = new_raw_batch_put_request::<C>(
@@ -336,7 +339,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
/// let result: () = req.await.unwrap();
/// # });
/// ```
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
pub async fn batch_delete(&self, keys: impl IntoIterator<Item=impl Into<Key>>) -> Result<()> {
debug!(self.logger, "invoking raw batch_delete request");
self.assert_non_atomic()?;
let request =
@@ -462,7 +465,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
/// ```
pub async fn batch_scan(
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
each_limit: u32,
) -> Result<Vec<KvPair>> {
debug!(self.logger, "invoking raw batch_scan request");
@@ -494,7 +497,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
/// ```
pub async fn batch_scan_keys(
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
each_limit: u32,
) -> Result<Vec<Key>> {
debug!(self.logger, "invoking raw batch_scan_keys request");
@@ -544,7 +547,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
&self,
copr_name: impl Into<String>,
copr_version_req: impl Into<String>,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
request_builder: impl Fn(metapb::Region, Vec<Range<Key>>) -> Vec<u8> + Send + Sync + 'static,
) -> Result<Vec<(Vec<u8>, Vec<Range<Key>>)>> {
let copr_version_req = copr_version_req.into();
@@ -590,7 +593,7 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {

async fn batch_scan_inner(
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
ranges: impl IntoIterator<Item=impl Into<BoundRange>>,
each_limit: u32,
key_only: bool,
) -> Result<Vec<KvPair>> {
@@ -625,13 +628,16 @@ impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {

#[cfg(test)]
mod tests {
use super::*;
use std::{any::Any, sync::Arc};

use tikv_client_proto::kvrpcpb;

use crate::{
mock::{MockKvClient, MockPdClient},
Result,
};
use std::{any::Any, sync::Arc};
use tikv_client_proto::kvrpcpb;

use super::*;

#[tokio::test]
async fn test_raw_coprocessor() -> Result<()> {
@@ -687,13 +693,13 @@ mod tests {
"2:[Key(0A)..Key(0F), Key(14)..Key(FAFA)]".to_string(),
vec![
Key::from(vec![10])..Key::from(vec![15]),
Key::from(vec![20])..Key::from(vec![250, 250])
Key::from(vec![20])..Key::from(vec![250, 250]),
]
),
(
"3:[Key(FAFA)..Key()]".to_string(),
vec![Key::from(vec![250, 250])..Key::from(vec![])]
)
),
]
);
Ok(())
14 changes: 8 additions & 6 deletions src/transaction/client.rs
Original file line number Diff line number Diff line change
@@ -10,10 +10,10 @@ use crate::{
backoff::{DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF},
config::Config,
pd::{PdClient, PdRpcClient},
request::{request_codec::RequestCodec, Plan},
request::{Plan, request_codec::RequestCodec},
Result,
timestamp::TimestampExt,
transaction::{Snapshot, Transaction, TransactionOptions},
Result,
};

use super::{requests::new_scan_lock_request, resolve_locks};
@@ -44,8 +44,8 @@ pub struct Client<C> {
}

impl<C> Client<C>
where
C: RequestCodec,
where
C: RequestCodec,
{
/// Create a transactional [`Client`] and connect to the TiKV cluster.
///
@@ -66,10 +66,11 @@ where
/// ```
pub async fn new<S: Into<String>>(
pd_endpoints: Vec<S>,
codec: C,
logger: Option<Logger>,
) -> Result<Client<C>> {
// debug!(self.logger, "creating transactional client");
Self::new_with_config(pd_endpoints, Config::default(), logger).await
Self::new_with_config(pd_endpoints, Config::default(), codec, logger).await
}

/// Create a transactional [`Client`] with a custom configuration, and connect to the TiKV cluster.
@@ -97,6 +98,7 @@ where
pub async fn new_with_config<S: Into<String>>(
pd_endpoints: Vec<S>,
config: Config,
codec: C,
optional_logger: Option<Logger>,
) -> Result<Client<C>> {
let logger = optional_logger.unwrap_or_else(|| {
@@ -111,7 +113,7 @@ where
});
debug!(logger, "creating new transactional client");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, true, logger.clone()).await?);
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, codec, logger.clone()).await?);
Ok(Client {
pd,
logger,