diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index a6b2b5704751..5d07d82197b4 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -53,13 +53,11 @@ from apache_beam.yaml.yaml_errors import maybe_with_exception_handling_transform_fn from apache_beam.yaml.yaml_provider import dicts_to_rows -# Import js2py package if it exists +# Import quickjs package if it exists try: - import js2py - from js2py.base import JsObjectWrapper + import quickjs except ImportError: - js2py = None - JsObjectWrapper = object + quickjs = None _str_expression_fields = { 'AssignTimestamps': 'timestamp', @@ -178,18 +176,34 @@ def _check_mapping_arguments( raise ValueError(f'{transform_name} cannot specify "name" without "path"') -# js2py's JsObjectWrapper object has a self-referencing __dict__ property -# that cannot be pickled without implementing the __getstate__ and -# __setstate__ methods. -class _CustomJsObjectWrapper(JsObjectWrapper): - def __init__(self, js_obj): - super().__init__(js_obj.__dict__['_obj']) +class _QuickJsCallable: + def __init__(self, source, name=None): + self.source = source + self.name = name + self._func = None + + def _get_func(self): + if self._func is None: + if quickjs is None: + raise ValueError("quickjs is not installed.") + context = quickjs.Context() + if self.name: + context.eval(self.source) + self._func = context.get(self.name) + else: + self._func = context.eval(self.source) + return self._func + + def __call__(self, *args, **kwargs): + return self._get_func()(*args, **kwargs) def __getstate__(self): - return self.__dict__.copy() + return {'source': self.source, 'name': self.name} def __setstate__(self, state): - self.__dict__.update(state) + self.source = state['source'] + self.name = state['name'] + self._func = None # TODO(yaml) Improve type inferencing for JS UDF's @@ -210,78 +224,49 @@ def py_value_to_js_dict(py_value): def _expand_javascript_mapping_func( original_fields, expression=None, callable=None, path=None, name=None): - # Check for installed js2py package - if js2py is None: + # Check for installed quickjs package + if quickjs is None: raise ValueError( - "Javascript mapping functions are not supported on" - " Python 3.12 or later.") - - # import remaining js2py objects - from js2py import base - from js2py.constructors import jsdate - from js2py.internals import simplex - - js_array_type = ( - base.PyJsArray, - base.PyJsArrayBuffer, - base.PyJsInt8Array, - base.PyJsUint8Array, - base.PyJsUint8ClampedArray, - base.PyJsInt16Array, - base.PyJsUint16Array, - base.PyJsInt32Array, - base.PyJsUint32Array, - base.PyJsFloat32Array, - base.PyJsFloat64Array) - - def _js_object_to_py_object(obj): - if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)): - return base.to_python(obj) - elif isinstance(obj, js_array_type): - return [_js_object_to_py_object(value) for value in obj.to_list()] - elif isinstance(obj, jsdate.PyJsDate): - return obj.to_utc_dt() - elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)): - return None - elif isinstance(obj, base.PyJsError): - raise RuntimeError(obj['message']) - elif isinstance(obj, base.PyJsObject): - return { - key: _js_object_to_py_object(value['value']) - for (key, value) in obj.own.items() - } - elif isinstance(obj, base.JsObjectWrapper): - return _js_object_to_py_object(obj._obj) + "Javascript mapping functions require the 'quickjs' package.") - return obj + import json if expression: - source = '\n'.join(['function(__row__) {'] + [ - f' {name} = __row__.{name}' - for name in original_fields if name in expression - ] + [' return (' + expression + ')'] + ['}']) - js_func = _CustomJsObjectWrapper(js2py.eval_js(source)) + source = '\n'.join( + ['function fn(json_row) {', ' const __row__ = JSON.parse(json_row);'] + + [ + f' const {name} = __row__.{name};' + for name in original_fields if name in expression + ] + [' return JSON.stringify(' + expression + ');'] + ['}']) + js_func = _QuickJsCallable(source, "fn") elif callable: - js_func = _CustomJsObjectWrapper(js2py.eval_js(callable)) + # Wrap the callable in a named function to use quickjs.Function + source = ( + f"function fn(json_row) {{ " + f"const row = JSON.parse(json_row); " + f"return JSON.stringify(({callable})(row)); }}") + js_func = _QuickJsCallable(source, "fn") else: if not path.endswith('.js'): raise ValueError(f'File "{path}" is not a valid .js file.') udf_code = FileSystems.open(path).read().decode() - js = js2py.EvalJs() - js.eval(udf_code) - js_func = _CustomJsObjectWrapper(getattr(js, name)) + bridge_source = ( + udf_code + f"\nfunction bridge_fn(json_row) {{ " + f"return JSON.stringify({name}(JSON.parse(json_row))); }}") + js_func = _QuickJsCallable(bridge_source, "bridge_fn") def js_wrapper(row): row_as_dict = py_value_to_js_dict(row) + row_json = json.dumps(row_as_dict) try: - js_result = js_func(row_as_dict) - except simplex.JsException as exn: + js_result_json = js_func(row_json) + js_result = json.loads(js_result_json) + except Exception as exn: raise RuntimeError( - f"Error evaluating javascript expression: " - f"{exn.mes['message']}") from exn - return dicts_to_rows(_js_object_to_py_object(js_result)) + f"Error evaluating javascript expression: {exn}") from exn + return dicts_to_rows(js_result) return js_wrapper diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py b/sdks/python/apache_beam/yaml/yaml_udf_test.py index 3d664ab9de41..63116b41c5fa 100644 --- a/sdks/python/apache_beam/yaml/yaml_udf_test.py +++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py @@ -32,10 +32,10 @@ from apache_beam.yaml.yaml_transform import YamlTransform try: - import js2py + import quickjs except ImportError: - js2py = None - logging.warning('js2py is not installed; some tests will be skipped.') + quickjs = None + logging.warning('quickjs is not installed; some tests will be skipped.') def as_rows(): @@ -63,7 +63,7 @@ def setUp(self): def tearDown(self): shutil.rmtree(self.tmpdir) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(quickjs is None, 'quickjs not installed.') def test_map_to_fields_filter_inline_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -197,7 +197,7 @@ def test_map_to_fields_sql_reserved_keyword_append(): beam.Row(label='389a', timestamp=2, label_copy="389a"), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(quickjs is None, 'quickjs not installed.') def test_filter_inline_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -252,7 +252,7 @@ def test_filter_inline_py(self): row=beam.Row(rank=2, values=[7, 8, 9])), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(quickjs is None, 'quickjs not installed.') def test_filter_expression_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -296,7 +296,7 @@ def test_filter_expression_py(self): row=beam.Row(rank=0, values=[1, 2, 3])), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(quickjs is None, 'quickjs not installed.') def test_filter_inline_js_file(self): data = ''' function f(x) { diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 5d4f86ae4d97..0dafb2f1fcd7 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -616,7 +616,7 @@ def get_portability_package_data(): 'jinja2>=3.0,<3.2', 'virtualenv-clone>=0.5,<1.0', # https://github.com/PiotrDabkowski/Js2Py/issues/317 - 'js2py>=0.74,<1; python_version<"3.12"', + 'quickjs; python_version < "3.13" or platform_system != "Windows"', 'jsonschema>=4.0.0,<5.0.0', ] + dataframe_dependency, # Keep the following dependencies in line with what we test against