Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ pub trait PdClient: Send + Sync + 'static {

async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp>;

async fn get_timestamp_with_keyspace_id(
self: Arc<Self>,
keyspace_id: Option<u32>,
) -> Result<Timestamp> {
let _ = keyspace_id;
self.get_timestamp().await
}

async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool>;

async fn load_keyspace(&self, keyspace: &str) -> Result<keyspacepb::KeyspaceMeta>;
Expand Down Expand Up @@ -261,6 +269,16 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
self.pd.clone().get_timestamp().await
}

async fn get_timestamp_with_keyspace_id(
self: Arc<Self>,
keyspace_id: Option<u32>,
) -> Result<Timestamp> {
self.pd
.clone()
.get_timestamp_with_keyspace_id(keyspace_id)
.await
}

async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool> {
self.pd.clone().update_safepoint(safepoint).await
}
Expand Down
264 changes: 257 additions & 7 deletions src/pd/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -9,26 +10,198 @@ use async_trait::async_trait;
use log::error;
use log::info;
use log::warn;
use tokio::sync::Mutex;
use tonic::transport::Channel;
use tonic::Code;
use tonic::IntoRequest;
use tonic::Request;

use super::timestamp::TimestampOracle;
use crate::internal_err;
use crate::proto::keyspacepb;
use crate::proto::pdpb;
use crate::proto::tsopb;
use crate::Error;
use crate::Result;
use crate::SecurityManager;
use crate::Timestamp;

const DEFAULT_KEYSPACE_ID: u32 = 0;
const DEFAULT_KEYSPACE_GROUP_ID: u32 = 0;

/// A PD cluster.
pub struct Cluster {
id: u64,
client: pdpb::pd_client::PdClient<Channel>,
keyspace_client: keyspacepb::keyspace_client::KeyspaceClient<Channel>,
members: pdpb::GetMembersResponse,
tso: TimestampOracle,
tso_provider: TsoProvider,
}

enum TsoProvider {
Pd(TimestampOracle),
Microservice(Arc<TsoServiceDiscovery>),
}

struct TsoServiceDiscovery {
cluster_id: u64,
tso_urls: Vec<String>,
security_mgr: Arc<SecurityManager>,
oracles: Mutex<HashMap<u32, TimestampOracle>>,
}

impl TsoServiceDiscovery {
fn new(cluster_id: u64, tso_urls: Vec<String>, security_mgr: Arc<SecurityManager>) -> Self {
Self {
cluster_id,
tso_urls,
security_mgr,
oracles: Mutex::new(HashMap::new()),
}
}

async fn get_timestamp(
&self,
keyspace_id: Option<u32>,
timeout: Duration,
) -> Result<Timestamp> {
let keyspace_id = keyspace_id.unwrap_or(DEFAULT_KEYSPACE_ID);
let oracle = self.get_or_create_oracle(keyspace_id, timeout).await?;
match oracle.clone().get_timestamp().await {
Ok(ts) => Ok(ts),
Err(err) => {
self.oracles.lock().await.remove(&keyspace_id);
warn!(
"TSO service oracle failed, rediscovering keyspace {}: {:?}",
keyspace_id, err
);
let oracle = self.create_oracle(keyspace_id, timeout).await?;
self.oracles
.lock()
.await
.insert(keyspace_id, oracle.clone());
oracle.get_timestamp().await
}
}
}

async fn get_or_create_oracle(
&self,
keyspace_id: u32,
timeout: Duration,
) -> Result<TimestampOracle> {
if let Some(oracle) = self.oracles.lock().await.get(&keyspace_id).cloned() {
return Ok(oracle);
}
let oracle = self.create_oracle(keyspace_id, timeout).await?;
self.oracles
.lock()
.await
.insert(keyspace_id, oracle.clone());
Ok(oracle)
}

async fn create_oracle(&self, keyspace_id: u32, timeout: Duration) -> Result<TimestampOracle> {
let keyspace_group = self.find_group_by_keyspace_id(keyspace_id, timeout).await?;
let keyspace_group_id = keyspace_group.id;
let primary = keyspace_group
.members
.iter()
.find(|member| member.is_primary)
.or_else(|| keyspace_group.members.first())
.ok_or_else(|| {
internal_err!(
"no TSO service member found for keyspace {} group {}",
keyspace_id,
keyspace_group_id
)
})?
.address
.clone();
if primary.is_empty() {
return Err(internal_err!(
"empty TSO service primary for keyspace {} group {}",
keyspace_id,
keyspace_group_id
));
}
let client = self
.security_mgr
.connect(&primary, tsopb::tso_client::TsoClient::<Channel>::new)
.await?;
TimestampOracle::new_tso_service(self.cluster_id, keyspace_id, keyspace_group_id, client)
}

async fn find_group_by_keyspace_id(
&self,
keyspace_id: u32,
timeout: Duration,
) -> Result<tsopb::KeyspaceGroup> {
let mut last_err = None;
for tso_url in &self.tso_urls {
match self
.find_group_by_keyspace_id_at(tso_url, keyspace_id, timeout)
.await
{
Ok(group) => return Ok(group),
Err(err) => {
warn!(
"failed to find TSO keyspace group from {} for keyspace {}: {:?}",
tso_url, keyspace_id, err
);
last_err = Some(err);
}
}
}
Err(last_err.unwrap_or_else(|| {
internal_err!(
"no TSO service URL is available for keyspace {}",
keyspace_id
)
}))
}

async fn find_group_by_keyspace_id_at(
&self,
tso_url: &str,
keyspace_id: u32,
timeout: Duration,
) -> Result<tsopb::KeyspaceGroup> {
let mut client = self
.security_mgr
.connect(tso_url, tsopb::tso_client::TsoClient::<Channel>::new)
.await?;
let mut req = tsopb::FindGroupByKeyspaceIdRequest {
header: Some(tsopb::RequestHeader {
cluster_id: self.cluster_id,
sender_id: 0,
keyspace_id,
keyspace_group_id: DEFAULT_KEYSPACE_GROUP_ID,
}),
keyspace_id,
}
.into_request();
req.set_timeout(timeout);
let resp = client.find_group_by_keyspace_id(req).await?.into_inner();
if let Some(err) = resp
.header
.as_ref()
.and_then(|header| header.error.as_ref())
{
return Err(internal_err!(
"failed to find TSO keyspace group for keyspace {}, err {}",
keyspace_id,
err.message
));
}
resp.keyspace_group.ok_or_else(|| {
internal_err!(
"TSO service {} returned no keyspace group for keyspace {}",
tso_url,
keyspace_id
)
})
}
}

macro_rules! pd_request {
Expand Down Expand Up @@ -81,8 +254,21 @@ impl Cluster {
req.send(&mut self.client, timeout).await
}

pub async fn get_timestamp(&self) -> Result<Timestamp> {
self.tso.clone().get_timestamp().await
pub async fn get_timestamp(&self, timeout: Duration) -> Result<Timestamp> {
self.get_timestamp_with_keyspace_id(None, timeout).await
}

pub async fn get_timestamp_with_keyspace_id(
&self,
keyspace_id: Option<u32>,
timeout: Duration,
) -> Result<Timestamp> {
match &self.tso_provider {
TsoProvider::Pd(tso) => tso.clone().get_timestamp().await,
TsoProvider::Microservice(discovery) => {
discovery.get_timestamp(keyspace_id, timeout).await
}
}
}

pub async fn update_safepoint(
Expand Down Expand Up @@ -128,13 +314,13 @@ impl Connection {
let members = self.validate_endpoints(endpoints, timeout).await?;
let (client, keyspace_client, members) = self.try_connect_leader(&members, timeout).await?;
let id = members.header.as_ref().unwrap().cluster_id;
let tso = TimestampOracle::new(id, &client)?;
let tso_provider = self.create_tso_provider(id, &client, timeout).await?;
let cluster = Cluster {
id,
client,
keyspace_client,
members,
tso,
tso_provider,
};
Ok(cluster)
}
Expand All @@ -145,19 +331,83 @@ impl Connection {
let start = Instant::now();
let (client, keyspace_client, members) =
self.try_connect_leader(&cluster.members, timeout).await?;
let tso = TimestampOracle::new(cluster.id, &client)?;
let tso_provider = self
.create_tso_provider(cluster.id, &client, timeout)
.await?;
*cluster = Cluster {
id: cluster.id,
client,
keyspace_client,
members,
tso,
tso_provider,
};

info!("updating PD client done, spent {:?}", start.elapsed());
Ok(())
}

async fn create_tso_provider(
&self,
cluster_id: u64,
client: &pdpb::pd_client::PdClient<Channel>,
timeout: Duration,
) -> Result<TsoProvider> {
match Self::get_cluster_info(client.clone(), timeout).await {
Ok(cluster_info) => {
let service_mode = cluster_info
.service_modes
.first()
.and_then(|mode| pdpb::ServiceMode::try_from(*mode).ok())
.unwrap_or(pdpb::ServiceMode::PdSvcMode);
if service_mode == pdpb::ServiceMode::ApiSvcMode {
if cluster_info.tso_urls.is_empty() {
return Err(internal_err!(
"PD reports API service mode but no TSO service URLs"
));
}
info!(
"using TSO microservice provider with URLs {:?}",
cluster_info.tso_urls
);
return Ok(TsoProvider::Microservice(Arc::new(
TsoServiceDiscovery::new(
cluster_id,
cluster_info.tso_urls,
self.security_mgr.clone(),
),
)));
}
}
Err(Error::GrpcAPI(status)) if status.code() == Code::Unimplemented => {
warn!("PD GetClusterInfo is not implemented, falling back to PD TSO");
}
Err(err) => return Err(err),
}

info!("using PD TSO provider");
Ok(TsoProvider::Pd(TimestampOracle::new(cluster_id, client)?))
}

async fn get_cluster_info(
mut client: pdpb::pd_client::PdClient<Channel>,
timeout: Duration,
) -> Result<pdpb::GetClusterInfoResponse> {
let mut req = pdpb::GetClusterInfoRequest::default().into_request();
req.set_timeout(timeout);
let resp = client.get_cluster_info(req).await?.into_inner();
if let Some(err) = resp
.header
.as_ref()
.and_then(|header| header.error.as_ref())
{
return Err(internal_err!(
"failed to get PD cluster info, err {:?}",
err
));
}
Ok(resp)
}

async fn validate_endpoints(
&self,
endpoints: &[String],
Expand Down
21 changes: 20 additions & 1 deletion src/pd/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ pub trait RetryClientTrait {

async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp>;

async fn get_timestamp_with_keyspace_id(
self: Arc<Self>,
keyspace_id: Option<u32>,
) -> Result<Timestamp> {
let _ = keyspace_id;
self.get_timestamp().await
}

async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool>;

async fn load_keyspace(&self, keyspace: &str) -> Result<keyspacepb::KeyspaceMeta>;
Expand Down Expand Up @@ -189,7 +197,18 @@ impl RetryClientTrait for RetryClient<Cluster> {
}

async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
retry!(self, "get_timestamp", |cluster| cluster.get_timestamp())
retry!(self, "get_timestamp", |cluster| {
cluster.get_timestamp(self.timeout)
})
}

async fn get_timestamp_with_keyspace_id(
self: Arc<Self>,
keyspace_id: Option<u32>,
) -> Result<Timestamp> {
retry!(self, "get_timestamp_with_keyspace_id", |cluster| {
cluster.get_timestamp_with_keyspace_id(keyspace_id, self.timeout)
})
}

async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool> {
Expand Down
Loading
Loading