Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ def test_invalid_query_on_non_existent_collection(self):

with self.assertRaises(Exception) as context:
with TestPipeline() as p:
p.not_use_test_runner_api = True
_ = (p | beam.Create(test_chunks) | Enrichment(handler))

expect_err_msg_contains = "collection not found"
Expand Down Expand Up @@ -307,6 +308,7 @@ def test_invalid_query_on_non_existent_field(self):

with self.assertRaises(Exception) as context:
with TestPipeline() as p:
p.not_use_test_runner_api = True
_ = (p | beam.Create(test_chunks) | Enrichment(handler))

expect_err_msg_contains = f"fieldName({non_existent_field}) not found"
Expand All @@ -330,6 +332,7 @@ def test_empty_input_chunks(self):
expected_chunks = []

with TestPipeline() as p:
p.not_use_test_runner_api = True
result = (p | beam.Create(test_chunks) | Enrichment(handler))
assert_that(
result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent(
Expand Down Expand Up @@ -458,6 +461,7 @@ def test_filtered_search_with_cosine_similarity_and_batching(self):
]

with TestPipeline() as p:
p.not_use_test_runner_api = True
result = (p | beam.Create(test_chunks) | Enrichment(handler))
assert_that(
result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent(
Expand Down Expand Up @@ -563,6 +567,7 @@ def test_filtered_search_with_bm25_full_text_and_batching(self):
]

with TestPipeline() as p:
p.not_use_test_runner_api = True
result = (p | beam.Create(test_chunks) | Enrichment(handler))
assert_that(
result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent(
Expand Down Expand Up @@ -704,6 +709,7 @@ def test_vector_search_with_euclidean_distance(self):
]

with TestPipeline() as p:
p.not_use_test_runner_api = True
result = (p | beam.Create(test_chunks) | Enrichment(handler))
assert_that(
result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent(
Expand Down Expand Up @@ -844,6 +850,7 @@ def test_vector_search_with_inner_product_similarity(self):
]

with TestPipeline() as p:
p.not_use_test_runner_api = True
result = (p | beam.Create(test_chunks) | Enrichment(handler))
assert_that(
result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent(
Expand Down Expand Up @@ -909,6 +916,7 @@ def test_keyword_search_with_inner_product_sparse_embedding(self):
]

with TestPipeline() as p:
p.not_use_test_runner_api = True
result = (p | beam.Create(test_chunks) | Enrichment(handler))
assert_that(
result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent(
Expand Down Expand Up @@ -982,6 +990,7 @@ def test_hybrid_search(self):
]

with TestPipeline() as p:
p.not_use_test_runner_api = True
result = (p | beam.Create(test_chunks) | Enrichment(handler))
assert_that(
result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ def tearDownClass(cls):

def setUp(self):
self.write_test_pipeline = TestPipeline()
self.write_test_pipeline.not_use_test_runner_api = True
self._collection_name = f"test_collection_{self._testMethodName}"
self._partition_name = f"test_partition_{self._testMethodName}"
config = unpack_dataclass_with_kwargs(self._connection_config)
Expand Down Expand Up @@ -232,6 +231,7 @@ def test_invalid_write_on_non_existent_collection(self):
# Write pipeline.
with self.assertRaises(Exception) as context:
with TestPipeline() as p:
p.not_use_test_runner_api = True
_ = (p | beam.Create(test_chunks) | config.create_write_transform())

# Assert on what should happen.
Expand All @@ -252,6 +252,7 @@ def test_invalid_write_on_non_existent_partition(self):
# Write pipeline.
with self.assertRaises(Exception) as context:
with TestPipeline() as p:
p.not_use_test_runner_api = True
_ = (p | beam.Create(test_chunks) | config.create_write_transform())

# Assert on what should happen.
Expand Down Expand Up @@ -287,6 +288,7 @@ def test_invalid_write_on_missing_primary_key_in_entity(self):
# Write pipeline.
with self.assertRaises(Exception) as context:
with TestPipeline() as p:
p.not_use_test_runner_api = True
_ = (p | beam.Create(test_chunks) | config.create_write_transform())

# Assert on what should happen.
Expand Down Expand Up @@ -332,6 +334,7 @@ def test_write_on_auto_id_primary_key(self):
config = MilvusVectorWriterConfig(
connection_params=self._connection_config, write_config=write_config)

self.write_test_pipeline.not_use_test_runner_api = True
with self.write_test_pipeline as p:
_ = (p | beam.Create(test_chunks) | config.create_write_transform())

Expand All @@ -357,6 +360,7 @@ def test_write_on_existent_collection_with_default_schema(self):
config = MilvusVectorWriterConfig(
connection_params=self._connection_config, write_config=write_config)

self.write_test_pipeline.not_use_test_runner_api = True
with self.write_test_pipeline as p:
_ = (p | beam.Create(test_chunks) | config.create_write_transform())

Expand Down Expand Up @@ -422,6 +426,7 @@ def test_write_with_custom_column_specifications(self):
write_config=write_config,
column_specs=custom_column_specs)

self.write_test_pipeline.not_use_test_runner_api = True
with self.write_test_pipeline as p:
_ = (p | beam.Create(test_chunks) | config.create_write_transform())

Expand Down Expand Up @@ -474,6 +479,7 @@ def test_write_with_batching(self):
config = MilvusVectorWriterConfig(
connection_params=self._connection_config, write_config=write_config)

self.write_test_pipeline.not_use_test_runner_api = True
with self.write_test_pipeline as p:
_ = (p | beam.Create(test_chunks) | config.create_write_transform())

Expand Down
54 changes: 44 additions & 10 deletions sdks/python/apache_beam/ml/rag/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ class CustomMilvusContainer(MilvusContainer):
Extends MilvusContainer to provide custom port binding and environment
configuration for testing with standalone Milvus instances.
"""
def __init__(

def __init__( # pylint: disable=bad-super-call
self,
image: str,
service_container_port,
Expand All @@ -96,7 +97,11 @@ def __init__(
) -> None:
# Skip the parent class's constructor and go straight to
# GenericContainer.
super(MilvusContainer, self).__init__(image=image, **kwargs)
super(
MilvusContainer,
self,
).__init__(
image=image, **kwargs)
self.port = service_container_port
self.healthcheck_port = healthcheck_container_port
self.with_exposed_ports(service_container_port, healthcheck_container_port)
Expand Down Expand Up @@ -133,6 +138,27 @@ class MilvusTestHelpers:
# https://milvus.io/docs/release_notes.md or PyPI at
# https://pypi.org/project/pymilvus/ for version compatibility.
# Example: Milvus v2.6.0 requires pymilvus==2.6.0 (exact match required).
@staticmethod
def _wait_for_milvus_grpc(uri: str) -> None:
"""Wait until Milvus accepts RPCs.

Docker may report started before gRPC is ready.
"""
def list_collections_probe():
client = MilvusClient(uri=uri)
try:
client.list_collections()
finally:
client.close()

retry_with_backoff(
list_collections_probe,
max_retries=25,
retry_delay=2.0,
retry_backoff_factor=1.2,
operation_name="Milvus client connection after container start",
exception_types=(MilvusException, ))

@staticmethod
def start_db_container(
image="milvusdb/milvus:v2.5.10",
Expand All @@ -148,23 +174,31 @@ def start_db_container(
if tc_max_retries is not None:
testcontainers_config.max_tries = tc_max_retries
for i in range(vector_client_max_retries):
vector_db_container: Optional[CustomMilvusContainer] = None
try:
vector_db_container = CustomMilvusContainer(
image=image,
service_container_port=service_container_port,
healthcheck_container_port=healthcheck_container_port)
vector_db_container = vector_db_container.with_volume_mapping(
mapped_container = vector_db_container.with_volume_mapping(
cfg, "/milvus/configs/user.yaml")
vector_db_container.start()
host = vector_db_container.get_container_host_ip()
port = vector_db_container.get_exposed_port(service_container_port)
info = VectorDBContainerInfo(vector_db_container, host, port)
assert mapped_container is not None
running_container: CustomMilvusContainer = mapped_container
vector_db_container = running_container
running_container.start()
host = running_container.get_container_host_ip()
port = running_container.get_exposed_port(service_container_port)
info = VectorDBContainerInfo(running_container, host, port)
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
_LOGGER.info(
"milvus db container started successfully on %s.", info.uri)
break
except Exception as e:
stdout_logs, stderr_logs = vector_db_container.get_logs()
stdout_logs = stdout_logs.decode("utf-8")
stderr_logs = stderr_logs.decode("utf-8")
stdout_logs = stderr_logs = ""
if vector_db_container is not None:
raw_out, raw_err = vector_db_container.get_logs()
stdout_logs = raw_out.decode("utf-8")
stderr_logs = raw_err.decode("utf-8")
_LOGGER.warning(
"Retry %d/%d: Failed to start Milvus DB container. Reason: %s. "
"STDOUT logs:\n%s\nSTDERR logs:\n%s",
Expand Down
24 changes: 17 additions & 7 deletions sdks/python/apache_beam/yaml/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,13 +719,23 @@ def test(self, providers=providers): # default arg to capture loop value
**yaml_transform.SafeLineLoader.strip_metadata(
fixture.get('config', {}))))
for pipeline_spec in spec['pipelines']:
with beam.Pipeline(options=PipelineOptions(
pickle_library='cloudpickle',
**replace_recursive(yaml_transform.SafeLineLoader.strip_metadata(
pipeline_spec.get('options', {})),
vars))) as p:
yaml_transform.expand_pipeline(
p, replace_recursive(pipeline_spec, vars))
try:
with beam.Pipeline(options=PipelineOptions(
pickle_library='cloudpickle',
**replace_recursive(
yaml_transform.SafeLineLoader.strip_metadata(
pipeline_spec.get('options', {})),
vars))) as p:
yaml_transform.expand_pipeline(
p, replace_recursive(pipeline_spec, vars))
except ValueError as exn:
# FnApiRunner currently does not support this requirement in
# some xlang scenarios (e.g. Iceberg YAML pipelines).
if 'beam:requirement:pardo:on_window_expiration:v1' in str(exn):
self.skipTest(
'Runner does not support '
'beam:requirement:pardo:on_window_expiration:v1')
raise

yield f'test_{suffix}', test

Expand Down
40 changes: 32 additions & 8 deletions sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,16 @@ def cythonize(*args, **kwargs):

milvus_dependency = ['pymilvus>=2.5.10,<3.0.0']

ml_base = [
# google-adk / OpenTelemetry require protobuf>=5; tensorflow-transform in
# ml_test is pinned to versions that require protobuf<5 on Python 3.10. Those
Comment thread
damccorm marked this conversation as resolved.
# cannot be installed together, so ADK deps stay out of ml_test (use ml_base).
ml_base_core = [
'embeddings>=0.0.4', # 0.0.3 crashes setuptools
'google-adk',
'onnxruntime',
# onnx 1.12–1.13 cap protobuf in ways that trigger huge backtracking with
# Beam[gcp]+ml_test; pip can fall back to onnx 1.11 sdist which needs cmake.
# 1.14.1+ matches tf2onnx>=1.16 and ships manylinux wheels for py3.10.
'onnx>=1.14.1,<2',
'langchain',
'sentence-transformers>=2.2.2',
'skl2onnx',
Expand All @@ -174,11 +180,24 @@ def cythonize(*args, **kwargs):
# tensorflow transitive dep, lower versions not compatible with Python3.10+
'absl-py>=0.12.0',
'tensorflow-hub',
'tf2onnx',
'torch',
'transformers',
]

ml_adk_dependency = [
'google-adk==1.28.1',
# proto-plus<1.24 caps protobuf<5; opentelemetry-proto (via ADK) needs
# protobuf>=5. Scoped here so the main dependency list stays broader.
'proto-plus>=1.26.1,<2',
'opentelemetry-api==1.37.0',
'opentelemetry-sdk==1.37.0',
'opentelemetry-exporter-otlp-proto-http==1.37.0',
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we need to pin these opentelemetry dependencies as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes as ADK fixes OTel versions, unpinned exporters were upgrading and conflicting with ADK’s SDK constraint, so i pined the trio to match

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and pip failed with something like google-adk 1.17.0 wants opentelemetry-sdk exactly 1.37.x, but pip pulled opentelemetry-exporter-otlp-proto-http 1.40.x, which requires opentelemetry-sdk~=1.40

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know where the opentelemetry requirement was coming from that installed 1.40.x? I would've expected that to be handled by pip

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pip was handling it, it ended in ResolutionImpossible / a hard conflict, not a wrong install and the 1.40.x side was coming from opentelemetry-exporter-otlp-proto-http (e.g. 1.40.0 wants opentelemetry-sdk~=1.40). google-adk meanwhile pins opentelemetry-sdk to 1.37.x, so theres no single version of the SDK that satisfies both so pip can’t merge those, it just fails unless we pin the exporter (and api) to the same minor as ADK’s SDK so nothing pulls 1.40.x into the graph

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but what requires opentelemetry-exporter-otlp-proto-http?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to pin opentelemetry-exporter-otlp-proto-http because otherwise pip kept grabbing a newer 1.40.x build that wants opentelemetry-sdk~=1.40, while ADK only lines up with 1.37.x so resolution blew up unless i locked the exporter to the same minor as the SDK @damccorm

# protobuf>=5 (ADK/OTel); tf2onnx 1.16.x pins protobuf~=3.20 only.
'tf2onnx>=1.17.0,<1.18',
]

ml_base = ml_base_core + ml_adk_dependency


def find_by_ext(root_dir, ext):
for root, _, files in os.walk(root_dir):
Expand Down Expand Up @@ -392,6 +411,9 @@ def get_portability_package_data():
'packaging>=22.0',
'pillow>=12.1.1,<13',
'pymongo>=3.8.0,<5.0.0',
# ADK / OpenTelemetry need proto-plus>=1.26.1 (protobuf>=5); that
# floor is declared on ml_adk_dependency only so core installs stay
# compatible with older proto-plus.
'proto-plus>=1.7.1,<2',
# 1. Use a tighter upper bound in protobuf dependency to make sure
# the minor version at job submission
Expand Down Expand Up @@ -447,9 +469,11 @@ def get_portability_package_data():
'mock>=1.0.1,<6.0.0',
'pandas<2.3.0',
'parameterized>=0.7.1,<0.10.0',
'pydot>=1.2.0,<2',
'pyhamcrest>=1.9,!=1.10.0,<3.0.0',
'requests_mock>=1.7,<2.0',
'tenacity>=8.0.0,<9',
# google-adk 1.28+ requires tenacity>=9,<10 (conflicts with <9).
'tenacity>=8.0.0,<10',
'pytest>=7.1.2,<9.0',
'pytest-xdist>=2.5.0,<4',
'pytest-timeout>=2.1.0,<3',
Expand Down Expand Up @@ -547,14 +571,14 @@ def get_portability_package_data():
# TFT->TFX-BSL require pandas 1.x, which is not compatible
# with numpy 2.x
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we need this anymore?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you mean protobuf<4, i dropped that cap because with ADK/OTel in the same extra it fought protobuf>=5. ml_test no longer pulls ADK, so that specific cap isnt what unblocks resolution anymore, we can revisit a narrower cap separately if [gcp]+[ml_test] still allows it.

'numpy<2',
# To help with dependency resolution in test suite. Revise once
# https://github.com/apache/beam/issues/37854 is fixed
'protobuf<4; python_version<"3.11"'
# Comment out xgboost as it is breaking presubmit python ml
# tests due to tag check introduced since pip 24.2
# https://github.com/apache/beam/issues/31285
# 'xgboost<2.0', # https://github.com/apache/beam/issues/31252
] + ml_base,
# tft needs protobuf<5; tf2onnx 1.17+ allows protobuf 5 on the
# ADK-only path.
'tf2onnx>=1.16.1,<1.17',
] + ml_base_core,
'p310_ml_test': [
'datatable',
] + ml_base,
Expand Down
Loading