Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 174 additions & 18 deletions paimon-python/pypaimon/common/external_path_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,185 @@
# specific language governing permissions and limitations
# under the License.

import bisect
import ctypes
import random
import struct
from abc import ABC, abstractmethod
from typing import List


class ExternalPathProvider:
def __init__(self, external_table_paths: List[str], relative_bucket_path: str):
self.external_table_paths = external_table_paths
self.relative_bucket_path = relative_bucket_path
self.position = random.randint(0, len(external_table_paths) - 1) if external_table_paths else 0
class ExternalPathProvider(ABC):
"""Provider for external data paths."""

@abstractmethod
def get_next_external_data_path(self, file_name: str) -> str:
"""
Get the next external data path using round-robin strategy.
"""
if not self.external_table_paths:
raise ValueError("No external paths available")

self.position += 1
if self.position == len(self.external_table_paths):
self.position = 0

external_base = self.external_table_paths[self.position]
if self.relative_bucket_path:
return f"{external_base.rstrip('/')}/{self.relative_bucket_path.strip('/')}/{file_name}"
"""Get the next external data path for the given file name."""

@staticmethod
def create(strategy, external_table_paths, relative_bucket_path="", weights=None):
"""Factory method to create the appropriate ExternalPathProvider."""
from pypaimon.common.options.core_options import ExternalPathStrategy

if strategy is None:
return None
if isinstance(strategy, str):
strategy = ExternalPathStrategy(strategy)

if strategy == ExternalPathStrategy.NONE:
return None
elif strategy in (ExternalPathStrategy.ROUND_ROBIN, ExternalPathStrategy.SPECIFIC_FS):
return RoundRobinExternalPathProvider(external_table_paths, relative_bucket_path)
elif strategy == ExternalPathStrategy.ENTROPY_INJECT:
return EntropyInjectExternalPathProvider(external_table_paths, relative_bucket_path)
elif strategy == ExternalPathStrategy.WEIGHTED:
if len(external_table_paths) < 2 or not weights:
return RoundRobinExternalPathProvider(external_table_paths, relative_bucket_path)
return WeightedExternalPathProvider(external_table_paths, relative_bucket_path, weights)
else:
raise ValueError(f"Unsupported external path strategy: {strategy}")


class RoundRobinExternalPathProvider(ExternalPathProvider):
"""Provider for round-robin external data paths."""

def __init__(self, external_table_paths: List[str], relative_bucket_path: str = ""):
if not external_table_paths:
raise ValueError("external_table_paths must not be empty")
self._external_table_paths = external_table_paths
self._relative_bucket_path = relative_bucket_path
self._position = random.randint(0, len(external_table_paths) - 1)

def get_next_external_data_path(self, file_name: str) -> str:
self._position += 1
if self._position == len(self._external_table_paths):
self._position = 0

external_base = self._external_table_paths[self._position]
if self._relative_bucket_path:
return f"{external_base.rstrip('/')}/{self._relative_bucket_path.strip('/')}/{file_name}"
else:
return f"{external_base.rstrip('/')}/{file_name}"


class EntropyInjectExternalPathProvider(ExternalPathProvider):
"""Provider for entropy-injected external data paths.

Generates hash-based directory structures from filenames using murmur3_32.
Constants: 20-bit hash, depth=3 dirs of 4 bits each, 8-bit remainder.
"""

_HASH_BINARY_STRING_BITS = 20
_ENTROPY_DIR_LENGTH = 4
_ENTROPY_DIR_DEPTH = 3

def __init__(self, external_table_paths: List[str], relative_bucket_path: str = ""):
if not external_table_paths:
raise ValueError("external_table_paths must not be empty")
self._external_table_paths = external_table_paths
self._relative_bucket_path = relative_bucket_path
self._position = 0

def get_next_external_data_path(self, file_name: str) -> str:
hash_dirs = self._compute_hash(file_name)
if self._relative_bucket_path:
file_path_with_hash = f"{self._relative_bucket_path.strip('/')}/{hash_dirs}/{file_name}"
else:
file_path_with_hash = f"{hash_dirs}/{file_name}"

self._position += 1
if self._position == len(self._external_table_paths):
self._position = 0

external_base = self._external_table_paths[self._position]
return f"{external_base.rstrip('/')}/{file_path_with_hash}"

def _compute_hash(self, file_name: str) -> str:
hash_int = _murmur3_32(file_name.encode('utf-8'))
binary_string = format((hash_int & 0xFFFFFFFF) | 0x80000000, '032b')
hash_str = binary_string[32 - self._HASH_BINARY_STRING_BITS:]

parts = []
total_prefix = self._ENTROPY_DIR_DEPTH * self._ENTROPY_DIR_LENGTH
for i in range(0, total_prefix, self._ENTROPY_DIR_LENGTH):
end = min(i + self._ENTROPY_DIR_LENGTH, len(hash_str))
parts.append(hash_str[i:end])
if len(hash_str) > total_prefix:
parts.append(hash_str[total_prefix:])
return "/".join(parts)


class WeightedExternalPathProvider(ExternalPathProvider):
"""Provider for weighted external data paths.

Uses cumulative weights with binary search for path selection.
"""

def __init__(self, external_table_paths: List[str], relative_bucket_path: str, weights: List[int]):
if len(external_table_paths) != len(weights):
raise ValueError(
f"The number of external paths and weights should be the same. "
f"Paths: {len(external_table_paths)}, Weights: {len(weights)}"
)
self._external_table_paths = external_table_paths
self._relative_bucket_path = relative_bucket_path
self._total_weight = sum(weights)
self._cumulative_weights: List[int] = []
cumulative = 0
for w in weights:
cumulative += w
self._cumulative_weights.append(cumulative)

def get_next_external_data_path(self, file_name: str) -> str:
random_value = random.random() * self._total_weight
index = bisect.bisect_right(self._cumulative_weights, random_value)
if index >= len(self._external_table_paths):
index = len(self._external_table_paths) - 1
selected_base = self._external_table_paths[index]
if self._relative_bucket_path:
return f"{selected_base.rstrip('/')}/{self._relative_bucket_path.strip('/')}/{file_name}"
else:
return f"{selected_base.rstrip('/')}/{file_name}"


def _murmur3_32(data: bytes, seed: int = 0) -> int:
"""Pure-Python murmur3_32 hash, compatible with Guava Hashing.murmur3_32().

Returns a signed 32-bit integer identical to Java's int representation.
"""
c1 = 0xCC9E2D51
c2 = 0x1B873593
length = len(data)
h1 = seed & 0xFFFFFFFF
rounded_end = (length & 0xFFFFFFFC)

for i in range(0, rounded_end, 4):
k1 = struct.unpack_from('<I', data, i)[0]
k1 = (k1 * c1) & 0xFFFFFFFF
k1 = ((k1 << 15) | (k1 >> 17)) & 0xFFFFFFFF
k1 = (k1 * c2) & 0xFFFFFFFF
h1 ^= k1
h1 = ((h1 << 13) | (h1 >> 19)) & 0xFFFFFFFF
h1 = (h1 * 5 + 0xE6546B64) & 0xFFFFFFFF

k1 = 0
remaining = length & 3
if remaining >= 3:
k1 ^= data[rounded_end + 2] << 16
if remaining >= 2:
k1 ^= data[rounded_end + 1] << 8
if remaining >= 1:
k1 ^= data[rounded_end]
k1 = (k1 * c1) & 0xFFFFFFFF
k1 = ((k1 << 15) | (k1 >> 17)) & 0xFFFFFFFF
k1 = (k1 * c2) & 0xFFFFFFFF
h1 ^= k1

h1 ^= length
h1 ^= h1 >> 16
h1 = (h1 * 0x85EBCA6B) & 0xFFFFFFFF
h1 ^= h1 >> 13
h1 = (h1 * 0xC2B2AE35) & 0xFFFFFFFF
h1 ^= h1 >> 16

return ctypes.c_int32(h1).value
25 changes: 23 additions & 2 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from pypaimon.common.memory_size import MemorySize
from pypaimon.common.options import Options
from pypaimon.common.options.config_option import ConfigOption
from pypaimon.common.options.options_utils import OptionsUtils
from pypaimon.common.options.config_options import ConfigOptions
from pypaimon.common.options.options_utils import OptionsUtils


class ExternalPathStrategy(str, Enum):
Expand All @@ -34,6 +34,8 @@ class ExternalPathStrategy(str, Enum):
NONE = "none"
ROUND_ROBIN = "round-robin"
SPECIFIC_FS = "specific-fs"
ENTROPY_INJECT = "entropy-inject"
WEIGHTED = "weight-robin"


class ChangelogProducer(str, Enum):
Expand Down Expand Up @@ -497,7 +499,10 @@ class CoreOptions:
ConfigOptions.key("data-file.external-paths.strategy")
.string_type()
.default_value(ExternalPathStrategy.NONE)
.with_description("Strategy for selecting external paths. Options: none, round-robin, specific-fs.")
.with_description(
"Strategy for selecting external paths. "
"Options: none, round-robin, specific-fs, entropy-inject, weight-robin."
)
)

DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS: ConfigOption[str] = (
Expand All @@ -507,6 +512,16 @@ class CoreOptions:
.with_description("Specific filesystem for external paths when using specific-fs strategy.")
)

DATA_FILE_EXTERNAL_PATHS_WEIGHTS: ConfigOption[str] = (
ConfigOptions.key("data-file.external-paths.weights")
.string_type()
.no_default_value()
.with_description(
"Weights for external paths when strategy is weight-robin. "
"Format: comma-separated positive integers corresponding to paths in order."
)
)

# Global Index options
GLOBAL_INDEX_ENABLED: ConfigOption[bool] = (
ConfigOptions.key("global-index.enabled")
Expand Down Expand Up @@ -885,6 +900,12 @@ def data_file_external_paths_strategy(self, default=None):
def data_file_external_paths_specific_fs(self, default=None):
return self.options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS, default)

def data_file_external_paths_weights(self, default=None):
value = self.options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS_WEIGHTS, default)
if not value:
return None
return [int(w.strip()) for w in value.split(",") if w.strip()]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please validate the parsed weights here, matching Java CoreOptions.externalPathWeights(). Currently zero or negative weights are accepted and then WeightedExternalPathProvider builds invalid cumulative weights: for example 0,0 makes total_weight zero so every random value maps past the last cumulative entry and all files go to the last path, while 10,-5 makes the second path unreachable. Java rejects non-positive weights, so Python should raise for <= 0 too.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, fixed


def commit_max_retries(self) -> int:
return self.options.get(CoreOptions.COMMIT_MAX_RETRIES)

Expand Down
15 changes: 11 additions & 4 deletions paimon-python/pypaimon/table/file_store_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,15 @@ def branch_manager(self):
# If catalog environment has a catalog loader, use CatalogBranchManager
catalog_loader = self.catalog_environment.catalog_loader
if catalog_loader is not None:
from pypaimon.branch.catalog_branch_manager import CatalogBranchManager
from pypaimon.branch.catalog_branch_manager import \
CatalogBranchManager
return CatalogBranchManager(
catalog_loader,
self.identifier
)
# Otherwise, use FileSystemBranchManager
from pypaimon.branch.filesystem_branch_manager import FileSystemBranchManager
from pypaimon.branch.filesystem_branch_manager import \
FileSystemBranchManager
current_branch = self.current_branch() or "main"
return FileSystemBranchManager(
self.file_io,
Expand Down Expand Up @@ -380,6 +382,8 @@ def path_factory(self) -> 'FileStorePathFactory':
file_compression=file_compression,
data_file_path_directory=None,
external_paths=external_paths,
external_path_strategy=self.options.data_file_external_paths_strategy(),
external_path_weights=self.options.data_file_external_paths_weights(),
index_file_in_data_file_dir=False,
)

Expand Down Expand Up @@ -417,11 +421,13 @@ def new_stream_write_builder(self) -> StreamWriteBuilder:
return StreamWriteBuilder(self)

def new_full_text_search_builder(self) -> 'FullTextSearchBuilder':
from pypaimon.table.source.full_text_search_builder import FullTextSearchBuilderImpl
from pypaimon.table.source.full_text_search_builder import \
FullTextSearchBuilderImpl
return FullTextSearchBuilderImpl(self)

def new_vector_search_builder(self) -> 'VectorSearchBuilder':
from pypaimon.table.source.vector_search_builder import VectorSearchBuilderImpl
from pypaimon.table.source.vector_search_builder import \
VectorSearchBuilderImpl
return VectorSearchBuilderImpl(self)

def create_row_key_extractor(self) -> RowKeyExtractor:
Expand Down Expand Up @@ -496,6 +502,7 @@ def _try_time_travel(self, options: Options) -> Optional[TableSchema]:

def _create_external_paths(self) -> List[str]:
from urllib.parse import urlparse

from pypaimon.common.options.core_options import ExternalPathStrategy

external_paths_str = self.options.data_file_external_paths()
Expand Down
Loading
Loading