Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

Unreleased
----------
This upcoming version includes:
- **Breaking**: Remove ``max_workers`` and ``alive_nodes_checking_frequency`` parameters from
``Configure.Replication.async_config()`` and ``Reconfigure.Replication.async_config()``.
Both fields have been no-ops on the server since Weaviate 1.37.3. Code referencing these
parameters will need to be updated; no behavioral change results from the removal.

Version 4.21.1
--------------
This patch version includes:
Expand Down
28 changes: 13 additions & 15 deletions integration/test_collection_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1598,8 +1598,8 @@ def test_replication_config_with_async_config(collection_factory: CollectionFact
factor=1,
async_enabled=True,
async_config=Configure.Replication.async_config(
max_workers=8,
hashtree_height=20,
propagation_concurrency=4,
),
),
)
Expand All @@ -1608,8 +1608,8 @@ def test_replication_config_with_async_config(collection_factory: CollectionFact
assert config.replication_config.async_enabled is True
assert config.replication_config.async_config is not None
ac = config.replication_config.async_config
assert ac.max_workers == 8
assert ac.hashtree_height == 20
assert ac.propagation_concurrency == 4


def test_replication_config_remove_async_config_by_disabling_async_replication(
Expand All @@ -1624,14 +1624,13 @@ def test_replication_config_remove_async_config_by_disabling_async_replication(
factor=1,
async_enabled=True,
async_config=Configure.Replication.async_config(
max_workers=8,
hashtree_height=20,
propagation_concurrency=4,
),
),
)
config = collection.config.get()
assert config.replication_config.async_config is not None
assert config.replication_config.async_config.max_workers == 8
assert config.replication_config.async_config.propagation_concurrency == 4

collection.config.update(
replication_config=Reconfigure.replication(
Expand All @@ -1653,14 +1652,13 @@ def test_replication_config_remove_async_config(collection_factory: CollectionFa
factor=1,
async_enabled=True,
async_config=Configure.Replication.async_config(
max_workers=8,
hashtree_height=20,
propagation_concurrency=4,
),
),
)
config = collection.config.get()
assert config.replication_config.async_config is not None
assert config.replication_config.async_config.max_workers == 8
assert config.replication_config.async_config.propagation_concurrency == 4

collection.config.update(
replication_config=Reconfigure.replication(
Expand All @@ -1685,29 +1683,29 @@ def test_replication_config_unset_single_async_field(
factor=1,
async_enabled=True,
async_config=Configure.Replication.async_config(
max_workers=8,
hashtree_height=20,
propagation_concurrency=4,
),
),
)
config = collection.config.get()
ac = config.replication_config.async_config
assert ac is not None
assert ac.max_workers == 8
assert ac.hashtree_height == 20
assert ac.propagation_concurrency == 4

# Update with only max_workers — hashtree_height reverts to server default
# Update with only propagation_concurrency — hashtree_height reverts to server default
collection.config.update(
replication_config=Reconfigure.replication(
async_config=Reconfigure.Replication.async_config(
max_workers=8,
propagation_concurrency=4,
),
),
)
config = collection.config.get()
ac = config.replication_config.async_config
assert ac is not None
assert ac.max_workers == 8
assert ac.propagation_concurrency == 4
assert ac.hashtree_height != 20


Expand All @@ -1734,15 +1732,15 @@ def test_replication_config_add_async_config_to_existing_collection(
collection.config.update(
replication_config=Reconfigure.replication(
async_config=Reconfigure.Replication.async_config(
max_workers=8,
hashtree_height=20,
propagation_concurrency=4,
),
),
)
config = collection.config.get()
assert config.replication_config.async_config is not None
ac = config.replication_config.async_config
assert ac.max_workers == 8
assert ac.hashtree_height == 20
assert ac.propagation_concurrency == 4


Expand Down
19 changes: 4 additions & 15 deletions test/collection/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2853,11 +2853,9 @@ def test_config_with_vectors(vector_config: List[_VectorConfigCreate], expected:
(
Configure.replication(
async_config=Configure.Replication.async_config(
max_workers=10,
hashtree_height=5,
frequency=60,
frequency_while_propagating=30,
alive_nodes_checking_frequency=120,
logging_frequency=15,
diff_batch_size=100,
diff_per_node_timeout=10,
Expand All @@ -2871,11 +2869,9 @@ def test_config_with_vectors(vector_config: List[_VectorConfigCreate], expected:
),
{
"asyncConfig": {
"maxWorkers": 10,
"hashtreeHeight": 5,
"frequency": 60,
"frequencyWhilePropagating": 30,
"aliveNodesCheckingFrequency": 120,
"loggingFrequency": 15,
"diffBatchSize": 100,
"diffPerNodeTimeout": 10,
Expand Down Expand Up @@ -2923,11 +2919,9 @@ def test_configure_with_replication(config: _ReplicationConfigCreate, expected:
(
Reconfigure.replication(
async_config=Reconfigure.Replication.async_config(
max_workers=10,
hashtree_height=5,
frequency=60,
frequency_while_propagating=30,
alive_nodes_checking_frequency=120,
logging_frequency=15,
diff_batch_size=100,
diff_per_node_timeout=10,
Expand All @@ -2944,11 +2938,9 @@ def test_configure_with_replication(config: _ReplicationConfigCreate, expected:
"asyncEnabled": None,
"deletionStrategy": None,
"asyncConfig": {
"maxWorkers": 10,
"hashtreeHeight": 5,
"frequency": 60,
"frequencyWhilePropagating": 30,
"aliveNodesCheckingFrequency": 120,
"loggingFrequency": 15,
"diffBatchSize": 100,
"diffPerNodeTimeout": 10,
Expand Down Expand Up @@ -2977,29 +2969,26 @@ def test_replication_config_to_dict_with_async_config() -> None:
async_enabled=True,
deletion_strategy=ReplicationDeletionStrategy.TIME_BASED_RESOLUTION,
async_config=_AsyncReplicationConfig(
max_workers=8,
hashtree_height=20,
frequency=None,
frequency_while_propagating=None,
alive_nodes_checking_frequency=3,
logging_frequency=None,
diff_batch_size=None,
diff_per_node_timeout=None,
pre_propagation_timeout=None,
propagation_timeout=None,
propagation_limit=None,
propagation_delay=None,
propagation_concurrency=None,
propagation_concurrency=4,
propagation_batch_size=None,
),
)
d = config.to_dict()
assert d["factor"] == 3
assert d["asyncEnabled"] is True
assert d["deletionStrategy"] == "TimeBasedResolution"
assert d["asyncConfig"]["maxWorkers"] == 8
assert d["asyncConfig"]["hashtreeHeight"] == 20
assert d["asyncConfig"]["aliveNodesCheckingFrequency"] == 3
assert d["asyncConfig"]["propagationConcurrency"] == 4


def test_replication_config_to_dict_without_async_config() -> None:
Expand All @@ -3025,7 +3014,7 @@ def test_replication_config_update_merge_with_missing_async_config() -> None:
"""
update = Reconfigure.replication(
async_config=Reconfigure.Replication.async_config(
max_workers=12,
hashtree_height=5,
propagation_concurrency=4,
),
)
Expand All @@ -3036,7 +3025,7 @@ def test_replication_config_update_merge_with_missing_async_config() -> None:
"deletionStrategy": "NoAutomatedResolution",
}
result = update.merge_with_existing(existing_schema)
assert result["asyncConfig"]["maxWorkers"] == 12
assert result["asyncConfig"]["hashtreeHeight"] == 5
assert result["asyncConfig"]["propagationConcurrency"] == 4
assert result["factor"] == 3

Expand Down
6 changes: 3 additions & 3 deletions test/collection/test_config_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ def test_replication_async_config_replace_on_update() -> None:
schema = {
"factor": 1,
"asyncEnabled": True,
"asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20},
"asyncConfig": {"hashtreeHeight": 20, "propagationConcurrency": 4},
}
update = Reconfigure.replication(
async_config=Reconfigure.Replication.async_config(max_workers=16),
async_config=Reconfigure.Replication.async_config(propagation_concurrency=8),
)
result = update.merge_with_existing(schema)
assert result["asyncConfig"] == {"maxWorkers": 16}
assert result["asyncConfig"] == {"propagationConcurrency": 8}
assert "hashtreeHeight" not in result["asyncConfig"]


Expand Down
14 changes: 0 additions & 14 deletions weaviate/collections/classes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,9 @@ class _ShardingConfigCreate(_ConfigCreateModel):


class _AsyncReplicationConfigCreate(_ConfigCreateModel):
maxWorkers: Optional[int]
hashtreeHeight: Optional[int]
frequency: Optional[int]
frequencyWhilePropagating: Optional[int]
aliveNodesCheckingFrequency: Optional[int]
loggingFrequency: Optional[int]
diffBatchSize: Optional[int]
diffPerNodeTimeout: Optional[int]
Expand All @@ -315,11 +313,9 @@ class _AsyncReplicationConfigCreate(_ConfigCreateModel):


class _AsyncReplicationConfigUpdate(_ConfigUpdateModel):
maxWorkers: Optional[int]
hashtreeHeight: Optional[int]
frequency: Optional[int]
frequencyWhilePropagating: Optional[int]
aliveNodesCheckingFrequency: Optional[int]
loggingFrequency: Optional[int]
diffBatchSize: Optional[int]
diffPerNodeTimeout: Optional[int]
Expand Down Expand Up @@ -1809,11 +1805,9 @@ def to_dict(self) -> Dict[str, Any]:

@dataclass
class _AsyncReplicationConfig(_ConfigBase):
max_workers: Optional[int]
hashtree_height: Optional[int]
frequency: Optional[int]
frequency_while_propagating: Optional[int]
alive_nodes_checking_frequency: Optional[int]
logging_frequency: Optional[int]
diff_batch_size: Optional[int]
diff_per_node_timeout: Optional[int]
Expand Down Expand Up @@ -2565,11 +2559,9 @@ class _Replication:
@staticmethod
def async_config(
*,
max_workers: Optional[int] = None,
hashtree_height: Optional[int] = None,
frequency: Optional[int] = None,
frequency_while_propagating: Optional[int] = None,
alive_nodes_checking_frequency: Optional[int] = None,
logging_frequency: Optional[int] = None,
diff_batch_size: Optional[int] = None,
diff_per_node_timeout: Optional[int] = None,
Expand All @@ -2585,11 +2577,9 @@ def async_config(
This is only available with WeaviateDB `>=v1.36.0`.
"""
return _AsyncReplicationConfigCreate(
maxWorkers=max_workers,
hashtreeHeight=hashtree_height,
frequency=frequency,
frequencyWhilePropagating=frequency_while_propagating,
aliveNodesCheckingFrequency=alive_nodes_checking_frequency,
loggingFrequency=logging_frequency,
diffBatchSize=diff_batch_size,
diffPerNodeTimeout=diff_per_node_timeout,
Expand All @@ -2606,11 +2596,9 @@ class _ReplicationUpdate:
@staticmethod
def async_config(
*,
max_workers: Optional[int] = None,
hashtree_height: Optional[int] = None,
frequency: Optional[int] = None,
frequency_while_propagating: Optional[int] = None,
alive_nodes_checking_frequency: Optional[int] = None,
logging_frequency: Optional[int] = None,
diff_batch_size: Optional[int] = None,
diff_per_node_timeout: Optional[int] = None,
Expand All @@ -2626,11 +2614,9 @@ def async_config(
This is only available with WeaviateDB `>=v1.36.0`.
"""
return _AsyncReplicationConfigUpdate(
maxWorkers=max_workers,
hashtreeHeight=hashtree_height,
frequency=frequency,
frequencyWhilePropagating=frequency_while_propagating,
aliveNodesCheckingFrequency=alive_nodes_checking_frequency,
loggingFrequency=logging_frequency,
diffBatchSize=diff_batch_size,
diffPerNodeTimeout=diff_per_node_timeout,
Expand Down
2 changes: 0 additions & 2 deletions weaviate/collections/classes/config_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,9 @@ def _collection_config_from_json(schema: Dict[str, Any]) -> _CollectionConfig:
),
async_config=(
_AsyncReplicationConfig(
max_workers=async_cfg.get("maxWorkers"),
hashtree_height=async_cfg.get("hashtreeHeight"),
frequency=async_cfg.get("frequency"),
frequency_while_propagating=async_cfg.get("frequencyWhilePropagating"),
alive_nodes_checking_frequency=async_cfg.get("aliveNodesCheckingFrequency"),
logging_frequency=async_cfg.get("loggingFrequency"),
diff_batch_size=async_cfg.get("diffBatchSize"),
diff_per_node_timeout=async_cfg.get("diffPerNodeTimeout"),
Expand Down