From 83a8a2a1d6f1806a78a6c254dea69d96bc49d602 Mon Sep 17 00:00:00 2001 From: manu-sj Date: Wed, 10 Jun 2026 08:11:14 +0200 Subject: [PATCH 1/4] [FSTORE-1938] Support chaining of Transformation Functions using an Execution DAG https://hopsworks.atlassian.net/browse/FSTORE-1938 Document chaining of transformation functions across the user guides: how the output of one function feeds another, how the execution DAG resolves the order, how cycles and duplicate output columns are rejected, and how the DAG is rendered from the UI and from the SDK with visualize_transformations(). A Transformation Functions Performance Tuning subsection in the transformation functions guide covers the node-parallel execution model: the n_processes argument and its defaults per input shape, pool pre-spawning through init_serving and init_batch_scoring, Arrow shared-memory staging, and the HSFS_TF_POOL_START_METHOD override. The model-dependent transformations guide notes that statistics for chained functions are fit in dependency order on the data each function sees. The on-demand transformations guide covers chains whose intermediate output is dropped from the feature group. No migration entry is included since the changes are backwards compatible. Signed-off-by: Manu Sathyarajan Joseph Co-Authored-By: Claude Opus 4.8 (1M context) Co-Authored-By: Claude Fable 5 --- .../on_demand_transformations.md | 37 ++++++++ .../model-dependent-transformations.md | 38 ++++++++ .../fs/transformation_functions.md | 95 +++++++++++++++++++ 3 files changed, 170 insertions(+) diff --git a/docs/user_guides/fs/feature_group/on_demand_transformations.md b/docs/user_guides/fs/feature_group/on_demand_transformations.md index 9eadc8c45c..103e50f615 100644 --- a/docs/user_guides/fs/feature_group/on_demand_transformations.md +++ b/docs/user_guides/fs/feature_group/on_demand_transformations.md @@ -270,3 +270,40 @@ On-demand transformation functions can also be accessed and executed as normal f "on_demand_feature1" ](feature_vector["transaction_time"], datetime.now()) ``` + +## Chaining On-Demand Transformations + +On-demand transformations (ODTs) attached to the same feature group can be chained: one ODT's output column can serve as another ODT's input. +The execution order is resolved automatically; the DAG is visible from the feature group overview page in the Hopsworks UI. +An intermediate output consumed only by a downstream ODT can be dropped from the feature group; the full chain still executes during online serving, and the dropped column never becomes a stored feature. + +An ODT's output column becomes a regular feature in the feature group, which a downstream feature view can consume and pass into a model-dependent transformation. +This is the implicit cross-DAG path between on-demand and model-dependent transformation chains: nothing extra to configure on either side. + +!!! example "ODT that consumes an upstream ODT's output" + === "Python" + + ```python + from hopsworks import udf + + + @udf(int) + def add_one(col): + return col + 1 + + + @udf(int) + def double(col): + return col * 2 + + + fg = fs.create_feature_group( + name="chained_odt_fg", + version=1, + primary_key=["id"], + transformation_functions=[ + add_one("raw").alias("raw_plus_one"), + double("raw_plus_one").alias("raw_plus_one_doubled"), + ], + ) + ``` diff --git a/docs/user_guides/fs/feature_view/model-dependent-transformations.md b/docs/user_guides/fs/feature_view/model-dependent-transformations.md index bed6b1137a..2b1ef90414 100644 --- a/docs/user_guides/fs/feature_view/model-dependent-transformations.md +++ b/docs/user_guides/fs/feature_view/model-dependent-transformations.md @@ -175,3 +175,41 @@ To achieve this, set the `transform` parameter to False. # Fetching untransformed batch data. untransformed_batch_data = feature_view.get_batch_data(transform=False) ``` + +## Chaining Model-Dependent Transformations + +A model-dependent transformation (MDT) can consume another MDT's output as its input. +The DAG is resolved automatically at execution time, so producers always run before consumers. + +!!! example "Chaining two increments and a sum" + === "Python" + + ```python + from hopsworks import udf + + + @udf(int) + def add_one(col): + return col + 1 + + + @udf(int) + def add(a, b): + return a + b + + + fv = fs.create_feature_view( + name="chained_mdt_fv", + query=fg.select_all(), + transformation_functions=[ + add_one("data1").alias("data1_plus_one"), + add_one("data2").alias("data2_plus_one"), + add("data1_plus_one", "data2_plus_one").alias("sum_plus_two"), + ], + version=1, + ) + ``` + +Training dataset statistics for chained MDTs are computed in dependency order, so a statistics-based transformation such as a min-max scaler that consumes another MDT's output is fit on that intermediate output, not on the raw feature. + +See [Transformation Functions Performance Tuning][transformation-functions-performance-tuning] for `n_processes` semantics on chained DAGs. diff --git a/docs/user_guides/fs/transformation_functions.md b/docs/user_guides/fs/transformation_functions.md index 4e2487f3dd..fbe488420b 100644 --- a/docs/user_guides/fs/transformation_functions.md +++ b/docs/user_guides/fs/transformation_functions.md @@ -345,3 +345,98 @@ If only the `name` is provided, then the version will default to 1. ## Using transformation functions Transformation functions can be used by attaching it to a feature view to [create model-dependent transformations](./feature_view/model-dependent-transformations.md) or attached to feature groups to [create on-demand transformations](./feature_group/on_demand_transformations.md) + +## Chained Transformation Functions + +Transformation functions can be chained: the output column of one transformation function can serve as the input to another. +Hopsworks resolves the execution order automatically using a topological sort of the resulting DAG, so dependencies always run before their consumers. +Chaining works for both on-demand transformations attached to a feature group and model-dependent transformations attached to a feature view. + +!!! example "Chained model-dependent transformations on a feature view" + === "Python" + + ```python + from hopsworks import udf + + + @udf(int) + def add_one(col): + return col + 1 + + + @udf(int) + def add(a, b): + return a + b + + + fv = fs.create_feature_view( + name="chained_mdts_fv", + query=fg.select_all(), + transformation_functions=[ + add_one("data1").alias("data1_plus_one"), + add_one("data2").alias("data2_plus_one"), + add("data1_plus_one", "data2_plus_one").alias("sum_plus_two"), + ], + version=1, + ) + ``` + +The same DAG drives offline training data generation and online feature vector retrieval, so chains apply uniformly across both paths. +A configuration with no valid execution order is rejected: a duplicate output column or a cycle between transformation functions raises an error naming the offending functions, which can be fixed by renaming outputs with `.alias()`. + +Cross-DAG chaining is implicit: an on-demand transformation's output column becomes a feature in its feature group, which a feature view can consume and feed into a model-dependent transformation. +No additional setup is required. + +### Visualizing the execution DAG + +The execution DAG is shown in the Hopsworks UI on the feature view and feature group overview pages under "Transformation execution DAG." +The same graph can be rendered from the SDK with `visualize_transformations()`, available on both feature views and feature groups. +It renders as a Mermaid flowchart in Jupyter and as text elsewhere. + +!!! example "Visualizing transformation DAGs" + === "Python" + + ```python + # Render both the model-dependent and on-demand DAGs. + fv.visualize_transformations() + + # Render only the model-dependent DAG, top-to-bottom layout. + fv.visualize_transformations(kind="model_dependent", orient="TB") + + # Render the on-demand DAG of a feature group. + fg.visualize_transformations() + ``` + +### Transformation Functions Performance Tuning + +Transformation function execution is sequential by default. +Independent transformation functions in the DAG are the unit of parallelism: with more than one worker process, transformations that do not depend on each other run concurrently, while a chained sequence always runs in dependency order. +A strictly linear chain has nothing to overlap, so worker processes only add overhead there. + +The number of worker processes is controlled by the `n_processes` argument, accepted by the feature view and feature group entry points that execute transformations, such as `get_feature_vector`, `get_feature_vectors`, `get_batch_data`, `training_data`, and `transform`. +Passing `n_processes=1` always forces sequential execution. + +When `n_processes` is not provided, execution is sequential: parallelism is strictly opt-in, because whether the worker-pool overhead pays off depends on the cost of your transformation functions, which only you can judge. +A value above the DAG's maximum parallelism is capped to it, with a warning, because no more transformation functions can ever run concurrently than the DAG has independent branches. +On the Spark engine `n_processes` is ignored because the whole DAG is pushed down to Spark, which distributes the work itself. + +For online serving, spawning the worker pool during the first request would add the pool startup cost to that request's latency. +Passing `n_processes` to `init_serving` or `init_batch_scoring` pre-spawns the pool at initialization time and makes that value the default for subsequent retrieval calls; an explicit `n_processes` on an individual call still takes precedence. + +!!! example "Pre-spawning the worker pool for online serving" + === "Python" + + ```python + fv.init_serving(training_dataset_version=1, n_processes=2) + + # Served using the pool of two workers spawned at init time. + vector = fv.get_feature_vector(entry={"id": 1}) + ``` + +Parallel DataFrame execution stages the input once in Arrow shared memory, and each worker reads only the columns its transformation function needs. +The worker pool start method defaults to `fork` on Linux and `spawn` on macOS and Windows, where forking after threaded native libraries have been loaded can deadlock the worker; set the `HSFS_TF_POOL_START_METHOD` environment variable to `fork`, `forkserver`, or `spawn` to override it. + +Two patterns hold across deployments: + +- For vectorized Pandas UDFs on small inputs, sequential execution is at least as fast as parallel because the pool overhead dominates. +- For CPU-heavy UDF chains with independent branches, `n_processes >= 2` overlaps the branches and reduces tail latency. From 0531d6ed50da9d7684b3a2bf7a25c80a5909a61b Mon Sep 17 00:00:00 2001 From: manu-sj Date: Fri, 12 Jun 2026 08:05:09 +0200 Subject: [PATCH 2/4] [FSTORE-1938] Support chaining of Transformation Functions using an Execution DAG https://hopsworks.atlassian.net/browse/FSTORE-1938 Restructure the performance tuning section so it reads in order: what the n_processes argument is, how parallelism maps to the DAG, when it pays off, online serving specifics, implementation notes. The previous version stated the sequential default three times across the first three paragraphs and placed the practical guidance after the implementation internals. Content changes: a call-shape distinction in the guidance (batch and offline calls benefit from worker processes, single feature vectors rarely do because the per-call dispatch cost usually exceeds the work), and a note that pre-spawning the pool removes the startup cost but not the per-call dispatch cost. Both reflect the measured behavior of the online batch chaining benchmark in the loadtest repository. Signed-off-by: Manu Sathyarajan Joseph Co-Authored-By: Claude Opus 4.7 (1M context) --- .../fs/transformation_functions.md | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/docs/user_guides/fs/transformation_functions.md b/docs/user_guides/fs/transformation_functions.md index fbe488420b..daca3579fc 100644 --- a/docs/user_guides/fs/transformation_functions.md +++ b/docs/user_guides/fs/transformation_functions.md @@ -409,19 +409,23 @@ It renders as a Mermaid flowchart in Jupyter and as text elsewhere. ### Transformation Functions Performance Tuning -Transformation function execution is sequential by default. -Independent transformation functions in the DAG are the unit of parallelism: with more than one worker process, transformations that do not depend on each other run concurrently, while a chained sequence always runs in dependency order. -A strictly linear chain has nothing to overlap, so worker processes only add overhead there. +Transformation functions execute sequentially unless the `n_processes` argument requests worker processes. +The argument is accepted by the feature view and feature group entry points that execute transformations, such as `get_feature_vector`, `get_feature_vectors`, `get_batch_data`, `training_data`, and `transform`. +Parallelism is strictly opt-in because whether the worker-pool overhead pays off depends on the cost of your transformation functions, which only you can judge. -The number of worker processes is controlled by the `n_processes` argument, accepted by the feature view and feature group entry points that execute transformations, such as `get_feature_vector`, `get_feature_vectors`, `get_batch_data`, `training_data`, and `transform`. -Passing `n_processes=1` always forces sequential execution. - -When `n_processes` is not provided, execution is sequential: parallelism is strictly opt-in, because whether the worker-pool overhead pays off depends on the cost of your transformation functions, which only you can judge. -A value above the DAG's maximum parallelism is capped to it, with a warning, because no more transformation functions can ever run concurrently than the DAG has independent branches. +With more than one worker process, independent transformation functions in the DAG run concurrently, while a chained sequence always runs in dependency order. +The DAG therefore bounds the useful value: a value above the DAG's maximum parallelism is capped to it, with a warning, and a strictly linear chain has nothing to overlap, so worker processes only add overhead there. On the Spark engine `n_processes` is ignored because the whole DAG is pushed down to Spark, which distributes the work itself. +Whether parallelism pays off depends on the call shape and the cost of the functions: + +- For batch and offline calls such as `get_feature_vectors`, `get_batch_data`, and `training_data`, CPU-heavy functions on independent branches benefit from `n_processes >= 2`: per-row work accumulates over the batch and the branches overlap. +- For vectorized Pandas UDFs on small inputs, sequential execution is at least as fast because the pool overhead dominates. +- For single feature vectors, sequential execution wins unless one transformation function alone costs more than the worker round trip, because dispatching the function and its inputs to a worker process is paid on every call. + For online serving, spawning the worker pool during the first request would add the pool startup cost to that request's latency. Passing `n_processes` to `init_serving` or `init_batch_scoring` pre-spawns the pool at initialization time and makes that value the default for subsequent retrieval calls; an explicit `n_processes` on an individual call still takes precedence. +Pre-spawning removes the startup cost but not the per-call dispatch cost described above. !!! example "Pre-spawning the worker pool for online serving" === "Python" @@ -433,10 +437,8 @@ Passing `n_processes` to `init_serving` or `init_batch_scoring` pre-spawns the p vector = fv.get_feature_vector(entry={"id": 1}) ``` -Parallel DataFrame execution stages the input once in Arrow shared memory, and each worker reads only the columns its transformation function needs. -The worker pool start method defaults to `fork` on Linux and `spawn` on macOS and Windows, where forking after threaded native libraries have been loaded can deadlock the worker; set the `HSFS_TF_POOL_START_METHOD` environment variable to `fork`, `forkserver`, or `spawn` to override it. - -Two patterns hold across deployments: +Two implementation details are relevant when measuring: -- For vectorized Pandas UDFs on small inputs, sequential execution is at least as fast as parallel because the pool overhead dominates. -- For CPU-heavy UDF chains with independent branches, `n_processes >= 2` overlaps the branches and reduces tail latency. +- Parallel DataFrame execution stages the input once in Arrow shared memory, and each worker reads only the columns its transformation function needs, so the input is not copied per worker. +- The worker pool start method defaults to `fork` on Linux and `spawn` on macOS and Windows, where forking after threaded native libraries have been loaded can deadlock the worker. + Set the `HSFS_TF_POOL_START_METHOD` environment variable to `fork`, `forkserver`, or `spawn` to override it. From e444f247cc82a7ab1bf22a0a070e16281c915bb2 Mon Sep 17 00:00:00 2001 From: manu-sj Date: Fri, 12 Jun 2026 08:22:13 +0200 Subject: [PATCH 3/4] [FSTORE-1938] Support chaining of Transformation Functions using an Execution DAG https://hopsworks.atlassian.net/browse/FSTORE-1938 Rework the chaining documentation for reading order on all three pages. The hub page now flows what chaining is, example, uniform offline and online behavior, statistics over chains with a link to the model-dependent page, cross-type chaining, and invalid configurations last instead of interleaved. The model-dependent page gives the statistics-over-chains behavior its own subsection instead of a single dangling sentence after the example, and states that statistics are fit on the train split, each transformation executes once, and the fitted values are persisted for serving. The on-demand page leads with the example like the other pages, and the example now demonstrates the dropped-column claims it previously only stated: both the raw input and the intermediate are dropped, leaving one stored output. Signed-off-by: Manu Sathyarajan Joseph Co-Authored-By: Claude Opus 4.7 (1M context) --- .../on_demand_transformations.md | 24 ++++++++++--------- .../model-dependent-transformations.md | 6 ++++- .../fs/transformation_functions.md | 7 +++--- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/docs/user_guides/fs/feature_group/on_demand_transformations.md b/docs/user_guides/fs/feature_group/on_demand_transformations.md index 103e50f615..63dde49cc7 100644 --- a/docs/user_guides/fs/feature_group/on_demand_transformations.md +++ b/docs/user_guides/fs/feature_group/on_demand_transformations.md @@ -273,26 +273,22 @@ On-demand transformation functions can also be accessed and executed as normal f ## Chaining On-Demand Transformations -On-demand transformations (ODTs) attached to the same feature group can be chained: one ODT's output column can serve as another ODT's input. -The execution order is resolved automatically; the DAG is visible from the feature group overview page in the Hopsworks UI. -An intermediate output consumed only by a downstream ODT can be dropped from the feature group; the full chain still executes during online serving, and the dropped column never becomes a stored feature. +On-demand transformations attached to the same feature group can be chained: one transformation's output column can serve as another transformation's input. +The execution order is resolved automatically, and the resulting DAG is visible from the feature group overview page in the Hopsworks UI. -An ODT's output column becomes a regular feature in the feature group, which a downstream feature view can consume and pass into a model-dependent transformation. -This is the implicit cross-DAG path between on-demand and model-dependent transformation chains: nothing extra to configure on either side. - -!!! example "ODT that consumes an upstream ODT's output" +!!! example "On-demand transformation that consumes an upstream output" === "Python" ```python from hopsworks import udf - @udf(int) - def add_one(col): - return col + 1 + @udf(int, drop=["raw"]) + def add_one(raw): + return raw + 1 - @udf(int) + @udf(int, drop=["col"]) def double(col): return col * 2 @@ -307,3 +303,9 @@ This is the implicit cross-DAG path between on-demand and model-dependent transf ], ) ``` + +Columns consumed only by the chain can be dropped, as the raw input `raw` and the intermediate `raw_plus_one` are in the example, leaving `raw_plus_one_doubled` as the only stored output. +The full chain still executes during online serving, and dropped columns never become stored features. + +An on-demand transformation's output column becomes a regular feature in the feature group, which a downstream feature view can consume and pass into a model-dependent transformation. +This is the implicit chaining path between on-demand and model-dependent transformations, with no additional setup on either side. diff --git a/docs/user_guides/fs/feature_view/model-dependent-transformations.md b/docs/user_guides/fs/feature_view/model-dependent-transformations.md index 2b1ef90414..61d980c660 100644 --- a/docs/user_guides/fs/feature_view/model-dependent-transformations.md +++ b/docs/user_guides/fs/feature_view/model-dependent-transformations.md @@ -210,6 +210,10 @@ The DAG is resolved automatically at execution time, so producers always run bef ) ``` -Training dataset statistics for chained MDTs are computed in dependency order, so a statistics-based transformation such as a min-max scaler that consumes another MDT's output is fit on that intermediate output, not on the raw feature. +### Statistics over chained transformations + +Statistics-based transformations participate in chains like any other transformation. +A transformation that requires statistics on another transformation's output, such as a min-max scaler applied to an imputed column, is fit on that intermediate output rather than on the raw feature. +During training dataset creation the statistics are computed in dependency order on the train split, each transformation executes exactly once, and the fitted statistics are persisted so that online serving applies the same values. See [Transformation Functions Performance Tuning][transformation-functions-performance-tuning] for `n_processes` semantics on chained DAGs. diff --git a/docs/user_guides/fs/transformation_functions.md b/docs/user_guides/fs/transformation_functions.md index daca3579fc..018c881371 100644 --- a/docs/user_guides/fs/transformation_functions.md +++ b/docs/user_guides/fs/transformation_functions.md @@ -382,10 +382,11 @@ Chaining works for both on-demand transformations attached to a feature group an ``` The same DAG drives offline training data generation and online feature vector retrieval, so chains apply uniformly across both paths. -A configuration with no valid execution order is rejected: a duplicate output column or a cycle between transformation functions raises an error naming the offending functions, which can be fixed by renaming outputs with `.alias()`. +Statistics-based transformations participate in chains too: a transformation that requires statistics on another transformation's output is fit on that intermediate output, as described in [model-dependent transformations][chaining-model-dependent-transformations]. + +Chaining also works across the two transformation types without additional setup: an on-demand transformation's output column becomes a feature in its feature group, which a feature view can consume and feed into a model-dependent transformation. -Cross-DAG chaining is implicit: an on-demand transformation's output column becomes a feature in its feature group, which a feature view can consume and feed into a model-dependent transformation. -No additional setup is required. +A configuration with no valid execution order is rejected: a duplicate output column or a cycle between transformation functions raises an error naming the offending functions, which can be fixed by renaming outputs with `.alias()`. ### Visualizing the execution DAG From 6c984a9702af50bb0e774e1f299064526a6058cc Mon Sep 17 00:00:00 2001 From: manu-sj Date: Fri, 12 Jun 2026 09:01:32 +0200 Subject: [PATCH 4/4] Improving docs --- .../fs/transformation_functions.md | 81 ++++++++----------- 1 file changed, 33 insertions(+), 48 deletions(-) diff --git a/docs/user_guides/fs/transformation_functions.md b/docs/user_guides/fs/transformation_functions.md index 018c881371..25047b227b 100644 --- a/docs/user_guides/fs/transformation_functions.md +++ b/docs/user_guides/fs/transformation_functions.md @@ -1,4 +1,3 @@ - # Transformation Functions In AI systems, [transformation functions](https://www.hopsworks.ai/dictionary/transformation) transform data to create features, the inputs to machine learning models (in both training and inference). @@ -43,15 +42,15 @@ The decorator accepts three parameters: It can be a single Python type if the function returns one transformed feature, or a list of Python types if it returns multiple transformed features. The supported Python types that be used with the `return_type` argument are provided in the table below: - | Supported Python Types | - |:----------------------------------:| - | str | - | int | - | float | - | bool | - | datetime.datetime | - | datetime.date | - | datetime.time | + | Supported Python Types | + | :--------------------: | + | str | + | int | + | float | + | bool | + | datetime.datetime | + | datetime.date | + | datetime.time | - **`drop`** (optional): Identifies input arguments to exclude from the output after transformations are applied. By default, all inputs are retained in the output. @@ -59,7 +58,7 @@ The decorator accepts three parameters: - **`mode`** (optional): Determines the execution mode of the transformation function. The argument accepts three values: `default`, `python`, or `pandas`. - By default, the `mode` is set to `default`. Further details on this argument can be found [below](#specifying-execution-modes). + By default, the `mode` is set to `default`. Further details on this argument can be found [below](#specifying-execution-modes). Hopsworks supports four types of transformation functions across all execution modes: @@ -74,7 +73,7 @@ To create a one-to-one transformation function, the Hopsworks `@udf` decorator m The transformation function should take one argument as input and return a Pandas Series. !!! example "Creation of a one-to-one transformation function in Hopsworks." - === "Python" +=== "Python" ```python from hopsworks import udf @@ -90,7 +89,7 @@ The transformation function should take one argument as input and return a Panda The creation of many-to-one transformation functions is similar to that of a one-to-one transformation function, the only difference being that the transformation function accepts multiple features as input. !!! example "Creation of a many-to-one transformation function in Hopsworks." - === "Python" +=== "Python" ```python from hopsworks import udf @@ -107,7 +106,7 @@ To create a one-to-many transformation function, the Hopsworks `@udf` decorato The return types provided to the decorator must match the types of each column in the returned Pandas DataFrame. !!! example "Creation of a one-to-many transformation function in Hopsworks." - === "Python" +=== "Python" ```python from hopsworks import udf @@ -123,7 +122,7 @@ The return types provided to the decorator must match the types of each column i The creation of a many-to-many transformation function is similar to that of a one-to-many transformation function, the only difference being that the transformation function accepts multiple features as input. !!! example "Creation of a many-to-many transformation function in Hopsworks." - === "Python" +=== "Python" ```python from hopsworks import udf @@ -137,7 +136,7 @@ The creation of a many-to-many transformation function is similar to that of a o ### Specifying execution modes The `mode` parameter of the `@udf` decorator can be used to specify the execution mode of the transformation function. -It accepts three possible values `default`, `python` and `pandas`. Each mode is explained in more detail below: +It accepts three possible values `default`, `python` and `pandas`. Each mode is explained in more detail below: #### Default Mode @@ -146,7 +145,7 @@ It serves as the default mode used when the `mode` parameter is not specified. In this mode, the transformation function is executed as a Pandas UDF during training and in the batch inference pipeline, while it operates as a Python UDF during online inference. !!! example "Creating a many to many transformations function using the default execution mode" - === "Python" +=== "Python" ```python from hopsworks import udf @@ -168,7 +167,7 @@ In this mode, the transformation function is executed as a Pandas UDF during tra The transformation function can be configured to always execute as a Python UDF by setting the `mode` parameter of the `@udf` decorator to `python`. !!! example "Creating a many to many transformation function as a Python UDF" - === "Python" +=== "Python" ```python from hopsworks import udf @@ -184,7 +183,7 @@ The transformation function can be configured to always execute as a Python UDF The transformation function can be configured to always execute as a Pandas UDF by setting the `mode` parameter of the `@udf` decorator to `pandas`. !!! example "Creating a many to many transformations function as a Pandas UDF" - === "Python" +=== "Python" ```python import pandas as pd @@ -211,11 +210,11 @@ The transformation function can be configured to always execute as a Pandas UDF ### Dropping input features -The `drop` parameter of the `@udf` decorator is used to drop specific columns in the input DataFrame after transformation. If any argument of the transformation function is passed to the `drop` parameter, then the column mapped to the argument is dropped after the transformation functions are applied. +The `drop` parameter of the `@udf` decorator is used to drop specific columns in the input DataFrame after transformation. If any argument of the transformation function is passed to the `drop` parameter, then the column mapped to the argument is dropped after the transformation functions are applied. In the example below, the columns mapped to the arguments `feature1` and `feature3` are dropped after the application of all transformation functions. !!! example "Specify arguments to drop after transformation" - === "Python" +=== "Python" ```python from hopsworks import udf @@ -233,7 +232,7 @@ Each name must be uniques and should be at-most 63 characters long. If no name is provided via the `alias` function, Hopsworks generates default output feature names when [on-demand](./feature_group/on_demand_transformations.md) or [model-dependent](./feature_view/model-dependent-transformations.md) transformation functions are created. !!! example "Specifying output column names for transformation functions." - === "Python" +=== "Python" ```python from hopsworks import udf @@ -266,7 +265,7 @@ These objects encapsulate statistics related to the argument as instances of the Upon instantiation, instances of `FeatureTransformationStatistics` contain `None` values and are updated with the required statistics after the creation of a training dataset. !!! example "Creation of a transformation function in Hopsworks that uses training dataset statistics" - === "Python" +=== "Python" ```python from hopsworks import udf @@ -295,7 +294,7 @@ These variables contain common data used across transformation functions. By including the context argument, you can pass the necessary data as a dictionary into the into the `context` argument of the transformation function during [training dataset creation](feature_view/training-data.md#passing-context-variables-to-transformation-functions) or [feature vector retrieval](feature_view/feature-vectors.md#passing-context-variables-to-transformation-functions) or [batch data retrieval](feature_view/batch-data.md#passing-context-variables-to-transformation-functions). !!! example "Creation of a transformation function in Hopsworks that accepts context variables" - === "Python" +=== "Python" ```python from hopsworks import udf @@ -312,7 +311,7 @@ To save a transformation function to the feature store, use the function `creat The save function will throw an error if another transformation function with the same name and version is already saved in the feature store. !!! example "Register transformation function `add_one` in the Hopsworks feature store" - === "Python" +=== "Python" ```python plus_one_meta = fs.create_transformation_function( @@ -329,7 +328,7 @@ A specific transformation function can be retrieved using its `name` and `versio If only the `name` is provided, then the version will default to 1. !!! example "Retrieving transformation functions from the feature store" - === "Python" +=== "Python" ```python # get all transformation functions @@ -344,7 +343,7 @@ If only the `name` is provided, then the version will default to 1. ## Using transformation functions -Transformation functions can be used by attaching it to a feature view to [create model-dependent transformations](./feature_view/model-dependent-transformations.md) or attached to feature groups to [create on-demand transformations](./feature_group/on_demand_transformations.md) +Transformation functions can be used by attaching it to a feature view to [create model-dependent transformations](./feature_view/model-dependent-transformations.md) or attached to feature groups to [create on-demand transformations](./feature_group/on_demand_transformations.md) ## Chained Transformation Functions @@ -353,7 +352,7 @@ Hopsworks resolves the execution order automatically using a topological sort of Chaining works for both on-demand transformations attached to a feature group and model-dependent transformations attached to a feature view. !!! example "Chained model-dependent transformations on a feature view" - === "Python" +=== "Python" ```python from hopsworks import udf @@ -395,7 +394,7 @@ The same graph can be rendered from the SDK with `visualize_transformations()`, It renders as a Mermaid flowchart in Jupyter and as text elsewhere. !!! example "Visualizing transformation DAGs" - === "Python" +=== "Python" ```python # Render both the model-dependent and on-demand DAGs. @@ -412,24 +411,14 @@ It renders as a Mermaid flowchart in Jupyter and as text elsewhere. Transformation functions execute sequentially unless the `n_processes` argument requests worker processes. The argument is accepted by the feature view and feature group entry points that execute transformations, such as `get_feature_vector`, `get_feature_vectors`, `get_batch_data`, `training_data`, and `transform`. -Parallelism is strictly opt-in because whether the worker-pool overhead pays off depends on the cost of your transformation functions, which only you can judge. - -With more than one worker process, independent transformation functions in the DAG run concurrently, while a chained sequence always runs in dependency order. -The DAG therefore bounds the useful value: a value above the DAG's maximum parallelism is capped to it, with a warning, and a strictly linear chain has nothing to overlap, so worker processes only add overhead there. -On the Spark engine `n_processes` is ignored because the whole DAG is pushed down to Spark, which distributes the work itself. +Parallelism is strictly opt-in because whether the worker-pool overhead pays off depends on the cost of your transformation functions. -Whether parallelism pays off depends on the call shape and the cost of the functions: +With more than one worker process, independent transformation functions in the DAG run concurrently, while a chained sequence always runs in dependency order. On the Spark engine `n_processes` is ignored because the whole DAG is pushed down to Spark, which distributes the work itself. For batch and offline calls such as `get_feature_vectors`, `get_batch_data`, and `training_data` with CPU-heavy functions benefit from `n_processes >= 2`, for vectorized Pandas UDFs on small inputs, sequential execution is at least as fast because the pool overhead dominates. -- For batch and offline calls such as `get_feature_vectors`, `get_batch_data`, and `training_data`, CPU-heavy functions on independent branches benefit from `n_processes >= 2`: per-row work accumulates over the batch and the branches overlap. -- For vectorized Pandas UDFs on small inputs, sequential execution is at least as fast because the pool overhead dominates. -- For single feature vectors, sequential execution wins unless one transformation function alone costs more than the worker round trip, because dispatching the function and its inputs to a worker process is paid on every call. - -For online serving, spawning the worker pool during the first request would add the pool startup cost to that request's latency. -Passing `n_processes` to `init_serving` or `init_batch_scoring` pre-spawns the pool at initialization time and makes that value the default for subsequent retrieval calls; an explicit `n_processes` on an individual call still takes precedence. -Pre-spawning removes the startup cost but not the per-call dispatch cost described above. +For online serving, spawning the worker pool during the first request would add the pool startup cost to that request's latency. Passing `n_processes` to `init_serving` or `init_batch_scoring` pre-spawns the pool at initialization time and makes that value the default for subsequent retrieval calls; an explicit `n_processes` on an individual call still takes precedence. !!! example "Pre-spawning the worker pool for online serving" - === "Python" +=== "Python" ```python fv.init_serving(training_dataset_version=1, n_processes=2) @@ -438,8 +427,4 @@ Pre-spawning removes the startup cost but not the per-call dispatch cost describ vector = fv.get_feature_vector(entry={"id": 1}) ``` -Two implementation details are relevant when measuring: - -- Parallel DataFrame execution stages the input once in Arrow shared memory, and each worker reads only the columns its transformation function needs, so the input is not copied per worker. -- The worker pool start method defaults to `fork` on Linux and `spawn` on macOS and Windows, where forking after threaded native libraries have been loaded can deadlock the worker. - Set the `HSFS_TF_POOL_START_METHOD` environment variable to `fork`, `forkserver`, or `spawn` to override it. +The worker pool start method defaults to `fork` on Linux and `spawn` on macOS and Windows. Set the `HOPSWORKS_TF_POOL_START_METHOD` environment variable to `fork`, `forkserver`, or `spawn` to override it.