diff --git a/src/pd/client.rs b/src/pd/client.rs index 05b9c07c..fe472b0a 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -65,6 +65,14 @@ pub trait PdClient: Send + Sync + 'static { async fn get_timestamp(self: Arc) -> Result; + async fn get_timestamp_with_keyspace_id( + self: Arc, + keyspace_id: Option, + ) -> Result { + let _ = keyspace_id; + self.get_timestamp().await + } + async fn update_safepoint(self: Arc, safepoint: u64) -> Result; async fn load_keyspace(&self, keyspace: &str) -> Result; @@ -261,6 +269,16 @@ impl PdClient for PdRpcClient { self.pd.clone().get_timestamp().await } + async fn get_timestamp_with_keyspace_id( + self: Arc, + keyspace_id: Option, + ) -> Result { + self.pd + .clone() + .get_timestamp_with_keyspace_id(keyspace_id) + .await + } + async fn update_safepoint(self: Arc, safepoint: u64) -> Result { self.pd.clone().update_safepoint(safepoint).await } diff --git a/src/pd/cluster.rs b/src/pd/cluster.rs index ef6cce2b..5d332446 100644 --- a/src/pd/cluster.rs +++ b/src/pd/cluster.rs @@ -1,5 +1,6 @@ // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. +use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -9,7 +10,9 @@ use async_trait::async_trait; use log::error; use log::info; use log::warn; +use tokio::sync::Mutex; use tonic::transport::Channel; +use tonic::Code; use tonic::IntoRequest; use tonic::Request; @@ -17,18 +20,188 @@ use super::timestamp::TimestampOracle; use crate::internal_err; use crate::proto::keyspacepb; use crate::proto::pdpb; +use crate::proto::tsopb; use crate::Error; use crate::Result; use crate::SecurityManager; use crate::Timestamp; +const DEFAULT_KEYSPACE_ID: u32 = 0; +const DEFAULT_KEYSPACE_GROUP_ID: u32 = 0; + /// A PD cluster. pub struct Cluster { id: u64, client: pdpb::pd_client::PdClient, keyspace_client: keyspacepb::keyspace_client::KeyspaceClient, members: pdpb::GetMembersResponse, - tso: TimestampOracle, + tso_provider: TsoProvider, +} + +enum TsoProvider { + Pd(TimestampOracle), + Microservice(Arc), +} + +struct TsoServiceDiscovery { + cluster_id: u64, + tso_urls: Vec, + security_mgr: Arc, + oracles: Mutex>, +} + +impl TsoServiceDiscovery { + fn new(cluster_id: u64, tso_urls: Vec, security_mgr: Arc) -> Self { + Self { + cluster_id, + tso_urls, + security_mgr, + oracles: Mutex::new(HashMap::new()), + } + } + + async fn get_timestamp( + &self, + keyspace_id: Option, + timeout: Duration, + ) -> Result { + let keyspace_id = keyspace_id.unwrap_or(DEFAULT_KEYSPACE_ID); + let oracle = self.get_or_create_oracle(keyspace_id, timeout).await?; + match oracle.clone().get_timestamp().await { + Ok(ts) => Ok(ts), + Err(err) => { + self.oracles.lock().await.remove(&keyspace_id); + warn!( + "TSO service oracle failed, rediscovering keyspace {}: {:?}", + keyspace_id, err + ); + let oracle = self.create_oracle(keyspace_id, timeout).await?; + self.oracles + .lock() + .await + .insert(keyspace_id, oracle.clone()); + oracle.get_timestamp().await + } + } + } + + async fn get_or_create_oracle( + &self, + keyspace_id: u32, + timeout: Duration, + ) -> Result { + if let Some(oracle) = self.oracles.lock().await.get(&keyspace_id).cloned() { + return Ok(oracle); + } + let oracle = self.create_oracle(keyspace_id, timeout).await?; + self.oracles + .lock() + .await + .insert(keyspace_id, oracle.clone()); + Ok(oracle) + } + + async fn create_oracle(&self, keyspace_id: u32, timeout: Duration) -> Result { + let keyspace_group = self.find_group_by_keyspace_id(keyspace_id, timeout).await?; + let keyspace_group_id = keyspace_group.id; + let primary = keyspace_group + .members + .iter() + .find(|member| member.is_primary) + .or_else(|| keyspace_group.members.first()) + .ok_or_else(|| { + internal_err!( + "no TSO service member found for keyspace {} group {}", + keyspace_id, + keyspace_group_id + ) + })? + .address + .clone(); + if primary.is_empty() { + return Err(internal_err!( + "empty TSO service primary for keyspace {} group {}", + keyspace_id, + keyspace_group_id + )); + } + let client = self + .security_mgr + .connect(&primary, tsopb::tso_client::TsoClient::::new) + .await?; + TimestampOracle::new_tso_service(self.cluster_id, keyspace_id, keyspace_group_id, client) + } + + async fn find_group_by_keyspace_id( + &self, + keyspace_id: u32, + timeout: Duration, + ) -> Result { + let mut last_err = None; + for tso_url in &self.tso_urls { + match self + .find_group_by_keyspace_id_at(tso_url, keyspace_id, timeout) + .await + { + Ok(group) => return Ok(group), + Err(err) => { + warn!( + "failed to find TSO keyspace group from {} for keyspace {}: {:?}", + tso_url, keyspace_id, err + ); + last_err = Some(err); + } + } + } + Err(last_err.unwrap_or_else(|| { + internal_err!( + "no TSO service URL is available for keyspace {}", + keyspace_id + ) + })) + } + + async fn find_group_by_keyspace_id_at( + &self, + tso_url: &str, + keyspace_id: u32, + timeout: Duration, + ) -> Result { + let mut client = self + .security_mgr + .connect(tso_url, tsopb::tso_client::TsoClient::::new) + .await?; + let mut req = tsopb::FindGroupByKeyspaceIdRequest { + header: Some(tsopb::RequestHeader { + cluster_id: self.cluster_id, + sender_id: 0, + keyspace_id, + keyspace_group_id: DEFAULT_KEYSPACE_GROUP_ID, + }), + keyspace_id, + } + .into_request(); + req.set_timeout(timeout); + let resp = client.find_group_by_keyspace_id(req).await?.into_inner(); + if let Some(err) = resp + .header + .as_ref() + .and_then(|header| header.error.as_ref()) + { + return Err(internal_err!( + "failed to find TSO keyspace group for keyspace {}, err {}", + keyspace_id, + err.message + )); + } + resp.keyspace_group.ok_or_else(|| { + internal_err!( + "TSO service {} returned no keyspace group for keyspace {}", + tso_url, + keyspace_id + ) + }) + } } macro_rules! pd_request { @@ -81,8 +254,21 @@ impl Cluster { req.send(&mut self.client, timeout).await } - pub async fn get_timestamp(&self) -> Result { - self.tso.clone().get_timestamp().await + pub async fn get_timestamp(&self, timeout: Duration) -> Result { + self.get_timestamp_with_keyspace_id(None, timeout).await + } + + pub async fn get_timestamp_with_keyspace_id( + &self, + keyspace_id: Option, + timeout: Duration, + ) -> Result { + match &self.tso_provider { + TsoProvider::Pd(tso) => tso.clone().get_timestamp().await, + TsoProvider::Microservice(discovery) => { + discovery.get_timestamp(keyspace_id, timeout).await + } + } } pub async fn update_safepoint( @@ -128,13 +314,13 @@ impl Connection { let members = self.validate_endpoints(endpoints, timeout).await?; let (client, keyspace_client, members) = self.try_connect_leader(&members, timeout).await?; let id = members.header.as_ref().unwrap().cluster_id; - let tso = TimestampOracle::new(id, &client)?; + let tso_provider = self.create_tso_provider(id, &client, timeout).await?; let cluster = Cluster { id, client, keyspace_client, members, - tso, + tso_provider, }; Ok(cluster) } @@ -145,19 +331,83 @@ impl Connection { let start = Instant::now(); let (client, keyspace_client, members) = self.try_connect_leader(&cluster.members, timeout).await?; - let tso = TimestampOracle::new(cluster.id, &client)?; + let tso_provider = self + .create_tso_provider(cluster.id, &client, timeout) + .await?; *cluster = Cluster { id: cluster.id, client, keyspace_client, members, - tso, + tso_provider, }; info!("updating PD client done, spent {:?}", start.elapsed()); Ok(()) } + async fn create_tso_provider( + &self, + cluster_id: u64, + client: &pdpb::pd_client::PdClient, + timeout: Duration, + ) -> Result { + match Self::get_cluster_info(client.clone(), timeout).await { + Ok(cluster_info) => { + let service_mode = cluster_info + .service_modes + .first() + .and_then(|mode| pdpb::ServiceMode::try_from(*mode).ok()) + .unwrap_or(pdpb::ServiceMode::PdSvcMode); + if service_mode == pdpb::ServiceMode::ApiSvcMode { + if cluster_info.tso_urls.is_empty() { + return Err(internal_err!( + "PD reports API service mode but no TSO service URLs" + )); + } + info!( + "using TSO microservice provider with URLs {:?}", + cluster_info.tso_urls + ); + return Ok(TsoProvider::Microservice(Arc::new( + TsoServiceDiscovery::new( + cluster_id, + cluster_info.tso_urls, + self.security_mgr.clone(), + ), + ))); + } + } + Err(Error::GrpcAPI(status)) if status.code() == Code::Unimplemented => { + warn!("PD GetClusterInfo is not implemented, falling back to PD TSO"); + } + Err(err) => return Err(err), + } + + info!("using PD TSO provider"); + Ok(TsoProvider::Pd(TimestampOracle::new(cluster_id, client)?)) + } + + async fn get_cluster_info( + mut client: pdpb::pd_client::PdClient, + timeout: Duration, + ) -> Result { + let mut req = pdpb::GetClusterInfoRequest::default().into_request(); + req.set_timeout(timeout); + let resp = client.get_cluster_info(req).await?.into_inner(); + if let Some(err) = resp + .header + .as_ref() + .and_then(|header| header.error.as_ref()) + { + return Err(internal_err!( + "failed to get PD cluster info, err {:?}", + err + )); + } + Ok(resp) + } + async fn validate_endpoints( &self, endpoints: &[String], diff --git a/src/pd/retry.rs b/src/pd/retry.rs index c9ccf1e1..7fdacc96 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -45,6 +45,14 @@ pub trait RetryClientTrait { async fn get_timestamp(self: Arc) -> Result; + async fn get_timestamp_with_keyspace_id( + self: Arc, + keyspace_id: Option, + ) -> Result { + let _ = keyspace_id; + self.get_timestamp().await + } + async fn update_safepoint(self: Arc, safepoint: u64) -> Result; async fn load_keyspace(&self, keyspace: &str) -> Result; @@ -189,7 +197,18 @@ impl RetryClientTrait for RetryClient { } async fn get_timestamp(self: Arc) -> Result { - retry!(self, "get_timestamp", |cluster| cluster.get_timestamp()) + retry!(self, "get_timestamp", |cluster| { + cluster.get_timestamp(self.timeout) + }) + } + + async fn get_timestamp_with_keyspace_id( + self: Arc, + keyspace_id: Option, + ) -> Result { + retry!(self, "get_timestamp_with_keyspace_id", |cluster| { + cluster.get_timestamp_with_keyspace_id(keyspace_id, self.timeout) + }) } async fn update_safepoint(self: Arc, safepoint: u64) -> Result { diff --git a/src/pd/timestamp.rs b/src/pd/timestamp.rs index a1cc7fbd..9995c750 100644 --- a/src/pd/timestamp.rs +++ b/src/pd/timestamp.rs @@ -29,9 +29,12 @@ use tokio::sync::Mutex; use tonic::transport::Channel; use crate::internal_err; -use crate::proto::pdpb::pd_client::PdClient; -use crate::proto::pdpb::*; +use crate::proto::pdpb; +use crate::proto::pdpb::pd_client::PdClient as PdTsoClient; +use crate::proto::tsopb; +use crate::proto::tsopb::tso_client::TsoClient; use crate::Result; +use crate::Timestamp; /// It is an empirical value. const MAX_BATCH_SIZE: usize = 64; @@ -53,12 +56,34 @@ pub(crate) struct TimestampOracle { } impl TimestampOracle { - pub(crate) fn new(cluster_id: u64, pd_client: &PdClient) -> Result { + pub(crate) fn new( + cluster_id: u64, + pd_client: &PdTsoClient, + ) -> Result { let pd_client = pd_client.clone(); let (request_tx, request_rx) = mpsc::channel(MAX_BATCH_SIZE); // Start a background thread to handle TSO requests and responses - tokio::spawn(run_tso(cluster_id, pd_client, request_rx)); + tokio::spawn(run_pd_tso(cluster_id, pd_client, request_rx)); + + Ok(TimestampOracle { request_tx }) + } + + pub(crate) fn new_tso_service( + cluster_id: u64, + keyspace_id: u32, + keyspace_group_id: u32, + tso_client: TsoClient, + ) -> Result { + let (request_tx, request_rx) = mpsc::channel(MAX_BATCH_SIZE); + + tokio::spawn(run_tso_service( + cluster_id, + keyspace_id, + keyspace_group_id, + tso_client, + request_rx, + )); Ok(TimestampOracle { request_tx }) } @@ -74,9 +99,9 @@ impl TimestampOracle { } } -async fn run_tso( +async fn run_pd_tso( cluster_id: u64, - mut pd_client: PdClient, + mut pd_client: PdTsoClient, request_rx: mpsc::Receiver, ) -> Result<()> { // The `TimestampRequest`s which are waiting for the responses from the PD server @@ -87,7 +112,7 @@ async fn run_tso( // if the queue containing pending requests is no longer full. let sending_future_waker = Arc::new(AtomicWaker::new()); - let request_stream = TsoRequestStream { + let request_stream = PdTsoRequestStream { cluster_id, request_rx, pending_requests: pending_requests.clone(), @@ -100,7 +125,7 @@ async fn run_tso( while let Some(Ok(resp)) = responses.next().await { { let mut pending_requests = pending_requests.lock().await; - allocate_timestamps(&resp, &mut pending_requests)?; + allocate_timestamps(resp.count, resp.timestamp.as_ref(), &mut pending_requests)?; } // Wake up the sending future blocked by too many pending requests or locked. @@ -111,13 +136,48 @@ async fn run_tso( Ok(()) } +async fn run_tso_service( + cluster_id: u64, + keyspace_id: u32, + keyspace_group_id: u32, + mut tso_client: TsoClient, + request_rx: mpsc::Receiver, +) -> Result<()> { + let pending_requests = Arc::new(Mutex::new(VecDeque::with_capacity(MAX_PENDING_COUNT))); + let sending_future_waker = Arc::new(AtomicWaker::new()); + + let request_stream = TsoServiceRequestStream { + cluster_id, + keyspace_id, + keyspace_group_id, + request_rx, + pending_requests: pending_requests.clone(), + self_waker: sending_future_waker.clone(), + }; + + let mut responses = tso_client.tso(request_stream).await?.into_inner(); + + while let Some(Ok(resp)) = responses.next().await { + if let Some(err) = resp.header.as_ref().and_then(|h| h.error.as_ref()) { + return Err(internal_err!("TSO service returned error: {}", err.message)); + } + { + let mut pending_requests = pending_requests.lock().await; + allocate_timestamps(resp.count, resp.timestamp.as_ref(), &mut pending_requests)?; + } + sending_future_waker.wake(); + } + info!("TSO service stream terminated"); + Ok(()) +} + struct RequestGroup { - tso_request: TsoRequest, + count: u32, requests: Vec, } #[pin_project] -struct TsoRequestStream { +struct PdTsoRequestStream { cluster_id: u64, #[pin] request_rx: mpsc::Receiver>, @@ -125,8 +185,8 @@ struct TsoRequestStream { self_waker: Arc, } -impl Stream for TsoRequestStream { - type Item = TsoRequest; +impl Stream for PdTsoRequestStream { + type Item = pdpb::TsoRequest; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this = self.project(); @@ -153,19 +213,17 @@ impl Stream for TsoRequestStream { } if !requests.is_empty() { - let req = TsoRequest { - header: Some(RequestHeader { + let count = requests.len() as u32; + let req = pdpb::TsoRequest { + header: Some(pdpb::RequestHeader { cluster_id: *this.cluster_id, sender_id: 0, }), - count: requests.len() as u32, + count, dc_location: String::new(), }; - let request_group = RequestGroup { - tso_request: req.clone(), - requests, - }; + let request_group = RequestGroup { count, requests }; pending_requests.push_back(request_group); Poll::Ready(Some(req)) @@ -178,25 +236,84 @@ impl Stream for TsoRequestStream { } } +#[pin_project] +struct TsoServiceRequestStream { + cluster_id: u64, + keyspace_id: u32, + keyspace_group_id: u32, + #[pin] + request_rx: mpsc::Receiver>, + pending_requests: Arc>>, + self_waker: Arc, +} + +impl Stream for TsoServiceRequestStream { + type Item = tsopb::TsoRequest; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut this = self.project(); + + let pending_requests = this.pending_requests.lock(); + pin_mut!(pending_requests); + let mut pending_requests = if let Poll::Ready(pending_requests) = pending_requests.poll(cx) + { + pending_requests + } else { + this.self_waker.register(cx.waker()); + return Poll::Pending; + }; + let mut requests = Vec::new(); + + while requests.len() < MAX_BATCH_SIZE && pending_requests.len() < MAX_PENDING_COUNT { + match this.request_rx.poll_recv(cx) { + Poll::Ready(Some(sender)) => { + requests.push(sender); + } + Poll::Ready(None) if requests.is_empty() => return Poll::Ready(None), + _ => break, + } + } + + if !requests.is_empty() { + let count = requests.len() as u32; + let req = tsopb::TsoRequest { + header: Some(tsopb::RequestHeader { + cluster_id: *this.cluster_id, + sender_id: 0, + keyspace_id: *this.keyspace_id, + keyspace_group_id: *this.keyspace_group_id, + }), + count, + dc_location: String::new(), + }; + + pending_requests.push_back(RequestGroup { count, requests }); + + Poll::Ready(Some(req)) + } else { + this.self_waker.register(cx.waker()); + Poll::Pending + } + } +} + fn allocate_timestamps( - resp: &TsoResponse, + count: u32, + timestamp: Option<&Timestamp>, pending_requests: &mut VecDeque, ) -> Result<()> { // PD returns the timestamp with the biggest logical value. We can send back timestamps // whose logical value is from `logical - count + 1` to `logical` using the senders // in `pending`. - let tail_ts = resp - .timestamp - .as_ref() - .ok_or_else(|| internal_err!("No timestamp in TsoResponse"))?; + let tail_ts = timestamp.ok_or_else(|| internal_err!("No timestamp in TsoResponse"))?; - let mut offset = resp.count; + let mut offset = count; if let Some(RequestGroup { - tso_request, + count: request_count, requests, }) = pending_requests.pop_front() { - if tso_request.count != offset { + if request_count != offset { return Err(internal_err!( "PD gives different number of timestamps than expected" )); diff --git a/src/request/keyspace.rs b/src/request/keyspace.rs index 79f7225d..72e2918d 100644 --- a/src/request/keyspace.rs +++ b/src/request/keyspace.rs @@ -34,6 +34,13 @@ pub enum KeyMode { } impl Keyspace { + pub fn tso_keyspace_id(&self) -> Option { + match self { + Keyspace::Enable { keyspace_id } => Some(*keyspace_id), + _ => None, + } + } + pub fn api_version(&self) -> kvrpcpb::ApiVersion { match self { Keyspace::Disable => kvrpcpb::ApiVersion::V1, diff --git a/src/transaction/client.rs b/src/transaction/client.rs index a40044b7..1e108251 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -243,7 +243,10 @@ impl Client { /// # }); /// ``` pub async fn current_timestamp(&self) -> Result { - self.pd.clone().get_timestamp().await + self.pd + .clone() + .get_timestamp_with_keyspace_id(self.keyspace.tso_keyspace_id()) + .await } /// Request garbage collection (GC) of the TiKV cluster. diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 5c5d2513..8dbb7215 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -55,7 +55,10 @@ pub async fn resolve_locks( keyspace: Keyspace, ) -> Result /* live_locks */> { debug!("resolving locks"); - let ts = pd_client.clone().get_timestamp().await?; + let ts = pd_client + .clone() + .get_timestamp_with_keyspace_id(keyspace.tso_keyspace_id()) + .await?; let caller_start_ts = timestamp.version(); let current_ts = ts.version(); @@ -480,7 +483,10 @@ impl LockResolver { Err(err) => return Err(err), }; - let current = pd_client.clone().get_timestamp().await?; + let current = pd_client + .clone() + .get_timestamp_with_keyspace_id(keyspace.tso_keyspace_id()) + .await?; status.check_ttl(current); let res = Arc::new(status); if res.is_cacheable() { @@ -563,7 +569,10 @@ impl LockResolver { { Ok(status) => return Ok(status), Err(Error::TxnNotFound(txn_not_found)) => { - let current = pd_client.clone().get_timestamp().await?; + let current = pd_client + .clone() + .get_timestamp_with_keyspace_id(keyspace.tso_keyspace_id()) + .await?; if lock_until_expired_ms(lock.lock_version, lock.lock_ttl, current) <= 0 { warn!( "lock txn not found, lock has expired, lock {:?}, caller_start_ts {}, current_ts {}", diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index ed8eb911..88e56a4b 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -847,7 +847,11 @@ impl Transaction { .buffer .get_primary_key() .unwrap_or_else(|| first_key.clone()); - let for_update_ts = self.rpc.clone().get_timestamp().await?; + let for_update_ts = self + .rpc + .clone() + .get_timestamp_with_keyspace_id(self.keyspace.tso_keyspace_id()) + .await?; self.options.push_for_update_ts(for_update_ts.clone()); let request = new_pessimistic_lock_request( keys.clone().into_iter(), @@ -1381,7 +1385,11 @@ impl Committer { async fn commit_primary(&mut self) -> Result { debug!("committing primary"); let primary_key = self.primary_key.clone().into_iter(); - let commit_version = self.rpc.clone().get_timestamp().await?; + let commit_version = self + .rpc + .clone() + .get_timestamp_with_keyspace_id(self.keyspace.tso_keyspace_id()) + .await?; let req = new_commit_request( primary_key, self.start_version.clone(),