diff --git a/docs/user_guides/fs/feature_group/create.md b/docs/user_guides/fs/feature_group/create.md index d05d967fc8..f985c96472 100644 --- a/docs/user_guides/fs/feature_group/create.md +++ b/docs/user_guides/fs/feature_group/create.md @@ -102,6 +102,116 @@ MaxDirectoryItemsExceededException - The directory item limit is exceeded: limit By using partitioning the system will write the feature data in different subdirectories, thus allowing you to write 10240 files per partition. +##### Time-grain partitioning with `partitioned_by` + +Most time-series feature groups want to partition by a time grain derived from `event_time`. +Instead of decomposing the timestamp into `year` / `month` / `day` columns yourself and passing them as `partition_key`, declare the grains with `partitioned_by` and let Hopsworks derive the partition columns for you. +Pass one or more grains drawn from `hour`, `day`, `week`, `month`, and `year`. +Supported on `time_travel_format="DELTA"`, `"ICEBERG"`, and `"HUDI"` for non-stream feature groups (see [Hudi](#hudi) and [Stream feature groups](#stream-feature-groups) below). + +```python +fg = fs.get_or_create_feature_group( + name="transactions", + version=1, + primary_key=["tx_id"], + event_time="tx_ts", + partitioned_by=["year", "month", "day"], + time_travel_format="DELTA", +) +fg.insert(df) # df does not need year/month/day; they derive from tx_ts +``` + +The example above is equivalent to manually decomposing `tx_ts` into three columns and passing `partition_key=["year", "month", "day"]`, but you never write the grain columns yourself. +The grain columns are ordinary materialized partition columns: the client computes them from `event_time` on each write, and the backend registers them as partition columns through the normal table-creation path (no Delta generated columns, no extra job). +The source DataFrame must contain only your real features plus `event_time`; it must not carry the grain columns. + +On disk the data lands in the standard Hive layout, one directory level per grain in the order you listed them: + +```text +.../transactions_1/year=2026/month=06/day=11/ +``` + +The grains become real features on the feature group, so they show up in the schema and in `fg.partition_key`, and you can filter on them directly. +By default they are written only to the offline store (see [Online feature store](#online-feature-store) below). + +###### Parameters + +- `partitioned_by`: ordered, non-empty list of grains from `{"hour", "day", "week", "month", "year"}`, no duplicates. + Mutually exclusive with `partition_key`, and requires `event_time` to be set. + A grain must not collide with `event_time` or an existing feature name. +- `online_partition_columns` (default `False`): when `True`, the derived grain columns are also written to the online store; when `False` they are offline-only. + Online serving with `partitioned_by` is not supported yet, so this is effectively always `False` today (see below). + +###### Persistence across sessions + +`partitioned_by` is stored on the feature group, so it round-trips without re-passing it: + +```python +fg = fs.get_feature_group("transactions", version=1) +fg.partitioned_by # ["year", "month", "day"] +fg.partition_key # ["year", "month", "day"] +``` + +###### Reading and partition pruning + +Read the whole group, or a time slice; the grain columns appear as normal feature columns, populated from `event_time`: + +```python +recent = fg.read(start_time="2026-06-01", end_time="2026-06-11") +``` + +The grain columns are real partition columns, so a filter on a grain column (for example `fg.filter(fg.year == 2026)`) prunes partitions natively. +A filter on an `event_time` range is rewritten into equivalent grain-column predicates by the query layer, so `fg.read(start_time=..., end_time=...)` prunes too on hierarchical specs (and tightens to the finest grain the range allows, so a within-one-month window also bounds `day`): + +| `partitioned_by` | Prunes on `event_time` range? | Prunes on `year` / `month` / `day` filter? | +| --- | --- | --- | +| `["year"]` | ✅ | ✅ | +| `["year", "month"]` | ✅ | ✅ | +| `["year", "month", "day"]` | ✅ | ✅ | +| `["year", "month", "day", "hour"]` | ✅ | ✅ | +| `["month"]` (no year) | ⚠️ no, month alone is ambiguous across years | ✅ filter on month works | +| `["year", "week"]` | ⚠️ year only, week is not directly derivable from a date range | ✅ both columns prune | +| `["day"]` (no year/month) | ⚠️ no, day-of-month is ambiguous | ✅ filter on day works | + +Prefer hierarchical specs: `["year"]`, `["year", "month"]`, `["year", "month", "day"]`, `["year", "month", "day", "hour"]`. +They line up with the typical batch-pipeline access pattern and prune naturally on both grain-column and `event_time`-range filters. +Non-hierarchical specs are still valid; they just do not prune on an `event_time` range, only on a direct filter of the derived columns. + +###### Example: clickstream partitioned by the hour + +A high-volume event stream partitioned down to the hour, so a query for a few hours reads only those partitions: + +```python +fg = fs.get_or_create_feature_group( + name="clickstream", + version=1, + primary_key=["event_id"], + event_time="event_time", + partitioned_by=["year", "month", "day", "hour"], + online_enabled=False, + time_travel_format="DELTA", +) +fg.insert(clickstream_df) # only event_id / event_time / event fields +``` + +###### Online feature store + +Online-enabled feature groups do not yet support `partitioned_by`. +The online ingestion path does not exclude the offline-only grain columns from the Kafka/Avro schema, nor materialize them for the online write, so the backend rejects `partitioned_by` together with `online_enabled=True`, both at creation and when enabling online on an existing group. +Keep the feature group offline-only to use `partitioned_by`. + +###### Hudi + +`partitioned_by` works on Hudi feature groups written directly by Spark (a non-stream feature group): the client materializes the grain columns and Hudi partitions on them. +On the Python (non-Spark) engine a Hudi feature group is created as a stream feature group, which is not yet supported (see below); use `time_travel_format="DELTA"` or `"ICEBERG"` there. +`time_travel_format="NONE"` (plain Hive/parquet) is rejected because it has no grain-materialization step. + +###### Stream feature groups + +`partitioned_by` is not yet supported on stream feature groups (`stream=True`). +Stream feature groups materialize through the DeltaStreamer job, which does not derive the grain columns yet, so the backend rejects `partitioned_by` on them at creation. +Create a non-stream feature group to use `partitioned_by`. + ##### Table format When you create a feature group, you can specify the table format you want to use to store the data in your feature group by setting the `time_travel_format` parameter.