Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions vortex-session/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::ops::Deref;
use std::sync::Arc;

use arcref::ArcRef;
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use vortex_error::VortexExpect;
use vortex_utils::aliases::dash_map::DashMap;

Expand Down Expand Up @@ -71,24 +71,24 @@ impl<T: Clone> Registry<T> {
///
/// ## Upcoming Changes
///
/// 1. This object holds an Arc of Mutex internally because we need concurrent access from the
/// 1. This object holds an Arc of RwLock internally because we need concurrent access from the
/// layout writer code path. We should update SegmentSink to take an Array rather than
/// ByteBuffer such that serializing arrays is done sequentially.
/// 2. The name is terrible. `Interner<T>` is better, but I want to minimize breakage for now.
#[derive(Clone, Debug)]
pub struct Context<T> {
// TODO(ngates): it's a long story, but if we make SegmentSink and SegmentSource take an
// enum of Segment { Array, DType, Buffer } then we don't actually need a mutable context
// in the LayoutWriter, therefore we don't need a Mutex here and everyone is happier.
ids: Arc<Mutex<Vec<Id>>>,
// in the LayoutWriter, therefore we don't need a RwLock here and everyone is happier.
ids: Arc<RwLock<Vec<Id>>>,
// Optional registry used to filter the permissible interned items.
registry: Option<Registry<T>>,
}

impl<T> Default for Context<T> {
fn default() -> Self {
Self {
ids: Arc::new(Mutex::new(Vec::new())),
ids: Arc::new(RwLock::new(Vec::new())),
registry: None,
}
}
Expand All @@ -98,7 +98,7 @@ impl<T: Clone> Context<T> {
/// Create a context with the given initial IDs.
pub fn new(ids: Vec<Id>) -> Self {
Self {
ids: Arc::new(Mutex::new(ids)),
ids: Arc::new(RwLock::new(ids)),
registry: None,
}
}
Expand All @@ -123,7 +123,7 @@ impl<T: Clone> Context<T> {
return None;
}

let mut ids = self.ids.lock();
let mut ids = self.ids.write();
if let Some(idx) = ids.iter().position(|e| e == id) {
return Some(u16::try_from(idx).vortex_expect("Cannot have more than u16::MAX items"));
}
Expand All @@ -139,11 +139,11 @@ impl<T: Clone> Context<T> {

/// Resolve an interned ID by its index.
pub fn resolve(&self, idx: u16) -> Option<Id> {
self.ids.lock().get(idx as usize).cloned()
self.ids.read().get(idx as usize).cloned()
}

/// Get the list of interned IDs.
pub fn to_ids(&self) -> Vec<Id> {
self.ids.lock().clone()
self.ids.read().clone()
}
}
Loading