Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 50 additions & 5 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,65 @@
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp import gcsio

try:
from apache_beam.io.gcp import gcsio
except ImportError:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There have been several proposed fixes, including this one, with unncessarily complex workarounds. I have proposed a proper fix #37445 (comment)

# The gcp extra (google-cloud-storage and friends, imported by gcsio) may not
# be installed. Import it lazily so GCSFileSystem can still be looked up via
# FileSystems.get_filesystem() without the gcp extra installed; the dependency
# is only required when the filesystem is actually used, matching the behavior
# of S3FileSystem. See https://github.com/apache/beam/issues/37445.
gcsio = None # type: ignore[assignment]

__all__ = ['GCSFileSystem']


class _classproperty:
"""A read-only property that resolves on the class as well as instances.

Lets ``GCSFileSystem.CHUNK_SIZE`` remain accessible as a class attribute (as
in ``S3FileSystem``) while computing its value lazily, so that importing this
module does not require the gcp extra. See
https://github.com/apache/beam/issues/37445.
"""
def __init__(self, fget):
self._fget = fget

def __get__(self, obj, owner):
return self._fget(owner)


class GCSFileSystem(FileSystem):
"""A GCS ``FileSystem`` implementation for accessing files on GCS.
"""

CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations
GCS_PREFIX = 'gs://'

def __init__(self, pipeline_options):
super().__init__(pipeline_options)
self._pipeline_options = pipeline_options

@staticmethod
def _get_gcsio_module():
"""Return the ``gcsio`` module, raising ImportError if it is unavailable.

``gcsio`` is imported lazily (see the module-level import) so that this
filesystem can be looked up without the gcp extra installed. The dependency
is only required when the filesystem is actually used.
"""
if gcsio is None:
raise ImportError(
'Could not import apache_beam.io.gcp.gcsio. This usually means the '
'gcp dependencies are not installed. Install them with: '
'pip install apache-beam[gcp]')
return gcsio

@_classproperty
def CHUNK_SIZE(cls):
"""Chunk size in batch operations."""
return cls._get_gcsio_module().MAX_BATCH_OPERATION_SIZE

@classmethod
def scheme(cls):
"""URI scheme for the FileSystem
Expand Down Expand Up @@ -139,7 +182,8 @@ def _list(self, dir_or_prefix):
raise BeamIOError("List operation failed", {dir_or_prefix: e})

def _gcsIO(self):
return gcsio.GcsIO(pipeline_options=self._pipeline_options)
return self._get_gcsio_module().GcsIO(
pipeline_options=self._pipeline_options)

def _path_open(
self,
Expand Down Expand Up @@ -370,8 +414,9 @@ def delete(self, paths):

def report_lineage(self, path, lineage):
try:
components = gcsio.parse_gcs_path(path, object_optional=True)
except ValueError:
components = self._get_gcsio_module().parse_gcs_path(
path, object_optional=True)
except (ImportError, ValueError):
# report lineage is fail-safe
traceback.print_exc()
return
Expand Down
64 changes: 63 additions & 1 deletion sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
# pytype: skip-file

import logging
import subprocess
import sys
import textwrap
import unittest

import mock
Expand All @@ -38,7 +41,58 @@
# pylint: enable=wrong-import-order, wrong-import-position


@unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed')
@unittest.skipIf(gcsfilesystem is None, 'GCSFileSystem could not be imported')
class GCSFileSystemLazyImportTest(unittest.TestCase):
def test_get_filesystem_without_gcp_extra(self):
# Regression test for https://github.com/apache/beam/issues/37445.
#
# Without the gcp extra installed, gcsio cannot be imported. GCSFileSystem
# must still import and register so FileSystems.get_filesystem('gs://...')
# returns it (like S3FileSystem), deferring the dependency error to usage.
#
# This runs in a subprocess because the behavior is decided at import time
# and this test environment has the gcp extra installed; we simulate its
# absence by blocking the gcsio import in a fresh interpreter (reloading in
# process would leave a second GCSFileSystem registered for the gs scheme).
script = textwrap.dedent(
"""
import sys
# Simulate the gcp extra not being installed.
sys.modules['apache_beam.io.gcp.gcsio'] = None

from apache_beam.io.gcp import gcsfilesystem
assert gcsfilesystem.gcsio is None, 'expected gcsio to be unavailable'

from apache_beam.io.filesystems import FileSystems
fs = FileSystems.get_filesystem('gs://bucket/object')
assert type(fs).__name__ == 'GCSFileSystem', type(fs).__name__
assert fs.scheme() == 'gs'

# Using the filesystem raises a clear ImportError (deferred validation).
for use in (lambda: fs.CHUNK_SIZE, fs._gcsIO):
try:
use()
except ImportError:
pass
else:
raise AssertionError('expected ImportError using GCS without gcp')
print('OK')
""")
result = subprocess.run([sys.executable, '-c', script],
capture_output=True,
text=True,
check=False)
self.assertEqual(
result.returncode,
0,
msg='subprocess failed:\nstdout=%s\nstderr=%s' %
(result.stdout, result.stderr))
self.assertIn('OK', result.stdout)


@unittest.skipIf(
gcsfilesystem is None or gcsfilesystem.gcsio is None,
'GCP dependencies are not installed')
class GCSFileSystemTest(unittest.TestCase):
def setUp(self):
pipeline_options = PipelineOptions()
Expand All @@ -48,6 +102,14 @@ def test_scheme(self):
self.assertEqual(self.fs.scheme(), 'gs')
self.assertEqual(gcsfilesystem.GCSFileSystem.scheme(), 'gs')

def test_chunk_size_on_class_and_instance(self):
# CHUNK_SIZE must resolve both on the class and on instances (see the
# review on #37445): the lazy class-property must preserve the class-level
# access that S3FileSystem provides as a plain class attribute.
expected = gcsfilesystem.gcsio.MAX_BATCH_OPERATION_SIZE
self.assertEqual(gcsfilesystem.GCSFileSystem.CHUNK_SIZE, expected)
self.assertEqual(self.fs.CHUNK_SIZE, expected)

def test_join(self):
self.assertEqual(
'gs://bucket/path/to/file',
Expand Down
Loading