Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions docs/user_guides/fs/feature_group/create.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,116 @@ MaxDirectoryItemsExceededException - The directory item limit is exceeded: limit

By using partitioning the system will write the feature data in different subdirectories, thus allowing you to write 10240 files per partition.

##### Time-grain partitioning with `partitioned_by`

Most time-series feature groups want to partition by a time grain derived from `event_time`.
Instead of decomposing the timestamp into `year` / `month` / `day` columns yourself and passing them as `partition_key`, declare the grains with `partitioned_by` and let Hopsworks derive the partition columns for you.
Pass one or more grains drawn from `hour`, `day`, `week`, `month`, and `year`.
Supported on `time_travel_format="DELTA"`, `"ICEBERG"`, and `"HUDI"` for non-stream feature groups (see [Hudi](#hudi) and [Stream feature groups](#stream-feature-groups) below).

```python
fg = fs.get_or_create_feature_group(
name="transactions",
version=1,
primary_key=["tx_id"],
event_time="tx_ts",
partitioned_by=["year", "month", "day"],
time_travel_format="DELTA",
)
fg.insert(df) # df does not need year/month/day; they derive from tx_ts
```

The example above is equivalent to manually decomposing `tx_ts` into three columns and passing `partition_key=["year", "month", "day"]`, but you never write the grain columns yourself.
The grain columns are ordinary materialized partition columns: the client computes them from `event_time` on each write, and the backend registers them as partition columns through the normal table-creation path (no Delta generated columns, no extra job).
The source DataFrame must contain only your real features plus `event_time`; it must not carry the grain columns.
Comment on lines +124 to +126

On disk the data lands in the standard Hive layout, one directory level per grain in the order you listed them:

```text
.../transactions_1/year=2026/month=06/day=11/<parquet files>
```

The grains become real features on the feature group, so they show up in the schema and in `fg.partition_key`, and you can filter on them directly.
By default they are written only to the offline store (see [Online feature store](#online-feature-store) below).

###### Parameters

- `partitioned_by`: ordered, non-empty list of grains from `{"hour", "day", "week", "month", "year"}`, no duplicates.
Mutually exclusive with `partition_key`, and requires `event_time` to be set.
A grain must not collide with `event_time` or an existing feature name.
- `online_partition_columns` (default `False`): when `True`, the derived grain columns are also written to the online store; when `False` they are offline-only.
Online serving with `partitioned_by` is not supported yet, so this is effectively always `False` today (see below).
Comment on lines +142 to +143

###### Persistence across sessions

`partitioned_by` is stored on the feature group, so it round-trips without re-passing it:

```python
fg = fs.get_feature_group("transactions", version=1)
fg.partitioned_by # ["year", "month", "day"]
fg.partition_key # ["year", "month", "day"]
```

###### Reading and partition pruning

Read the whole group, or a time slice; the grain columns appear as normal feature columns, populated from `event_time`:

```python
recent = fg.read(start_time="2026-06-01", end_time="2026-06-11")
```

The grain columns are real partition columns, so a filter on a grain column (for example `fg.filter(fg.year == 2026)`) prunes partitions natively.
A filter on an `event_time` range is rewritten into equivalent grain-column predicates by the query layer, so `fg.read(start_time=..., end_time=...)` prunes too on hierarchical specs (and tightens to the finest grain the range allows, so a within-one-month window also bounds `day`):

| `partitioned_by` | Prunes on `event_time` range? | Prunes on `year` / `month` / `day` filter? |
| --- | --- | --- |
| `["year"]` | ✅ | ✅ |
| `["year", "month"]` | ✅ | ✅ |
| `["year", "month", "day"]` | ✅ | ✅ |
| `["year", "month", "day", "hour"]` | ✅ | ✅ |
| `["month"]` (no year) | ⚠️ no, month alone is ambiguous across years | ✅ filter on month works |
| `["year", "week"]` | ⚠️ year only, week is not directly derivable from a date range | ✅ both columns prune |
| `["day"]` (no year/month) | ⚠️ no, day-of-month is ambiguous | ✅ filter on day works |

Prefer hierarchical specs: `["year"]`, `["year", "month"]`, `["year", "month", "day"]`, `["year", "month", "day", "hour"]`.
They line up with the typical batch-pipeline access pattern and prune naturally on both grain-column and `event_time`-range filters.
Non-hierarchical specs are still valid; they just do not prune on an `event_time` range, only on a direct filter of the derived columns.

###### Example: clickstream partitioned by the hour

A high-volume event stream partitioned down to the hour, so a query for a few hours reads only those partitions:

```python
fg = fs.get_or_create_feature_group(
name="clickstream",
version=1,
primary_key=["event_id"],
event_time="event_time",
partitioned_by=["year", "month", "day", "hour"],
online_enabled=False,
time_travel_format="DELTA",
)
fg.insert(clickstream_df) # only event_id / event_time / event fields
```

###### Online feature store

Online-enabled feature groups do not yet support `partitioned_by`.
The online ingestion path does not exclude the offline-only grain columns from the Kafka/Avro schema, nor materialize them for the online write, so the backend rejects `partitioned_by` together with `online_enabled=True`, both at creation and when enabling online on an existing group.
Keep the feature group offline-only to use `partitioned_by`.

###### Hudi

`partitioned_by` works on Hudi feature groups written directly by Spark (a non-stream feature group): the client materializes the grain columns and Hudi partitions on them.
On the Python (non-Spark) engine a Hudi feature group is created as a stream feature group, which is not yet supported (see below); use `time_travel_format="DELTA"` or `"ICEBERG"` there.
`time_travel_format="NONE"` (plain Hive/parquet) is rejected because it has no grain-materialization step.

###### Stream feature groups

`partitioned_by` is not yet supported on stream feature groups (`stream=True`).
Stream feature groups materialize through the DeltaStreamer job, which does not derive the grain columns yet, so the backend rejects `partitioned_by` on them at creation.
Create a non-stream feature group to use `partitioned_by`.

##### Table format

When you create a feature group, you can specify the table format you want to use to store the data in your feature group by setting the `time_travel_format` parameter.
Expand Down
Loading