Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 118 additions & 19 deletions src/btreemap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use allocator::Allocator;
pub use iter::Iter;
use node::{DerivedPageSize, Entry, Node, NodeType, PageSize, Version};
use std::borrow::Cow;
use std::cell::RefCell;
use std::marker::PhantomData;
use std::ops::{Bound, RangeBounds};

Expand All @@ -81,6 +82,57 @@ const DEFAULT_PAGE_SIZE: u32 = 1024;
// A marker to indicate that the `PageSize` stored in the header is a `PageSize::Value`.
const PAGE_SIZE_VALUE_MARKER: u32 = u32::MAX;

const NODE_CACHE_NUM_SLOTS: usize = 32;

/// A direct-mapped node cache modeled after CPU caches.
///
/// Each slot is indexed by `(node_address / page_size) % NUM_SLOTS`. Lookup
/// is O(1) with ~5 instructions overhead. Collision = eviction (no LRU
/// tracking needed).
///
/// Upper tree levels (root, depth-1) naturally stay cached because their
/// addresses are stable and map to distinct slots.
struct NodeCache<K: Storable + Ord + Clone> {
slots: Vec<(Address, Option<Node<K>>)>,
page_size: u32,
}

impl<K: Storable + Ord + Clone> NodeCache<K> {
fn new(page_size: u32) -> Self {
let mut slots = Vec::with_capacity(NODE_CACHE_NUM_SLOTS);
for _ in 0..NODE_CACHE_NUM_SLOTS {
slots.push((NULL, None));
}
Self { slots, page_size }
}

fn slot_index(&self, addr: Address) -> usize {
(addr.get() / self.page_size as u64) as usize % NODE_CACHE_NUM_SLOTS
}

fn take(&mut self, addr: Address) -> Option<Node<K>> {
let idx = self.slot_index(addr);
if self.slots[idx].0 == addr {
self.slots[idx].0 = NULL;
self.slots[idx].1.take()
} else {
None
}
}

fn put(&mut self, addr: Address, node: Node<K>) {
let idx = self.slot_index(addr);
self.slots[idx] = (addr, Some(node));
}

fn invalidate(&mut self, addr: Address) {
let idx = self.slot_index(addr);
if self.slots[idx].0 == addr {
self.slots[idx] = (NULL, None);
}
}
}

/// A B-Tree map implementation that stores its data into a designated memory.
///
/// # Memory Implementations
Expand Down Expand Up @@ -248,6 +300,9 @@ where
// The number of elements in the map.
length: u64,

// Direct-mapped node cache to avoid re-loading hot nodes from stable memory.
cache: RefCell<NodeCache<K>>,

// A marker to communicate to the Rust compiler that we own these types.
_phantom: PhantomData<(K, V)>,
}
Expand Down Expand Up @@ -358,6 +413,7 @@ where
),
version: Version::V2(page_size),
length: 0,
cache: RefCell::new(NodeCache::new(page_size.get())),
_phantom: PhantomData,
};

Expand All @@ -373,18 +429,21 @@ where
let max_key_size = K::BOUND.max_size();
let max_value_size = V::BOUND.max_size();

let version = Version::V1(DerivedPageSize {
max_key_size,
max_value_size,
});

let btree = Self {
root_addr: NULL,
allocator: Allocator::new(
memory,
Address::from(ALLOCATOR_OFFSET as u64),
Node::<K>::max_size(max_key_size, max_value_size),
),
version: Version::V1(DerivedPageSize {
max_key_size,
max_value_size,
}),
version,
length: 0,
cache: RefCell::new(NodeCache::new(version.page_size().get())),
_phantom: PhantomData,
};

Expand Down Expand Up @@ -434,6 +493,7 @@ where
allocator: Allocator::load(memory, allocator_addr),
version,
length: header.length,
cache: RefCell::new(NodeCache::new(version.page_size().get())),
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -654,31 +714,42 @@ where
return None;
}
self.traverse(self.root_addr, key, |node, idx| {
node.extract_entry_at(idx, self.memory()).1 // Extract value.
node.value(idx, self.memory()).to_vec()
})
.map(Cow::Owned)
.map(V::from_bytes)
}

/// Returns true if the key exists.
pub fn contains_key(&self, key: &K) -> bool {
// An empty closure returns Some(()) if the key is found.
self.root_addr != NULL && self.traverse(self.root_addr, key, |_, _| ()).is_some()
}

/// Recursively traverses from `node_addr`, invoking `f` if `key` is found. Stops at a leaf if not.
///
/// Uses the node cache: nodes are taken out before use and returned after.
fn traverse<F, R>(&self, node_addr: Address, key: &K, f: F) -> Option<R>
where
F: Fn(&mut Node<K>, usize) -> R,
F: Fn(&Node<K>, usize) -> R,
{
let mut node = self.load_node(node_addr);
// Look for the key in the current node.
let node = self.take_or_load_node(node_addr);
match node.search(key, self.memory()) {
Ok(idx) => Some(f(&mut node, idx)), // Key found: apply `f`.
Err(idx) => match node.node_type() {
NodeType::Leaf => None, // At a leaf: key not present.
NodeType::Internal => self.traverse(node.child(idx), key, f), // Continue search in child.
},
Ok(idx) => {
let result = f(&node, idx);
self.return_node(node);
Some(result)
}
Err(idx) => {
let child_addr = match node.node_type() {
NodeType::Leaf => {
self.return_node(node);
return None;
}
NodeType::Internal => node.child(idx),
};
self.return_node(node);
self.traverse(child_addr, key, f)
}
}
}

Expand Down Expand Up @@ -713,6 +784,7 @@ where
self.root_addr = NULL;
self.length = 0;
self.allocator.clear();
*self.cache.get_mut() = NodeCache::new(self.version.page_size().get());
self.save_header();
}

Expand All @@ -722,8 +794,9 @@ where
if self.root_addr == NULL {
return None;
}
let root = self.load_node(self.root_addr);
let root = self.take_or_load_node(self.root_addr);
let (k, encoded_v) = root.get_min(self.memory());
self.return_node(root);
Some((k, V::from_bytes(Cow::Owned(encoded_v))))
}

Expand All @@ -733,8 +806,9 @@ where
if self.root_addr == NULL {
return None;
}
let root = self.load_node(self.root_addr);
let root = self.take_or_load_node(self.root_addr);
let (k, encoded_v) = root.get_max(self.memory());
self.return_node(root);
Some((k, V::from_bytes(Cow::Owned(encoded_v))))
}

Expand Down Expand Up @@ -1273,7 +1347,13 @@ where
/// [1, 2, 3, 4, 5, 6, 7] (stored in the `into` node)
/// `source` is deallocated.
fn merge(&mut self, source: Node<K>, mut into: Node<K>, median: Entry<K>) -> Node<K> {
let source_addr = source.address();
into.merge(source, median, &mut self.allocator);
// Node::merge saves `into` and deallocates `source` directly through
// the allocator, so we must invalidate both cache slots here.
let cache = self.cache.get_mut();
cache.invalidate(into.address());
cache.invalidate(source_addr);
into
}

Expand All @@ -1285,22 +1365,41 @@ where
}
}

/// Deallocates a node.
/// Deallocates a node and invalidates its cache slot.
#[inline]
fn deallocate_node(&mut self, node: Node<K>) {
let addr = node.address();
node.deallocate(self.allocator_mut());
self.cache.get_mut().invalidate(addr);
}

/// Takes a node from the cache, or loads it from memory if not cached.
///
/// Used by read paths (`&self`). The caller must call `return_node` when
/// done to put the node back into the cache.
fn take_or_load_node(&self, address: Address) -> Node<K> {
if let Some(node) = self.cache.borrow_mut().take(address) {
return node;
}
Node::load(address, self.version.page_size(), self.memory())
}

/// Returns a node to the cache after use on a read path.
fn return_node(&self, node: Node<K>) {
self.cache.borrow_mut().put(node.address(), node);
}

/// Loads a node from memory.
/// Loads a node from memory, bypassing the cache.
#[inline]
fn load_node(&self, address: Address) -> Node<K> {
Node::load(address, self.version.page_size(), self.memory())
}

/// Saves the node to memory.
/// Saves the node to memory and invalidates the cache slot.
#[inline]
fn save_node(&mut self, node: &mut Node<K>) {
node.save(self.allocator_mut());
self.cache.get_mut().invalidate(node.address());
}

/// Replaces the value at `idx` in the node, saves the node, and returns the old value.
Expand Down
9 changes: 0 additions & 9 deletions src/btreemap/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,6 @@ impl<K: Storable + Ord + Clone> Node<K> {
.insert(idx, (LazyKey::by_value(key), LazyValue::by_value(value)));
}

/// Returns the entry at the specified index while consuming this node.
pub fn extract_entry_at<M: Memory>(&mut self, idx: usize, memory: &M) -> Entry<K> {
let (key, value) = self.entries.swap_remove(idx);
(
self.extract_key(key, memory),
self.extract_value(value, memory),
)
}

/// Removes the entry at the specified index.
pub fn remove_entry<M: Memory>(&mut self, idx: usize, memory: &M) -> Entry<K> {
let (key, value) = self.entries.remove(idx);
Expand Down
Loading