Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,9 @@ def get_default_system_parameters(
"enable_mcp_observatory",
"mcp_max_response_size",
"user_id_pool_batch_size",
"storage_shard_pool_enabled",
"storage_shard_pool_target_size",
"storage_shard_pool_replenish_interval",
]


Expand Down
8 changes: 8 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,13 @@ def __init__(
]
self.flags_with_values["enable_case_literal_transform"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_cast_elimination"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["storage_shard_pool_enabled"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["storage_shard_pool_target_size"] = [
"0",
"5",
"10",
"20",
]

# If you are adding a new config flag in Materialize, consider using it
# here instead of just marking it as uninteresting to silence the
Expand Down Expand Up @@ -1783,6 +1790,7 @@ def __init__(
"oidc_authentication_claim",
"console_oidc_client_id",
"console_oidc_scopes",
"storage_shard_pool_replenish_interval",
]

def run(self, exe: Executor) -> bool:
Expand Down
13 changes: 10 additions & 3 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ impl CatalogState {
StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
}
StateUpdateKind::PreAllocatedShard(_) => {
// Pre-allocated shards are managed entirely by the storage controller.
// No in-memory catalog state to update.
}
}

Ok(())
Expand Down Expand Up @@ -1458,7 +1462,8 @@ impl CatalogState {
| StateUpdateKind::Schema(_)
| StateUpdateKind::NetworkPolicy(_)
| StateUpdateKind::StorageCollectionMetadata(_)
| StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
| StateUpdateKind::UnfinalizedShard(_)
| StateUpdateKind::PreAllocatedShard(_) => Vec::new(),
}
}

Expand Down Expand Up @@ -2064,7 +2069,8 @@ fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
| StateUpdateKind::SourceReferences(_)
| StateUpdateKind::AuditLog(_)
| StateUpdateKind::StorageCollectionMetadata(_)
| StateUpdateKind::UnfinalizedShard(_) => push_update(
| StateUpdateKind::UnfinalizedShard(_)
| StateUpdateKind::PreAllocatedShard(_) => push_update(
update,
diff,
&mut post_item_retractions,
Expand Down Expand Up @@ -2402,7 +2408,8 @@ impl ApplyState {
| Comment(_)
| AuditLog(_)
| StorageCollectionMetadata(_)
| UnfinalizedShard(_) => Self::Updates(vec![update]),
| UnfinalizedShard(_)
| PreAllocatedShard(_) => Self::Updates(vec![update]),
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ impl Catalog {
BootstrapStateUpdateKind::Comment(_)
| BootstrapStateUpdateKind::StorageCollectionMetadata(_)
| BootstrapStateUpdateKind::SourceReferences(_)
| BootstrapStateUpdateKind::UnfinalizedShard(_) => {
| BootstrapStateUpdateKind::UnfinalizedShard(_)
| BootstrapStateUpdateKind::PreAllocatedShard(_) => {
post_item_updates.push((kind, ts, diff));
}
BootstrapStateUpdateKind::AuditLog(_) => {
Expand Down
17 changes: 14 additions & 3 deletions src/catalog-debug/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ use mz_catalog::durable::debug::{
AuditLogCollection, ClusterCollection, ClusterIntrospectionSourceIndexCollection,
ClusterReplicaCollection, Collection, CollectionTrace, CollectionType, CommentCollection,
ConfigCollection, DatabaseCollection, DebugCatalogState, DefaultPrivilegeCollection,
IdAllocatorCollection, ItemCollection, NetworkPolicyCollection, RoleAuthCollection,
RoleCollection, SchemaCollection, SettingCollection, SourceReferencesCollection,
StorageCollectionMetadataCollection, SystemConfigurationCollection,
IdAllocatorCollection, ItemCollection, NetworkPolicyCollection, PreAllocatedShardsCollection,
RoleAuthCollection, RoleCollection, SchemaCollection, SettingCollection,
SourceReferencesCollection, StorageCollectionMetadataCollection, SystemConfigurationCollection,
SystemItemMappingCollection, SystemPrivilegeCollection, Trace, TxnWalShardCollection,
UnfinalizedShardsCollection,
};
Expand Down Expand Up @@ -322,6 +322,9 @@ macro_rules! for_collection {
$fn::<UnfinalizedShardsCollection>($($arg),*).await?
}
CollectionType::TxnWalShard => $fn::<TxnWalShardCollection>($($arg),*).await?,
CollectionType::PreAllocatedShard => {
$fn::<PreAllocatedShardsCollection>($($arg),*).await?
}
}
};
}
Expand Down Expand Up @@ -468,6 +471,7 @@ async fn dump(
storage_collection_metadata,
unfinalized_shards,
txn_wal_shard,
pre_allocated_shards,
} = if consolidate {
openable_state.trace_consolidated().await?
} else {
Expand Down Expand Up @@ -558,6 +562,13 @@ async fn dump(
consolidate,
);
dump_col(&mut data, txn_wal_shard, &ignore, stats_only, consolidate);
dump_col(
&mut data,
pre_allocated_shards,
&ignore,
stats_only,
consolidate,
);

writeln!(&mut target, "{data:#?}")?;
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/catalog-protos/objects_hashes.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"name": "objects.rs",
"md5": "42585baa1b4b5e1b6da4e361a35ec546"
"md5": "a4ef93cb057d5919b2cb3a5478b2431d"
},
{
"name": "objects_v74.rs",
Expand Down Expand Up @@ -33,6 +33,6 @@
},
{
"name": "objects_v81.rs",
"md5": "42585baa1b4b5e1b6da4e361a35ec546"
"md5": "a4ef93cb057d5919b2cb3a5478b2431d"
}
]
32 changes: 32 additions & 0 deletions src/catalog-protos/src/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,22 @@ pub struct UnfinalizedShardKey {
pub shard: String,
}

#[derive(
Clone,
Debug,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
Serialize,
Deserialize,
Arbitrary
)]
pub struct PreAllocatedShardKey {
pub shard: String,
}

#[derive(
Clone,
Debug,
Expand Down Expand Up @@ -2533,6 +2549,7 @@ pub enum StateUpdateKind {
SystemPrivileges(SystemPrivileges),
TxnWalShard(TxnWalShard),
UnfinalizedShard(UnfinalizedShard),
PreAllocatedShard(PreAllocatedShard),
}

#[derive(
Expand Down Expand Up @@ -2888,6 +2905,21 @@ pub struct UnfinalizedShard {
pub key: UnfinalizedShardKey,
}

#[derive(
Clone,
Debug,
PartialEq,
Eq,
PartialOrd,
Ord,
Serialize,
Deserialize,
Arbitrary
)]
pub struct PreAllocatedShard {
pub key: PreAllocatedShardKey,
}

#[derive(
Clone,
Debug,
Expand Down
32 changes: 32 additions & 0 deletions src/catalog-protos/src/objects_v81.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,22 @@ pub struct UnfinalizedShardKey {
pub shard: String,
}

#[derive(
Clone,
Debug,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
Serialize,
Deserialize,
Arbitrary
)]
pub struct PreAllocatedShardKey {
pub shard: String,
}

#[derive(
Clone,
Debug,
Expand Down Expand Up @@ -2533,6 +2549,7 @@ pub enum StateUpdateKind {
SystemPrivileges(SystemPrivileges),
TxnWalShard(TxnWalShard),
UnfinalizedShard(UnfinalizedShard),
PreAllocatedShard(PreAllocatedShard),
}

#[derive(
Expand Down Expand Up @@ -2888,6 +2905,21 @@ pub struct UnfinalizedShard {
pub key: UnfinalizedShardKey,
}

#[derive(
Clone,
Debug,
PartialEq,
Eq,
PartialOrd,
Ord,
Serialize,
Deserialize,
Arbitrary
)]
pub struct PreAllocatedShard {
pub key: PreAllocatedShardKey,
}

#[derive(
Clone,
Debug,
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/src/durable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ pub use crate::durable::objects::state_update::StateUpdate;
use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
pub use crate::durable::objects::{
Cluster, ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged, Comment,
Database, DefaultPrivilege, IntrospectionSourceIndex, Item, NetworkPolicy, ReplicaConfig,
ReplicaLocation, Role, RoleAuth, Schema, SourceReference, SourceReferences,
Database, DefaultPrivilege, IntrospectionSourceIndex, Item, NetworkPolicy, PreAllocatedShard,
ReplicaConfig, ReplicaLocation, Role, RoleAuth, Schema, SourceReference, SourceReferences,
StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping,
UnfinalizedShard,
};
Expand Down
13 changes: 13 additions & 0 deletions src/catalog/src/durable/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub enum CollectionType {
SystemPrivileges,
StorageCollectionMetadata,
UnfinalizedShard,
PreAllocatedShard,
TxnWalShard,
}

Expand Down Expand Up @@ -289,6 +290,14 @@ collection_impl!({
trace_field: unfinalized_shards,
update: StateUpdateKind::UnfinalizedShard,
});
collection_impl!({
name: PreAllocatedShardsCollection,
key: proto::PreAllocatedShardKey,
value: (),
collection_type: CollectionType::PreAllocatedShard,
trace_field: pre_allocated_shards,
update: StateUpdateKind::PreAllocatedShard,
});
collection_impl!({
name: TxnWalShardCollection,
key: (),
Expand Down Expand Up @@ -349,6 +358,7 @@ pub struct Trace {
pub system_privileges: CollectionTrace<SystemPrivilegeCollection>,
pub storage_collection_metadata: CollectionTrace<StorageCollectionMetadataCollection>,
pub unfinalized_shards: CollectionTrace<UnfinalizedShardsCollection>,
pub pre_allocated_shards: CollectionTrace<PreAllocatedShardsCollection>,
pub txn_wal_shard: CollectionTrace<TxnWalShardCollection>,
}

Expand Down Expand Up @@ -376,6 +386,7 @@ impl Trace {
system_privileges: CollectionTrace::new(),
storage_collection_metadata: CollectionTrace::new(),
unfinalized_shards: CollectionTrace::new(),
pre_allocated_shards: CollectionTrace::new(),
txn_wal_shard: CollectionTrace::new(),
}
}
Expand Down Expand Up @@ -403,6 +414,7 @@ impl Trace {
system_privileges,
storage_collection_metadata,
unfinalized_shards,
pre_allocated_shards,
txn_wal_shard,
} = self;
audit_log.sort();
Expand All @@ -426,6 +438,7 @@ impl Trace {
system_privileges.sort();
storage_collection_metadata.sort();
unfinalized_shards.sort();
pre_allocated_shards.sort();
txn_wal_shard.sort();
}
}
Expand Down
32 changes: 32 additions & 0 deletions src/catalog/src/durable/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,30 @@ impl DurableType for UnfinalizedShard {
}
}

#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq)]
pub struct PreAllocatedShard {
pub shard: ShardId,
}

impl DurableType for PreAllocatedShard {
type Key = PreAllocatedShardKey;
type Value = ();

fn into_key_value(self) -> (Self::Key, Self::Value) {
(PreAllocatedShardKey { shard: self.shard }, ())
}

fn from_key_value(key: Self::Key, _value: Self::Value) -> Self {
Self { shard: key.shard }
}

fn key(&self) -> Self::Key {
PreAllocatedShardKey {
shard: self.shard.clone(),
}
}
}

// Structs used internally to represent on-disk state.

/// A snapshot of the current on-disk state.
Expand Down Expand Up @@ -1159,6 +1183,7 @@ pub struct Snapshot {
pub storage_collection_metadata:
BTreeMap<proto::StorageCollectionMetadataKey, proto::StorageCollectionMetadataValue>,
pub unfinalized_shards: BTreeMap<proto::UnfinalizedShardKey, ()>,
pub pre_allocated_shards: BTreeMap<proto::PreAllocatedShardKey, ()>,
pub txn_wal_shard: BTreeMap<(), proto::TxnWalShardValue>,
}

Expand Down Expand Up @@ -1434,6 +1459,13 @@ pub struct UnfinalizedShardKey {
pub(crate) shard: ShardId,
}

/// This value is stored transparently, however, it should only ever be
/// manipulated by the storage controller.
#[derive(Debug, Clone, PartialOrd, PartialEq, Eq, Ord)]
pub struct PreAllocatedShardKey {
pub(crate) shard: ShardId,
}

/// This value is stored transparently, however, it should only ever be
/// manipulated by the storage controller.
#[derive(Debug, Clone, PartialOrd, PartialEq, Eq, Ord)]
Expand Down
Loading
Loading