diff --git a/crates/memory-usage/src/lib.rs b/crates/memory-usage/src/lib.rs index 1ea3755810c..a89762cb5cb 100644 --- a/crates/memory-usage/src/lib.rs +++ b/crates/memory-usage/src/lib.rs @@ -126,6 +126,13 @@ impl MemoryUsage for std::collections::BTreeMap< } } +impl MemoryUsage for std::collections::BTreeSet { + fn heap_usage(&self) -> usize { + // NB: this is best-effort, since we don't have a `capacity()` method on `BTreeMap`. + self.len() * mem::size_of::() + self.iter().map(|t| t.heap_usage()).sum::() + } +} + #[cfg(feature = "smallvec")] impl MemoryUsage for smallvec::SmallVec where diff --git a/crates/table/src/page.rs b/crates/table/src/page.rs index dc6acacf82f..29c58ad95c4 100644 --- a/crates/table/src/page.rs +++ b/crates/table/src/page.rs @@ -382,6 +382,13 @@ impl PageHeader { pub(super) fn present_rows_storage_ptr_for_test(&self) -> *const () { self.fixed.present_rows.storage().as_ptr().cast() } + + /// Returns the number of var-len granules available for allocation, + /// including those in the "gap" between the fixed-len and var-len part of the page. + fn available_var_len_granules(&self) -> usize { + self.var.freelist_len as usize + + VarLenGranule::space_to_granules(gap_remaining_size(self.var.first, self.fixed.last)) + } } /// Fixed-length row portions must be at least large enough to store a `FreeCellRef`. @@ -1195,6 +1202,12 @@ impl Page { self.header.var.num_granules as usize } + /// Returns the number of var-len granules free to store data, + /// including those in the "gap" between the fixed-len and var-len part of the page. + pub fn available_var_len_granules(&self) -> usize { + self.header.available_var_len_granules() + } + #[cfg(test)] /// # Safety /// @@ -1367,12 +1380,6 @@ impl Page { gap_in_granules >= needed_granules_after_freelist } - /// Returns whether the row is full with respect to storing a fixed row with `fixed_row_size` - /// and no variable component. - pub fn is_full(&self, fixed_row_size: Size) -> bool { - !self.has_space_for_row(fixed_row_size, 0) - } - /// Will leave partially-allocated chunks if fails prematurely, /// so always check `Self::has_space_for_row` before calling. /// diff --git a/crates/table/src/pages.rs b/crates/table/src/pages.rs index 14bc46345db..d1f9dbcfbce 100644 --- a/crates/table/src/pages.rs +++ b/crates/table/src/pages.rs @@ -9,6 +9,7 @@ use super::var_len::VarLenMembers; use core::ops::{ControlFlow, Deref, Index, IndexMut}; use spacetimedb_sats::layout::Size; use spacetimedb_sats::memory_usage::MemoryUsage; +use std::collections::BTreeSet; use std::ops::DerefMut; use thiserror::Error; @@ -39,8 +40,21 @@ impl IndexMut for Pages { pub struct Pages { /// The collection of pages under management. pages: Vec>, - /// The set of pages that aren't yet full. - non_full_pages: Vec, + /// The set of pages that aren't yet full, + /// sorted by the number of var-len granules available in each page. + /// + /// Used during insertion to locate a page with enough space to store a given row. + /// + /// The first value in the tuple is [`Page::available_var_len_granules`], and the second value is the page index. + /// + /// Pages for which `available_var_len_granules` is 0 are not stored. + /// + /// If multiple pages have the same number of granules available, they are then sorted by `PageIndex`. + /// This maintains a deterministic sort order, + /// so that replaying the same set of operations on multiple datastores + /// will always result in the same layout of rows in pages, + /// regardless of when those datastores were (re)started prior to or during the sequence of operations. + non_full_pages: BTreeSet<(usize, PageIndex)>, } impl MemoryUsage for Pages { @@ -51,6 +65,47 @@ impl MemoryUsage for Pages { } impl Pages { + #[cfg(test)] + pub(crate) fn assert_num_full_pages_consistent(&self) { + let mut seen_page_indexes = BTreeSet::new(); + for &(_, page_index) in &self.non_full_pages { + assert!( + seen_page_indexes.insert(page_index), + "page {:?} appears multiple times in non_full_pages", + page_index + ); + } + + for (idx, page) in self.pages.iter().enumerate() { + let page_index = PageIndex(idx as u64); + let available_granules = page.available_var_len_granules(); + let entries_for_page: Vec<_> = self + .non_full_pages + .iter() + .copied() + .filter(|&(_, idx)| idx == page_index) + .collect(); + + if available_granules == 0 { + assert!( + entries_for_page.is_empty(), + "page {:?} has 0 available var-len granules but appears in non_full_pages as {:?}", + page_index, + entries_for_page + ); + } else { + assert_eq!( + entries_for_page, + vec![(available_granules, page_index)], + "page {:?} has {} available var-len granules but non_full_pages has {:?}", + page_index, + available_granules, + entries_for_page + ); + } + } + } + /// Is there space to allocate another page? pub fn can_allocate_new_page(&self) -> Result { let new_idx = self.len(); @@ -78,7 +133,14 @@ impl Pages { page.clear(); } // Mark every page non-full. - self.non_full_pages = (0..self.pages.len()).map(|idx| PageIndex(idx as u64)).collect(); + self.non_full_pages = (0..self.pages.len()) + // We could probably compute the number of available granules once and use it for all pages, + // rather than calling the method on each page, + // but we'd have to do some amount of reasoning to demonstrate it was correct + // based on the definition of `Page::clear`, + // and why bother? + .map(|idx| (self.pages[idx].available_var_len_granules(), PageIndex(idx as u64))) + .collect(); } /// Get a reference to fixed-len row data. @@ -94,7 +156,7 @@ impl Pages { /// returning an error if the new number of pages would overflow `PageIndex::MAX`. /// /// The new page is initially empty, but is not added to the non-full set. - /// Callers should call [`Pages::maybe_mark_page_non_full`] after operating on the new page. + /// Callers should call [`Pages::record_page_available_granules`] after operating on the new page. fn allocate_new_page(&mut self, pool: &PagePool, fixed_row_size: Size) -> Result { let new_idx = self.can_allocate_new_page()?; @@ -107,23 +169,10 @@ impl Pages { /// Reserve a new, initially empty page. pub fn reserve_empty_page(&mut self, pool: &PagePool, fixed_row_size: Size) -> Result { let idx = self.allocate_new_page(pool, fixed_row_size)?; - self.mark_page_non_full(idx); + self.record_page_available_granules(idx); Ok(idx) } - /// Mark the page at `idx` as non-full. - pub fn mark_page_non_full(&mut self, idx: PageIndex) { - self.non_full_pages.push(idx); - } - - /// If the page at `page_index` is not full, - /// add it to the non-full set so that later insertions can access it. - pub fn maybe_mark_page_non_full(&mut self, page_index: PageIndex, fixed_row_size: Size) { - if !self[page_index].is_full(fixed_row_size) { - self.non_full_pages.push(page_index); - } - } - /// Call `f` with a reference to a page which satisfies /// `page.has_space_for_row(fixed_row_size, num_var_len_granules)`. pub fn with_page_to_insert_row( @@ -135,7 +184,7 @@ impl Pages { ) -> Result<(PageIndex, Res), Error> { let page_index = self.find_page_with_space_for_row(pool, fixed_row_size, num_var_len_granules)?; let res = f(&mut self[page_index]); - self.maybe_mark_page_non_full(page_index, fixed_row_size); + self.record_page_available_granules(page_index); Ok((page_index, res)) } @@ -143,7 +192,7 @@ impl Pages { /// containing `num_var_len_granules` granules of var-len data. /// /// Retrieving a page in this way will remove it from the non-full set. - /// After performing an insertion, the caller should use [`Pages::maybe_mark_page_non_full`] + /// After performing an insertion, the caller should use [`Pages::record_page_available_granules`] /// to restore the page to the non-full set. fn find_page_with_space_for_row( &mut self, @@ -151,14 +200,13 @@ impl Pages { fixed_row_size: Size, num_var_len_granules: usize, ) -> Result { - if let Some((page_idx_idx, page_idx)) = self + if let Some((page_num_free_granules, page_idx)) = self .non_full_pages - .iter() + .range((num_var_len_granules, PageIndex(0))..) .copied() - .enumerate() .find(|(_, page_idx)| self[*page_idx].has_space_for_row(fixed_row_size, num_var_len_granules)) { - self.non_full_pages.swap_remove(page_idx_idx); + self.non_full_pages.remove(&(page_num_free_granules, page_idx)); return Ok(page_idx); } @@ -232,23 +280,69 @@ impl Pages { row_ptr: RowPointer, blob_store: &mut dyn BlobStore, ) -> BlobNumBytes { - let page = &mut self[row_ptr.page_index()]; - let full_before = page.is_full(fixed_row_size); - // SAFETY: - // - `row_ptr.page_offset()` does point to a valid row in this page - // as the caller promised that `row_ptr` points to a valid row in `self`. - // - // - `fixed_row_size` is consistent with the size in bytes of the fixed part of the row. - // The size is also conistent with `var_len_visitor`. - let blob_store_deleted_bytes = - unsafe { page.delete_row(row_ptr.page_offset(), fixed_row_size, var_len_visitor, blob_store) }; - - // If the page was previously full, mark it as non-full now, - // since we just opened a space in it. - if full_before { - self.mark_page_non_full(row_ptr.page_index()); + let page_index = row_ptr.page_index(); + + self.with_updating_non_full_pages(page_index, |this| { + let page = &mut this[page_index]; + + // SAFETY: + // - `row_ptr.page_offset()` does point to a valid row in this page + // as the caller promised that `row_ptr` points to a valid row in `self`. + // + // - `fixed_row_size` is consistent with the size in bytes of the fixed part of the row. + // The size is also conistent with `var_len_visitor`. + unsafe { page.delete_row(row_ptr.page_offset(), fixed_row_size, var_len_visitor, blob_store) } + }) + } + + /// Collect information about the page `self[page_index]` sufficient to update [`Self::non_full_pages`], + /// then run `body` to update the page, and finally update [`Self::non_full_pages`] for its new fullness and capacity. + /// + /// `body` should not update any pages other than the one identified by `page_index`. + fn with_updating_non_full_pages(&mut self, page_index: PageIndex, body: impl FnOnce(&mut Self) -> Ret) -> Ret { + let page = &self[page_index]; + + let available_granules_before = page.available_var_len_granules(); + + let ret = body(self); + + self.update_page_available_granules(available_granules_before, page_index); + + ret + } + + /// Update [`Self::non_full_pages`] to change the number of var-len granules available in the page at `self[page_index]`, + /// first deleting any old entry and then re-inserting the new entry. + /// + /// The entry for `page` in `self.non_full_granules` should not have been deleted prior to calling this method. + /// If the entry has already been deleted or was never present, instead use [`Self::record_page_available_ganules`]. + /// + /// `available_granules_before` should be the previous count from [`Page::available_var_len_granules`], + /// prior to whatever operation made space available in the page. + /// This is necessary because `non_full_pages` is a `BTreeSet` sorted by `(available_granules, page_index)`, + /// so locating the `page_index` without the `available_granules` would be slow. + fn update_page_available_granules(&mut self, available_granules_before: usize, page_index: PageIndex) { + if available_granules_before != 0 { + let _prev = self.non_full_pages.remove(&(available_granules_before, page_index)); + debug_assert!(_prev); + } else { + debug_assert!(!self.non_full_pages.remove(&(available_granules_before, page_index))); + } + + self.record_page_available_granules(page_index); + } + + /// Record the number of available var-len granules in the page at `self[page_index]` into [`Self::non_full_pages`]. + /// + /// Prior to calling this function, there must not be an entry for `page_index` in [`Self::non_full_pages`]. + fn record_page_available_granules(&mut self, page_index: PageIndex) { + debug_assert!(!self.non_full_pages.iter().any(|(_, idx)| *idx == page_index)); + + let available_granules = self[page_index].available_var_len_granules(); + + if available_granules != 0 { + self.non_full_pages.insert((available_granules, page_index)); } - blob_store_deleted_bytes } /// Materialize a view of rows in `self` for which the `filter` returns `true`. @@ -353,12 +447,15 @@ impl Pages { /// Should only ever be called when `self.is_empty()`. /// /// Also populates `self.non_full_pages`. - pub fn set_contents(&mut self, pages: Vec>, fixed_row_size: Size) { + pub fn set_contents(&mut self, pages: Vec>) { debug_assert!(self.is_empty()); self.non_full_pages = pages .iter() .enumerate() - .filter_map(|(idx, page)| (!page.is_full(fixed_row_size)).then_some(PageIndex(idx as _))) + .filter_map(|(idx, page)| { + let num_granules = page.available_var_len_granules(); + (num_granules != 0).then_some((page.available_var_len_granules(), PageIndex(idx as _))) + }) .collect(); self.pages = pages; } diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 1d406b42eaf..6b1c032aff3 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -1605,7 +1605,7 @@ impl Table { /// /// The schema of rows stored in the `pages` must exactly match `self.schema` and `self.inner.row_layout`. pub unsafe fn set_pages(&mut self, pages: Vec>, blob_store: &dyn BlobStore) { - self.inner.pages.set_contents(pages, self.inner.row_layout.size()); + self.inner.pages.set_contents(pages); // Recompute table metadata based on the new pages. // Compute the row count first, in case later computations want to use it as a capacity to pre-allocate. @@ -2746,6 +2746,39 @@ pub(crate) mod test { // which is already what the actual implementation does. } + /// Test that the recording of non-full pages and counts of var-len granules available + /// by the page manager are correct after insertions and deletes. + /// + /// Tested here rather than in pages.rs because it's easier to test with typed rows than raw byte buffers. + #[test] + fn non_full_pages_consistent((ty, vals) in generate_typed_row_vec(0..SIZE, 128, 2048)) { + let pool = PagePool::new_for_test(); + let mut blob_store = HashMapBlobStore::default(); + let mut table = table(ty); + let mut inserted_row_ptrs = Vec::new(); + + table.inner.pages.assert_num_full_pages_consistent(); + + // Insert 3 rows at a time, then delete the last 1. + // This keeps the page usage growing towards fullness, but also includes some deletes. + for rows in vals.chunks(3) { + for row in rows { + let row_ptr = match table.insert(&pool, &mut blob_store, row) { + Ok((_, row_ref)) => row_ref.pointer(), + Err(InsertError::Duplicate(_)) => continue, + Err(e) => return Err(TestCaseError::fail(format!("unexpected insert error: {e:?}"))), + }; + inserted_row_ptrs.push(row_ptr); + table.inner.pages.assert_num_full_pages_consistent(); + } + + if let Some(row_ptr) = inserted_row_ptrs.pop() { + table.delete(&mut blob_store, row_ptr, |_| ()); + table.inner.pages.assert_num_full_pages_consistent(); + } + } + } + #[test] fn index_size_reporting_matches_slow_implementations_single_column( (ty, vals) in generate_typed_row_vec(1..SIZE, 128, 2048),