From b23558b4c229feb2b373a3f33432d2fb2e4f7ffa Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Sat, 24 Jan 2026 13:21:19 +0200 Subject: [PATCH 1/2] (improvement) unit tests for benchmarking query planning. Not a very scientific one, but reasonable to get some measurements in terms of how different optimizations work. Example run (on https://github.com/scylladb/python-driver/pull/650 branch): ykaul@ykaul:~/github/python-driver$ pytest -s tests/unit/test_policy_performance.py /usr/lib/python3.14/site-packages/pytest_asyncio/plugin.py:211: PytestDeprecationWarning: The configuration option "asyncio_default_fixture_loop_scope" is unset. The event loop scope for asynchronous fixtures will default to the fixture caching scope. Future versions of pytest-asyncio will default the loop scope for asynchronous fixtures to function scope. Set the default fixture loop scope explicitly in order to avoid unexpected behavior in the future. Valid fixture loop scopes are: "function", "class", "module", "package", "session" warnings.warn(PytestDeprecationWarning(_DEFAULT_FIXTURE_LOOP_SCOPE_UNSET)) ============================================================================================================ test session starts ============================================================================================================= platform linux -- Python 3.14.2, pytest-8.3.5, pluggy-1.6.0 rootdir: /home/ykaul/github/python-driver configfile: pyproject.toml plugins: asyncio-1.1.0, anyio-4.12.1 asyncio: mode=Mode.STRICT, asyncio_default_fixture_loop_scope=None, asyncio_default_test_loop_scope=function collected 4 items tests/unit/test_policy_performance.py Pinned to CPU 0 .... === Performance Benchmarks === Policy | Ops | Time (s) | Kops/s ---------------------------------------------------------------------- DCAware | 100000 | 0.2328 | 429 RackAware | 100000 | 0.3637 | 274 TokenAware(DCAware) | 100000 | 1.5884 | 62 TokenAware(RackAware) | 100000 | 1.6816 | 59 ---------------------------------------------------------------------- Signed-off-by: Yaniv Kaul --- tests/unit/test_policy_performance.py | 214 ++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 tests/unit/test_policy_performance.py diff --git a/tests/unit/test_policy_performance.py b/tests/unit/test_policy_performance.py new file mode 100644 index 0000000000..ecb31ca939 --- /dev/null +++ b/tests/unit/test_policy_performance.py @@ -0,0 +1,214 @@ +import unittest +import time +import uuid +import struct +import os +import statistics +from unittest.mock import Mock + +from cassandra.policies import ( + DCAwareRoundRobinPolicy, + RackAwareRoundRobinPolicy, + TokenAwarePolicy, + DefaultLoadBalancingPolicy, + HostFilterPolicy +) +from cassandra.pool import Host +from cassandra.cluster import SimpleConvictionPolicy + +# Mock for Connection/EndPoint since Host expects it +class MockEndPoint(object): + __slots__ = ('address',) + + def __init__(self, address): + self.address = address + def __str__(self): + return self.address + +class MockStatement(object): + __slots__ = ('routing_key', 'keyspace', 'table') + + def __init__(self, routing_key, keyspace="ks", table="tbl"): + self.routing_key = routing_key + self.keyspace = keyspace + self.table = table + + def is_lwt(self): + return False + +class MockTokenMap(object): + __slots__ = ('token_class', 'get_replicas_func') + def __init__(self, get_replicas_func): + self.token_class = Mock() + self.token_class.from_key = lambda k: k + self.get_replicas_func = get_replicas_func + + def get_replicas(self, keyspace, token): + return self.get_replicas_func(keyspace, token) + +class MockTablets(object): + __slots__ = () + def get_tablet_for_key(self, keyspace, table, key): + return None + +class MockMetadata(object): + __slots__ = ('_tablets', 'token_map', 'get_replicas_func', 'hosts_by_address') + def __init__(self, get_replicas_func, hosts_by_address): + self._tablets = MockTablets() + self.token_map = MockTokenMap(get_replicas_func) + self.get_replicas_func = get_replicas_func + self.hosts_by_address = hosts_by_address + + def can_support_partitioner(self): + return True + + def get_replicas(self, keyspace, key): + return self.get_replicas_func(keyspace, key) + + def get_host(self, addr): + return self.hosts_by_address.get(addr) + +class MockCluster(object): + __slots__ = ('metadata',) + def __init__(self, metadata): + self.metadata = metadata + +class TestPolicyPerformance(unittest.TestCase): + @classmethod + def setUpClass(cls): + if hasattr(os, 'sched_setaffinity'): + try: + # Pin to the first available CPU + cpu = list(os.sched_getaffinity(0))[0] + os.sched_setaffinity(0, {cpu}) + print(f"Pinned to CPU {cpu}") + except Exception as e: + print(f"Could not pin CPU: {e}") + + # 1. Topology: 5 DCs, 3 Racks/DC, 3 Nodes/Rack = 45 Nodes + cls.hosts = [] + cls.hosts_map = {} # host_id -> Host + cls.replicas_map = {} # routing_key -> list of replica hosts + + # Deterministic generation + dcs = ['dc{}'.format(i) for i in range(5)] + racks = ['rack{}'.format(i) for i in range(3)] + nodes_per_rack = 3 + + ip_counter = 0 + subnet_counter = 0 + for dc in dcs: + for rack in racks: + subnet_counter += 1 + for node_idx in range(nodes_per_rack): + ip_counter += 1 + address = "127.0.{}.{}".format(subnet_counter, node_idx + 1) + h_id = uuid.UUID(int=ip_counter) + h = Host(MockEndPoint(address), SimpleConvictionPolicy, host_id=h_id) + h.set_location_info(dc, rack) + cls.hosts.append(h) + cls.hosts_map[h_id] = h + + # 2. Queries: 100,000 deterministic queries + cls.query_count = 100000 + cls.queries = [] + cls.results = [] + # We'll use simple packed integers as routing keys + for i in range(cls.query_count): + key = struct.pack('>I', i) + cls.queries.append(MockStatement(routing_key=key)) + + # Pre-calculate replicas for TokenAware: + # Deterministically pick 3 replicas based on the key index + # This simulates the metadata.get_replicas behavior + # We pick index i, i+1, i+2 mod 45 + replicas = [] + for r in range(3): + idx = (i + r) % len(cls.hosts) + replicas.append(cls.hosts[idx]) + cls.replicas_map[key] = replicas + + def _get_replicas_side_effect(self, keyspace, key): + return self.replicas_map.get(key, []) + + def _setup_cluster_mock(self): + hosts_by_address = {} + for host in self.hosts: + addr = getattr(host, 'address', None) + if addr is None and getattr(host, 'endpoint', None) is not None: + addr = getattr(host.endpoint, 'address', None) + if addr is not None: + hosts_by_address[addr] = host + metadata = MockMetadata(self._get_replicas_side_effect, hosts_by_address) + return MockCluster(metadata) + + def _run_benchmark(self, name, policy): + # Setup + cluster = self._setup_cluster_mock() + policy.populate(cluster, self.hosts) + + # Warmup + for _ in range(100): + list(policy.make_query_plan(working_keyspace="ks", query=self.queries[0])) + + # Run multiple iterations to reduce noise + iterations = 5 + timings = [] + + for _ in range(iterations): + start_time = time.perf_counter() + for q in self.queries: + # We consume the iterator to ensure full plan generation cost is paid + for _ in policy.make_query_plan(working_keyspace="ks", query=q): + pass + end_time = time.perf_counter() + timings.append(end_time - start_time) + + # Use median to filter outliers + duration = statistics.median(timings) + + count = len(self.queries) + ops_per_sec = count / duration + kops = int(ops_per_sec / 1000) + + self.results.append((name, count, duration, kops)) + return ops_per_sec + + @classmethod + def tearDownClass(cls): + print("\n\n=== Performance Benchmarks ===") + print(f"{'Policy':<30} | {'Ops':<10} | {'Time (s)':<10} | {'Kops/s':<10}") + print("-" * 70) + for name, count, duration, kops in cls.results: + print(f"{name:<30} | {count:<10} | {duration:<10.4f} | {kops:<10}") + print("-" * 70) + + def test_dc_aware(self): + # Local DC = dc0, 1 remote host per DC + policy = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) + self._run_benchmark("DCAware", policy) + + def test_rack_aware(self): + # Local DC = dc0, Local Rack = rack0, 1 remote host per DC + policy = RackAwareRoundRobinPolicy(local_dc='dc0', local_rack='rack0', used_hosts_per_remote_dc=1) + self._run_benchmark("RackAware", policy) + + def test_token_aware_wrapping_dc_aware(self): + child = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) + policy = TokenAwarePolicy(child, shuffle_replicas=False) # False for strict determinism in test if needed + self._run_benchmark("TokenAware(DCAware)", policy) + + def test_token_aware_wrapping_rack_aware(self): + child = RackAwareRoundRobinPolicy(local_dc='dc0', local_rack='rack0', used_hosts_per_remote_dc=1) + policy = TokenAwarePolicy(child, shuffle_replicas=False) + self._run_benchmark("TokenAware(RackAware)", policy) + + def test_default_wrapping_dc_aware(self): + child = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) + policy = DefaultLoadBalancingPolicy(child) + self._run_benchmark("Default(DCAware)", policy) + + def test_host_filter_wrapping_dc_aware(self): + child = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) + policy = HostFilterPolicy(child_policy=child, predicate=lambda host: host.rack != 'rack2') + self._run_benchmark("HostFilter(DCAware)", policy) From a797262e6695d4c49d10e85894f27b8ea0860a1b Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Sun, 25 Jan 2026 16:01:26 +0200 Subject: [PATCH 2/2] (improvement) add HostPolicy to the benchmark suite. Signed-off-by: Yaniv Kaul --- tests/unit/test_policy_performance.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/test_policy_performance.py b/tests/unit/test_policy_performance.py index ecb31ca939..fbe80cfb26 100644 --- a/tests/unit/test_policy_performance.py +++ b/tests/unit/test_policy_performance.py @@ -6,6 +6,8 @@ import statistics from unittest.mock import Mock +"A micro-bechmark for performance of policies" + from cassandra.policies import ( DCAwareRoundRobinPolicy, RackAwareRoundRobinPolicy,