From 2055aba350190398bb757002aa432baa887794d4 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Tue, 14 Apr 2026 00:54:16 +0900 Subject: [PATCH 1/3] feat: support hdfs using hdfs native --- crates/paimon/Cargo.toml | 3 +- crates/paimon/src/io/mod.rs | 5 + crates/paimon/src/io/storage.rs | 107 ++++++++++++++++- crates/paimon/src/io/storage_hdfs.rs | 165 +++++++++++++++++++++++++++ 4 files changed, 278 insertions(+), 2 deletions(-) create mode 100644 crates/paimon/src/io/storage_hdfs.rs diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 7477a3b8..c3b18b12 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -30,13 +30,14 @@ version.workspace = true [features] default = ["storage-memory", "storage-fs", "storage-oss"] -storage-all = ["storage-memory", "storage-fs", "storage-oss", "storage-s3"] +storage-all = ["storage-memory", "storage-fs", "storage-oss", "storage-s3", "storage-hdfs"] fulltext = ["tantivy", "tempfile"] storage-memory = ["opendal/services-memory"] storage-fs = ["opendal/services-fs"] storage-oss = ["opendal/services-oss"] storage-s3 = ["opendal/services-s3"] +storage-hdfs = ["opendal/services-hdfs-native"] [dependencies] url = "2.5.2" diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/io/mod.rs index 92a909a7..7e49c3c8 100644 --- a/crates/paimon/src/io/mod.rs +++ b/crates/paimon/src/io/mod.rs @@ -40,3 +40,8 @@ use storage_oss::*; mod storage_s3; #[cfg(feature = "storage-s3")] use storage_s3::*; + +#[cfg(feature = "storage-hdfs")] +mod storage_hdfs; +#[cfg(feature = "storage-hdfs")] +use storage_hdfs::*; diff --git a/crates/paimon/src/io/storage.rs b/crates/paimon/src/io/storage.rs index ccef9828..d6c71a61 100644 --- a/crates/paimon/src/io/storage.rs +++ b/crates/paimon/src/io/storage.rs @@ -16,9 +16,13 @@ // under the License. use std::collections::HashMap; +#[cfg(any(feature = "storage-oss", feature = "storage-s3", feature = "storage-hdfs"))] +use std::sync::Mutex; #[cfg(any(feature = "storage-oss", feature = "storage-s3"))] -use std::sync::{Mutex, MutexGuard}; +use std::sync::MutexGuard; +#[cfg(feature = "storage-hdfs")] +use opendal::services::HdfsNativeConfig; #[cfg(feature = "storage-oss")] use opendal::services::OssConfig; #[cfg(feature = "storage-s3")] @@ -48,6 +52,11 @@ pub enum Storage { config: Box, operators: Mutex>, }, + #[cfg(feature = "storage-hdfs")] + Hdfs { + config: Box, + op: Mutex>, + }, } impl Storage { @@ -80,6 +89,14 @@ impl Storage { operators: Mutex::new(HashMap::new()), }) } + #[cfg(feature = "storage-hdfs")] + Scheme::HdfsNative => { + let config = super::hdfs_config_parse(props)?; + Ok(Self::Hdfs { + config: Box::new(config), + op: Mutex::new(None), + }) + } _ => Err(error::Error::IoUnsupported { message: "Unsupported storage feature".to_string(), }), @@ -104,6 +121,22 @@ impl Storage { let op = Self::cached_s3_operator(config, operators, path, &bucket)?; Ok((op, relative_path)) } + #[cfg(feature = "storage-hdfs")] + Storage::Hdfs { config, op } => { + let relative_path = Self::hdfs_relative_path(path)?; + let mut guard = op.lock().map_err(|_| error::Error::UnexpectedError { + message: "Failed to lock HDFS operator".to_string(), + source: None, + })?; + // HDFS uses a single operator per Storage instance (unlike S3/OSS + // which cache per bucket). The operator is lazily initialized from + // the first path's NameNode if not set in config. One FileIO + // instance should target exactly one HDFS cluster. + if guard.is_none() { + *guard = Some(super::hdfs_config_build(config, path)?); + } + Ok((guard.as_ref().unwrap().clone(), relative_path)) + } } } @@ -226,12 +259,84 @@ impl Storage { Ok(op) } + #[cfg(feature = "storage-hdfs")] + fn hdfs_relative_path(path: &str) -> crate::Result<&str> { + let after_scheme = path.strip_prefix("hdfs://").ok_or_else(|| { + error::Error::ConfigInvalid { + message: format!("Invalid HDFS path: {path}, should start with hdfs://"), + } + })?; + match after_scheme.find('/') { + Some(pos) => Ok(&after_scheme[pos + 1..]), + None => Err(error::Error::ConfigInvalid { + message: format!("Invalid HDFS path: {path}, missing path component"), + }), + } + } + fn parse_scheme(scheme: &str) -> crate::Result { match scheme { "memory" => Ok(Scheme::Memory), "file" | "" => Ok(Scheme::Fs), "s3" | "s3a" => Ok(Scheme::S3), + "hdfs" => Ok(Scheme::HdfsNative), s => Ok(s.parse::()?), } } } + +#[cfg(all(test, feature = "storage-hdfs"))] +mod hdfs_tests { + use super::Storage; + use crate::io::FileIOBuilder; + + #[test] + fn test_hdfs_relative_path_normal() { + let result = Storage::hdfs_relative_path("hdfs://namenode:8020/warehouse/db/table"); + assert_eq!(result.unwrap(), "warehouse/db/table"); + } + + #[test] + fn test_hdfs_relative_path_root_slash() { + let result = Storage::hdfs_relative_path("hdfs://namenode:8020/"); + assert_eq!(result.unwrap(), ""); + } + + #[test] + fn test_hdfs_relative_path_no_port() { + let result = Storage::hdfs_relative_path("hdfs://nameservice1/warehouse/data"); + assert_eq!(result.unwrap(), "warehouse/data"); + } + + #[test] + fn test_hdfs_relative_path_missing_path_component() { + let result = Storage::hdfs_relative_path("hdfs://namenode:8020"); + assert!(result.is_err()); + } + + #[test] + fn test_hdfs_relative_path_wrong_scheme() { + let result = Storage::hdfs_relative_path("s3://bucket/key"); + assert!(result.is_err()); + } + + #[test] + fn test_parse_scheme_hdfs() { + let scheme = Storage::parse_scheme("hdfs").unwrap(); + assert_eq!(scheme, opendal::Scheme::HdfsNative); + } + + #[test] + fn test_file_io_builder_hdfs() { + let file_io = FileIOBuilder::new("hdfs") + .with_prop("hdfs.name-node", "hdfs://namenode:8020") + .build(); + assert!(file_io.is_ok()); + } + + #[test] + fn test_file_io_from_url_hdfs() { + let builder = crate::io::FileIO::from_url("hdfs://namenode:8020/warehouse"); + assert!(builder.is_ok()); + } +} diff --git a/crates/paimon/src/io/storage_hdfs.rs b/crates/paimon/src/io/storage_hdfs.rs new file mode 100644 index 00000000..248492ba --- /dev/null +++ b/crates/paimon/src/io/storage_hdfs.rs @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use opendal::services::HdfsNativeConfig; +use opendal::Operator; +use url::Url; + +use crate::error::Error; +use crate::Result; + +/// Configuration key for HDFS name node URL. +/// +/// Example: "hdfs://namenode:8020" or "hdfs://nameservice1" (HA). +const HDFS_NAME_NODE: &str = "hdfs.name-node"; + +/// Configuration key to enable HDFS append capability. +const HDFS_ENABLE_APPEND: &str = "hdfs.enable-append"; + +/// Parse paimon catalog options into an [`HdfsNativeConfig`]. +/// +/// Extracts HDFS-related configuration from the properties map. +/// The `hdfs.name-node` key is optional — if omitted, the name node +/// will be extracted from the file path URL at operator build time. +pub(crate) fn hdfs_config_parse(props: HashMap) -> Result { + let mut cfg = HdfsNativeConfig::default(); + + cfg.name_node = props.get(HDFS_NAME_NODE).cloned(); + + if let Some(v) = props.get(HDFS_ENABLE_APPEND) { + if v.eq_ignore_ascii_case("true") { + cfg.enable_append = true; + } + } + + Ok(cfg) +} + +/// Build an [`Operator`] for the given HDFS path. +/// +/// If the config has no `name_node` set, it will be extracted from the path URL. +/// The root is set to "/" so that relative paths work correctly. +/// +/// Example path: "hdfs://namenode:8020/warehouse/db/table" +pub(crate) fn hdfs_config_build(cfg: &HdfsNativeConfig, path: &str) -> Result { + let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { + message: format!("Invalid HDFS url: {path}"), + })?; + + let mut cfg = cfg.clone(); + + if cfg.name_node.is_none() { + let host = url.host_str().ok_or_else(|| Error::ConfigInvalid { + message: format!("Invalid HDFS url: {path}, missing name node host"), + })?; + let port_part = url.port().map(|p| format!(":{p}")).unwrap_or_default(); + cfg.name_node = Some(format!("hdfs://{host}{port_part}")); + } + + cfg.root = Some("/".to_string()); + + Ok(Operator::from_config(cfg)?.finish()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_props(pairs: &[(&str, &str)]) -> HashMap { + pairs + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } + + #[test] + fn test_hdfs_config_parse_with_name_node() { + let props = make_props(&[("hdfs.name-node", "hdfs://namenode:8020")]); + let cfg = hdfs_config_parse(props).unwrap(); + assert_eq!(cfg.name_node.as_deref(), Some("hdfs://namenode:8020")); + assert!(!cfg.enable_append); + } + + #[test] + fn test_hdfs_config_parse_with_enable_append() { + let props = make_props(&[ + ("hdfs.name-node", "hdfs://namenode:8020"), + ("hdfs.enable-append", "true"), + ]); + let cfg = hdfs_config_parse(props).unwrap(); + assert!(cfg.enable_append); + } + + #[test] + fn test_hdfs_config_parse_empty_props() { + let cfg = hdfs_config_parse(HashMap::new()).unwrap(); + assert!(cfg.name_node.is_none()); + assert!(!cfg.enable_append); + } + + #[test] + fn test_hdfs_config_build_extracts_name_node_from_path() { + let cfg = HdfsNativeConfig::default(); + let op = hdfs_config_build(&cfg, "hdfs://namenode:8020/warehouse/db").unwrap(); + assert_eq!(op.info().scheme().to_string(), "hdfs-native"); + } + + #[test] + fn test_hdfs_config_build_uses_config_name_node() { + let mut cfg = HdfsNativeConfig::default(); + cfg.name_node = Some("hdfs://my-cluster:9000".to_string()); + let op = hdfs_config_build(&cfg, "hdfs://my-cluster:9000/warehouse").unwrap(); + assert_eq!(op.info().scheme().to_string(), "hdfs-native"); + } + + #[test] + fn test_hdfs_config_build_invalid_url() { + let cfg = HdfsNativeConfig::default(); + let result = hdfs_config_build(&cfg, "not-a-valid-url"); + assert!(result.is_err()); + } + + #[test] + fn test_hdfs_config_build_missing_host() { + let cfg = HdfsNativeConfig::default(); + let result = hdfs_config_build(&cfg, "hdfs:///path/without/host"); + assert!(result.is_err()); + } + + #[test] + fn test_hdfs_config_parse_enable_append_false() { + let props = make_props(&[ + ("hdfs.name-node", "hdfs://namenode:8020"), + ("hdfs.enable-append", "false"), + ]); + let cfg = hdfs_config_parse(props).unwrap(); + assert!(!cfg.enable_append); + } + + #[test] + fn test_hdfs_config_parse_unrelated_keys_ignored() { + let props = make_props(&[ + ("s3.endpoint", "https://s3.amazonaws.com"), + ("fs.oss.endpoint", "https://oss.aliyuncs.com"), + ("hdfs.name-node", "hdfs://namenode:8020"), + ]); + let cfg = hdfs_config_parse(props).unwrap(); + assert_eq!(cfg.name_node.as_deref(), Some("hdfs://namenode:8020")); + } +} From 9914ac0b103c35601b07f1fa867b434378c5dc71 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Tue, 14 Apr 2026 01:04:42 +0900 Subject: [PATCH 2/3] refactor: move hdfs specific logic and test to storage_hdfs --- crates/paimon/src/io/storage.rs | 73 +--------------------------- crates/paimon/src/io/storage_hdfs.rs | 47 ++++++++++++++++++ 2 files changed, 48 insertions(+), 72 deletions(-) diff --git a/crates/paimon/src/io/storage.rs b/crates/paimon/src/io/storage.rs index d6c71a61..9b08b02b 100644 --- a/crates/paimon/src/io/storage.rs +++ b/crates/paimon/src/io/storage.rs @@ -123,7 +123,7 @@ impl Storage { } #[cfg(feature = "storage-hdfs")] Storage::Hdfs { config, op } => { - let relative_path = Self::hdfs_relative_path(path)?; + let relative_path = super::hdfs_relative_path(path)?; let mut guard = op.lock().map_err(|_| error::Error::UnexpectedError { message: "Failed to lock HDFS operator".to_string(), source: None, @@ -259,21 +259,6 @@ impl Storage { Ok(op) } - #[cfg(feature = "storage-hdfs")] - fn hdfs_relative_path(path: &str) -> crate::Result<&str> { - let after_scheme = path.strip_prefix("hdfs://").ok_or_else(|| { - error::Error::ConfigInvalid { - message: format!("Invalid HDFS path: {path}, should start with hdfs://"), - } - })?; - match after_scheme.find('/') { - Some(pos) => Ok(&after_scheme[pos + 1..]), - None => Err(error::Error::ConfigInvalid { - message: format!("Invalid HDFS path: {path}, missing path component"), - }), - } - } - fn parse_scheme(scheme: &str) -> crate::Result { match scheme { "memory" => Ok(Scheme::Memory), @@ -284,59 +269,3 @@ impl Storage { } } } - -#[cfg(all(test, feature = "storage-hdfs"))] -mod hdfs_tests { - use super::Storage; - use crate::io::FileIOBuilder; - - #[test] - fn test_hdfs_relative_path_normal() { - let result = Storage::hdfs_relative_path("hdfs://namenode:8020/warehouse/db/table"); - assert_eq!(result.unwrap(), "warehouse/db/table"); - } - - #[test] - fn test_hdfs_relative_path_root_slash() { - let result = Storage::hdfs_relative_path("hdfs://namenode:8020/"); - assert_eq!(result.unwrap(), ""); - } - - #[test] - fn test_hdfs_relative_path_no_port() { - let result = Storage::hdfs_relative_path("hdfs://nameservice1/warehouse/data"); - assert_eq!(result.unwrap(), "warehouse/data"); - } - - #[test] - fn test_hdfs_relative_path_missing_path_component() { - let result = Storage::hdfs_relative_path("hdfs://namenode:8020"); - assert!(result.is_err()); - } - - #[test] - fn test_hdfs_relative_path_wrong_scheme() { - let result = Storage::hdfs_relative_path("s3://bucket/key"); - assert!(result.is_err()); - } - - #[test] - fn test_parse_scheme_hdfs() { - let scheme = Storage::parse_scheme("hdfs").unwrap(); - assert_eq!(scheme, opendal::Scheme::HdfsNative); - } - - #[test] - fn test_file_io_builder_hdfs() { - let file_io = FileIOBuilder::new("hdfs") - .with_prop("hdfs.name-node", "hdfs://namenode:8020") - .build(); - assert!(file_io.is_ok()); - } - - #[test] - fn test_file_io_from_url_hdfs() { - let builder = crate::io::FileIO::from_url("hdfs://namenode:8020/warehouse"); - assert!(builder.is_ok()); - } -} diff --git a/crates/paimon/src/io/storage_hdfs.rs b/crates/paimon/src/io/storage_hdfs.rs index 248492ba..40891427 100644 --- a/crates/paimon/src/io/storage_hdfs.rs +++ b/crates/paimon/src/io/storage_hdfs.rs @@ -24,6 +24,23 @@ use url::Url; use crate::error::Error; use crate::Result; +/// Parse HDFS path to get relative path from root. +/// +/// Example: "hdfs://namenode:8020/warehouse/db/table" -> "warehouse/db/table" +pub(crate) fn hdfs_relative_path(path: &str) -> Result<&str> { + let after_scheme = path.strip_prefix("hdfs://").ok_or_else(|| { + Error::ConfigInvalid { + message: format!("Invalid HDFS path: {path}, should start with hdfs://"), + } + })?; + match after_scheme.find('/') { + Some(pos) => Ok(&after_scheme[pos + 1..]), + None => Err(Error::ConfigInvalid { + message: format!("Invalid HDFS path: {path}, missing path component"), + }), + } +} + /// Configuration key for HDFS name node URL. /// /// Example: "hdfs://namenode:8020" or "hdfs://nameservice1" (HA). @@ -162,4 +179,34 @@ mod tests { let cfg = hdfs_config_parse(props).unwrap(); assert_eq!(cfg.name_node.as_deref(), Some("hdfs://namenode:8020")); } + + #[test] + fn test_hdfs_relative_path_normal() { + let result = hdfs_relative_path("hdfs://namenode:8020/warehouse/db/table"); + assert_eq!(result.unwrap(), "warehouse/db/table"); + } + + #[test] + fn test_hdfs_relative_path_root_slash() { + let result = hdfs_relative_path("hdfs://namenode:8020/"); + assert_eq!(result.unwrap(), ""); + } + + #[test] + fn test_hdfs_relative_path_no_port() { + let result = hdfs_relative_path("hdfs://nameservice1/warehouse/data"); + assert_eq!(result.unwrap(), "warehouse/data"); + } + + #[test] + fn test_hdfs_relative_path_missing_path_component() { + let result = hdfs_relative_path("hdfs://namenode:8020"); + assert!(result.is_err()); + } + + #[test] + fn test_hdfs_relative_path_wrong_scheme() { + let result = hdfs_relative_path("s3://bucket/key"); + assert!(result.is_err()); + } } From 423e64b2341dc7db168f3e473b4f1559607c6800 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Tue, 14 Apr 2026 01:06:59 +0900 Subject: [PATCH 3/3] refactor: apply cargo fmt --- crates/paimon/src/io/storage.rs | 6 +++++- crates/paimon/src/io/storage_hdfs.rs | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/crates/paimon/src/io/storage.rs b/crates/paimon/src/io/storage.rs index 9b08b02b..a57fcfc2 100644 --- a/crates/paimon/src/io/storage.rs +++ b/crates/paimon/src/io/storage.rs @@ -16,7 +16,11 @@ // under the License. use std::collections::HashMap; -#[cfg(any(feature = "storage-oss", feature = "storage-s3", feature = "storage-hdfs"))] +#[cfg(any( + feature = "storage-oss", + feature = "storage-s3", + feature = "storage-hdfs" +))] use std::sync::Mutex; #[cfg(any(feature = "storage-oss", feature = "storage-s3"))] use std::sync::MutexGuard; diff --git a/crates/paimon/src/io/storage_hdfs.rs b/crates/paimon/src/io/storage_hdfs.rs index 40891427..6486f775 100644 --- a/crates/paimon/src/io/storage_hdfs.rs +++ b/crates/paimon/src/io/storage_hdfs.rs @@ -28,11 +28,11 @@ use crate::Result; /// /// Example: "hdfs://namenode:8020/warehouse/db/table" -> "warehouse/db/table" pub(crate) fn hdfs_relative_path(path: &str) -> Result<&str> { - let after_scheme = path.strip_prefix("hdfs://").ok_or_else(|| { - Error::ConfigInvalid { + let after_scheme = path + .strip_prefix("hdfs://") + .ok_or_else(|| Error::ConfigInvalid { message: format!("Invalid HDFS path: {path}, should start with hdfs://"), - } - })?; + })?; match after_scheme.find('/') { Some(pos) => Ok(&after_scheme[pos + 1..]), None => Err(Error::ConfigInvalid {