diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 0f0753f0f36..b9c329dda53 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -111,7 +111,7 @@ impl Options { pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed"); pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = true; pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false; - pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 8 * 1024; + pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 128 * 1024; pub const DEFAULT: Self = Self { log_format_version: DEFAULT_LOG_FORMAT_VERSION, diff --git a/crates/core/src/db/persistence.rs b/crates/core/src/db/persistence.rs index 5b0daa5145c..af12b5b45cc 100644 --- a/crates/core/src/db/persistence.rs +++ b/crates/core/src/db/persistence.rs @@ -1,4 +1,8 @@ -use std::{io, sync::Arc}; +use std::{ + io, + num::{NonZeroU64, NonZeroUsize}, + sync::Arc, +}; use async_trait::async_trait; use spacetimedb_commitlog::SizeOnDisk; @@ -13,6 +17,45 @@ use super::{ snapshot::{self, SnapshotDatabaseState, SnapshotWorker}, }; +/// Local durability configuration exposed through server config. +#[derive(Clone, Copy, Debug, Default, serde::Deserialize, PartialEq, Eq)] +#[serde(rename_all = "kebab-case")] +pub struct DurabilityConfig { + #[serde(default)] + pub commitlog: CommitlogConfig, +} + +/// Commitlog configuration exposed through server config. +#[derive(Clone, Copy, Debug, Default, serde::Deserialize, PartialEq, Eq)] +#[serde(rename_all = "kebab-case")] +pub struct CommitlogConfig { + pub max_segment_size: Option, + pub preallocate_segments: Option, + pub write_buffer_size: Option, +} + +impl DurabilityConfig { + fn into_options(self) -> spacetimedb_durability::local::Options { + let mut opts = spacetimedb_durability::local::Options::default(); + self.commitlog.apply_to(&mut opts.commitlog); + opts + } +} + +impl CommitlogConfig { + fn apply_to(self, opts: &mut spacetimedb_commitlog::Options) { + if let Some(max_segment_size) = self.max_segment_size { + opts.max_segment_size = max_segment_size.get(); + } + if let Some(preallocate_segments) = self.preallocate_segments { + opts.preallocate_segments = preallocate_segments; + } + if let Some(write_buffer_size) = self.write_buffer_size { + opts.write_buffer_size = write_buffer_size.get(); + } + } +} + /// [spacetimedb_durability::Durability] impls with a [`Txdata`] transaction /// payload, suitable for use in the [`relational_db::RelationalDB`]. pub type Durability = dyn spacetimedb_durability::Durability; @@ -128,14 +171,21 @@ pub trait PersistenceProvider: Send + Sync { /// [compresses]: relational_db::snapshot_watching_commitlog_compressor pub struct LocalPersistenceProvider { data_dir: Arc, + durability: DurabilityConfig, } impl LocalPersistenceProvider { pub fn new(data_dir: impl Into>) -> Self { Self { data_dir: data_dir.into(), + durability: DurabilityConfig::default(), } } + + pub fn with_durability_config(mut self, durability: DurabilityConfig) -> Self { + self.durability = durability; + self + } } #[async_trait] @@ -149,7 +199,12 @@ impl PersistenceProvider for LocalPersistenceProvider { asyncify(move || relational_db::open_snapshot_repo(snapshot_dir, database_identity, replica_id)) .await .map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Enabled))?; - let (durability, disk_size) = relational_db::local_durability(replica_dir, Some(&snapshot_worker)).await?; + let (durability, disk_size) = relational_db::local_durability_with_options( + replica_dir, + Some(&snapshot_worker), + self.durability.into_options(), + ) + .await?; tokio::spawn(relational_db::snapshot_watching_commitlog_compressor( snapshot_worker.subscribe(), diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 9f041c92ccb..a2b3a28db4c 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1670,6 +1670,20 @@ pub type LocalDurability = Arc>; pub async fn local_durability( replica_dir: ReplicaDir, snapshot_worker: Option<&SnapshotWorker>, +) -> Result<(LocalDurability, DiskSizeFn), DBError> { + local_durability_with_options(replica_dir, snapshot_worker, <_>::default()).await +} + +/// Initialize local durability with explicit parameters. +/// +/// Also returned is a [`DiskSizeFn`] as required by [`RelationalDB::open`]. +/// +/// Note that this operation can be expensive, as it needs to traverse a suffix +/// of the commitlog. +pub async fn local_durability_with_options( + replica_dir: ReplicaDir, + snapshot_worker: Option<&SnapshotWorker>, + opts: durability::local::Options, ) -> Result<(LocalDurability, DiskSizeFn), DBError> { let rt = tokio::runtime::Handle::current(); let on_new_segment = snapshot_worker.map(|snapshot_worker| { @@ -1684,7 +1698,7 @@ pub async fn local_durability( durability::Local::open( replica_dir.clone(), rt, - <_>::default(), + opts, // Give the durability a handle to request a new snapshot run, // which it will send down whenever we rotate commitlog segments. on_new_segment, diff --git a/crates/standalone/config.toml b/crates/standalone/config.toml index 29a4bf3556e..18369e6c781 100644 --- a/crates/standalone/config.toml +++ b/crates/standalone/config.toml @@ -44,4 +44,15 @@ directives = [ # Apply a V8 heap limit in MiB. Set to 0 to use V8's default limit. # heap-limit-mb = 0 +[durability.commitlog] +# Maximum size in bytes for each commitlog segment. +# max-segment-size = 1073741824 + +# Preallocate disk space for commitlog segments up to max-segment-size. +# Has no effect unless commitlog fallocate support is enabled. +# preallocate-segments = false + +# Size in bytes of the memory buffer holding commit data before flushing to storage. +# write-buffer-size = 131072 + # vim: set nowritebackup: << otherwise triggers cargo-watch diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index de4b80ce78c..189bafa11bd 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -12,7 +12,7 @@ use http::StatusCode; use spacetimedb::client::ClientActorIndex; use spacetimedb::config::{CertificateAuthority, MetadataFile, V8Config, WasmConfig}; use spacetimedb::db; -use spacetimedb::db::persistence::LocalPersistenceProvider; +use spacetimedb::db::persistence::{DurabilityConfig, LocalPersistenceProvider}; use spacetimedb::energy::{EnergyBalance, EnergyQuanta, NullEnergyMonitor}; use spacetimedb::host::{DiskStorage, HostController, HostRuntimeConfig, MigratePlanResult, UpdateDatabaseResult}; use spacetimedb::identity::{AuthCtx, Identity}; @@ -41,6 +41,7 @@ pub use spacetimedb_client_api::routes::subscribe::{BIN_PROTOCOL, TEXT_PROTOCOL} #[derive(Clone, Copy)] pub struct StandaloneOptions { pub db_config: db::Config, + pub durability: DurabilityConfig, pub websocket: WebSocketOptions, pub wasm: WasmConfig, pub v8: V8Config, @@ -76,7 +77,8 @@ impl StandaloneEnv { let energy_monitor = Arc::new(NullEnergyMonitor); let program_store = Arc::new(DiskStorage::new(data_dir.program_bytes().0).await?); - let persistence_provider = Arc::new(LocalPersistenceProvider::new(data_dir.clone())); + let persistence_provider = + Arc::new(LocalPersistenceProvider::new(data_dir.clone()).with_durability_config(config.durability)); let host_controller = HostController::new( data_dir, config.db_config, @@ -650,6 +652,7 @@ mod tests { storage: Storage::Memory, page_pool_max_size: None, }, + durability: DurabilityConfig::default(), websocket: WebSocketOptions::default(), wasm: WasmConfig::default(), v8: V8Config::default(), diff --git a/crates/standalone/src/subcommands/start.rs b/crates/standalone/src/subcommands/start.rs index 50f6db19257..eb319f38c75 100644 --- a/crates/standalone/src/subcommands/start.rs +++ b/crates/standalone/src/subcommands/start.rs @@ -11,6 +11,7 @@ use axum::extract::DefaultBodyLimit; use clap::ArgAction::SetTrue; use clap::{Arg, ArgMatches}; use spacetimedb::config::{parse_config, CertificateAuthority}; +use spacetimedb::db::persistence::DurabilityConfig; use spacetimedb::db::{self, Storage}; use spacetimedb::startup::{self, TracingOptions}; use spacetimedb::util::jobs::JobCores; @@ -99,6 +100,8 @@ struct ConfigFile { #[serde(flatten)] common: spacetimedb::config::ConfigFile, #[serde(default)] + durability: DurabilityConfig, + #[serde(default)] websocket: WebSocketOptions, } @@ -181,6 +184,7 @@ pub async fn exec(args: &ArgMatches, db_cores: JobCores) -> anyhow::Result<()> { let ctx = StandaloneEnv::init( StandaloneOptions { db_config, + durability: config.durability, websocket: config.websocket, wasm: config.common.wasm, v8: config.common.v8, @@ -525,6 +529,11 @@ mod tests { heap-gc-trigger-fraction = 0.6 heap-retire-fraction = 0.8 heap-limit-mb = 128 + + [durability.commitlog] + max-segment-size = 1048576 + preallocate-segments = true + write-buffer-size = 131072 "#; let config: ConfigFile = toml::from_str(toml).unwrap(); @@ -543,6 +552,15 @@ mod tests { assert_eq!(config.common.v8.heap_policy.heap_gc_trigger_fraction, 0.6); assert_eq!(config.common.v8.heap_policy.heap_retire_fraction, 0.8); assert_eq!(config.common.v8.heap_policy.heap_limit_bytes, 128 * 1024 * 1024); + assert_eq!( + config.durability.commitlog.max_segment_size.map(|val| val.get()), + Some(1024 * 1024) + ); + assert_eq!(config.durability.commitlog.preallocate_segments, Some(true)); + assert_eq!( + config.durability.commitlog.write_buffer_size.map(|val| val.get()), + Some(128 * 1024) + ); assert_eq!( config.websocket, diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 7556b7f692e..21fea57fe96 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -245,6 +245,7 @@ impl CompiledModule { let env = spacetimedb_standalone::StandaloneEnv::init( spacetimedb_standalone::StandaloneOptions { db_config: config, + durability: Default::default(), websocket: WebSocketOptions::default(), wasm: Default::default(), v8: Default::default(),