From 7bc5927955ff6eb4532d82bae3f1b22ed6d2b30f Mon Sep 17 00:00:00 2001 From: wilmerdooley Date: Sat, 30 May 2026 07:47:28 -0700 Subject: [PATCH] Make GCS filesystem lookup lazy to match S3 behavior FileSystems.get_filesystem("gs://...") raised immediately when the gcp extra was not installed, because gcsfilesystem.py imported gcsio (and its google-cloud-storage dependency) at module load time. When that import failed, GCSFileSystem was never registered, unlike S3FileSystem whose s3io imports boto3 lazily. Import gcsio lazily so GCSFileSystem can still be looked up without the gcp extra, deferring the dependency error to usage time (matching S3). A single _get_gcsio_module() helper raises a clear ImportError when the module is unavailable; CHUNK_SIZE, _gcsIO and report_lineage go through it. Add a regression test that simulates the missing extra in a subprocess. Fixes #37445 Signed-off-by: wilmerdooley --- .../apache_beam/io/gcp/gcsfilesystem.py | 55 ++++++++++++++-- .../apache_beam/io/gcp/gcsfilesystem_test.py | 64 ++++++++++++++++++- 2 files changed, 113 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 3763e21abc9f..003947d84473 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -34,22 +34,65 @@ from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystem import FileMetadata from apache_beam.io.filesystem import FileSystem -from apache_beam.io.gcp import gcsio + +try: + from apache_beam.io.gcp import gcsio +except ImportError: + # The gcp extra (google-cloud-storage and friends, imported by gcsio) may not + # be installed. Import it lazily so GCSFileSystem can still be looked up via + # FileSystems.get_filesystem() without the gcp extra installed; the dependency + # is only required when the filesystem is actually used, matching the behavior + # of S3FileSystem. See https://github.com/apache/beam/issues/37445. + gcsio = None # type: ignore[assignment] __all__ = ['GCSFileSystem'] +class _classproperty: + """A read-only property that resolves on the class as well as instances. + + Lets ``GCSFileSystem.CHUNK_SIZE`` remain accessible as a class attribute (as + in ``S3FileSystem``) while computing its value lazily, so that importing this + module does not require the gcp extra. See + https://github.com/apache/beam/issues/37445. + """ + def __init__(self, fget): + self._fget = fget + + def __get__(self, obj, owner): + return self._fget(owner) + + class GCSFileSystem(FileSystem): """A GCS ``FileSystem`` implementation for accessing files on GCS. """ - CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations GCS_PREFIX = 'gs://' def __init__(self, pipeline_options): super().__init__(pipeline_options) self._pipeline_options = pipeline_options + @staticmethod + def _get_gcsio_module(): + """Return the ``gcsio`` module, raising ImportError if it is unavailable. + + ``gcsio`` is imported lazily (see the module-level import) so that this + filesystem can be looked up without the gcp extra installed. The dependency + is only required when the filesystem is actually used. + """ + if gcsio is None: + raise ImportError( + 'Could not import apache_beam.io.gcp.gcsio. This usually means the ' + 'gcp dependencies are not installed. Install them with: ' + 'pip install apache-beam[gcp]') + return gcsio + + @_classproperty + def CHUNK_SIZE(cls): + """Chunk size in batch operations.""" + return cls._get_gcsio_module().MAX_BATCH_OPERATION_SIZE + @classmethod def scheme(cls): """URI scheme for the FileSystem @@ -139,7 +182,8 @@ def _list(self, dir_or_prefix): raise BeamIOError("List operation failed", {dir_or_prefix: e}) def _gcsIO(self): - return gcsio.GcsIO(pipeline_options=self._pipeline_options) + return self._get_gcsio_module().GcsIO( + pipeline_options=self._pipeline_options) def _path_open( self, @@ -370,8 +414,9 @@ def delete(self, paths): def report_lineage(self, path, lineage): try: - components = gcsio.parse_gcs_path(path, object_optional=True) - except ValueError: + components = self._get_gcsio_module().parse_gcs_path( + path, object_optional=True) + except (ImportError, ValueError): # report lineage is fail-safe traceback.print_exc() return diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index 08fdd6302887..f96d883ca9c7 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -21,6 +21,9 @@ # pytype: skip-file import logging +import subprocess +import sys +import textwrap import unittest import mock @@ -38,7 +41,58 @@ # pylint: enable=wrong-import-order, wrong-import-position -@unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed') +@unittest.skipIf(gcsfilesystem is None, 'GCSFileSystem could not be imported') +class GCSFileSystemLazyImportTest(unittest.TestCase): + def test_get_filesystem_without_gcp_extra(self): + # Regression test for https://github.com/apache/beam/issues/37445. + # + # Without the gcp extra installed, gcsio cannot be imported. GCSFileSystem + # must still import and register so FileSystems.get_filesystem('gs://...') + # returns it (like S3FileSystem), deferring the dependency error to usage. + # + # This runs in a subprocess because the behavior is decided at import time + # and this test environment has the gcp extra installed; we simulate its + # absence by blocking the gcsio import in a fresh interpreter (reloading in + # process would leave a second GCSFileSystem registered for the gs scheme). + script = textwrap.dedent( + """ + import sys + # Simulate the gcp extra not being installed. + sys.modules['apache_beam.io.gcp.gcsio'] = None + + from apache_beam.io.gcp import gcsfilesystem + assert gcsfilesystem.gcsio is None, 'expected gcsio to be unavailable' + + from apache_beam.io.filesystems import FileSystems + fs = FileSystems.get_filesystem('gs://bucket/object') + assert type(fs).__name__ == 'GCSFileSystem', type(fs).__name__ + assert fs.scheme() == 'gs' + + # Using the filesystem raises a clear ImportError (deferred validation). + for use in (lambda: fs.CHUNK_SIZE, fs._gcsIO): + try: + use() + except ImportError: + pass + else: + raise AssertionError('expected ImportError using GCS without gcp') + print('OK') + """) + result = subprocess.run([sys.executable, '-c', script], + capture_output=True, + text=True, + check=False) + self.assertEqual( + result.returncode, + 0, + msg='subprocess failed:\nstdout=%s\nstderr=%s' % + (result.stdout, result.stderr)) + self.assertIn('OK', result.stdout) + + +@unittest.skipIf( + gcsfilesystem is None or gcsfilesystem.gcsio is None, + 'GCP dependencies are not installed') class GCSFileSystemTest(unittest.TestCase): def setUp(self): pipeline_options = PipelineOptions() @@ -48,6 +102,14 @@ def test_scheme(self): self.assertEqual(self.fs.scheme(), 'gs') self.assertEqual(gcsfilesystem.GCSFileSystem.scheme(), 'gs') + def test_chunk_size_on_class_and_instance(self): + # CHUNK_SIZE must resolve both on the class and on instances (see the + # review on #37445): the lazy class-property must preserve the class-level + # access that S3FileSystem provides as a plain class attribute. + expected = gcsfilesystem.gcsio.MAX_BATCH_OPERATION_SIZE + self.assertEqual(gcsfilesystem.GCSFileSystem.CHUNK_SIZE, expected) + self.assertEqual(self.fs.CHUNK_SIZE, expected) + def test_join(self): self.assertEqual( 'gs://bucket/path/to/file',