diff --git a/paimon-python/pypaimon/common/external_path_provider.py b/paimon-python/pypaimon/common/external_path_provider.py index f553dced521e..e039b1d86ed2 100644 --- a/paimon-python/pypaimon/common/external_path_provider.py +++ b/paimon-python/pypaimon/common/external_path_provider.py @@ -15,29 +15,185 @@ # specific language governing permissions and limitations # under the License. +import bisect +import ctypes import random +import struct +from abc import ABC, abstractmethod from typing import List -class ExternalPathProvider: - def __init__(self, external_table_paths: List[str], relative_bucket_path: str): - self.external_table_paths = external_table_paths - self.relative_bucket_path = relative_bucket_path - self.position = random.randint(0, len(external_table_paths) - 1) if external_table_paths else 0 +class ExternalPathProvider(ABC): + """Provider for external data paths.""" + @abstractmethod def get_next_external_data_path(self, file_name: str) -> str: - """ - Get the next external data path using round-robin strategy. - """ - if not self.external_table_paths: - raise ValueError("No external paths available") - - self.position += 1 - if self.position == len(self.external_table_paths): - self.position = 0 - - external_base = self.external_table_paths[self.position] - if self.relative_bucket_path: - return f"{external_base.rstrip('/')}/{self.relative_bucket_path.strip('/')}/{file_name}" + """Get the next external data path for the given file name.""" + + @staticmethod + def create(strategy, external_table_paths, relative_bucket_path="", weights=None): + """Factory method to create the appropriate ExternalPathProvider.""" + from pypaimon.common.options.core_options import ExternalPathStrategy + + if strategy is None: + return None + if isinstance(strategy, str): + strategy = ExternalPathStrategy(strategy) + + if strategy == ExternalPathStrategy.NONE: + return None + elif strategy in (ExternalPathStrategy.ROUND_ROBIN, ExternalPathStrategy.SPECIFIC_FS): + return RoundRobinExternalPathProvider(external_table_paths, relative_bucket_path) + elif strategy == ExternalPathStrategy.ENTROPY_INJECT: + return EntropyInjectExternalPathProvider(external_table_paths, relative_bucket_path) + elif strategy == ExternalPathStrategy.WEIGHTED: + if len(external_table_paths) < 2 or not weights: + return RoundRobinExternalPathProvider(external_table_paths, relative_bucket_path) + return WeightedExternalPathProvider(external_table_paths, relative_bucket_path, weights) + else: + raise ValueError(f"Unsupported external path strategy: {strategy}") + + +class RoundRobinExternalPathProvider(ExternalPathProvider): + """Provider for round-robin external data paths.""" + + def __init__(self, external_table_paths: List[str], relative_bucket_path: str = ""): + if not external_table_paths: + raise ValueError("external_table_paths must not be empty") + self._external_table_paths = external_table_paths + self._relative_bucket_path = relative_bucket_path + self._position = random.randint(0, len(external_table_paths) - 1) + + def get_next_external_data_path(self, file_name: str) -> str: + self._position += 1 + if self._position == len(self._external_table_paths): + self._position = 0 + + external_base = self._external_table_paths[self._position] + if self._relative_bucket_path: + return f"{external_base.rstrip('/')}/{self._relative_bucket_path.strip('/')}/{file_name}" else: return f"{external_base.rstrip('/')}/{file_name}" + + +class EntropyInjectExternalPathProvider(ExternalPathProvider): + """Provider for entropy-injected external data paths. + + Generates hash-based directory structures from filenames using murmur3_32. + Constants: 20-bit hash, depth=3 dirs of 4 bits each, 8-bit remainder. + """ + + _HASH_BINARY_STRING_BITS = 20 + _ENTROPY_DIR_LENGTH = 4 + _ENTROPY_DIR_DEPTH = 3 + + def __init__(self, external_table_paths: List[str], relative_bucket_path: str = ""): + if not external_table_paths: + raise ValueError("external_table_paths must not be empty") + self._external_table_paths = external_table_paths + self._relative_bucket_path = relative_bucket_path + self._position = 0 + + def get_next_external_data_path(self, file_name: str) -> str: + hash_dirs = self._compute_hash(file_name) + if self._relative_bucket_path: + file_path_with_hash = f"{self._relative_bucket_path.strip('/')}/{hash_dirs}/{file_name}" + else: + file_path_with_hash = f"{hash_dirs}/{file_name}" + + self._position += 1 + if self._position == len(self._external_table_paths): + self._position = 0 + + external_base = self._external_table_paths[self._position] + return f"{external_base.rstrip('/')}/{file_path_with_hash}" + + def _compute_hash(self, file_name: str) -> str: + hash_int = _murmur3_32(file_name.encode('utf-8')) + binary_string = format((hash_int & 0xFFFFFFFF) | 0x80000000, '032b') + hash_str = binary_string[32 - self._HASH_BINARY_STRING_BITS:] + + parts = [] + total_prefix = self._ENTROPY_DIR_DEPTH * self._ENTROPY_DIR_LENGTH + for i in range(0, total_prefix, self._ENTROPY_DIR_LENGTH): + end = min(i + self._ENTROPY_DIR_LENGTH, len(hash_str)) + parts.append(hash_str[i:end]) + if len(hash_str) > total_prefix: + parts.append(hash_str[total_prefix:]) + return "/".join(parts) + + +class WeightedExternalPathProvider(ExternalPathProvider): + """Provider for weighted external data paths. + + Uses cumulative weights with binary search for path selection. + """ + + def __init__(self, external_table_paths: List[str], relative_bucket_path: str, weights: List[int]): + if len(external_table_paths) != len(weights): + raise ValueError( + f"The number of external paths and weights should be the same. " + f"Paths: {len(external_table_paths)}, Weights: {len(weights)}" + ) + self._external_table_paths = external_table_paths + self._relative_bucket_path = relative_bucket_path + self._total_weight = sum(weights) + self._cumulative_weights: List[int] = [] + cumulative = 0 + for w in weights: + cumulative += w + self._cumulative_weights.append(cumulative) + + def get_next_external_data_path(self, file_name: str) -> str: + random_value = random.random() * self._total_weight + index = bisect.bisect_right(self._cumulative_weights, random_value) + if index >= len(self._external_table_paths): + index = len(self._external_table_paths) - 1 + selected_base = self._external_table_paths[index] + if self._relative_bucket_path: + return f"{selected_base.rstrip('/')}/{self._relative_bucket_path.strip('/')}/{file_name}" + else: + return f"{selected_base.rstrip('/')}/{file_name}" + + +def _murmur3_32(data: bytes, seed: int = 0) -> int: + """Pure-Python murmur3_32 hash, compatible with Guava Hashing.murmur3_32(). + + Returns a signed 32-bit integer identical to Java's int representation. + """ + c1 = 0xCC9E2D51 + c2 = 0x1B873593 + length = len(data) + h1 = seed & 0xFFFFFFFF + rounded_end = (length & 0xFFFFFFFC) + + for i in range(0, rounded_end, 4): + k1 = struct.unpack_from('> 17)) & 0xFFFFFFFF + k1 = (k1 * c2) & 0xFFFFFFFF + h1 ^= k1 + h1 = ((h1 << 13) | (h1 >> 19)) & 0xFFFFFFFF + h1 = (h1 * 5 + 0xE6546B64) & 0xFFFFFFFF + + k1 = 0 + remaining = length & 3 + if remaining >= 3: + k1 ^= data[rounded_end + 2] << 16 + if remaining >= 2: + k1 ^= data[rounded_end + 1] << 8 + if remaining >= 1: + k1 ^= data[rounded_end] + k1 = (k1 * c1) & 0xFFFFFFFF + k1 = ((k1 << 15) | (k1 >> 17)) & 0xFFFFFFFF + k1 = (k1 * c2) & 0xFFFFFFFF + h1 ^= k1 + + h1 ^= length + h1 ^= h1 >> 16 + h1 = (h1 * 0x85EBCA6B) & 0xFFFFFFFF + h1 ^= h1 >> 13 + h1 = (h1 * 0xC2B2AE35) & 0xFFFFFFFF + h1 ^= h1 >> 16 + + return ctypes.c_int32(h1).value diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index 5a518782944d..254ebd860af0 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -23,8 +23,8 @@ from pypaimon.common.memory_size import MemorySize from pypaimon.common.options import Options from pypaimon.common.options.config_option import ConfigOption -from pypaimon.common.options.options_utils import OptionsUtils from pypaimon.common.options.config_options import ConfigOptions +from pypaimon.common.options.options_utils import OptionsUtils class ExternalPathStrategy(str, Enum): @@ -34,6 +34,8 @@ class ExternalPathStrategy(str, Enum): NONE = "none" ROUND_ROBIN = "round-robin" SPECIFIC_FS = "specific-fs" + ENTROPY_INJECT = "entropy-inject" + WEIGHTED = "weight-robin" class ChangelogProducer(str, Enum): @@ -497,7 +499,10 @@ class CoreOptions: ConfigOptions.key("data-file.external-paths.strategy") .string_type() .default_value(ExternalPathStrategy.NONE) - .with_description("Strategy for selecting external paths. Options: none, round-robin, specific-fs.") + .with_description( + "Strategy for selecting external paths. " + "Options: none, round-robin, specific-fs, entropy-inject, weight-robin." + ) ) DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS: ConfigOption[str] = ( @@ -507,6 +512,16 @@ class CoreOptions: .with_description("Specific filesystem for external paths when using specific-fs strategy.") ) + DATA_FILE_EXTERNAL_PATHS_WEIGHTS: ConfigOption[str] = ( + ConfigOptions.key("data-file.external-paths.weights") + .string_type() + .no_default_value() + .with_description( + "Weights for external paths when strategy is weight-robin. " + "Format: comma-separated positive integers corresponding to paths in order." + ) + ) + # Global Index options GLOBAL_INDEX_ENABLED: ConfigOption[bool] = ( ConfigOptions.key("global-index.enabled") @@ -885,6 +900,23 @@ def data_file_external_paths_strategy(self, default=None): def data_file_external_paths_specific_fs(self, default=None): return self.options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS, default) + def data_file_external_paths_weights(self, default=None): + value = self.options.get( + CoreOptions.DATA_FILE_EXTERNAL_PATHS_WEIGHTS, default + ) + if value is None: + return None + parts = value.split(",") + weights = [] + for part in parts: + parsed = int(part.strip()) + if parsed <= 0: + raise ValueError( + f"Weight must be positive, got: {parsed}" + ) + weights.append(parsed) + return weights + def commit_max_retries(self) -> int: return self.options.get(CoreOptions.COMMIT_MAX_RETRIES) diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index 9d418cbb26bb..1fee68615a76 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -118,13 +118,15 @@ def branch_manager(self): # If catalog environment has a catalog loader, use CatalogBranchManager catalog_loader = self.catalog_environment.catalog_loader if catalog_loader is not None: - from pypaimon.branch.catalog_branch_manager import CatalogBranchManager + from pypaimon.branch.catalog_branch_manager import \ + CatalogBranchManager return CatalogBranchManager( catalog_loader, self.identifier ) # Otherwise, use FileSystemBranchManager - from pypaimon.branch.filesystem_branch_manager import FileSystemBranchManager + from pypaimon.branch.filesystem_branch_manager import \ + FileSystemBranchManager current_branch = self.current_branch() or "main" return FileSystemBranchManager( self.file_io, @@ -380,6 +382,8 @@ def path_factory(self) -> 'FileStorePathFactory': file_compression=file_compression, data_file_path_directory=None, external_paths=external_paths, + external_path_strategy=self.options.data_file_external_paths_strategy(), + external_path_weights=self.options.data_file_external_paths_weights(), index_file_in_data_file_dir=False, ) @@ -417,11 +421,13 @@ def new_stream_write_builder(self) -> StreamWriteBuilder: return StreamWriteBuilder(self) def new_full_text_search_builder(self) -> 'FullTextSearchBuilder': - from pypaimon.table.source.full_text_search_builder import FullTextSearchBuilderImpl + from pypaimon.table.source.full_text_search_builder import \ + FullTextSearchBuilderImpl return FullTextSearchBuilderImpl(self) def new_vector_search_builder(self) -> 'VectorSearchBuilder': - from pypaimon.table.source.vector_search_builder import VectorSearchBuilderImpl + from pypaimon.table.source.vector_search_builder import \ + VectorSearchBuilderImpl return VectorSearchBuilderImpl(self) def create_row_key_extractor(self) -> RowKeyExtractor: @@ -496,6 +502,7 @@ def _try_time_travel(self, options: Options) -> Optional[TableSchema]: def _create_external_paths(self) -> List[str]: from urllib.parse import urlparse + from pypaimon.common.options.core_options import ExternalPathStrategy external_paths_str = self.options.data_file_external_paths() diff --git a/paimon-python/pypaimon/tests/external_paths_test.py b/paimon-python/pypaimon/tests/external_paths_test.py index ba2c07534fe9..aab4fa38a599 100644 --- a/paimon-python/pypaimon/tests/external_paths_test.py +++ b/paimon-python/pypaimon/tests/external_paths_test.py @@ -24,8 +24,11 @@ from pypaimon import CatalogFactory, Schema from pypaimon.catalog.catalog import Identifier -from pypaimon.common.options.core_options import CoreOptions, ExternalPathStrategy -from pypaimon.common.external_path_provider import ExternalPathProvider +from pypaimon.common.external_path_provider import ( + EntropyInjectExternalPathProvider, ExternalPathProvider, + RoundRobinExternalPathProvider, WeightedExternalPathProvider, _murmur3_32) +from pypaimon.common.options.core_options import (CoreOptions, + ExternalPathStrategy) class ExternalPathProviderTest(unittest.TestCase): @@ -40,7 +43,7 @@ def test_path_selection_and_structure(self): "oss://bucket3/external", ] relative_path = "partition=value/bucket-0" - provider = ExternalPathProvider(external_paths, relative_path) + provider = RoundRobinExternalPathProvider(external_paths, relative_path) paths = [provider.get_next_external_data_path("file.parquet") for _ in range(6)] @@ -56,18 +59,247 @@ def test_path_selection_and_structure(self): self.assertIn("file.parquet", paths[0]) # Test single path - single_provider = ExternalPathProvider(["oss://bucket/external"], "bucket-0") + single_provider = RoundRobinExternalPathProvider(["oss://bucket/external"], "bucket-0") single_path = single_provider.get_next_external_data_path("data.parquet") self.assertIn("bucket/external", single_path) self.assertIn("bucket-0", single_path) self.assertIn("data.parquet", single_path) # Test empty relative path - empty_provider = ExternalPathProvider(["oss://bucket/external"], "") + empty_provider = RoundRobinExternalPathProvider(["oss://bucket/external"], "") empty_path = empty_provider.get_next_external_data_path("file.parquet") self.assertIn("bucket/external", empty_path) self.assertIn("file.parquet", empty_path) + def test_factory_create_round_robin(self): + """Test ExternalPathProvider.create() with round-robin strategy.""" + provider = ExternalPathProvider.create( + "round-robin", ["oss://a/path", "oss://b/path"], "bucket-0" + ) + self.assertIsInstance(provider, RoundRobinExternalPathProvider) + paths = [provider.get_next_external_data_path("f.parquet") for _ in range(4)] + schemes_used = {p.split("://")[1].split("/")[0] for p in paths} + self.assertEqual(len(schemes_used), 2) + + def test_factory_create_specific_fs(self): + """Test ExternalPathProvider.create() with specific-fs (falls through to round-robin).""" + provider = ExternalPathProvider.create( + "specific-fs", ["oss://bucket/path"] + ) + self.assertIsInstance(provider, RoundRobinExternalPathProvider) + + def test_factory_create_none(self): + """Test ExternalPathProvider.create() with none strategy returns None.""" + provider = ExternalPathProvider.create("none", ["oss://bucket/path"]) + self.assertIsNone(provider) + + def test_factory_create_entropy_inject(self): + """Test ExternalPathProvider.create() with entropy-inject strategy.""" + provider = ExternalPathProvider.create( + "entropy-inject", ["oss://a/path", "oss://b/path"], "bucket-0" + ) + self.assertIsInstance(provider, EntropyInjectExternalPathProvider) + + def test_factory_create_weighted(self): + """Test ExternalPathProvider.create() with weight-robin strategy.""" + provider = ExternalPathProvider.create( + "weight-robin", ["oss://a/path", "oss://b/path"], "bucket-0", [10, 5] + ) + self.assertIsInstance(provider, WeightedExternalPathProvider) + + def test_factory_create_weighted_fallback(self): + """Test weight-robin falls back to round-robin when paths < 2 or no weights.""" + provider = ExternalPathProvider.create( + "weight-robin", ["oss://a/path"], "bucket-0", [10] + ) + self.assertIsInstance(provider, RoundRobinExternalPathProvider) + + provider2 = ExternalPathProvider.create( + "weight-robin", ["oss://a/path", "oss://b/path"], "bucket-0", None + ) + self.assertIsInstance(provider2, RoundRobinExternalPathProvider) + + +class Murmur3HashTest(unittest.TestCase): + """Test murmur3_32 hash implementation for Java Guava compatibility.""" + + def test_empty_string(self): + """Empty string should produce a deterministic hash.""" + result = _murmur3_32(b'') + # Guava: Hashing.murmur3_32().hashString("", UTF_8).asInt() == 0 + self.assertEqual(result, 0) + + def test_deterministic(self): + """Same input always produces same output.""" + for s in [b'test', b'hello world', b'data-0001.blob']: + self.assertEqual(_murmur3_32(s), _murmur3_32(s)) + + def test_known_values(self): + """Verify against Guava Hashing.murmur3_32().hashString(s, UTF_8).asInt(). + + Values confirmed by running Java Guava 32.0.0. + """ + self.assertEqual(_murmur3_32(b''), 0) + self.assertEqual(_murmur3_32(b'a'), 1009084850) + self.assertEqual(_murmur3_32(b'hello'), 613153351) + self.assertEqual(_murmur3_32(b'world'), -74040069) + self.assertEqual(_murmur3_32(b'test'), -1167338989) + self.assertEqual(_murmur3_32(b'data-abc.blob'), 894520562) + self.assertEqual(_murmur3_32(b'data-xyz.blob'), -822867934) + + def test_signed_32bit_range(self): + """Result should be in signed 32-bit integer range.""" + for s in [b'a', b'ab', b'abc', b'abcd', b'abcde']: + result = _murmur3_32(s) + self.assertGreaterEqual(result, -(2 ** 31)) + self.assertLessEqual(result, 2 ** 31 - 1) + + +class EntropyInjectExternalPathProviderTest(unittest.TestCase): + """Test EntropyInjectExternalPathProvider functionality.""" + + def test_hash_directory_structure(self): + """Hash directories should have depth=3 with 4-bit segments + remainder.""" + provider = EntropyInjectExternalPathProvider(["oss://bucket/ext"], "bucket-0") + hash_dirs = provider._compute_hash("test-file.blob") + parts = hash_dirs.split("/") + self.assertEqual(len(parts), 4) + self.assertEqual(len(parts[0]), 4) + self.assertEqual(len(parts[1]), 4) + self.assertEqual(len(parts[2]), 4) + self.assertEqual(len(parts[3]), 8) + for part in parts: + self.assertTrue(all(c in ('0', '1') for c in part)) + + def test_deterministic_path(self): + """Same filename always produces same hash directories.""" + provider = EntropyInjectExternalPathProvider( + ["oss://bucket/ext"], "dt=20240101/bucket-0" + ) + hash1 = provider._compute_hash("data-001.parquet") + hash2 = provider._compute_hash("data-001.parquet") + self.assertEqual(hash1, hash2) + + def test_path_format(self): + """Full path should include base/relative/hashDirs/fileName.""" + provider = EntropyInjectExternalPathProvider( + ["oss://bucket/ext"], "dt=20240101/bucket-0" + ) + path = provider.get_next_external_data_path("data-001.parquet") + self.assertIn("oss://bucket/ext", path) + self.assertIn("dt=20240101/bucket-0", path) + self.assertIn("data-001.parquet", path) + # Should have hash dirs between relative path and filename + parts_between = path.split("bucket-0/")[1].split("/data-001.parquet")[0] + self.assertEqual(len(parts_between.split("/")), 4) + + def test_multi_path_rotation(self): + """Paths should rotate across external paths.""" + provider = EntropyInjectExternalPathProvider( + ["oss://a/ext", "oss://b/ext", "oss://c/ext"], "" + ) + paths = [provider.get_next_external_data_path(f"file-{i}.parquet") for i in range(6)] + bases = [p.split("://")[1][0] for p in paths] + self.assertEqual(set(bases), {'a', 'b', 'c'}) + + +class WeightedExternalPathProviderTest(unittest.TestCase): + """Test WeightedExternalPathProvider functionality.""" + + def test_weight_distribution(self): + """Paths should be selected roughly proportional to weights.""" + import random as _random + _random.seed(42) + provider = WeightedExternalPathProvider( + ["oss://a/path", "oss://b/path"], "bucket-0", [90, 10] + ) + counts = {"a": 0, "b": 0} + for i in range(10000): + path = provider.get_next_external_data_path(f"file-{i}.parquet") + if "://a/" in path: + counts["a"] += 1 + else: + counts["b"] += 1 + + # With 90:10 weights, "a" should get ~90% (allow 5% tolerance) + ratio_a = counts["a"] / 10000 + self.assertGreater(ratio_a, 0.85) + self.assertLess(ratio_a, 0.95) + + def test_equal_weights(self): + """Equal weights should distribute roughly evenly.""" + import random as _random + _random.seed(42) + provider = WeightedExternalPathProvider( + ["oss://a/path", "oss://b/path", "oss://c/path"], "bucket-0", [1, 1, 1] + ) + counts = {"a": 0, "b": 0, "c": 0} + for i in range(9000): + path = provider.get_next_external_data_path(f"file-{i}.parquet") + for key in counts: + if f"://{key}/" in path: + counts[key] += 1 + + for key in counts: + ratio = counts[key] / 9000 + self.assertGreater(ratio, 0.28) + self.assertLess(ratio, 0.39) + + def test_path_format(self): + """Path should include base/relative/fileName.""" + provider = WeightedExternalPathProvider( + ["oss://a/path", "oss://b/path"], "dt=20240101/bucket-0", [5, 5] + ) + path = provider.get_next_external_data_path("data.parquet") + self.assertIn("dt=20240101/bucket-0", path) + self.assertIn("data.parquet", path) + + def test_mismatched_lengths_raises(self): + """Should raise ValueError if paths and weights have different lengths.""" + with self.assertRaises(ValueError): + WeightedExternalPathProvider( + ["oss://a/path", "oss://b/path"], "bucket-0", [10] + ) + + +class WeightsParsingTest(unittest.TestCase): + """Test CoreOptions.data_file_external_paths_weights() parsing and validation.""" + + def test_valid_weights(self): + """Normal comma-separated positive integers.""" + from pypaimon.common.options.core_options import CoreOptions + opts = CoreOptions.from_dict({"data-file.external-paths.weights": "10,5,15"}) + self.assertEqual(opts.data_file_external_paths_weights(), [10, 5, 15]) + + def test_none_when_not_configured(self): + """Returns None when option is not set.""" + from pypaimon.common.options.core_options import CoreOptions + opts = CoreOptions.from_dict({}) + self.assertIsNone(opts.data_file_external_paths_weights()) + + def test_zero_weight_raises(self): + """Zero weight should raise ValueError.""" + from pypaimon.common.options.core_options import CoreOptions + opts = CoreOptions.from_dict({"data-file.external-paths.weights": "10,0,5"}) + with self.assertRaises(ValueError) as ctx: + opts.data_file_external_paths_weights() + self.assertIn("positive", str(ctx.exception)) + + def test_negative_weight_raises(self): + """Negative weight should raise ValueError.""" + from pypaimon.common.options.core_options import CoreOptions + opts = CoreOptions.from_dict({"data-file.external-paths.weights": "10,-5"}) + with self.assertRaises(ValueError) as ctx: + opts.data_file_external_paths_weights() + self.assertIn("positive", str(ctx.exception)) + + def test_empty_element_raises(self): + """Empty element like '10,,5' should raise ValueError (align with Java NumberFormatException).""" + from pypaimon.common.options.core_options import CoreOptions + opts = CoreOptions.from_dict({"data-file.external-paths.weights": "10,,5"}) + with self.assertRaises(ValueError): + opts.data_file_external_paths_weights() + class ExternalPathsConfigTest(unittest.TestCase): """Test external paths configuration parsing through FileStoreTable._create_external_paths().""" @@ -188,6 +420,7 @@ def test_create_external_path_provider(self): # Test with external paths configured provider = path_factory.create_external_path_provider(("value1",), 0) self.assertIsNotNone(provider) + self.assertIsInstance(provider, RoundRobinExternalPathProvider) path = provider.get_next_external_data_path("file.parquet") self.assertTrue("bucket1" in str(path) or "bucket2" in str(path)) self.assertIn("dt=value1", str(path)) @@ -234,6 +467,55 @@ def test_create_external_path_provider(self): provider3 = table3.path_factory().create_external_path_provider((), 0) self.assertIsNone(provider3) + def test_create_entropy_inject_provider(self): + """Test creating EntropyInject provider from path factory.""" + table_name = "test_db.entropy_test" + # Manually delete table directory if it exists + try: + table_path = self.catalog.get_table_path(Identifier.from_string(table_name)) + if self.catalog.file_io.exists(table_path): + self.catalog.file_io.delete(table_path, recursive=True) + except Exception: + pass # Table may not exist, ignore + options = { + CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(): "oss://bucket1/path1,oss://bucket2/path2", + CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(): ExternalPathStrategy.ENTROPY_INJECT, + } + pa_schema = pa.schema([("id", pa.int32()), ("name", pa.string())]) + schema = Schema.from_pyarrow_schema(pa_schema, options=options) + self.catalog.create_table(table_name, schema, True) + table = self.catalog.get_table(table_name) + path_factory = table.path_factory() + + provider = path_factory.create_external_path_provider((), 0) + self.assertIsNotNone(provider) + self.assertIsInstance(provider, EntropyInjectExternalPathProvider) + + def test_create_weighted_provider(self): + """Test creating Weighted provider from path factory.""" + table_name = "test_db.weighted_test" + # Manually delete table directory if it exists + try: + table_path = self.catalog.get_table_path(Identifier.from_string(table_name)) + if self.catalog.file_io.exists(table_path): + self.catalog.file_io.delete(table_path, recursive=True) + except Exception: + pass # Table may not exist, ignore + options = { + CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(): "oss://bucket1/path1,oss://bucket2/path2", + CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(): ExternalPathStrategy.WEIGHTED, + CoreOptions.DATA_FILE_EXTERNAL_PATHS_WEIGHTS.key(): "10,5", + } + pa_schema = pa.schema([("id", pa.int32()), ("name", pa.string())]) + schema = Schema.from_pyarrow_schema(pa_schema, options=options) + self.catalog.create_table(table_name, schema, True) + table = self.catalog.get_table(table_name) + path_factory = table.path_factory() + + provider = path_factory.create_external_path_provider((), 0) + self.assertIsNotNone(provider) + self.assertIsInstance(provider, WeightedExternalPathProvider) + class ExternalPathsIntegrationTest(unittest.TestCase): """Integration tests for external paths feature.""" diff --git a/paimon-python/pypaimon/utils/file_store_path_factory.py b/paimon-python/pypaimon/utils/file_store_path_factory.py index d959ae4ffa55..3816a2ce29a9 100644 --- a/paimon-python/pypaimon/utils/file_store_path_factory.py +++ b/paimon-python/pypaimon/utils/file_store_path_factory.py @@ -55,6 +55,8 @@ def __init__( file_compression: str, data_file_path_directory: Optional[str] = None, external_paths: Optional[List[str]] = None, + external_path_strategy: str = "round-robin", + external_path_weights: Optional[List[int]] = None, index_file_in_data_file_dir: bool = False, ): self._root = root.rstrip('/') @@ -67,6 +69,8 @@ def __init__( self.file_compression = file_compression self.data_file_path_directory = data_file_path_directory self.external_paths = external_paths or [] + self.external_path_strategy = external_path_strategy + self.external_path_weights = external_path_weights self.index_file_in_data_file_dir = index_file_in_data_file_dir self.legacy_partition_name = legacy_partition_name @@ -124,7 +128,12 @@ def create_external_path_provider( return None relative_bucket_path = self.relative_bucket_path(partition, bucket) - return ExternalPathProvider(self.external_paths, relative_bucket_path) + return ExternalPathProvider.create( + self.external_path_strategy, + self.external_paths, + relative_bucket_path, + self.external_path_weights, + ) def global_index_path_factory(self) -> 'IndexPathFactory': return IndexPathFactory(self.index_path())