Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Options {
pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed");
pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = true;
pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false;
pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 8 * 1024;
pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 128 * 1024;

pub const DEFAULT: Self = Self {
log_format_version: DEFAULT_LOG_FORMAT_VERSION,
Expand Down
59 changes: 57 additions & 2 deletions crates/core/src/db/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{io, sync::Arc};
use std::{
io,
num::{NonZeroU64, NonZeroUsize},
sync::Arc,
};

use async_trait::async_trait;
use spacetimedb_commitlog::SizeOnDisk;
Expand All @@ -13,6 +17,45 @@ use super::{
snapshot::{self, SnapshotDatabaseState, SnapshotWorker},
};

/// Local durability configuration exposed through server config.
#[derive(Clone, Copy, Debug, Default, serde::Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
pub struct DurabilityConfig {
#[serde(default)]
pub commitlog: CommitlogConfig,
}

/// Commitlog configuration exposed through server config.
#[derive(Clone, Copy, Debug, Default, serde::Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
pub struct CommitlogConfig {
pub max_segment_size: Option<NonZeroU64>,
pub preallocate_segments: Option<bool>,
pub write_buffer_size: Option<NonZeroUsize>,
}

impl DurabilityConfig {
fn into_options(self) -> spacetimedb_durability::local::Options {
let mut opts = spacetimedb_durability::local::Options::default();
self.commitlog.apply_to(&mut opts.commitlog);
opts
}
}

impl CommitlogConfig {
fn apply_to(self, opts: &mut spacetimedb_commitlog::Options) {
if let Some(max_segment_size) = self.max_segment_size {
opts.max_segment_size = max_segment_size.get();
}
if let Some(preallocate_segments) = self.preallocate_segments {
opts.preallocate_segments = preallocate_segments;
}
if let Some(write_buffer_size) = self.write_buffer_size {
opts.write_buffer_size = write_buffer_size.get();
}
}
}

/// [spacetimedb_durability::Durability] impls with a [`Txdata`] transaction
/// payload, suitable for use in the [`relational_db::RelationalDB`].
pub type Durability = dyn spacetimedb_durability::Durability<TxData = Txdata>;
Expand Down Expand Up @@ -128,14 +171,21 @@ pub trait PersistenceProvider: Send + Sync {
/// [compresses]: relational_db::snapshot_watching_commitlog_compressor
pub struct LocalPersistenceProvider {
data_dir: Arc<ServerDataDir>,
durability: DurabilityConfig,
}

impl LocalPersistenceProvider {
pub fn new(data_dir: impl Into<Arc<ServerDataDir>>) -> Self {
Self {
data_dir: data_dir.into(),
durability: DurabilityConfig::default(),
}
}

pub fn with_durability_config(mut self, durability: DurabilityConfig) -> Self {
self.durability = durability;
self
}
}

#[async_trait]
Expand All @@ -149,7 +199,12 @@ impl PersistenceProvider for LocalPersistenceProvider {
asyncify(move || relational_db::open_snapshot_repo(snapshot_dir, database_identity, replica_id))
.await
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Enabled))?;
let (durability, disk_size) = relational_db::local_durability(replica_dir, Some(&snapshot_worker)).await?;
let (durability, disk_size) = relational_db::local_durability_with_options(
replica_dir,
Some(&snapshot_worker),
self.durability.into_options(),
)
.await?;

tokio::spawn(relational_db::snapshot_watching_commitlog_compressor(
snapshot_worker.subscribe(),
Expand Down
16 changes: 15 additions & 1 deletion crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1670,6 +1670,20 @@ pub type LocalDurability = Arc<durability::Local<ProductValue>>;
pub async fn local_durability(
replica_dir: ReplicaDir,
snapshot_worker: Option<&SnapshotWorker>,
) -> Result<(LocalDurability, DiskSizeFn), DBError> {
local_durability_with_options(replica_dir, snapshot_worker, <_>::default()).await
}

/// Initialize local durability with explicit parameters.
///
/// Also returned is a [`DiskSizeFn`] as required by [`RelationalDB::open`].
///
/// Note that this operation can be expensive, as it needs to traverse a suffix
/// of the commitlog.
pub async fn local_durability_with_options(
replica_dir: ReplicaDir,
snapshot_worker: Option<&SnapshotWorker>,
opts: durability::local::Options,
) -> Result<(LocalDurability, DiskSizeFn), DBError> {
let rt = tokio::runtime::Handle::current();
let on_new_segment = snapshot_worker.map(|snapshot_worker| {
Expand All @@ -1684,7 +1698,7 @@ pub async fn local_durability(
durability::Local::open(
replica_dir.clone(),
rt,
<_>::default(),
opts,
// Give the durability a handle to request a new snapshot run,
// which it will send down whenever we rotate commitlog segments.
on_new_segment,
Expand Down
11 changes: 11 additions & 0 deletions crates/standalone/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,15 @@ directives = [
# Apply a V8 heap limit in MiB. Set to 0 to use V8's default limit.
# heap-limit-mb = 0

[durability.commitlog]
# Maximum size in bytes for each commitlog segment.
# max-segment-size = 1073741824

# Preallocate disk space for commitlog segments up to max-segment-size.
# Has no effect unless commitlog fallocate support is enabled.
# preallocate-segments = false

# Size in bytes of the memory buffer holding commit data before flushing to storage.
# write-buffer-size = 131072

# vim: set nowritebackup: << otherwise triggers cargo-watch
7 changes: 5 additions & 2 deletions crates/standalone/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use http::StatusCode;
use spacetimedb::client::ClientActorIndex;
use spacetimedb::config::{CertificateAuthority, MetadataFile, V8Config, WasmConfig};
use spacetimedb::db;
use spacetimedb::db::persistence::LocalPersistenceProvider;
use spacetimedb::db::persistence::{DurabilityConfig, LocalPersistenceProvider};
use spacetimedb::energy::{EnergyBalance, EnergyQuanta, NullEnergyMonitor};
use spacetimedb::host::{DiskStorage, HostController, HostRuntimeConfig, MigratePlanResult, UpdateDatabaseResult};
use spacetimedb::identity::{AuthCtx, Identity};
Expand Down Expand Up @@ -41,6 +41,7 @@ pub use spacetimedb_client_api::routes::subscribe::{BIN_PROTOCOL, TEXT_PROTOCOL}
#[derive(Clone, Copy)]
pub struct StandaloneOptions {
pub db_config: db::Config,
pub durability: DurabilityConfig,
pub websocket: WebSocketOptions,
pub wasm: WasmConfig,
pub v8: V8Config,
Expand Down Expand Up @@ -76,7 +77,8 @@ impl StandaloneEnv {
let energy_monitor = Arc::new(NullEnergyMonitor);
let program_store = Arc::new(DiskStorage::new(data_dir.program_bytes().0).await?);

let persistence_provider = Arc::new(LocalPersistenceProvider::new(data_dir.clone()));
let persistence_provider =
Arc::new(LocalPersistenceProvider::new(data_dir.clone()).with_durability_config(config.durability));
let host_controller = HostController::new(
data_dir,
config.db_config,
Expand Down Expand Up @@ -650,6 +652,7 @@ mod tests {
storage: Storage::Memory,
page_pool_max_size: None,
},
durability: DurabilityConfig::default(),
websocket: WebSocketOptions::default(),
wasm: WasmConfig::default(),
v8: V8Config::default(),
Expand Down
18 changes: 18 additions & 0 deletions crates/standalone/src/subcommands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use axum::extract::DefaultBodyLimit;
use clap::ArgAction::SetTrue;
use clap::{Arg, ArgMatches};
use spacetimedb::config::{parse_config, CertificateAuthority};
use spacetimedb::db::persistence::DurabilityConfig;
use spacetimedb::db::{self, Storage};
use spacetimedb::startup::{self, TracingOptions};
use spacetimedb::util::jobs::JobCores;
Expand Down Expand Up @@ -99,6 +100,8 @@ struct ConfigFile {
#[serde(flatten)]
common: spacetimedb::config::ConfigFile,
#[serde(default)]
durability: DurabilityConfig,
#[serde(default)]
websocket: WebSocketOptions,
}

Expand Down Expand Up @@ -181,6 +184,7 @@ pub async fn exec(args: &ArgMatches, db_cores: JobCores) -> anyhow::Result<()> {
let ctx = StandaloneEnv::init(
StandaloneOptions {
db_config,
durability: config.durability,
websocket: config.websocket,
wasm: config.common.wasm,
v8: config.common.v8,
Expand Down Expand Up @@ -525,6 +529,11 @@ mod tests {
heap-gc-trigger-fraction = 0.6
heap-retire-fraction = 0.8
heap-limit-mb = 128

[durability.commitlog]
max-segment-size = 1048576
preallocate-segments = true
write-buffer-size = 131072
"#;

let config: ConfigFile = toml::from_str(toml).unwrap();
Expand All @@ -543,6 +552,15 @@ mod tests {
assert_eq!(config.common.v8.heap_policy.heap_gc_trigger_fraction, 0.6);
assert_eq!(config.common.v8.heap_policy.heap_retire_fraction, 0.8);
assert_eq!(config.common.v8.heap_policy.heap_limit_bytes, 128 * 1024 * 1024);
assert_eq!(
config.durability.commitlog.max_segment_size.map(|val| val.get()),
Some(1024 * 1024)
);
assert_eq!(config.durability.commitlog.preallocate_segments, Some(true));
assert_eq!(
config.durability.commitlog.write_buffer_size.map(|val| val.get()),
Some(128 * 1024)
);

assert_eq!(
config.websocket,
Expand Down
1 change: 1 addition & 0 deletions crates/testing/src/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ impl CompiledModule {
let env = spacetimedb_standalone::StandaloneEnv::init(
spacetimedb_standalone::StandaloneOptions {
db_config: config,
durability: Default::default(),
websocket: WebSocketOptions::default(),
wasm: Default::default(),
v8: Default::default(),
Expand Down
Loading