Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
"attrs>=21.2.0",
"frozendict>=1.2",
"google-auth>=1.31.0",
"grpcio>=1.50.0",
"grpcio-tools>=1.50.0",
"mock>=4.0.3",
"Pillow>=8.3.1",
"protobuf>=3.19.0",
Expand Down
171 changes: 145 additions & 26 deletions src/google/appengine/api/images/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,72 @@


import json
import logging
import os
import struct

import google.auth
import google.auth.transport.grpc
import google.auth.transport.requests
from google.oauth2 import id_token
import grpc
from google.appengine.api import apiproxy_stub_map
from google.appengine.api import blobstore
from google.appengine.api import datastore_types
from google.appengine.api.images import images_service_pb2
from google.appengine.ext import blobstore as ext_blobstore
from . import images_service_pb2
from . import images_service_rpc_pb2_grpc
from google.appengine.runtime import apiproxy_errors

import six
from six.moves import range


_USE_GRPC = bool(os.environ.get('USE_CUSTOM_IMAGES_GRPC_SERVICE'))


class _FakeRpc(object):
"""A fake RPC object that returns a pre-computed result."""

def __init__(self, result):
self._result = result

def get_result(self):
return self._result


def _make_grpc_call(method_name, request):
"""Creates an authenticated gRPC channel and makes an RPC call."""
endpoint = os.environ.get('IMAGES_SERVICE_ENDPOINT')
if not endpoint:
raise ValueError("IMAGES_SERVICE_ENDPOINT environment variable not set")

parsed = six.moves.urllib.parse.urlparse(endpoint)
target_host = parsed.netloc + ':443'
target_audience = endpoint

credentials, project_id = google.auth.default()
auth_req = google.auth.transport.requests.Request()
credentials.refresh(auth_req)
token = id_token.fetch_id_token(auth_req, target_audience)

channel_credentials = grpc.ssl_channel_credentials()
call_credentials = grpc.access_token_call_credentials(token)
composite_credentials = grpc.composite_channel_credentials(
channel_credentials,
call_credentials,
)

# Increase max receive message size to 32MB to handle large images
options = [('grpc.max_receive_message_length', 32 * 1024 * 1024)]

authed_channel = grpc.secure_channel(target_host, composite_credentials, options=options)
stub = images_service_rpc_pb2_grpc.ImagesServiceStub(authed_channel)

grpc_method = getattr(stub, method_name)
return grpc_method(request)


BlobKey = datastore_types.BlobKey


Expand Down Expand Up @@ -215,7 +270,10 @@ def __init__(self, image_data=None, blob_key=None, filename=None):

self._image_data = image_data
if filename:
self._blob_key = blobstore.create_gs_key(filename)
if _USE_GRPC:
self._blob_key = filename
else:
self._blob_key = blobstore.create_gs_key(filename)
else:
self._blob_key = _extract_blob_key(blob_key)
self._transforms = []
Expand Down Expand Up @@ -763,8 +821,29 @@ def _set_imagedata(self, imagedata):
imagedata: An `ImageData` protocol buffer instance.
"""
if self._blob_key:
imagedata.content = b""
imagedata.blob_key = self._blob_key
if _USE_GRPC and not self._blob_key.startswith('/gs/'):
# If we are using the custom gRPC service and have a legacy BlobKey,
# we must fetch the data locally because the external service cannot read it.
if not self._image_data:
try:
# Use high-level ext.blobstore to fetch content
blob_info = ext_blobstore.BlobInfo.get(self._blob_key)
if not blob_info:
raise ObjectNotFoundError()

with blob_info.open() as reader:
self._image_data = reader.read()
except Exception as e:
if isinstance(e, ObjectNotFoundError):
raise
raise Error("Failed to fetch blob data for gRPC: %s" % e)

imagedata.content = self._image_data
else:
# For GCS paths (starting with /gs/) or standard App Engine calls,
# we maintain the reference-based approach.
imagedata.content = b""
imagedata.blob_key = self._blob_key
else:
imagedata.content = self._image_data

Expand Down Expand Up @@ -888,7 +967,36 @@ def execute_transforms_async(self,
if transparent_substitution_rgb:
input_settings.transparent_substitution_rgb = transparent_substitution_rgb


if _USE_GRPC:
logging.info('Using custom gRPC image service.')
grpc_response = _make_grpc_call("Transform", request)
# Simulate side effects of execute_transforms_hook
# Note: We can't easily populate self._width/height/format/metadata here
# without parsing the response immediately, which is what _FakeRpc expects.
# The legacy hook does this on 'get_result'.
# For now, we return the content.
# If side effects are needed (like updating self._width), they should happens
# when get_result is called or we need a smarter FakeRpc.
# But standard usage is rpc.get_result().
# Let's align with the legacy behavior where self.* properties are updated
# only after the RPC completes (which happens synchronously for gRPC here).

self._image_data = grpc_response.image.content
self._blob_key = None
self._transforms = []
if grpc_response.image.HasField("width"):
self._width = grpc_response.image.width
else:
self._width = None
if grpc_response.image.HasField("height"):
self._height = grpc_response.image.height
else:
self._height = None
self._format = None
if grpc_response.source_metadata:
self._original_metadata = json.loads(grpc_response.source_metadata)

return _FakeRpc(self._image_data)

def execute_transforms_hook(rpc):
"""Checks success, handles exceptions, returns the converted RPC result.
Expand Down Expand Up @@ -988,6 +1096,12 @@ def histogram_async(self, rpc=None):

self._set_imagedata(request.image)

if _USE_GRPC:
logging.info('Using custom gRPC image service for histogram.')
grpc_response = _make_grpc_call("Histogram", request)
histogram_ = grpc_response.histogram
return _FakeRpc([histogram_.red, histogram_.green, histogram_.blue])

def get_histogram_hook(rpc):
"""Checks success, handles exceptions, returns the converted RPC result.

Expand Down Expand Up @@ -1733,7 +1847,6 @@ def composite_async(inputs,
image_map = {}

request = images_service_pb2.ImagesCompositeRequest()
response = images_service_pb2.ImagesTransformResponse()
for (image, x, y, opacity, anchor) in inputs:
if not image:
raise BadRequestError("Each input must include an image")
Expand Down Expand Up @@ -1775,26 +1888,32 @@ def composite_async(inputs,
(quality is not None)):
request.canvas.output.quality = quality

def composite_hook(rpc):
"""Checks success, handles exceptions, and returns the converted RPC result.

Args:
rpc: A UserRPC object.

Returns:
Images bytes of the composite image.

Raises:
See `composite_async` for more details.
"""
try:
rpc.check_success()
except apiproxy_errors.ApplicationError as e:
raise _ToImagesError(e)
return rpc.response.image.content

return _make_async_call(rpc, "Composite", request, response, composite_hook,
None)
if _USE_GRPC:
logging.info('Using custom gRPC image service for composite.')
grpc_response = _make_grpc_call("Composite", request)
return _FakeRpc(grpc_response.image.content)
else:
response = images_service_pb2.ImagesTransformResponse()
def composite_hook(rpc):
"""Checks success, handles exceptions, and returns the converted RPC result.

Args:
rpc: A UserRPC object.

Returns:
Images bytes of the composite image.

Raises:
See `composite_async` for more details.
"""
try:
rpc.check_success()
except apiproxy_errors.ApplicationError as e:
raise _ToImagesError(e)
return rpc.response.image.content

return _make_async_call(rpc, "Composite", request, response, composite_hook,
None)


def histogram(image_data, rpc=None):
Expand Down
Loading