diff --git a/.changeset/includes-child-where-clauses.md b/.changeset/includes-child-where-clauses.md new file mode 100644 index 000000000..566dface5 --- /dev/null +++ b/.changeset/includes-child-where-clauses.md @@ -0,0 +1,7 @@ +--- +'@tanstack/db': patch +--- + +fix: pass child where clauses to loadSubset in includes + +Pure-child WHERE clauses on includes subqueries (e.g., `.where(({ item }) => eq(item.status, 'active'))`) are now passed through to the child collection's `loadSubset`/`queryFn`, enabling server-side filtering. Previously only the correlation filter reached the sync layer; additional child filters were applied client-side only. diff --git a/.changeset/includes-lazy-loading.md b/.changeset/includes-lazy-loading.md new file mode 100644 index 000000000..ff32adb00 --- /dev/null +++ b/.changeset/includes-lazy-loading.md @@ -0,0 +1,7 @@ +--- +'@tanstack/db': patch +--- + +fix: lazy load includes child collections in on-demand sync mode + +Includes child collections now use the same lazy loading mechanism as regular joins. When a query uses includes with a correlation WHERE clause (e.g., `.where(({ item }) => eq(item.rootId, r.id))`), only matching child rows are loaded on-demand via `requestSnapshot({ where: inArray(field, keys) })` instead of loading all data upfront. This ensures the sync layer's `queryFn` receives the correlation filter in `loadSubsetOptions`, enabling efficient server-side filtering. diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 70786ca8d..f4cff548d 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -4,6 +4,7 @@ import { join as joinOperator, map, reduce, + tap, } from '@tanstack/db-ivm' import { optimizeQuery } from '../optimizer.js' import { @@ -22,6 +23,8 @@ import { Value as ValClass, getWhereExpression, } from '../ir.js' +import { ensureIndexForField } from '../../indexes/auto-index.js' +import { inArray } from '../builder/functions.js' import { compileExpression, toBooleanPredicate } from './evaluators.js' import { processJoins } from './joins.js' import { containsAggregate, processGroupBy } from './group-by.js' @@ -379,6 +382,72 @@ export function compileQuery( ), ) + // --- Includes lazy loading (mirrors join lazy loading in joins.ts) --- + // Resolve the child correlation field to its underlying collection + field path + // so we can set up an index and targeted requestSnapshot calls. + const childCorrelationAlias = subquery.childCorrelationField.path[0]! + const childFromCollection = + subquery.query.from.type === `collectionRef` + ? subquery.query.from.collection + : (null as unknown as Collection) + const followRefResult = followRef( + subquery.query, + subquery.childCorrelationField, + childFromCollection, + ) + + if (followRefResult) { + const followRefCollection = followRefResult.collection + const fieldPath = followRefResult.path + const fieldName = fieldPath[0] + + // 1. Mark child source as lazy so CollectionSubscriber skips initial full load + lazySources.add(childCorrelationAlias) + + // 2. Ensure an index on the correlation field for efficient lookups + if (fieldName) { + ensureIndexForField(fieldName, fieldPath, followRefCollection) + } + + // 3. Tap parent keys to intercept correlation values and request + // matching child rows on-demand via the child's subscription + parentKeys = parentKeys.pipe( + tap((data: any) => { + const resolvedAlias = + aliasRemapping[childCorrelationAlias] || childCorrelationAlias + const lazySourceSubscription = subscriptions[resolvedAlias] + + if (!lazySourceSubscription) { + return + } + + if (lazySourceSubscription.hasLoadedInitialState()) { + return + } + + const joinKeys = [ + ...new Set( + data + .getInner() + .map( + ([[correlationValue]]: any) => correlationValue as unknown, + ) + .filter((key: unknown) => key != null), + ), + ] + + if (joinKeys.length === 0) { + return + } + + const lazyJoinRef = new PropRef(fieldPath) + lazySourceSubscription.requestSnapshot({ + where: inArray(lazyJoinRef, joinKeys), + }) + }), + ) + } + // If parent filters exist, append them to the child query's WHERE const childQuery = subquery.parentFilters && subquery.parentFilters.length > 0 @@ -410,6 +479,9 @@ export function compileQuery( // Merge child's alias metadata into parent's Object.assign(aliasToCollectionId, childResult.aliasToCollectionId) Object.assign(aliasRemapping, childResult.aliasRemapping) + for (const [alias, whereClause] of childResult.sourceWhereClauses) { + sourceWhereClauses.set(alias, whereClause) + } includesResults.push({ pipeline: childResult.pipeline, diff --git a/packages/db/tests/query/includes-lazy-loading.test.ts b/packages/db/tests/query/includes-lazy-loading.test.ts new file mode 100644 index 000000000..8e8eccace --- /dev/null +++ b/packages/db/tests/query/includes-lazy-loading.test.ts @@ -0,0 +1,678 @@ +import { describe, expect, it, vi } from 'vitest' +import { + and, + createLiveQueryCollection, + eq, + gte, + toArray, +} from '../../src/query/index.js' +import { createCollection } from '../../src/collection/index.js' +import { extractSimpleComparisons } from '../../src/query/expression-helpers.js' +import { flushPromises, stripVirtualProps } from '../utils.js' +import type { LoadSubsetOptions } from '../../src/types.js' + +/** + * Tests that includes subqueries use lazy loading for child collections, + * analogous to how regular joins use lazy loading. + */ + +type Root = { + id: number + name: string +} + +type Item = { + id: number + rootId: number + title: string +} + +const sampleRoots: Array = [ + { id: 1, name: `Root A` }, + { id: 2, name: `Root B` }, + { id: 3, name: `Root C` }, +] + +const sampleItems: Array = [ + { id: 10, rootId: 1, title: `Item A1` }, + { id: 11, rootId: 1, title: `Item A2` }, + { id: 20, rootId: 2, title: `Item B1` }, + // No items for Root C +] + +describe(`includes lazy loading`, () => { + function createRootsCollection() { + return createCollection({ + id: `includes-lazy-roots`, + getKey: (r) => r.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + for (const root of sampleRoots) { + write({ type: `insert`, value: root }) + } + commit() + markReady() + }, + }, + }) + } + + function createItemsCollectionWithTracking() { + const loadSubsetCalls: Array = [] + + const collection = createCollection({ + id: `includes-lazy-items`, + getKey: (item) => item.id, + syncMode: `on-demand`, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + for (const item of sampleItems) { + write({ type: `insert`, value: item }) + } + commit() + markReady() + return { + loadSubset: vi.fn((options: LoadSubsetOptions) => { + loadSubsetCalls.push(options) + return Promise.resolve() + }), + } + }, + }, + }) + + return { collection, loadSubsetCalls } + } + + it(`should pass correlation filter to child collection loadSubset`, async () => { + const roots = createRootsCollection() + const { collection: items, loadSubsetCalls } = + createItemsCollectionWithTracking() + + const liveQuery = createLiveQueryCollection((q) => + q.from({ r: roots }).select(({ r }) => ({ + id: r.id, + name: r.name, + children: toArray( + q + .from({ item: items }) + .where(({ item }) => eq(item.rootId, r.id)) + .select(({ item }) => ({ + id: item.id, + title: item.title, + })), + ), + })), + ) + + await liveQuery.preload() + + // The child collection should have received a loadSubset call with + // an inArray filter containing the parent root IDs + expect(loadSubsetCalls.length).toBeGreaterThan(0) + + const lastCall = loadSubsetCalls[loadSubsetCalls.length - 1]! + expect(lastCall.where).toBeDefined() + + // The filter should be an `in` expression on rootId with the parent key values + const filters = extractSimpleComparisons(lastCall.where) + expect(filters).toEqual([ + { + field: [`rootId`], + operator: `in`, + value: expect.arrayContaining([1, 2, 3]), + }, + ]) + }) + + it(`should produce correct query results with lazy-loaded includes`, async () => { + const roots = createRootsCollection() + const { collection: items } = createItemsCollectionWithTracking() + + const liveQuery = createLiveQueryCollection((q) => + q.from({ r: roots }).select(({ r }) => ({ + id: r.id, + name: r.name, + children: toArray( + q + .from({ item: items }) + .where(({ item }) => eq(item.rootId, r.id)) + .select(({ item }) => ({ + id: item.id, + title: item.title, + })), + ), + })), + ) + + await liveQuery.preload() + + // Verify the query results are correct + expect(liveQuery.size).toBe(3) + + const rootA = stripVirtualProps(liveQuery.get(1)) + expect(rootA).toBeDefined() + expect(rootA!.name).toBe(`Root A`) + expect((rootA as any).children).toHaveLength(2) + + const rootB = stripVirtualProps(liveQuery.get(2)) + expect(rootB).toBeDefined() + expect(rootB!.name).toBe(`Root B`) + expect((rootB as any).children).toHaveLength(1) + + const rootC = stripVirtualProps(liveQuery.get(3)) + expect(rootC).toBeDefined() + expect(rootC!.name).toBe(`Root C`) + expect((rootC as any).children).toHaveLength(0) + }) + + it(`should mark child source as lazy (not load initial state eagerly)`, async () => { + const roots = createRootsCollection() + + let initialLoadTriggered = false + const loadSubsetCalls: Array = [] + + const items = createCollection({ + id: `includes-lazy-items-eager-check`, + getKey: (item) => item.id, + syncMode: `on-demand`, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + for (const item of sampleItems) { + write({ type: `insert`, value: item }) + } + commit() + markReady() + return { + loadSubset: vi.fn((options: LoadSubsetOptions) => { + loadSubsetCalls.push(options) + // Check if this is a full load (no where clause) vs targeted load + if (!options.where) { + initialLoadTriggered = true + } + return Promise.resolve() + }), + } + }, + }, + }) + + const liveQuery = createLiveQueryCollection((q) => + q.from({ r: roots }).select(({ r }) => ({ + id: r.id, + children: toArray( + q + .from({ item: items }) + .where(({ item }) => eq(item.rootId, r.id)) + .select(({ item }) => ({ + id: item.id, + title: item.title, + })), + ), + })), + ) + + await liveQuery.preload() + + // The child collection should NOT have triggered a full initial load + // (without any where clause). It should only load via targeted + // requestSnapshot calls with correlation key filters. + expect(initialLoadTriggered).toBe(false) + + // But it should have loaded data via targeted loadSubset calls + expect(loadSubsetCalls.length).toBeGreaterThan(0) + // Every loadSubset call should have a where clause + for (const call of loadSubsetCalls) { + expect(call.where).toBeDefined() + } + }) + + it(`should reactively load new child data when parent rows are added`, async () => { + let syncMethods: any + + const roots = createCollection({ + id: `includes-lazy-roots-reactive`, + getKey: (r) => r.id, + sync: { + sync: (methods) => { + syncMethods = methods + methods.begin() + methods.write({ type: `insert`, value: { id: 1, name: `Root A` } }) + methods.commit() + methods.markReady() + }, + }, + }) + + const loadSubsetCalls: Array = [] + + const items = createCollection({ + id: `includes-lazy-items-reactive`, + getKey: (item) => item.id, + syncMode: `on-demand`, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + // Pre-load items for roots 1 and 2 + write({ type: `insert`, value: { id: 10, rootId: 1, title: `A1` } }) + write({ type: `insert`, value: { id: 20, rootId: 2, title: `B1` } }) + commit() + markReady() + return { + loadSubset: vi.fn((options: LoadSubsetOptions) => { + loadSubsetCalls.push(options) + return Promise.resolve() + }), + } + }, + }, + }) + + const liveQuery = createLiveQueryCollection((q) => + q.from({ r: roots }).select(({ r }) => ({ + id: r.id, + name: r.name, + children: toArray( + q + .from({ item: items }) + .where(({ item }) => eq(item.rootId, r.id)) + .select(({ item }) => ({ + id: item.id, + title: item.title, + })), + ), + })), + ) + + await liveQuery.preload() + + // Clear previous calls + const callsBefore = loadSubsetCalls.length + + // Add a new parent row — this should trigger a loadSubset call + // for the new correlation key (id: 2) + syncMethods.begin() + syncMethods.write({ type: `insert`, value: { id: 2, name: `Root B` } }) + syncMethods.commit() + + // Wait for the reactive pipeline to process + await flushPromises() + await new Promise((resolve) => setTimeout(resolve, 50)) + + // A new loadSubset call should have been made that includes the new key + const newCalls = loadSubsetCalls.slice(callsBefore) + expect(newCalls.length).toBeGreaterThan(0) + + // At least one of the new calls should include the new parent key (2) + const hasNewKey = newCalls.some((call) => { + if (!call.where) return false + const filters = extractSimpleComparisons(call.where) + return filters.some( + (f) => + f.operator === `in` && Array.isArray(f.value) && f.value.includes(2), + ) + }) + expect(hasNewKey).toBe(true) + }) + + it(`should not trigger loadSubset without where for toArray includes`, async () => { + // Same test as the lazy check but using toArray explicitly + // to verify the materialization mode doesn't affect lazy loading + const roots = createRootsCollection() + const loadSubsetCalls: Array = [] + + const items = createCollection({ + id: `includes-lazy-items-toarray`, + getKey: (item) => item.id, + syncMode: `on-demand`, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + for (const item of sampleItems) { + write({ type: `insert`, value: item }) + } + commit() + markReady() + return { + loadSubset: vi.fn((options: LoadSubsetOptions) => { + loadSubsetCalls.push(options) + return Promise.resolve() + }), + } + }, + }, + }) + + const liveQuery = createLiveQueryCollection((q) => + q.from({ r: roots }).select(({ r }) => ({ + id: r.id, + children: toArray( + q + .from({ item: items }) + .where(({ item }) => eq(item.rootId, r.id)) + .select(({ item }) => ({ + id: item.id, + })), + ), + })), + ) + + await liveQuery.preload() + + // Every loadSubset call should have a where clause (no unfiltered loads) + for (const call of loadSubsetCalls) { + expect(call.where).toBeDefined() + } + }) + + it(`should work with Collection materialization (not just toArray)`, async () => { + const roots = createRootsCollection() + const loadSubsetCalls: Array = [] + + const items = createCollection({ + id: `includes-lazy-items-collection-mat`, + getKey: (item) => item.id, + syncMode: `on-demand`, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + for (const item of sampleItems) { + write({ type: `insert`, value: item }) + } + commit() + markReady() + return { + loadSubset: vi.fn((options: LoadSubsetOptions) => { + loadSubsetCalls.push(options) + return Promise.resolve() + }), + } + }, + }, + }) + + // Use Collection materialization (no toArray wrapper) + const liveQuery = createLiveQueryCollection((q) => + q.from({ r: roots }).select(({ r }) => ({ + id: r.id, + name: r.name, + children: q + .from({ item: items }) + .where(({ item }) => eq(item.rootId, r.id)) + .select(({ item }) => ({ + id: item.id, + title: item.title, + })), + })), + ) + + await liveQuery.preload() + + // Should use lazy loading with filters for Collection materialization too + expect(loadSubsetCalls.length).toBeGreaterThan(0) + + const lastCall = loadSubsetCalls[loadSubsetCalls.length - 1]! + expect(lastCall.where).toBeDefined() + + const filters = extractSimpleComparisons(lastCall.where) + expect(filters).toEqual([ + { + field: [`rootId`], + operator: `in`, + value: expect.arrayContaining([1, 2, 3]), + }, + ]) + }) +}) + +describe(`includes child where clauses in loadSubset`, () => { + /** + * Tests that pure-child WHERE clauses (not the correlation) are passed + * through to the child collection's loadSubset/queryFn. + */ + + type Root = { + id: number + name: string + } + + type Item = { + id: number + rootId: number + status: string + priority: number + title: string + } + + const sampleRoots: Array = [ + { id: 1, name: `Root A` }, + { id: 2, name: `Root B` }, + ] + + const sampleItems: Array = [ + { id: 10, rootId: 1, status: `active`, priority: 3, title: `A1 active` }, + { + id: 11, + rootId: 1, + status: `archived`, + priority: 1, + title: `A1 archived`, + }, + { id: 20, rootId: 2, status: `active`, priority: 5, title: `B1 active` }, + { id: 21, rootId: 2, status: `active`, priority: 2, title: `B1 active2` }, + ] + + function createRootsCollection() { + return createCollection({ + id: `child-where-roots`, + getKey: (r) => r.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + for (const root of sampleRoots) { + write({ type: `insert`, value: root }) + } + commit() + markReady() + }, + }, + }) + } + + function createItemsCollectionWithTracking() { + const loadSubsetCalls: Array = [] + + const collection = createCollection({ + id: `child-where-items`, + getKey: (item) => item.id, + syncMode: `on-demand`, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + for (const item of sampleItems) { + write({ type: `insert`, value: item }) + } + commit() + markReady() + return { + loadSubset: vi.fn((options: LoadSubsetOptions) => { + loadSubsetCalls.push(options) + return Promise.resolve() + }), + } + }, + }, + }) + + return { collection, loadSubsetCalls } + } + + it(`should include pure-child where clause in loadSubset along with correlation filter`, async () => { + const roots = createRootsCollection() + const { collection: items, loadSubsetCalls } = + createItemsCollectionWithTracking() + + const liveQuery = createLiveQueryCollection((q) => + q.from({ r: roots }).select(({ r }) => ({ + id: r.id, + children: toArray( + q + .from({ item: items }) + .where(({ item }) => eq(item.rootId, r.id)) + .where(({ item }) => eq(item.status, `active`)) + .select(({ item }) => ({ + id: item.id, + title: item.title, + })), + ), + })), + ) + + await liveQuery.preload() + + expect(loadSubsetCalls.length).toBeGreaterThan(0) + + // The loadSubset call should contain BOTH the correlation filter (inArray) + // AND the pure-child filter (eq status 'active') + const lastCall = loadSubsetCalls[loadSubsetCalls.length - 1]! + expect(lastCall.where).toBeDefined() + + const filters = extractSimpleComparisons(lastCall.where) + const hasCorrelationFilter = filters.some( + (f) => f.operator === `in` && f.field[0] === `rootId`, + ) + const hasStatusFilter = filters.some( + (f) => + f.operator === `eq` && f.field[0] === `status` && f.value === `active`, + ) + + expect(hasCorrelationFilter).toBe(true) + expect(hasStatusFilter).toBe(true) + }) + + it(`should include multiple pure-child where clauses in loadSubset`, async () => { + const roots = createRootsCollection() + const { collection: items, loadSubsetCalls } = + createItemsCollectionWithTracking() + + const liveQuery = createLiveQueryCollection((q) => + q.from({ r: roots }).select(({ r }) => ({ + id: r.id, + children: toArray( + q + .from({ item: items }) + .where(({ item }) => eq(item.rootId, r.id)) + .where(({ item }) => eq(item.status, `active`)) + .where(({ item }) => gte(item.priority, 3)) + .select(({ item }) => ({ + id: item.id, + title: item.title, + })), + ), + })), + ) + + await liveQuery.preload() + + expect(loadSubsetCalls.length).toBeGreaterThan(0) + + const lastCall = loadSubsetCalls[loadSubsetCalls.length - 1]! + expect(lastCall.where).toBeDefined() + + const filters = extractSimpleComparisons(lastCall.where) + const hasCorrelationFilter = filters.some( + (f) => f.operator === `in` && f.field[0] === `rootId`, + ) + const hasStatusFilter = filters.some( + (f) => + f.operator === `eq` && f.field[0] === `status` && f.value === `active`, + ) + const hasPriorityFilter = filters.some( + (f) => f.operator === `gte` && f.field[0] === `priority` && f.value === 3, + ) + + expect(hasCorrelationFilter).toBe(true) + expect(hasStatusFilter).toBe(true) + expect(hasPriorityFilter).toBe(true) + }) + + it(`should produce correct filtered results with child where clause`, async () => { + const roots = createRootsCollection() + const { collection: items } = createItemsCollectionWithTracking() + + const liveQuery = createLiveQueryCollection((q) => + q.from({ r: roots }).select(({ r }) => ({ + id: r.id, + children: toArray( + q + .from({ item: items }) + .where(({ item }) => eq(item.rootId, r.id)) + .where(({ item }) => eq(item.status, `active`)) + .select(({ item }) => ({ + id: item.id, + title: item.title, + })), + ), + })), + ) + + await liveQuery.preload() + + // Root A: only 1 active item (id 10), the archived one (id 11) should be filtered + const rootA = stripVirtualProps(liveQuery.get(1)) + expect(rootA).toBeDefined() + expect((rootA as any).children).toHaveLength(1) + expect((rootA as any).children[0].id).toBe(10) + + // Root B: 2 active items + const rootB = stripVirtualProps(liveQuery.get(2)) + expect(rootB).toBeDefined() + expect((rootB as any).children).toHaveLength(2) + }) + + it(`should include child where clause combined with correlation in and() syntax`, async () => { + const roots = createRootsCollection() + const { collection: items, loadSubsetCalls } = + createItemsCollectionWithTracking() + + // Use a single where with and() combining correlation + child filter + const liveQuery = createLiveQueryCollection((q) => + q.from({ r: roots }).select(({ r }) => ({ + id: r.id, + children: toArray( + q + .from({ item: items }) + .where(({ item }) => + and(eq(item.rootId, r.id), eq(item.status, `active`)), + ) + .select(({ item }) => ({ + id: item.id, + title: item.title, + })), + ), + })), + ) + + await liveQuery.preload() + + expect(loadSubsetCalls.length).toBeGreaterThan(0) + + const lastCall = loadSubsetCalls[loadSubsetCalls.length - 1]! + expect(lastCall.where).toBeDefined() + + const filters = extractSimpleComparisons(lastCall.where) + const hasCorrelationFilter = filters.some( + (f) => f.operator === `in` && f.field[0] === `rootId`, + ) + const hasStatusFilter = filters.some( + (f) => + f.operator === `eq` && f.field[0] === `status` && f.value === `active`, + ) + + expect(hasCorrelationFilter).toBe(true) + expect(hasStatusFilter).toBe(true) + }) +})