diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 3763e21abc9f..003947d84473 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -34,22 +34,65 @@ from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystem import FileMetadata from apache_beam.io.filesystem import FileSystem -from apache_beam.io.gcp import gcsio + +try: + from apache_beam.io.gcp import gcsio +except ImportError: + # The gcp extra (google-cloud-storage and friends, imported by gcsio) may not + # be installed. Import it lazily so GCSFileSystem can still be looked up via + # FileSystems.get_filesystem() without the gcp extra installed; the dependency + # is only required when the filesystem is actually used, matching the behavior + # of S3FileSystem. See https://github.com/apache/beam/issues/37445. + gcsio = None # type: ignore[assignment] __all__ = ['GCSFileSystem'] +class _classproperty: + """A read-only property that resolves on the class as well as instances. + + Lets ``GCSFileSystem.CHUNK_SIZE`` remain accessible as a class attribute (as + in ``S3FileSystem``) while computing its value lazily, so that importing this + module does not require the gcp extra. See + https://github.com/apache/beam/issues/37445. + """ + def __init__(self, fget): + self._fget = fget + + def __get__(self, obj, owner): + return self._fget(owner) + + class GCSFileSystem(FileSystem): """A GCS ``FileSystem`` implementation for accessing files on GCS. """ - CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations GCS_PREFIX = 'gs://' def __init__(self, pipeline_options): super().__init__(pipeline_options) self._pipeline_options = pipeline_options + @staticmethod + def _get_gcsio_module(): + """Return the ``gcsio`` module, raising ImportError if it is unavailable. + + ``gcsio`` is imported lazily (see the module-level import) so that this + filesystem can be looked up without the gcp extra installed. The dependency + is only required when the filesystem is actually used. + """ + if gcsio is None: + raise ImportError( + 'Could not import apache_beam.io.gcp.gcsio. This usually means the ' + 'gcp dependencies are not installed. Install them with: ' + 'pip install apache-beam[gcp]') + return gcsio + + @_classproperty + def CHUNK_SIZE(cls): + """Chunk size in batch operations.""" + return cls._get_gcsio_module().MAX_BATCH_OPERATION_SIZE + @classmethod def scheme(cls): """URI scheme for the FileSystem @@ -139,7 +182,8 @@ def _list(self, dir_or_prefix): raise BeamIOError("List operation failed", {dir_or_prefix: e}) def _gcsIO(self): - return gcsio.GcsIO(pipeline_options=self._pipeline_options) + return self._get_gcsio_module().GcsIO( + pipeline_options=self._pipeline_options) def _path_open( self, @@ -370,8 +414,9 @@ def delete(self, paths): def report_lineage(self, path, lineage): try: - components = gcsio.parse_gcs_path(path, object_optional=True) - except ValueError: + components = self._get_gcsio_module().parse_gcs_path( + path, object_optional=True) + except (ImportError, ValueError): # report lineage is fail-safe traceback.print_exc() return diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index 08fdd6302887..f96d883ca9c7 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -21,6 +21,9 @@ # pytype: skip-file import logging +import subprocess +import sys +import textwrap import unittest import mock @@ -38,7 +41,58 @@ # pylint: enable=wrong-import-order, wrong-import-position -@unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed') +@unittest.skipIf(gcsfilesystem is None, 'GCSFileSystem could not be imported') +class GCSFileSystemLazyImportTest(unittest.TestCase): + def test_get_filesystem_without_gcp_extra(self): + # Regression test for https://github.com/apache/beam/issues/37445. + # + # Without the gcp extra installed, gcsio cannot be imported. GCSFileSystem + # must still import and register so FileSystems.get_filesystem('gs://...') + # returns it (like S3FileSystem), deferring the dependency error to usage. + # + # This runs in a subprocess because the behavior is decided at import time + # and this test environment has the gcp extra installed; we simulate its + # absence by blocking the gcsio import in a fresh interpreter (reloading in + # process would leave a second GCSFileSystem registered for the gs scheme). + script = textwrap.dedent( + """ + import sys + # Simulate the gcp extra not being installed. + sys.modules['apache_beam.io.gcp.gcsio'] = None + + from apache_beam.io.gcp import gcsfilesystem + assert gcsfilesystem.gcsio is None, 'expected gcsio to be unavailable' + + from apache_beam.io.filesystems import FileSystems + fs = FileSystems.get_filesystem('gs://bucket/object') + assert type(fs).__name__ == 'GCSFileSystem', type(fs).__name__ + assert fs.scheme() == 'gs' + + # Using the filesystem raises a clear ImportError (deferred validation). + for use in (lambda: fs.CHUNK_SIZE, fs._gcsIO): + try: + use() + except ImportError: + pass + else: + raise AssertionError('expected ImportError using GCS without gcp') + print('OK') + """) + result = subprocess.run([sys.executable, '-c', script], + capture_output=True, + text=True, + check=False) + self.assertEqual( + result.returncode, + 0, + msg='subprocess failed:\nstdout=%s\nstderr=%s' % + (result.stdout, result.stderr)) + self.assertIn('OK', result.stdout) + + +@unittest.skipIf( + gcsfilesystem is None or gcsfilesystem.gcsio is None, + 'GCP dependencies are not installed') class GCSFileSystemTest(unittest.TestCase): def setUp(self): pipeline_options = PipelineOptions() @@ -48,6 +102,14 @@ def test_scheme(self): self.assertEqual(self.fs.scheme(), 'gs') self.assertEqual(gcsfilesystem.GCSFileSystem.scheme(), 'gs') + def test_chunk_size_on_class_and_instance(self): + # CHUNK_SIZE must resolve both on the class and on instances (see the + # review on #37445): the lazy class-property must preserve the class-level + # access that S3FileSystem provides as a plain class attribute. + expected = gcsfilesystem.gcsio.MAX_BATCH_OPERATION_SIZE + self.assertEqual(gcsfilesystem.GCSFileSystem.CHUNK_SIZE, expected) + self.assertEqual(self.fs.CHUNK_SIZE, expected) + def test_join(self): self.assertEqual( 'gs://bucket/path/to/file',