From 08bff6e6d92e3ef5622926579d41a81b5843f18f Mon Sep 17 00:00:00 2001 From: JonathanLab Date: Thu, 28 May 2026 16:16:52 +0200 Subject: [PATCH] refactor: port over file watcher --- MIGRATION.md | 23 ++ REFACTOR.md | 105 +++++-- apps/code/package.json | 1 + apps/code/src/main/di/container.ts | 2 - apps/code/src/main/index.ts | 16 +- .../code/src/main/services/archive/service.ts | 4 +- .../src/main/services/file-watcher/bridge.ts | 36 +++ .../src/main/services/file-watcher/schemas.ts | 60 ---- .../services/file-watcher/service.test.ts | 90 ------ .../src/main/services/file-watcher/service.ts | 266 ----------------- apps/code/src/main/services/fs/service.ts | 6 +- .../main/services/suspension/service.test.ts | 4 +- .../src/main/services/suspension/service.ts | 4 +- .../src/main/services/workspace/service.ts | 8 +- .../src/main/trpc/routers/file-watcher.ts | 35 +-- .../task-detail/components/FileTreePanel.tsx | 29 +- .../code/src/renderer/hooks/useFileWatcher.ts | 95 +++---- biome.jsonc | 8 +- packages/core/package.json | 15 +- packages/core/src/placeholder.ts | 1 + .../features/file-watcher/useFileWatcher.ts | 18 ++ packages/workspace-client/src/client.ts | 26 +- packages/workspace-client/src/provider.tsx | 27 +- packages/workspace-client/src/types.ts | 4 + packages/workspace-server/package.json | 1 + packages/workspace-server/src/app.ts | 6 +- packages/workspace-server/src/di/container.ts | 4 + packages/workspace-server/src/di/tokens.ts | 2 + .../src/services/fs/schemas.ts | 12 + .../src/services/fs/service.ts | 29 ++ .../src/services/git/schemas.ts | 11 + .../src/services/watcher/schemas.ts | 58 ++++ .../src/services/watcher/service.ts | 269 ++++++++++++++++++ packages/workspace-server/src/trpc.ts | 53 +++- pnpm-lock.yaml | 19 +- pnpm-workspace.yaml | 1 + 36 files changed, 734 insertions(+), 614 deletions(-) create mode 100644 apps/code/src/main/services/file-watcher/bridge.ts delete mode 100644 apps/code/src/main/services/file-watcher/schemas.ts delete mode 100644 apps/code/src/main/services/file-watcher/service.test.ts delete mode 100644 apps/code/src/main/services/file-watcher/service.ts create mode 100644 packages/core/src/placeholder.ts create mode 100644 packages/ui/src/features/file-watcher/useFileWatcher.ts create mode 100644 packages/workspace-client/src/types.ts create mode 100644 packages/workspace-server/src/services/fs/schemas.ts create mode 100644 packages/workspace-server/src/services/fs/service.ts create mode 100644 packages/workspace-server/src/services/git/schemas.ts create mode 100644 packages/workspace-server/src/services/watcher/schemas.ts create mode 100644 packages/workspace-server/src/services/watcher/service.ts diff --git a/MIGRATION.md b/MIGRATION.md index 9897c59267..55ed78c6d0 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -6,6 +6,29 @@ For the procedure to follow when porting a new feature, see [REFACTOR.md](./REFA --- +## 2026-05-28 — file-watcher (workspace-server owns orchestration, hook is pure useSubscription) + +- Moved: `apps/code/src/main/services/file-watcher/` deleted entirely. Orchestration (debounce, bulk threshold, git event filtering, git-dir resolution) lives in `packages/workspace-server/src/services/watcher/service.ts` as `WatcherService.watchRepo()`. New tRPC subscription procedure `fileWatcher.watch` emits the processed `FileWatcherEvent` discriminated union. Raw `watcher.watch` still available for unprocessed events. +- **Nothing for file-watcher lives in `packages/core/`.** The "orchestration" we thought belonged in core (debounce, bulk threshold, git filtering) turned out to be *source-smoothing* — properties of the event source, not domain logic. Source-smoothing belongs with the source. Core is for business state machines, retries, cross-feature coordination — none of which file-watcher has. +- New transport (still applies): `workspace-client` uses `splitLink` over `httpSubscriptionLink` (SSE) for subscriptions + `httpBatchLink` for queries/mutations. SSE auth via `?secret=` query param since EventSource can't send headers. +- Renderer hook (`packages/ui/src/features/file-watcher/useFileWatcher.ts`) is a 5-line `useSubscription(trpc.fileWatcher.watch.subscriptionOptions(...))` wrapper. No `useEffect`, no `for-await`, no orchestration state — pure react-query idiom. Caller passes a single `onEvent` callback and switches on `event.kind`. +- Main bridge: `apps/code/src/main/services/file-watcher/bridge.ts` is a small `FileWatcherBridge` class (~40 lines) that subscribes to `fileWatcher.watch` via workspace-client and re-emits via `TypedEventEmitter` for the four legacy in-process consumers (`fs`, `archive`, `suspension`, `workspace`). Bound at `MAIN_TOKENS.FileWatcherService` via `container.bind(...).toConstantValue(new FileWatcherBridge(workspaceClient))` in `index.ts` after `workspaceServer.start()`. +- Bridge retirement: delete `FileWatcherBridge`, its router, and the renderer's `start`/`stop` mutation calls when **fs**, **archive**, **suspension**, **workspace** migrate. Those consumers will then use `useFileWatcher` directly (renderer) or subscribe via workspace-client (background work in workspace-server or main). +- Cleaned: `WatcherRegistryService` dep dropped (its `isShutdown` check is unnecessary — subscriptions die naturally when workspace-server child or main process exits). Schemas split out of `trpc.ts` into per-service `schemas.ts`. Router is now strict one-liners. +- Left as-is: two parallel watcher pipelines per repo (the bridge + the renderer each subscribe to workspace-server); workspace-server doesn't dedupe parcel watchers. `FsService` in main still owns its file-cache invalidation. `WatcherRegistryService` still used by focus + app-lifecycle. +- New import paths: `import { useFileWatcher } from "@posthog/ui/features/file-watcher/useFileWatcher"`. For main consumers needing kind constants: `import { FileWatcherEventKind } from "@posthog/workspace-server/services/watcher/schemas"`. Bridge class: `apps/code/src/main/services/file-watcher/bridge.ts`. + +--- + +## 2026-05-28 — api-client (transport only) + +- Moved: `apps/code/src/renderer/api/{fetcher,generated,generated.augment,fetcher.test}.ts` → `packages/api-client/src/`. `generated.augment.d.ts` → `.ts` (side-effect import from `index.ts` so apps/code's tsc picks up the module augmentation through the package's exports). +- Cleaned: `__APP_VERSION__` Vite global removed from fetcher — now an `appVersion` field on `ApiFetcherConfig`. Renderer wrapper passes the global at construction. +- Left as-is: the 2929-line `posthogClient.ts` god-class. Tagged with a `PORT NOTE` — gets sliced into `packages/core//service.ts` per feature, following REFACTOR.md "Coexistence and bridges". +- New import path: `@posthog/api-client` (was `@renderer/api/{fetcher,generated}`). Also updated `scripts/update-openapi-client.ts` to write into the new package. + +--- + ## 2026-05-27 — diff-stats - Moved: `apps/code/src/main/services/git/getDiffStats` → `packages/workspace-server/src/services/git/service.ts` + `packages/ui/src/features/diff-stats/` diff --git a/REFACTOR.md b/REFACTOR.md index 7abd6a0007..0229c9068e 100644 --- a/REFACTOR.md +++ b/REFACTOR.md @@ -46,16 +46,22 @@ apps/ Per-domain folder shape, by package: ``` -core/sessions/ ui/sessions/ workspace-server/git/ -├── index.ts ├── index.ts ├── index.ts -├── service.ts ├── SessionList.tsx ├── procedures.ts -├── types.ts ├── SessionDetail.tsx ├── git-ops.ts -└── service.test.ts ├── useSession.ts └── git-ops.test.ts +core/sessions/ ui/sessions/ workspace-server/services/git/ +├── sessions.ts ├── SessionList.tsx ├── git.ts +├── types.ts ├── SessionDetail.tsx └── schemas.ts +└── sessions.test.ts ├── useSession.ts ├── store.ts (Zustand) └── SessionList.test.tsx ``` -Flat. No `internal/` folder — `index.ts` is the boundary. Split into more files when a single file gets too long to read, grouped by concept. +Naming: +- The main file is named after its domain (`sessions.ts`, `file-watcher.ts`, `git.ts`) — not `service.ts`, not `index.ts`. "Service" is DI culture; in core there's no DI and the suffix is meaningless. The repeated folder/file name is intentional: `file-watcher/file-watcher.ts` reads cleaner than `file-watcher/service.ts` and makes grep-by-filename land on the right file. +- `types.ts` — pure TS types, interfaces, enums, constants. No runtime cost. Use when the domain has internal-only types not crossing a tRPC boundary. +- `schemas.ts` — Zod schemas + types inferred from them (`z.infer`). Use when shapes cross a tRPC boundary (workspace-server procedures, anything validated at runtime). The schema is the source of truth; types are inferred from it, never declared separately. +- A domain can have both `types.ts` and `schemas.ts` when it has internal types AND boundary-validated shapes. Most have one or the other. +- Tests colocate next to the file under test (`sessions.test.ts`, `git.test.ts`). + +Flat. No `internal/` folder. Split into more files when a single file gets too long to read, grouped by concept. **What each package owns:** @@ -82,6 +88,10 @@ The desktop **main process is not the home of business logic anymore.** It does - **Delete, don't deprecate.** When code moves, the old file is removed in the same change. No shims, no re-exports, no "deprecated" comments. - **Banned imports in `packages/core`.** No `electron`, no `node:fs`, no `node:child_process`, no `node:net`, no `node:os`, no `node:path`. Pure JS only. Anything you'd reach for there is either a workspace-server procedure or a `@posthog/platform` interface. - **Don't bundle other work.** Wire-format changes, algorithm rewrites, new features, cosmetic renames — keep them out of the move. They double review surface and obscure what's actually being relocated. +- **Not every feature needs a core module.** `core/` is for domain logic — state machines, retries, dedup, cross-feature coordination, business rules. Features that are pure data-piping (server → useQuery → component) skip `core/` entirely. Don't invent a core file for symmetry; let core stay empty for that feature. +- **Source-smoothing belongs with the source, not in core.** Debouncing a noisy event stream, dedup, bulk-threshold throttling, filtering source-specific noise (irrelevant git dir events, etc.) — these are properties of the *event source*, not domain decisions. They live in the workspace-server procedure that owns the source, so every client gets the smoothed stream for free. Don't put them in core just because they look like "orchestration." +- **Hooks are pure react-query idioms.** `useQuery`, `useMutation`, `useSubscription` over a tRPC procedure — that's the whole hook. No `useEffect` constructing services. No `for-await` over async iterables in a hook body. No imperative subscribe/unsubscribe ceremony with a wrappers map. If you find yourself reaching for those, the orchestration is in the wrong place — push it to wherever the tRPC procedure lives (typically workspace-server) and the hook collapses to 5 lines. +- **`useState`, `useRef`, `useEffect` in a hook are usually a smell.** They mean the hook is holding application state or subscription bookkeeping that should live elsewhere — react-query's cache, a Zustand store, a workspace-server procedure, or just derivation from existing query data. The legitimate uses are narrow: `useRef` for DOM refs (focus, scroll, measurement), `useEffect` for synchronizing imperative browser APIs (event listeners on `window`, `ResizeObserver`, etc.). Anything else — caching a previous value, holding a subscription handle, stashing a callback ref to avoid re-renders, building a wrappers map — means the hook is doing work that belongs upstream. ## Comment markers @@ -97,47 +107,79 @@ Use these consistently. Grep targets matter — follow-up passes hunt for each m | Today | New home | |---|---| -| `apps/code/src/main/services//service.ts` — orchestration, retries, state machines, parsing, OAuth dances | `packages/core//service.ts` | -| Same file — the bits that touch git CLI / fs / spawn | `packages/workspace-server//` (git → `git/`, fs → `fs/`, spawn → `process/`, etc.) | -| `apps/code/src/main/trpc/routers/.ts` | Dumb procedures → registered from the relevant `workspace-server//`. Orchestrating procedures **disappear** — core calls the clients directly. | +| `apps/code/src/main/services//service.ts` — *business* orchestration: state machines, retries, OAuth flows, cross-feature coordination, business rules | `packages/core//.ts` | +| Same file — *source smoothing*: debounce, dedup, throttle, batch, source-noise filtering | `packages/workspace-server/services//` — alongside whatever procedure produces the noisy events. Don't route through core. | +| Same file — host syscalls (git CLI, fs, spawn, native modules) | `packages/workspace-server/services//` (git → `git/`, fs → `fs/`, spawn → `process/`, etc.) | +| `apps/code/src/main/trpc/routers/.ts` | Strict one-liner procedures, registered alongside their service in `workspace-server/services//`. Orchestrating procedures **disappear** — core (or the workspace-server procedure itself) does that work. | | `apps/code/src/api/` (Django) | `packages/api-client/` | | `apps/code/src/renderer/features//` (UI) | `packages/ui//` | | `apps/code/src/renderer/stores/.ts` (thin UI state) | `packages/ui//store.ts` (still Zustand, still thin) | | `apps/code/src/main/platform-adapters/.ts` | `apps/desktop/platform-adapters/.ts` | +If the migrated feature is pure data-piping (server → useQuery → component), there's no row to core — that's expected, not a missed step. + --- ## Per-feature procedure Do these in order. One feature at a time. -1. **Audit.** Grep for the feature. List every file: main service, schemas, router, store, components, hooks, subscriptions, tests. If the audit doesn't fit in one paragraph, split the feature (see [Splitting a mega-feature](#splitting-a-mega-feature)). -2. **Identify host calls.** Anything touching git CLI, fs, child-process spawn, native modules. Those become workspace-server procedures. -3. **Identify orchestration.** Retries, polling, dedup, state machines, multi-step flows, error normalization. That's core. -4. **Define the workspace-server router first.** Dumb procedures only, Zod input + output. Add it to `appRouter` in `packages/workspace-server/src/trpc.ts`. -5. **Port orchestration to `packages/core//`.** Pure JS. Inject `workspace-client` and `api-client` via constructor params — **no Inversify in core.** Unit test it. -6. **Wire the UI.** Lift to `packages/ui/features//` if shareable, or keep in the app. The component imports core; core imports the clients. -7. **Delete the old main service and router.** No shims, no compatibility re-exports. +1. **Audit.** Grep for the feature. List every file: main service, schemas, router, store, components, hooks, subscriptions, tests. **Also list fan-in**: which other main services consume this one's events or call its methods. The audit is for *you*, not a gate — most features in this codebase have fan-in, and that's not a reason to abandon the slice. See [Coexistence and bridges](#coexistence-and-bridges) for how to handle it. +2. **Identify host calls.** Anything touching git CLI, fs, child-process spawn, native modules, OS APIs. Those become workspace-server procedures. +3. **Sort the rest into one of three buckets:** + - **Source-smoothing** — debounce, dedup, throttle, batch, noise-filter events from a host source. Goes alongside the source's procedure in `workspace-server/services//`. Don't route through core. + - **Business orchestration** — state machines, retries, OAuth flows, cross-feature coordination, business rules, error normalization. Goes in `packages/core//`. + - **Neither** — pure data-piping from a server query to a component. There's no core module to write. Skip ahead to step 6. +4. **Define the workspace-server procedures first.** Strict one-liners over service methods. Zod input + output, schemas in `workspace-server/services//schemas.ts`. +5. **(If core applies)** Port business orchestration to `packages/core//.ts`. Pure JS. Constructor injection of `workspace-client` / `api-client` — **no Inversify in core.** Unit test the pure parts directly (extract pure functions for debouncing, drainage, predicates) — don't try to test the async iterable wiring. +6. **Wire the UI.** Hook in `packages/ui/features//` is a thin `useQuery` / `useMutation` / `useSubscription` over the tRPC procedure. No `useEffect` / `useRef` / `useState` ceremony. If you reach for those, the orchestration is in the wrong place — push it upstream and try again. +7. **Delete the old main service and router.** No shims, no re-exports — unless coexistence is genuinely needed for fan-in consumers ([Coexistence and bridges](#coexistence-and-bridges)). 8. **Apply in-slice cleanups.** See below. -9. **Add a MIGRATION.md entry.** What moved, what was cleaned, what was deliberately left. +9. **Add a MIGRATION.md entry.** What moved, what was cleaned, what was deliberately left, what the retirement condition is for any bridge. --- -## Splitting a mega-feature +## Coexistence and bridges + +This codebase is heavily inter-coupled — most main-process services consume events from, or call methods on, other main-process services. A pure "one feature, one slice, delete the old" port is the exception, not the rule. Expect coexistence; design for it. + +**The default pattern: bridge the old module.** When you move a feature's guts into `packages/core//` + `packages/workspace-server//` + `packages/ui//`, the old `apps/code/src/main/services//service.ts` doesn't have to die in the same change. Keep it as a thin shim that: -Some features are too large to move in one pass — the canonical example is the renderer-side `sessions` module (thousands of lines, owns its own state machines, holds subscriptions, reaches into other stores). Trying to port that in one go is how a refactor stalls for a week. +- constructs the new core module (or the workspace-server-backed client) at boot, +- forwards the events and methods its in-process consumers already depend on, +- holds no logic of its own — just delegation. -When the audit blows past one paragraph, **carve the feature into slices and migrate slice-by-slice**, not file-by-file. A slice is the smallest user-visible capability that can stand on its own: "list sessions," "create session," "session detail view," "session permissions stream." Each slice is its own pass through the per-feature procedure above, with its own MIGRATION.md entry. +The shim is the seam. Other main services keep depending on it unchanged. As each of *those* services migrates later, they drop their dependency on the shim. When the last one is gone, delete the shim. -Rules for slicing: +Mark the shim file with a one-liner at the top: `// PORT NOTE: shim — delegates to . Delete when migrate.` That tells the next reader (or agent) exactly what's keeping it alive and what unblocks its removal. -- **Pick the most read-only slice first.** Lists and detail views before mutations. Mutations before subscriptions. Subscriptions before anything that coordinates across other features. -- **The old module stays alive until the last slice lands.** New `packages/core//` and old `apps/code/src/...` coexist during the migration. That's fine — but the coexistence is the cost you're paying to land slices safely, not a permanent state. Don't add new code to the old module. -- **No shared helpers across the seam.** If a slice in `core/` needs a helper that still lives in the old module, copy it (mark with `// PORT NOTE: duplicated from , removed when lands`). Importing across the seam glues the two halves together and defeats the point. -- **Track the slices explicitly.** Open a tracking issue or a checklist at the top of MIGRATION.md for the feature. Each landed slice ticks a box. The feature isn't "migrated" until every box is ticked and the old module is deleted. -- **Stop and re-plan if a slice doesn't fit the model.** If you carve off "session detail view" and discover it can't be expressed without dragging half the state machine with it, that's a signal the slice boundary is wrong — not a signal to widen the slice. Re-slice. +**Skip the shim when the new class is signature-compatible with the old DI binding.** If the new `core//service.ts` already exposes the same methods and event API the old service did, you don't need a shim at all — just late-bind the new class to the existing DI token at bootstrap. The pattern (taken from the file-watcher migration): + +```ts +// In main bootstrap, after the new class's async prereqs are ready: +const connection = await wsServer.start(); +const workspaceClient = createWorkspaceClient(connection); +container + .bind(MAIN_TOKENS.FileWatcherService) + .toConstantValue(new CoreFileWatcherService({ workspace: workspaceClient })); + +await initializeServices(); // existing consumers resolve here, unchanged +``` -If you can't find a clean first slice at all, the feature probably has a layering problem that needs to be named before the move starts. Raise it. +Remove the static `container.bind(...).to(OldClass)` from `container.ts`. Consumers keep `@inject(MAIN_TOKENS.X) private x: X` — only the *type import path* changes (from `../X/service` to `@posthog/core/X/service`). The DI token now points at the core class; no delegation layer, no event re-emission, no shim file to delete later. + +This works when (a) the core class's public API is a strict superset of the old one, (b) there's a clean bootstrap point where the async prereqs are known to be ready, and (c) nothing tries to resolve the token earlier than that point. Verify (c) by grepping the token — `services/index.ts` side-effect imports, top-level `register*Handlers()` calls, etc. should not transitively `container.get` it before your bind runs. + +**When the feature itself is too big to port in one slice** (the renderer-side `sessions` module is the canonical example — thousands of lines, owns state machines, holds subscriptions, reaches into other stores), carve it into smaller user-visible slices: "list sessions," "create session," "session detail view," "session permissions stream." Each slice is its own pass through the per-feature procedure, with its own MIGRATION.md entry. Pick read-only slices before mutations, mutations before subscriptions. + +Rules that hold for both bridging and slicing: + +- **Don't add new code to the old module.** New logic goes in the new home. The old code is in maintenance mode for the duration. +- **Don't import across the seam in the wrong direction.** New `core/` code never imports from the old `apps/code/...` module — the dependency goes old → new, not new → old. If `core/` needs a helper that still only lives in the old module, copy it (mark with `// PORT NOTE: duplicated from , removed when lands`). +- **Track open coexistence in MIGRATION.md.** Each entry says what's still in the old location and what triggers its removal — fan-in waiting to migrate, shims keeping the boot path stable, helpers temporarily duplicated. +- **Coexistence is the cost, not the goal.** Every shim and duplicate is a debt with a known retirement condition. If you find one without a retirement condition, that's the layering problem — name it. + +If you genuinely can't find any tractable slice (the feature is so entangled that even a shim doesn't isolate the new code), that's a layering problem, not a porting problem. Raise it before starting. --- @@ -180,9 +222,12 @@ If you find debt that isn't a forbidden pattern and isn't a layering fix, **leav ## Recommended order 1. **Read-only, no subscriptions.** Done — diff-stats. -2. **Read-only, subscription-based** (file-watcher, sync-status). Proves the streaming transport. -3. **Write paths** (focus mode, worktree ops). -4. **Terminal / pty proxying.** Most ambitious. Tests the full pipeline including binary data. +2. **Read-only, subscription-based** — done. file-watcher proved the SSE streaming transport (workspace-client `splitLink` + `httpSubscriptionLink`, hono server accepting `?secret=` query). +3. **Auth / api-client-adjacent.** Exercises the api-client path end-to-end. Next. +4. **Write paths** (focus mode, worktree ops). +5. **Terminal / pty proxying.** Most ambitious. Tests the full pipeline including binary data. + +The first two slices also surfaced two recurring patterns now baked into the ground rules: source-smoothing belongs with the source (not core), and hooks are pure react-query idioms (not useEffect wrappers). Apply them on every slice going forward. --- diff --git a/apps/code/package.json b/apps/code/package.json index 048c475b4e..2a3af5f702 100644 --- a/apps/code/package.json +++ b/apps/code/package.json @@ -132,6 +132,7 @@ "@pierre/diffs": "^1.1.21", "@posthog/agent": "workspace:*", "@posthog/api-client": "workspace:*", + "@posthog/core": "workspace:*", "@posthog/electron-trpc": "workspace:*", "@posthog/enricher": "workspace:*", "@posthog/git": "workspace:*", diff --git a/apps/code/src/main/di/container.ts b/apps/code/src/main/di/container.ts index 0cf7eb8296..661b2dd62b 100644 --- a/apps/code/src/main/di/container.ts +++ b/apps/code/src/main/di/container.ts @@ -38,7 +38,6 @@ import { DeepLinkService } from "../services/deep-link/service"; import { EnrichmentService } from "../services/enrichment/service"; import { EnvironmentService } from "../services/environment/service"; import { ExternalAppsService } from "../services/external-apps/service"; -import { FileWatcherService } from "../services/file-watcher/service"; import { FocusService } from "../services/focus/service"; import { FocusSyncService } from "../services/focus/sync-service"; import { FoldersService } from "../services/folders/service"; @@ -125,7 +124,6 @@ container.bind(MAIN_TOKENS.ProvisioningService).to(ProvisioningService); container.bind(MAIN_TOKENS.ExternalAppsService).to(ExternalAppsService); container.bind(MAIN_TOKENS.LlmGatewayService).to(LlmGatewayService); container.bind(MAIN_TOKENS.McpAppsService).to(McpAppsService); -container.bind(MAIN_TOKENS.FileWatcherService).to(FileWatcherService); container.bind(MAIN_TOKENS.FocusService).to(FocusService); container.bind(MAIN_TOKENS.FocusSyncService).to(FocusSyncService); container.bind(MAIN_TOKENS.FoldersService).to(FoldersService); diff --git a/apps/code/src/main/index.ts b/apps/code/src/main/index.ts index e07e7b4033..6e4d977d35 100644 --- a/apps/code/src/main/index.ts +++ b/apps/code/src/main/index.ts @@ -1,7 +1,9 @@ import "reflect-metadata"; import os from "node:os"; +import { createWorkspaceClient } from "@posthog/workspace-client/client"; import { app, BrowserWindow, dialog } from "electron"; import log from "electron-log/main"; +import { FileWatcherBridge } from "./services/file-watcher/bridge"; import "./utils/logger"; import "./services/index.js"; import { ANALYTICS_EVENTS } from "@shared/types/analytics"; @@ -231,12 +233,18 @@ app.whenReady().then(async () => { ensureClaudeConfigDir(); registerMcpSandboxProtocol(); createWindow(); + + const wsServer = container.get( + MAIN_TOKENS.WorkspaceServerService, + ); + const connection = await wsServer.start(); + const workspaceClient = createWorkspaceClient(connection); + container + .bind(MAIN_TOKENS.FileWatcherService) + .toConstantValue(new FileWatcherBridge(workspaceClient)); + await initializeServices(); initializeDeepLinks(); - container - .get(MAIN_TOKENS.WorkspaceServerService) - .start() - .catch((err) => log.error("workspace-server failed to start", err)); }); app.on("window-all-closed", () => { diff --git a/apps/code/src/main/services/archive/service.ts b/apps/code/src/main/services/archive/service.ts index 1c3dec2600..98830cfa3c 100644 --- a/apps/code/src/main/services/archive/service.ts +++ b/apps/code/src/main/services/archive/service.ts @@ -27,7 +27,7 @@ import type { WorktreeRepository } from "../../db/repositories/worktree-reposito import { MAIN_TOKENS } from "../../di/tokens"; import { logger } from "../../utils/logger"; import type { AgentService } from "../agent/service"; -import type { FileWatcherService } from "../file-watcher/service"; +import type { FileWatcherBridge } from "../file-watcher/bridge"; import type { ProcessTrackingService } from "../process-tracking/service"; import { getWorktreeLocation } from "../settingsStore"; import type { ArchivedTask, ArchiveTaskInput } from "./schemas"; @@ -44,7 +44,7 @@ export class ArchiveService { @inject(MAIN_TOKENS.ProcessTrackingService) private readonly processTracking: ProcessTrackingService, @inject(MAIN_TOKENS.FileWatcherService) - private readonly fileWatcher: FileWatcherService, + private readonly fileWatcher: FileWatcherBridge, @inject(MAIN_TOKENS.RepositoryRepository) private readonly repositoryRepo: RepositoryRepository, @inject(MAIN_TOKENS.WorkspaceRepository) diff --git a/apps/code/src/main/services/file-watcher/bridge.ts b/apps/code/src/main/services/file-watcher/bridge.ts new file mode 100644 index 0000000000..a70146818a --- /dev/null +++ b/apps/code/src/main/services/file-watcher/bridge.ts @@ -0,0 +1,36 @@ +import type { WorkspaceClient } from "@posthog/workspace-client/client"; +import type { FileWatcherEvent } from "@posthog/workspace-client/types"; +import { TypedEventEmitter } from "../../utils/typed-event-emitter"; + +type FileWatcherEventsByKind = { + [K in FileWatcherEvent["kind"]]: Extract; +}; + +export class FileWatcherBridge extends TypedEventEmitter { + private subs = new Map void }>(); + + constructor(private workspace: WorkspaceClient) { + super(); + } + + startWatching(repoPath: string): void { + if (this.subs.has(repoPath)) return; + const sub = this.workspace.fileWatcher.watch.subscribe( + { repoPath }, + { + onData: (event) => { + this.emit(event.kind, event as never); + }, + onError: () => {}, + }, + ); + this.subs.set(repoPath, sub); + } + + stopWatching(repoPath: string): void { + const sub = this.subs.get(repoPath); + if (!sub) return; + sub.unsubscribe(); + this.subs.delete(repoPath); + } +} diff --git a/apps/code/src/main/services/file-watcher/schemas.ts b/apps/code/src/main/services/file-watcher/schemas.ts deleted file mode 100644 index f4072853a0..0000000000 --- a/apps/code/src/main/services/file-watcher/schemas.ts +++ /dev/null @@ -1,60 +0,0 @@ -import { z } from "zod"; - -export const listDirectoryInput = z.object({ - dirPath: z.string(), -}); - -export const watcherInput = z.object({ - repoPath: z.string(), -}); - -const directoryEntry = z.object({ - name: z.string(), - path: z.string(), - type: z.enum(["file", "directory"]), -}); - -export const listDirectoryOutput = z.array(directoryEntry); - -export type ListDirectoryInput = z.infer; -export type WatcherInput = z.infer; -export type DirectoryEntry = z.infer; - -export const FileWatcherEvent = { - DirectoryChanged: "directory-changed", - FileChanged: "file-changed", - FileDeleted: "file-deleted", - GitStateChanged: "git-state-changed", - WorkingTreeChanged: "working-tree-changed", -} as const; - -export type DirectoryChangedPayload = { - repoPath: string; - dirPath: string; -}; - -export type FileChangedPayload = { - repoPath: string; - filePath: string; -}; - -export type FileDeletedPayload = { - repoPath: string; - filePath: string; -}; - -export type GitStateChangedPayload = { - repoPath: string; -}; - -export type WorkingTreeChangedPayload = { - repoPath: string; -}; - -export interface FileWatcherEvents { - [FileWatcherEvent.DirectoryChanged]: DirectoryChangedPayload; - [FileWatcherEvent.FileChanged]: FileChangedPayload; - [FileWatcherEvent.FileDeleted]: FileDeletedPayload; - [FileWatcherEvent.GitStateChanged]: GitStateChangedPayload; - [FileWatcherEvent.WorkingTreeChanged]: WorkingTreeChangedPayload; -} diff --git a/apps/code/src/main/services/file-watcher/service.test.ts b/apps/code/src/main/services/file-watcher/service.test.ts deleted file mode 100644 index 465ca0b573..0000000000 --- a/apps/code/src/main/services/file-watcher/service.test.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; -import { FileWatcherEvent } from "./schemas"; - -vi.mock("../../utils/logger.js", () => ({ - logger: { - scope: () => ({ - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - }), - }, -})); - -vi.mock("@parcel/watcher", () => ({ subscribe: vi.fn() })); - -import type { WatcherRegistryService } from "../watcher-registry/service"; -import { FileWatcherService } from "./service"; - -interface PendingChanges { - dirs: Set; - files: Set; - deletes: Set; - timer: ReturnType | null; -} - -const makePending = ( - overrides: Partial = {}, -): PendingChanges => ({ - dirs: new Set(["/repo/src"]), - files: new Set(["/repo/src/a.ts"]), - deletes: new Set(["/repo/src/b.ts"]), - timer: setTimeout(() => {}, 1_000_000), - ...overrides, -}); - -describe("FileWatcherService.flushPending", () => { - let registry: { isShutdown: boolean }; - let service: FileWatcherService; - - beforeEach(() => { - registry = { isShutdown: false }; - service = new FileWatcherService( - registry as unknown as WatcherRegistryService, - ); - }); - - it("does not emit events when the watcher registry is shut down", () => { - registry.isShutdown = true; - const emitSpy = vi.spyOn(service, "emit"); - const pending = makePending(); - - ( - service as unknown as { - flushPending: (repoPath: string, pending: PendingChanges) => void; - } - ).flushPending("/repo", pending); - - expect(emitSpy).not.toHaveBeenCalled(); - expect(pending.dirs.size).toBe(0); - expect(pending.files.size).toBe(0); - expect(pending.deletes.size).toBe(0); - expect(pending.timer).toBeNull(); - }); - - it("emits per-path events when the registry is active", () => { - const emitSpy = vi.spyOn(service, "emit"); - const pending = makePending(); - - ( - service as unknown as { - flushPending: (repoPath: string, pending: PendingChanges) => void; - } - ).flushPending("/repo", pending); - - expect(emitSpy).toHaveBeenCalledWith(FileWatcherEvent.DirectoryChanged, { - repoPath: "/repo", - dirPath: "/repo/src", - }); - expect(emitSpy).toHaveBeenCalledWith(FileWatcherEvent.FileChanged, { - repoPath: "/repo", - filePath: "/repo/src/a.ts", - }); - expect(emitSpy).toHaveBeenCalledWith(FileWatcherEvent.FileDeleted, { - repoPath: "/repo", - filePath: "/repo/src/b.ts", - }); - expect(pending.timer).toBeNull(); - }); -}); diff --git a/apps/code/src/main/services/file-watcher/service.ts b/apps/code/src/main/services/file-watcher/service.ts deleted file mode 100644 index e86e5b486f..0000000000 --- a/apps/code/src/main/services/file-watcher/service.ts +++ /dev/null @@ -1,266 +0,0 @@ -import { existsSync } from "node:fs"; -import fs from "node:fs/promises"; -import path from "node:path"; -import * as watcher from "@parcel/watcher"; -import { inject, injectable } from "inversify"; -import { MAIN_TOKENS } from "../../di/tokens"; -import { logger } from "../../utils/logger"; -import { TypedEventEmitter } from "../../utils/typed-event-emitter"; -import type { WatcherRegistryService } from "../watcher-registry/service"; -import { - type DirectoryEntry, - FileWatcherEvent, - type FileWatcherEvents, -} from "./schemas"; - -const log = logger.scope("file-watcher"); - -const IGNORE_PATTERNS = ["**/node_modules/**", "**/.git/**", "**/.jj/**"]; -const DEBOUNCE_MS = 500; -const BULK_THRESHOLD = 100; - -interface PendingChanges { - dirs: Set; - files: Set; - deletes: Set; - timer: ReturnType | null; -} - -interface RepoWatcher { - filesId: string; - gitIds: string[]; - pending: PendingChanges; -} - -@injectable() -export class FileWatcherService extends TypedEventEmitter { - private watchers = new Map(); - - constructor( - @inject(MAIN_TOKENS.WatcherRegistryService) - private watcherRegistry: WatcherRegistryService, - ) { - super(); - } - - async listDirectory(dirPath: string): Promise { - try { - const entries = await fs.readdir(dirPath, { withFileTypes: true }); - return entries - .filter((e) => !e.name.startsWith(".")) - .map((e) => ({ - name: e.name, - path: path.join(dirPath, e.name), - type: e.isDirectory() ? ("directory" as const) : ("file" as const), - })) - .sort((a, b) => - a.type !== b.type - ? a.type === "directory" - ? -1 - : 1 - : a.name.localeCompare(b.name), - ); - } catch (error) { - log.error("Failed to list directory:", error); - return []; - } - } - - async startWatching(repoPath: string): Promise { - if (this.watchers.has(repoPath)) return; - - const pending: PendingChanges = { - dirs: new Set(), - files: new Set(), - deletes: new Set(), - timer: null, - }; - - const filesId = `file-watcher:files:${repoPath}`; - - const filesSub = await this.watchFiles(repoPath, pending); - this.watcherRegistry.register(filesId, filesSub); - - const gitIds: string[] = []; - const gitSubs = await this.watchGit(repoPath); - if (gitSubs) { - for (let i = 0; i < gitSubs.length; i++) { - const gitId = `file-watcher:git:${repoPath}:${i}`; - this.watcherRegistry.register(gitId, gitSubs[i]); - gitIds.push(gitId); - } - } - - this.watchers.set(repoPath, { - filesId, - gitIds, - pending, - }); - } - - async stopWatching(repoPath: string): Promise { - const w = this.watchers.get(repoPath); - if (!w) return; - - if (w.pending.timer) clearTimeout(w.pending.timer); - await this.watcherRegistry.unregister(w.filesId); - for (const gitId of w.gitIds) { - await this.watcherRegistry.unregister(gitId); - } - this.watchers.delete(repoPath); - } - - private async watchFiles( - repoPath: string, - pending: PendingChanges, - ): Promise { - return watcher.subscribe( - repoPath, - (err, events) => { - if (this.watcherRegistry.isShutdown) return; - if (err) { - this.handleWatcherError(err, repoPath); - return; - } - this.queueEvents(repoPath, pending, events); - }, - { ignore: IGNORE_PATTERNS }, - ); - } - - private handleWatcherError(err: Error, repoPath: string): void { - if (!existsSync(repoPath)) { - log.info(`Directory deleted, stopping watcher: ${repoPath}`); - this.stopWatching(repoPath).catch((e) => - log.warn(`Failed to stop watcher: ${e}`), - ); - } else { - log.debug("Watcher error:", err); - } - } - - private queueEvents( - repoPath: string, - pending: PendingChanges, - events: watcher.Event[], - ): void { - for (const event of events) { - pending.dirs.add(path.dirname(event.path)); - if (event.type === "delete") { - pending.deletes.add(event.path); - } else { - pending.files.add(event.path); - } - } - - if (pending.timer) clearTimeout(pending.timer); - pending.timer = setTimeout( - () => this.flushPending(repoPath, pending), - DEBOUNCE_MS, - ); - } - - private flushPending(repoPath: string, pending: PendingChanges): void { - if (this.watcherRegistry.isShutdown) { - pending.dirs.clear(); - pending.files.clear(); - pending.deletes.clear(); - pending.timer = null; - return; - } - - const totalChanges = pending.files.size + pending.deletes.size; - - if (totalChanges > 0) { - this.emit(FileWatcherEvent.WorkingTreeChanged, { repoPath }); - } - - if (totalChanges > BULK_THRESHOLD) { - pending.dirs.clear(); - pending.files.clear(); - pending.deletes.clear(); - pending.timer = null; - return; - } - - for (const dirPath of pending.dirs) { - this.emit(FileWatcherEvent.DirectoryChanged, { repoPath, dirPath }); - } - for (const filePath of pending.files) { - this.emit(FileWatcherEvent.FileChanged, { repoPath, filePath }); - } - for (const filePath of pending.deletes) { - this.emit(FileWatcherEvent.FileDeleted, { repoPath, filePath }); - } - - pending.dirs.clear(); - pending.files.clear(); - pending.deletes.clear(); - pending.timer = null; - } - - private async watchGit( - repoPath: string, - ): Promise { - try { - const gitDir = await this.resolveGitDir(repoPath); - const subscriptions: watcher.AsyncSubscription[] = []; - - const handleEvents = (err: Error | null, events: watcher.Event[]) => { - if (this.watcherRegistry.isShutdown) return; - if (err) { - log.error("Git watcher error:", err); - return; - } - const isRelevant = events.some( - (e) => - e.path.endsWith("/HEAD") || - e.path.endsWith("/index") || - e.path.endsWith("/MERGE_HEAD") || - e.path.endsWith("/CHERRY_PICK_HEAD") || - e.path.endsWith("/REVERT_HEAD") || - e.path.includes("/rebase-merge") || - e.path.includes("/rebase-apply") || - e.path.includes("/refs/heads/"), - ); - if (isRelevant) { - this.emit(FileWatcherEvent.GitStateChanged, { repoPath }); - } - }; - - subscriptions.push(await watcher.subscribe(gitDir, handleEvents)); - - const commonDir = await this.resolveCommonDir(gitDir); - if (commonDir && commonDir !== gitDir) { - subscriptions.push(await watcher.subscribe(commonDir, handleEvents)); - } - - return subscriptions; - } catch (error) { - log.warn("Failed to set up git watcher:", error); - return null; - } - } - - private async resolveCommonDir(gitDir: string): Promise { - try { - const commonDirFile = path.join(gitDir, "commondir"); - const content = await fs.readFile(commonDirFile, "utf-8"); - return path.resolve(gitDir, content.trim()); - } catch { - return null; - } - } - - private async resolveGitDir(repoPath: string): Promise { - const gitPath = path.join(repoPath, ".git"); - const stat = await fs.stat(gitPath); - - if (stat.isDirectory()) return gitPath; - - const content = await fs.readFile(gitPath, "utf-8"); - const match = content.match(/gitdir:\s*(.+)/); - if (!match) throw new Error("Invalid .git file format"); - return path.resolve(match[1].trim()); - } -} diff --git a/apps/code/src/main/services/fs/service.ts b/apps/code/src/main/services/fs/service.ts index 189a5a739a..d6b220abfb 100644 --- a/apps/code/src/main/services/fs/service.ts +++ b/apps/code/src/main/services/fs/service.ts @@ -1,11 +1,11 @@ import fs from "node:fs"; import path from "node:path"; import { getChangedFiles, listAllFiles } from "@posthog/git/queries"; +import { FileWatcherEventKind as FileWatcherEvent } from "@posthog/workspace-server/services/watcher/schemas"; import { inject, injectable } from "inversify"; import { MAIN_TOKENS } from "../../di/tokens"; import { logger } from "../../utils/logger"; -import { FileWatcherEvent } from "../file-watcher/schemas"; -import type { FileWatcherService } from "../file-watcher/service"; +import type { FileWatcherBridge } from "../file-watcher/bridge"; import type { BoundedReadResult, FileEntry } from "./schemas"; const log = logger.scope("fs"); @@ -18,7 +18,7 @@ export class FsService { constructor( @inject(MAIN_TOKENS.FileWatcherService) - private fileWatcher: FileWatcherService, + private fileWatcher: FileWatcherBridge, ) { this.fileWatcher.on(FileWatcherEvent.FileChanged, ({ repoPath }) => { this.invalidateCache(repoPath); diff --git a/apps/code/src/main/services/suspension/service.test.ts b/apps/code/src/main/services/suspension/service.test.ts index 5447f26ccb..d3a8135eae 100644 --- a/apps/code/src/main/services/suspension/service.test.ts +++ b/apps/code/src/main/services/suspension/service.test.ts @@ -91,7 +91,7 @@ import type { Workspace } from "../../db/repositories/workspace-repository.js"; import { createMockWorkspaceRepository } from "../../db/repositories/workspace-repository.mock.js"; import { createMockWorktreeRepository } from "../../db/repositories/worktree-repository.mock.js"; import type { AgentService } from "../agent/service.js"; -import type { FileWatcherService } from "../file-watcher/service.js"; +import type { FileWatcherBridge } from "../file-watcher/bridge"; import type { ProcessTrackingService } from "../process-tracking/service.js"; import { SuspensionService } from "./service.js"; @@ -104,7 +104,7 @@ function createMocks() { } as unknown as ProcessTrackingService; const fileWatcher = { stopWatching: vi.fn(), - } as unknown as FileWatcherService; + } as unknown as FileWatcherBridge; const repositoryRepo = createMockRepositoryRepository(); const workspaceRepo = createMockWorkspaceRepository(); const worktreeRepo = createMockWorktreeRepository(); diff --git a/apps/code/src/main/services/suspension/service.ts b/apps/code/src/main/services/suspension/service.ts index 5409bcff61..ba765fc3c3 100644 --- a/apps/code/src/main/services/suspension/service.ts +++ b/apps/code/src/main/services/suspension/service.ts @@ -24,7 +24,7 @@ import { MAIN_TOKENS } from "../../di/tokens.js"; import { logger } from "../../utils/logger.js"; import { TypedEventEmitter } from "../../utils/typed-event-emitter.js"; import type { AgentService } from "../agent/service.js"; -import type { FileWatcherService } from "../file-watcher/service.js"; +import type { FileWatcherBridge } from "../file-watcher/bridge"; import type { ProcessTrackingService } from "../process-tracking/service.js"; import { getAutoSuspendAfterDays, @@ -65,7 +65,7 @@ export class SuspensionService extends TypedEventEmitter if (this.branchWatcherInitialized) return; this.branchWatcherInitialized = true; - const fileWatcher = container.get( + const fileWatcher = container.get( MAIN_TOKENS.FileWatcherService, ); const focusService = container.get(MAIN_TOKENS.FocusService); @@ -1188,7 +1188,7 @@ export class WorkspaceService extends TypedEventEmitter branchName: string | null, ): Promise { try { - const fileWatcher = container.get( + const fileWatcher = container.get( MAIN_TOKENS.FileWatcherService, ); await fileWatcher.stopWatching(worktreePath); diff --git a/apps/code/src/main/trpc/routers/file-watcher.ts b/apps/code/src/main/trpc/routers/file-watcher.ts index c442556d10..95e5e78ff4 100644 --- a/apps/code/src/main/trpc/routers/file-watcher.ts +++ b/apps/code/src/main/trpc/routers/file-watcher.ts @@ -1,34 +1,15 @@ +import { z } from "zod"; import { container } from "../../di/container"; import { MAIN_TOKENS } from "../../di/tokens"; -import { - FileWatcherEvent, - type FileWatcherEvents, - listDirectoryInput, - listDirectoryOutput, - watcherInput, -} from "../../services/file-watcher/schemas"; -import type { FileWatcherService } from "../../services/file-watcher/service"; +import type { FileWatcherBridge } from "../../services/file-watcher/bridge"; import { publicProcedure, router } from "../trpc"; -const getService = () => - container.get(MAIN_TOKENS.FileWatcherService); +const watcherInput = z.object({ repoPath: z.string() }); -function subscribe(event: K) { - return publicProcedure.subscription(async function* (opts) { - const service = getService(); - const iterable = service.toIterable(event, { signal: opts.signal }); - for await (const data of iterable) { - yield data; - } - }); -} +const getService = () => + container.get(MAIN_TOKENS.FileWatcherService); export const fileWatcherRouter = router({ - listDirectory: publicProcedure - .input(listDirectoryInput) - .output(listDirectoryOutput) - .query(({ input }) => getService().listDirectory(input.dirPath)), - start: publicProcedure .input(watcherInput) .mutation(({ input }) => getService().startWatching(input.repoPath)), @@ -36,10 +17,4 @@ export const fileWatcherRouter = router({ stop: publicProcedure .input(watcherInput) .mutation(({ input }) => getService().stopWatching(input.repoPath)), - - onDirectoryChanged: subscribe(FileWatcherEvent.DirectoryChanged), - onFileChanged: subscribe(FileWatcherEvent.FileChanged), - onFileDeleted: subscribe(FileWatcherEvent.FileDeleted), - onGitStateChanged: subscribe(FileWatcherEvent.GitStateChanged), - onWorkingTreeChanged: subscribe(FileWatcherEvent.WorkingTreeChanged), }); diff --git a/apps/code/src/renderer/features/task-detail/components/FileTreePanel.tsx b/apps/code/src/renderer/features/task-detail/components/FileTreePanel.tsx index a108e6914f..0a6018fb4a 100644 --- a/apps/code/src/renderer/features/task-detail/components/FileTreePanel.tsx +++ b/apps/code/src/renderer/features/task-detail/components/FileTreePanel.tsx @@ -9,13 +9,14 @@ import { import { useCwd } from "@features/sidebar/hooks/useCwd"; import { useCloudRunState } from "@features/task-detail/hooks/useCloudRunState"; import { Cloud } from "@phosphor-icons/react"; +import { useFileWatcher as useFileWatcherUI } from "@posthog/ui/features/file-watcher/useFileWatcher"; +import { useWorkspaceTRPC } from "@posthog/workspace-client/trpc"; import { Box, Button, Flex, Spinner, Text } from "@radix-ui/themes"; import { useIsCloudTask } from "@renderer/features/workspace/hooks/useIsCloudTask"; import { useWorkspace } from "@renderer/features/workspace/hooks/useWorkspace"; -import { trpcClient, useTRPC } from "@renderer/trpc/client"; +import { trpcClient } from "@renderer/trpc/client"; import type { Task } from "@shared/types"; import { useQuery, useQueryClient } from "@tanstack/react-query"; -import { useSubscription } from "@trpc/tanstack-react-query"; import { handleExternalAppAction } from "@utils/handleExternalAppAction"; import { toRelativePath } from "@utils/path"; @@ -52,10 +53,10 @@ function LazyTreeItem({ const collapseAll = useFileTreeStore((state) => state.collapseAll); const openFileInSplit = usePanelLayoutStore((state) => state.openFileInSplit); const workspace = useWorkspace(taskId); - const trpc = useTRPC(); + const wsTrpc = useWorkspaceTRPC(); const { data: children } = useQuery( - trpc.fileWatcher.listDirectory.queryOptions( + wsTrpc.fs.listDirectory.queryOptions( { dirPath: entry.path }, { enabled: entry.type === "directory" && isExpanded, @@ -205,34 +206,30 @@ export function FileTreePanel({ taskId, task }: FileTreePanelProps) { } function LocalFileTreePanel({ taskId, task: _task }: FileTreePanelProps) { - const trpc = useTRPC(); const workspace = useWorkspace(taskId); const repoPath = useCwd(taskId); const mainRepoPath = workspace?.folderPath; const queryClient = useQueryClient(); const layout = usePanelLayoutStore((state) => state.getLayout(taskId)); + const wsTrpc = useWorkspaceTRPC(); const { data: rootEntries, isLoading, error, } = useQuery( - trpc.fileWatcher.listDirectory.queryOptions( + wsTrpc.fs.listDirectory.queryOptions( { dirPath: repoPath as string }, { enabled: !!repoPath, staleTime: Infinity }, ), ); - useSubscription( - trpc.fileWatcher.onDirectoryChanged.subscriptionOptions(undefined, { - enabled: !!repoPath, - onData: ({ dirPath }) => { - queryClient.invalidateQueries( - trpc.fileWatcher.listDirectory.queryFilter({ dirPath }), - ); - }, - }), - ); + useFileWatcherUI(repoPath ?? null, (event) => { + if (event.kind !== "directory-changed") return; + queryClient.invalidateQueries( + wsTrpc.fs.listDirectory.queryFilter({ dirPath: event.dirPath }), + ); + }); const isFileActive = (relativePath: string): boolean => { if (!layout) return false; diff --git a/apps/code/src/renderer/hooks/useFileWatcher.ts b/apps/code/src/renderer/hooks/useFileWatcher.ts index c8134f1710..15c3513773 100644 --- a/apps/code/src/renderer/hooks/useFileWatcher.ts +++ b/apps/code/src/renderer/hooks/useFileWatcher.ts @@ -3,12 +3,13 @@ import { invalidateGitWorkingTreeQueries, } from "@features/git-interaction/utils/gitCacheKeys"; import { usePanelLayoutStore } from "@features/panels/store/panelLayoutStore"; +import { useFileWatcher as useFileWatcherUI } from "@posthog/ui/features/file-watcher/useFileWatcher"; +import type { FileWatcherEvent } from "@posthog/workspace-client/types"; import { trpcClient, useTRPC } from "@renderer/trpc/client"; import { useQueryClient } from "@tanstack/react-query"; -import { useSubscription } from "@trpc/tanstack-react-query"; import { logger } from "@utils/logger"; import { toRelativePath } from "@utils/path"; -import { useEffect } from "react"; +import { useCallback, useEffect } from "react"; const log = logger.scope("file-watcher"); @@ -19,67 +20,49 @@ export function useFileWatcher(repoPath: string | null, taskId?: string) { useEffect(() => { if (!repoPath) return; - trpcClient.fileWatcher.start.mutate({ repoPath }).catch((error) => { - log.error("Failed to start file watcher:", error); + log.error("Failed to start main-side file watcher:", error); }); - return () => { trpcClient.fileWatcher.stop.mutate({ repoPath }); }; }, [repoPath]); - useSubscription( - trpc.fileWatcher.onFileChanged.subscriptionOptions(undefined, { - enabled: !!repoPath, - onData: ({ repoPath: rp, filePath }) => { - if (rp !== repoPath) return; - const relativePath = toRelativePath(filePath, repoPath); - queryClient.invalidateQueries( - trpc.fs.readRepoFile.queryFilter({ - repoPath, - filePath: relativePath, - }), - ); - queryClient.invalidateQueries( - trpc.fs.readRepoFileBounded.queryFilter({ - repoPath, - filePath: relativePath, - }), - ); - }, - }), - ); - - useSubscription( - trpc.fileWatcher.onFileDeleted.subscriptionOptions(undefined, { - enabled: !!repoPath, - onData: ({ repoPath: rp, filePath }) => { - if (rp !== repoPath) return; - if (!taskId) return; - const relativePath = toRelativePath(filePath, repoPath); - closeTabsForFile(taskId, relativePath); - }, - }), + const onEvent = useCallback( + (event: FileWatcherEvent) => { + if (!repoPath) return; + switch (event.kind) { + case "file-changed": { + const relativePath = toRelativePath(event.filePath, repoPath); + queryClient.invalidateQueries( + trpc.fs.readRepoFile.queryFilter({ + repoPath, + filePath: relativePath, + }), + ); + queryClient.invalidateQueries( + trpc.fs.readRepoFileBounded.queryFilter({ + repoPath, + filePath: relativePath, + }), + ); + return; + } + case "file-deleted": { + if (!taskId) return; + closeTabsForFile(taskId, toRelativePath(event.filePath, repoPath)); + return; + } + case "git-state-changed": + invalidateGitBranchQueries(repoPath); + return; + case "working-tree-changed": + invalidateGitWorkingTreeQueries(repoPath); + return; + } + }, + [repoPath, taskId, queryClient, trpc, closeTabsForFile], ); - useSubscription( - trpc.fileWatcher.onGitStateChanged.subscriptionOptions(undefined, { - enabled: !!repoPath, - onData: ({ repoPath: rp }) => { - if (rp !== repoPath) return; - invalidateGitBranchQueries(repoPath); - }, - }), - ); - - useSubscription( - trpc.fileWatcher.onWorkingTreeChanged.subscriptionOptions(undefined, { - enabled: !!repoPath, - onData: ({ repoPath: rp }) => { - if (rp !== repoPath) return; - invalidateGitWorkingTreeQueries(repoPath); - }, - }), - ); + useFileWatcherUI(repoPath, onEvent); } diff --git a/biome.jsonc b/biome.jsonc index e5698c67ca..2526157d43 100644 --- a/biome.jsonc +++ b/biome.jsonc @@ -180,9 +180,13 @@ "react", "react-dom", "node:*", - "@posthog/*" + "@posthog/*", + "!@posthog/shared", + "!@posthog/platform", + "!@posthog/workspace-client", + "!@posthog/api-client" ], - "message": "core is a pure layer that must not depend on any platform-specific dependencies." + "message": "core is a pure layer. Allowed @posthog deps: shared, platform, workspace-client, api-client. Never ui/* or workspace-server/*." } ] } diff --git a/packages/core/package.json b/packages/core/package.json index 9652040aec..63a6b8e67e 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -4,17 +4,24 @@ "description": "Zero-dependency pure domain layer. Types, schemas, pure functions. Runs in any JS environment (Node, Bun, browser, RN, edge). No I/O, no platform calls, no framework deps.", "private": true, "type": "module", + "exports": { + "./*": [ + "./src/*.ts", + "./src/*.tsx" + ] + }, "scripts": { - "clean": "node ../../scripts/rimraf.mjs dist .turbo" + "typecheck": "tsc --noEmit", + "clean": "node ../../scripts/rimraf.mjs .turbo" + }, + "dependencies": { + "@posthog/workspace-client": "workspace:*" }, "devDependencies": { "@posthog/tsconfig": "workspace:*", - "@posthog/tsup-config": "workspace:*", - "tsup": "catalog:", "typescript": "catalog:" }, "files": [ - "dist/**/*", "src/**/*" ] } diff --git a/packages/core/src/placeholder.ts b/packages/core/src/placeholder.ts new file mode 100644 index 0000000000..cb0ff5c3b5 --- /dev/null +++ b/packages/core/src/placeholder.ts @@ -0,0 +1 @@ +export {}; diff --git a/packages/ui/src/features/file-watcher/useFileWatcher.ts b/packages/ui/src/features/file-watcher/useFileWatcher.ts new file mode 100644 index 0000000000..87baf1d2a6 --- /dev/null +++ b/packages/ui/src/features/file-watcher/useFileWatcher.ts @@ -0,0 +1,18 @@ +import { useWorkspaceTRPC } from "@posthog/workspace-client/trpc"; +import type { FileWatcherEvent } from "@posthog/workspace-client/types"; +import { useSubscription } from "@trpc/tanstack-react-query"; + +export type { FileWatcherEvent }; + +export function useFileWatcher( + repoPath: string | null, + onEvent: (event: FileWatcherEvent) => void, +): void { + const trpc = useWorkspaceTRPC(); + useSubscription( + trpc.fileWatcher.watch.subscriptionOptions( + { repoPath: repoPath ?? "" }, + { enabled: !!repoPath, onData: onEvent }, + ), + ); +} diff --git a/packages/workspace-client/src/client.ts b/packages/workspace-client/src/client.ts index e35f1e5d19..79b518f93e 100644 --- a/packages/workspace-client/src/client.ts +++ b/packages/workspace-client/src/client.ts @@ -1,5 +1,10 @@ import type { AppRouter } from "@posthog/workspace-server/trpc"; -import { createTRPCClient, httpBatchLink } from "@trpc/client"; +import { + createTRPCClient, + httpBatchLink, + httpSubscriptionLink, + splitLink, +} from "@trpc/client"; import superjson from "superjson"; const SECRET_HEADER = "x-workspace-secret"; @@ -12,12 +17,23 @@ export interface WorkspaceConnection { export type WorkspaceClient = ReturnType; export function createWorkspaceClient(connection: WorkspaceConnection) { + const url = `${connection.url.replace(/\/$/, "")}/trpc`; + const headers = { [SECRET_HEADER]: connection.secret }; + const subscriptionUrl = `${url}?secret=${encodeURIComponent(connection.secret)}`; + return createTRPCClient({ links: [ - httpBatchLink({ - url: `${connection.url.replace(/\/$/, "")}/trpc`, - transformer: superjson, - headers: () => ({ [SECRET_HEADER]: connection.secret }), + splitLink({ + condition: (op) => op.type === "subscription", + true: httpSubscriptionLink({ + url: subscriptionUrl, + transformer: superjson, + }), + false: httpBatchLink({ + url, + transformer: superjson, + headers: () => headers, + }), }), ], }); diff --git a/packages/workspace-client/src/provider.tsx b/packages/workspace-client/src/provider.tsx index cccbedb8ca..24593ac6c6 100644 --- a/packages/workspace-client/src/provider.tsx +++ b/packages/workspace-client/src/provider.tsx @@ -1,13 +1,12 @@ -import type { AppRouter } from "@posthog/workspace-server/trpc"; import type { QueryClient } from "@tanstack/react-query"; -import { createTRPCClient, httpBatchLink } from "@trpc/client"; import { type ReactNode, useMemo } from "react"; -import superjson from "superjson"; -import type { WorkspaceConnection } from "./client"; +import { createWorkspaceClient, type WorkspaceConnection } from "./client"; import { WorkspaceTRPCProvider } from "./trpc"; -const SECRET_HEADER = "x-workspace-secret"; -const UNAVAILABLE_URL = "http://127.0.0.1:1/trpc-unavailable"; +const UNAVAILABLE: WorkspaceConnection = { + url: "http://127.0.0.1:1/trpc-unavailable", + secret: "", +}; export interface WorkspaceClientProviderProps { connection: WorkspaceConnection | null | undefined; @@ -20,21 +19,9 @@ export function WorkspaceClientProvider({ queryClient, children, }: WorkspaceClientProviderProps) { - const url = connection?.url; - const secret = connection?.secret; - const client = useMemo( - () => - createTRPCClient({ - links: [ - httpBatchLink({ - transformer: superjson, - url: url ? `${url.replace(/\/$/, "")}/trpc` : UNAVAILABLE_URL, - headers: () => (secret ? { [SECRET_HEADER]: secret } : {}), - }), - ], - }), - [url, secret], + () => createWorkspaceClient(connection ?? UNAVAILABLE), + [connection], ); return ( diff --git a/packages/workspace-client/src/types.ts b/packages/workspace-client/src/types.ts new file mode 100644 index 0000000000..3e09b64000 --- /dev/null +++ b/packages/workspace-client/src/types.ts @@ -0,0 +1,4 @@ +export type { + FileWatcherEvent, + FileWatcherEventKind, +} from "@posthog/workspace-server/services/watcher/schemas"; diff --git a/packages/workspace-server/package.json b/packages/workspace-server/package.json index 72a7e00a05..43488babb4 100644 --- a/packages/workspace-server/package.json +++ b/packages/workspace-server/package.json @@ -17,6 +17,7 @@ "dependencies": { "@hono/node-server": "catalog:", "@hono/trpc-server": "catalog:", + "@parcel/watcher": "catalog:", "@posthog/git": "workspace:*", "@trpc/server": "catalog:", "hono": "catalog:", diff --git a/packages/workspace-server/src/app.ts b/packages/workspace-server/src/app.ts index ca31e6a74e..d0c08963bf 100644 --- a/packages/workspace-server/src/app.ts +++ b/packages/workspace-server/src/app.ts @@ -19,7 +19,11 @@ export function createApp(options: CreateAppOptions): Hono { const expected = Buffer.from(options.sharedSecret); const requireSecret = createMiddleware(async (c, next) => { - const provided = Buffer.from(c.req.header(SECRET_HEADER) ?? ""); + // EventSource (used by tRPC SSE subscriptions) can't send custom headers, + // so subscriptions authenticate via a `secret` query param instead. + const headerSecret = c.req.header(SECRET_HEADER); + const querySecret = c.req.query("secret"); + const provided = Buffer.from(headerSecret ?? querySecret ?? ""); if ( provided.length !== expected.length || !timingSafeEqual(provided, expected) diff --git a/packages/workspace-server/src/di/container.ts b/packages/workspace-server/src/di/container.ts index 59f20cd2e1..87e8cc36df 100644 --- a/packages/workspace-server/src/di/container.ts +++ b/packages/workspace-server/src/di/container.ts @@ -1,7 +1,11 @@ import "reflect-metadata"; import { Container } from "inversify"; +import { FsService } from "../services/fs/service"; import { GitService } from "../services/git/service"; +import { WatcherService } from "../services/watcher/service"; import { TOKENS } from "./tokens"; export const container = new Container(); container.bind(TOKENS.GitService).to(GitService).inSingletonScope(); +container.bind(TOKENS.FsService).to(FsService).inSingletonScope(); +container.bind(TOKENS.WatcherService).to(WatcherService).inSingletonScope(); diff --git a/packages/workspace-server/src/di/tokens.ts b/packages/workspace-server/src/di/tokens.ts index b204ca4808..fdd0a370a6 100644 --- a/packages/workspace-server/src/di/tokens.ts +++ b/packages/workspace-server/src/di/tokens.ts @@ -1,3 +1,5 @@ export const TOKENS = Object.freeze({ GitService: Symbol.for("WorkspaceServer.GitService"), + FsService: Symbol.for("WorkspaceServer.FsService"), + WatcherService: Symbol.for("WorkspaceServer.WatcherService"), }); diff --git a/packages/workspace-server/src/services/fs/schemas.ts b/packages/workspace-server/src/services/fs/schemas.ts new file mode 100644 index 0000000000..7301e6d9f7 --- /dev/null +++ b/packages/workspace-server/src/services/fs/schemas.ts @@ -0,0 +1,12 @@ +import { z } from "zod"; + +export const directoryEntrySchema = z.object({ + name: z.string(), + path: z.string(), + type: z.enum(["file", "directory"]), +}); + +export type DirectoryEntry = z.infer; + +export const listDirectoryInput = z.object({ dirPath: z.string().min(1) }); +export const listDirectoryOutput = z.array(directoryEntrySchema); diff --git a/packages/workspace-server/src/services/fs/service.ts b/packages/workspace-server/src/services/fs/service.ts new file mode 100644 index 0000000000..ef303beea1 --- /dev/null +++ b/packages/workspace-server/src/services/fs/service.ts @@ -0,0 +1,29 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { injectable } from "inversify"; +import type { DirectoryEntry } from "./schemas"; + +@injectable() +export class FsService { + async listDirectory(dirPath: string): Promise { + try { + const entries = await fs.readdir(dirPath, { withFileTypes: true }); + return entries + .filter((e) => !e.name.startsWith(".")) + .map((e) => ({ + name: e.name, + path: path.join(dirPath, e.name), + type: e.isDirectory() ? ("directory" as const) : ("file" as const), + })) + .sort((a, b) => + a.type !== b.type + ? a.type === "directory" + ? -1 + : 1 + : a.name.localeCompare(b.name), + ); + } catch { + return []; + } + } +} diff --git a/packages/workspace-server/src/services/git/schemas.ts b/packages/workspace-server/src/services/git/schemas.ts new file mode 100644 index 0000000000..88e671109b --- /dev/null +++ b/packages/workspace-server/src/services/git/schemas.ts @@ -0,0 +1,11 @@ +import { z } from "zod"; + +export const diffStatsInput = z.object({ directoryPath: z.string().min(1) }); + +export const diffStatsSchema = z.object({ + filesChanged: z.number().int().nonnegative(), + linesAdded: z.number().int().nonnegative(), + linesRemoved: z.number().int().nonnegative(), +}); + +export type DiffStats = z.infer; diff --git a/packages/workspace-server/src/services/watcher/schemas.ts b/packages/workspace-server/src/services/watcher/schemas.ts new file mode 100644 index 0000000000..832bd94d50 --- /dev/null +++ b/packages/workspace-server/src/services/watcher/schemas.ts @@ -0,0 +1,58 @@ +import { z } from "zod"; + +export const watcherEventSchema = z.object({ + type: z.enum(["create", "update", "delete"]), + path: z.string(), +}); + +export type WatcherEvent = z.infer; + +export const watchInput = z.object({ + dirPath: z.string().min(1), + ignore: z.array(z.string()).optional(), +}); + +export const resolveGitDirsInput = z.object({ repoPath: z.string().min(1) }); +export const resolveGitDirsOutput = z.object({ + gitDir: z.string().nullable(), + commonDir: z.string().nullable(), +}); + +export const fileWatcherEventSchema = z.discriminatedUnion("kind", [ + z.object({ + kind: z.literal("directory-changed"), + repoPath: z.string(), + dirPath: z.string(), + }), + z.object({ + kind: z.literal("file-changed"), + repoPath: z.string(), + filePath: z.string(), + }), + z.object({ + kind: z.literal("file-deleted"), + repoPath: z.string(), + filePath: z.string(), + }), + z.object({ + kind: z.literal("git-state-changed"), + repoPath: z.string(), + }), + z.object({ + kind: z.literal("working-tree-changed"), + repoPath: z.string(), + }), +]); + +export type FileWatcherEvent = z.infer; +export type FileWatcherEventKind = FileWatcherEvent["kind"]; + +export const FileWatcherEventKind = { + DirectoryChanged: "directory-changed", + FileChanged: "file-changed", + FileDeleted: "file-deleted", + GitStateChanged: "git-state-changed", + WorkingTreeChanged: "working-tree-changed", +} as const; + +export const watchRepoInput = z.object({ repoPath: z.string().min(1) }); diff --git a/packages/workspace-server/src/services/watcher/service.ts b/packages/workspace-server/src/services/watcher/service.ts new file mode 100644 index 0000000000..427776f23e --- /dev/null +++ b/packages/workspace-server/src/services/watcher/service.ts @@ -0,0 +1,269 @@ +import { existsSync } from "node:fs"; +import fs from "node:fs/promises"; +import path from "node:path"; +import * as watcher from "@parcel/watcher"; +import { injectable } from "inversify"; +import type { FileWatcherEvent, WatcherEvent } from "./schemas"; + +export type WatchOptions = { + ignore?: string[]; +}; + +const IGNORE_PATTERNS = ["**/node_modules/**", "**/.git/**", "**/.jj/**"]; +const DEBOUNCE_MS = 500; +const BULK_THRESHOLD = 100; + +const dirname = (p: string): string => { + const i = Math.max(p.lastIndexOf("/"), p.lastIndexOf("\\")); + return i <= 0 ? p : p.slice(0, i); +}; + +const isRelevantGitEvent = (p: string): boolean => + p.endsWith("/HEAD") || + p.endsWith("/index") || + p.endsWith("/MERGE_HEAD") || + p.endsWith("/CHERRY_PICK_HEAD") || + p.endsWith("/REVERT_HEAD") || + p.includes("/rebase-merge") || + p.includes("/rebase-apply") || + p.includes("/refs/heads/"); + +interface Pending { + dirs: Set; + files: Set; + deletes: Set; +} + +const createPending = (): Pending => ({ + dirs: new Set(), + files: new Set(), + deletes: new Set(), +}); + +export const accumulateFsEvents = ( + pending: Pending, + events: WatcherEvent[], +): void => { + for (const event of events) { + pending.dirs.add(dirname(event.path)); + if (event.type === "delete") pending.deletes.add(event.path); + else pending.files.add(event.path); + } +}; + +export const drainPending = ( + repoPath: string, + pending: Pending, +): FileWatcherEvent[] => { + const totalChanges = pending.files.size + pending.deletes.size; + const out: FileWatcherEvent[] = []; + if (totalChanges === 0 && pending.dirs.size === 0) return out; + + if (totalChanges > 0) { + out.push({ kind: "working-tree-changed", repoPath }); + } + if (totalChanges <= BULK_THRESHOLD) { + for (const dirPath of pending.dirs) + out.push({ kind: "directory-changed", repoPath, dirPath }); + for (const filePath of pending.files) + out.push({ kind: "file-changed", repoPath, filePath }); + for (const filePath of pending.deletes) + out.push({ kind: "file-deleted", repoPath, filePath }); + } + pending.dirs.clear(); + pending.files.clear(); + pending.deletes.clear(); + return out; +}; + +@injectable() +export class WatcherService { + async *watch( + dirPath: string, + options: WatchOptions, + signal?: AbortSignal, + ): AsyncGenerator { + const effectiveSignal = signal ?? new AbortController().signal; + const queue: WatcherEvent[][] = []; + let resolve: ((value: WatcherEvent[][] | null) => void) | null = null; + let closed = false; + + const push = (events: WatcherEvent[]) => { + if (closed) return; + queue.push(events); + if (resolve) { + const r = resolve; + resolve = null; + r(queue.splice(0)); + } + }; + + const close = () => { + if (closed) return; + closed = true; + if (resolve) { + const r = resolve; + resolve = null; + r(null); + } + }; + + const subscription = await watcher.subscribe( + dirPath, + (err, events) => { + if (err) { + if (!existsSync(dirPath)) close(); + return; + } + push(events); + }, + { ignore: options.ignore }, + ); + + const onAbort = () => close(); + effectiveSignal.addEventListener("abort", onAbort, { once: true }); + + try { + while (!closed) { + if (queue.length > 0) { + for (const batch of queue.splice(0)) yield batch; + continue; + } + const next = await new Promise((r) => { + resolve = r; + }); + if (next === null) break; + for (const batch of next) yield batch; + } + } finally { + effectiveSignal.removeEventListener("abort", onAbort); + await subscription.unsubscribe().catch(() => {}); + } + } + + async *watchRepo( + repoPath: string, + signal?: AbortSignal, + ): AsyncGenerator { + const fileEvents = this.watch( + repoPath, + { ignore: IGNORE_PATTERNS }, + signal, + ); + const { gitDir, commonDir } = await this.resolveGitDirs(repoPath); + + const pending = createPending(); + let debounceTimer: ReturnType | null = null; + const outQueue: FileWatcherEvent[] = []; + let outResolve: ((next: FileWatcherEvent[] | null) => void) | null = null; + let outClosed = false; + + const pushOut = (events: FileWatcherEvent[]) => { + if (outClosed || events.length === 0) return; + outQueue.push(...events); + if (outResolve) { + const r = outResolve; + outResolve = null; + r(outQueue.splice(0)); + } + }; + + const closeOut = () => { + if (outClosed) return; + outClosed = true; + if (outResolve) { + const r = outResolve; + outResolve = null; + r(null); + } + }; + + const fileLoop = (async () => { + try { + for await (const batch of fileEvents) { + accumulateFsEvents(pending, batch); + if (debounceTimer) clearTimeout(debounceTimer); + debounceTimer = setTimeout(() => { + debounceTimer = null; + pushOut(drainPending(repoPath, pending)); + }, DEBOUNCE_MS); + } + } finally { + if (debounceTimer) clearTimeout(debounceTimer); + closeOut(); + } + })(); + + const gitLoops: Promise[] = []; + const gitDirs = [ + gitDir, + commonDir && commonDir !== gitDir ? commonDir : null, + ].filter((d): d is string => !!d); + for (const dir of gitDirs) { + gitLoops.push( + (async () => { + for await (const batch of this.watch(dir, {}, signal)) { + if (batch.some((e) => isRelevantGitEvent(e.path))) { + pushOut([{ kind: "git-state-changed", repoPath }]); + } + } + })().catch(() => {}), + ); + } + + signal?.addEventListener("abort", closeOut, { once: true }); + + try { + while (!outClosed) { + if (outQueue.length > 0) { + for (const event of outQueue.splice(0)) yield event; + continue; + } + const next = await new Promise((r) => { + outResolve = r; + }); + if (next === null) break; + for (const event of next) yield event; + } + } finally { + signal?.removeEventListener("abort", closeOut); + closeOut(); + await fileLoop.catch(() => {}); + await Promise.all(gitLoops).catch(() => {}); + } + } + + async resolveGitDirs( + repoPath: string, + ): Promise<{ gitDir: string | null; commonDir: string | null }> { + const gitDir = await this.resolveGitDir(repoPath); + const commonDir = gitDir ? await this.resolveCommonDir(gitDir) : null; + return { gitDir, commonDir }; + } + + async resolveGitDir(repoPath: string): Promise { + try { + const gitPath = path.join(repoPath, ".git"); + const stat = await fs.stat(gitPath); + if (stat.isDirectory()) return gitPath; + + const content = await fs.readFile(gitPath, "utf-8"); + const match = content.match(/gitdir:\s*(.+)/); + if (!match) return null; + return path.resolve(repoPath, match[1].trim()); + } catch { + return null; + } + } + + async resolveCommonDir(gitDir: string): Promise { + try { + const commonDirFile = path.join(gitDir, "commondir"); + const content = await fs.readFile(commonDirFile, "utf-8"); + const resolved = path.resolve(gitDir, content.trim()); + return resolved === gitDir ? null : resolved; + } catch { + return null; + } + } +} diff --git a/packages/workspace-server/src/trpc.ts b/packages/workspace-server/src/trpc.ts index 3632071d5f..c9bbcdab7b 100644 --- a/packages/workspace-server/src/trpc.ts +++ b/packages/workspace-server/src/trpc.ts @@ -1,29 +1,64 @@ import { initTRPC } from "@trpc/server"; import superjson from "superjson"; -import { z } from "zod"; import { container } from "./di/container"; import { TOKENS } from "./di/tokens"; +import { listDirectoryInput, listDirectoryOutput } from "./services/fs/schemas"; +import type { FsService } from "./services/fs/service"; +import { diffStatsInput, diffStatsSchema } from "./services/git/schemas"; import type { GitService } from "./services/git/service"; +import { + resolveGitDirsInput, + resolveGitDirsOutput, + watchInput, + watchRepoInput, +} from "./services/watcher/schemas"; +import type { WatcherService } from "./services/watcher/service"; const t = initTRPC.create({ transformer: superjson }); const gitService = () => container.get(TOKENS.GitService); +const fsService = () => container.get(TOKENS.FsService); +const watcherService = () => + container.get(TOKENS.WatcherService); -export const diffStatsSchema = z.object({ - filesChanged: z.number().int().nonnegative(), - linesAdded: z.number().int().nonnegative(), - linesRemoved: z.number().int().nonnegative(), -}); - -export type DiffStats = z.infer; +export { type DiffStats, diffStatsSchema } from "./services/git/schemas"; +export { + type FileWatcherEvent, + FileWatcherEventKind, +} from "./services/watcher/schemas"; export const appRouter = t.router({ diffStats: t.router({ getDiffStats: t.procedure - .input(z.object({ directoryPath: z.string().min(1) })) + .input(diffStatsInput) .output(diffStatsSchema) .query(({ input }) => gitService().getDiffStats(input.directoryPath)), }), + fs: t.router({ + listDirectory: t.procedure + .input(listDirectoryInput) + .output(listDirectoryOutput) + .query(({ input }) => fsService().listDirectory(input.dirPath)), + }), + watcher: t.router({ + resolveGitDirs: t.procedure + .input(resolveGitDirsInput) + .output(resolveGitDirsOutput) + .query(({ input }) => watcherService().resolveGitDirs(input.repoPath)), + + watch: t.procedure + .input(watchInput) + .subscription(({ input, signal }) => + watcherService().watch(input.dirPath, { ignore: input.ignore }, signal), + ), + }), + fileWatcher: t.router({ + watch: t.procedure + .input(watchRepoInput) + .subscription(({ input, signal }) => + watcherService().watchRepo(input.repoPath, signal), + ), + }), }); export type AppRouter = typeof appRouter; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b5a13d8ebf..6b4656e6e5 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -12,6 +12,9 @@ catalogs: '@hono/trpc-server': specifier: ^0.3.4 version: 0.3.4 + '@parcel/watcher': + specifier: ^2.5.6 + version: 2.5.6 '@phosphor-icons/react': specifier: ^2.1.10 version: 2.1.10 @@ -229,6 +232,9 @@ importers: '@posthog/api-client': specifier: workspace:* version: link:../../packages/api-client + '@posthog/core': + specifier: workspace:* + version: link:../../packages/core '@posthog/electron-trpc': specifier: workspace:* version: link:../../packages/electron-trpc @@ -891,16 +897,14 @@ importers: version: 2.1.9(@types/node@25.2.0)(jsdom@26.1.0)(lightningcss@1.32.0)(msw@2.12.8(@types/node@25.2.0)(typescript@5.9.3))(terser@5.46.0) packages/core: + dependencies: + '@posthog/workspace-client': + specifier: workspace:* + version: link:../workspace-client devDependencies: '@posthog/tsconfig': specifier: workspace:* version: link:../../tooling/typescript - '@posthog/tsup-config': - specifier: workspace:* - version: link:../../tooling/tsup-config - tsup: - specifier: 'catalog:' - version: 8.5.1(jiti@2.6.1)(postcss@8.5.6)(tsx@4.21.0)(typescript@5.9.3)(yaml@2.8.2) typescript: specifier: 'catalog:' version: 5.9.3 @@ -1085,6 +1089,9 @@ importers: '@hono/trpc-server': specifier: 'catalog:' version: 0.3.4(@trpc/server@11.12.0(typescript@5.9.3))(hono@4.11.7) + '@parcel/watcher': + specifier: 'catalog:' + version: 2.5.6 '@posthog/git': specifier: workspace:* version: link:../git diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index d8cb8225ef..782ebeac7a 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -25,6 +25,7 @@ catalog: "@posthog/quill": 0.3.0-beta.1 inversify: ^7.10.6 reflect-metadata: ^0.2.2 + "@parcel/watcher": ^2.5.6 ignoredBuiltDependencies: - msw