diff --git a/packages/core/CHANGELOG.md b/packages/core/CHANGELOG.md index b94f604..bb4b1af 100644 --- a/packages/core/CHANGELOG.md +++ b/packages/core/CHANGELOG.md @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.15.0] - 2026-05-23 + +### Added +- Event Bus module (CORE #16): `provideEventBus(options?)` factory with `EVENT_BUS_CONFIG` injection token +- `EventBusService` — central pub/sub with typed `DomainEvent`, `publish()`, `subscribe()`, `subscribePattern()` (wildcard/regex) +- Dual reactive API: `stream(eventType)` returns `Observable>`, `signal(eventType)` returns `Signal | undefined>` +- `EventQueueService` — persistent event queue backed by `StorageService` (localStorage), bounded by `maxQueueSize`, with `enqueue()`, `flush()`, `clear()` +- `EventHistoryService` — in-memory ring buffer bounded by `historySize`, with `query(filter?)` supporting AND-logic filters (type, source, since) +- Types: `DomainEvent`, `EventHandler`, `EventFilter`, `QueueStatus`, `FlushResult`, `EventBusOptions`, `EventBusConfig` + ## [0.14.0] - 2026-05-22 ### Added @@ -214,7 +224,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - User context module: `UserContextService` - API runtime module: `ApiClient`, `TransportRegistry`, `HttpTransportAdapter` -[Unreleased]: https://github.com/fireflyframework/firefly-frontend-framework/compare/core@0.14.0...HEAD +[Unreleased]: https://github.com/fireflyframework/firefly-frontend-framework/compare/core@0.15.0...HEAD +[0.15.0]: https://github.com/fireflyframework/firefly-frontend-framework/compare/core@0.14.0...core@0.15.0 [0.14.0]: https://github.com/fireflyframework/firefly-frontend-framework/compare/core@0.13.0...core@0.14.0 [0.13.0]: https://github.com/fireflyframework/firefly-frontend-framework/compare/core@0.12.0...core@0.13.0 [0.12.0]: https://github.com/fireflyframework/firefly-frontend-framework/compare/core@0.11.0...core@0.12.0 diff --git a/packages/core/package.json b/packages/core/package.json index b54db61..327fac3 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@fireflyframework/core", - "version": "0.14.0", + "version": "0.15.0", "publishConfig": { "registry": "https://npm.pkg.github.com" }, diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 51564c3..7ae47a7 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -15,3 +15,4 @@ export * from './lib/environment'; export * from './lib/security'; export * from './lib/files'; export * from './lib/notifications'; +export * from './lib/event-bus'; diff --git a/packages/core/src/lib/event-bus/event-bus.service.spec.ts b/packages/core/src/lib/event-bus/event-bus.service.spec.ts new file mode 100644 index 0000000..8ad6d99 --- /dev/null +++ b/packages/core/src/lib/event-bus/event-bus.service.spec.ts @@ -0,0 +1,267 @@ +import { TestBed } from '@angular/core/testing'; +import { EventBusService } from './event-bus.service'; +import { DomainEvent } from './event-bus.types'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function createEvent( + overrides?: Partial>, +): DomainEvent { + return { + type: 'test:event', + payload: undefined as T, + timestamp: Date.now(), + ...overrides, + } as DomainEvent; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('EventBusService', () => { + function setup() { + TestBed.configureTestingModule({ + providers: [EventBusService], + }); + return TestBed.inject(EventBusService); + } + + // ----------------------------------------------------------------------- + // publish + subscribe + // ----------------------------------------------------------------------- + describe('publish / subscribe', () => { + it('should deliver events to matching subscribers', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribe('user:created', (e) => received.push(e)); + + const event = createEvent({ type: 'user:created', payload: { id: 1 } }); + bus.publish(event); + + expect(received).toHaveLength(1); + expect(received[0]).toEqual(event); + }); + + it('should not deliver events to non-matching subscribers', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribe('user:created', (e) => received.push(e)); + + bus.publish(createEvent({ type: 'user:deleted' })); + + expect(received).toHaveLength(0); + }); + + it('should deliver to multiple subscribers of the same type', () => { + const bus = setup(); + const r1: DomainEvent[] = []; + const r2: DomainEvent[] = []; + bus.subscribe('order:placed', (e) => r1.push(e)); + bus.subscribe('order:placed', (e) => r2.push(e)); + + bus.publish(createEvent({ type: 'order:placed' })); + + expect(r1).toHaveLength(1); + expect(r2).toHaveLength(1); + }); + + it('should unsubscribe correctly', () => { + const bus = setup(); + const received: DomainEvent[] = []; + const sub = bus.subscribe('ping', (e) => received.push(e)); + + bus.publish(createEvent({ type: 'ping' })); + expect(received).toHaveLength(1); + + sub.unsubscribe(); + bus.publish(createEvent({ type: 'ping' })); + expect(received).toHaveLength(1); + }); + + it('should auto-set timestamp if not provided', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribe('ts:test', (e) => received.push(e)); + + const before = Date.now(); + bus.publish({ type: 'ts:test', payload: null, timestamp: 0 }); + // timestamp 0 is falsy, so it should be auto-set + expect(received[0].timestamp).toBeGreaterThanOrEqual(before); + }); + + it('should preserve explicit timestamp', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribe('ts:explicit', (e) => received.push(e)); + + bus.publish(createEvent({ type: 'ts:explicit', timestamp: 42 })); + expect(received[0].timestamp).toBe(42); + }); + }); + + // ----------------------------------------------------------------------- + // subscribePattern + // ----------------------------------------------------------------------- + describe('subscribePattern', () => { + it('should match wildcard patterns', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribePattern('auth:*', (e) => received.push(e)); + + bus.publish(createEvent({ type: 'auth:login' })); + bus.publish(createEvent({ type: 'auth:logout' })); + bus.publish(createEvent({ type: 'order:placed' })); + + expect(received).toHaveLength(2); + expect(received[0].type).toBe('auth:login'); + expect(received[1].type).toBe('auth:logout'); + }); + + it('should match RegExp patterns', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribePattern(/^error:/, (e) => received.push(e)); + + bus.publish(createEvent({ type: 'error:network' })); + bus.publish(createEvent({ type: 'error:timeout' })); + bus.publish(createEvent({ type: 'info:deploy' })); + + expect(received).toHaveLength(2); + }); + + it('should support multiple wildcards in pattern', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribePattern('*:created', (e) => received.push(e)); + + bus.publish(createEvent({ type: 'user:created' })); + bus.publish(createEvent({ type: 'order:created' })); + bus.publish(createEvent({ type: 'order:deleted' })); + + expect(received).toHaveLength(2); + }); + + it('should unsubscribe correctly', () => { + const bus = setup(); + const received: DomainEvent[] = []; + const sub = bus.subscribePattern('test:*', (e) => received.push(e)); + + bus.publish(createEvent({ type: 'test:one' })); + expect(received).toHaveLength(1); + + sub.unsubscribe(); + bus.publish(createEvent({ type: 'test:two' })); + expect(received).toHaveLength(1); + }); + }); + + // ----------------------------------------------------------------------- + // stream + // ----------------------------------------------------------------------- + describe('stream', () => { + it('should return an Observable filtered by event type', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.stream('data:updated').subscribe((e) => received.push(e)); + + bus.publish(createEvent({ type: 'data:updated', payload: { v: 1 } })); + bus.publish(createEvent({ type: 'data:deleted' })); + bus.publish(createEvent({ type: 'data:updated', payload: { v: 2 } })); + + expect(received).toHaveLength(2); + expect((received[0].payload as { v: number }).v).toBe(1); + expect((received[1].payload as { v: number }).v).toBe(2); + }); + + it('should allow RxJS operators on the returned Observable', () => { + const bus = setup(); + const types: string[] = []; + bus + .stream('pipe:test') + .subscribe((e) => types.push(e.type)); + + bus.publish(createEvent({ type: 'pipe:test' })); + expect(types).toEqual(['pipe:test']); + }); + }); + + // ----------------------------------------------------------------------- + // signal + // ----------------------------------------------------------------------- + describe('signal', () => { + it('should return a Signal that starts as undefined', () => { + const bus = setup(); + const sig = bus.signal('sig:test'); + expect(sig()).toBeUndefined(); + }); + + it('should update when a matching event is published', () => { + const bus = setup(); + const sig = bus.signal<{ n: number }>('sig:update'); + + bus.publish(createEvent({ type: 'sig:update', payload: { n: 42 } })); + + expect(sig()).toBeDefined(); + expect(sig()!.payload.n).toBe(42); + }); + + it('should always reflect the latest event', () => { + const bus = setup(); + const sig = bus.signal('sig:latest'); + + bus.publish(createEvent({ type: 'sig:latest', payload: 1 })); + bus.publish(createEvent({ type: 'sig:latest', payload: 2 })); + bus.publish(createEvent({ type: 'sig:latest', payload: 3 })); + + expect(sig()!.payload).toBe(3); + }); + }); + + // ----------------------------------------------------------------------- + // history (stub — delegated to EventHistoryService) + // ----------------------------------------------------------------------- + describe('history', () => { + it('should return an empty array (stub for EventHistoryService)', () => { + const bus = setup(); + bus.publish(createEvent({ type: 'any' })); + expect(bus.history()).toEqual([]); + }); + }); + + // ----------------------------------------------------------------------- + // destroy + // ----------------------------------------------------------------------- + describe('destroy', () => { + it('should complete the internal Subject', () => { + const bus = setup(); + let completed = false; + bus.stream('test').subscribe({ complete: () => (completed = true) }); + + bus.destroy(); + + expect(completed).toBe(true); + }); + + it('should unsubscribe all tracked subscriptions', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribe('destroy:test', (e) => received.push(e)); + bus.subscribePattern('destroy:*', (e) => received.push(e)); + + bus.destroy(); + + // Publishing after destroy should not deliver (Subject completed) + // But even if it could, subscriptions are unsubscribed + expect(received).toHaveLength(0); + }); + + it('should not throw when called multiple times', () => { + const bus = setup(); + bus.destroy(); + expect(() => bus.destroy()).not.toThrow(); + }); + }); +}); diff --git a/packages/core/src/lib/event-bus/event-bus.service.ts b/packages/core/src/lib/event-bus/event-bus.service.ts new file mode 100644 index 0000000..6ad3f0a --- /dev/null +++ b/packages/core/src/lib/event-bus/event-bus.service.ts @@ -0,0 +1,138 @@ +import { inject, Injectable, Injector, runInInjectionContext, Signal } from '@angular/core'; +import { toSignal } from '@angular/core/rxjs-interop'; +import { Observable, Subject, Subscription, filter } from 'rxjs'; + +import { DomainEvent, EventHandler } from './event-bus.types'; + +/** + * Central pub/sub service for internal event-driven communication. + * + * Allows features and modules to communicate without direct coupling. + * Events are dispatched synchronously to all matching subscribers. + * + * Supports exact type matching via `subscribe()` and pattern matching + * (wildcards / RegExp) via `subscribePattern()`. + * + * @example + * ```typescript + * const bus = inject(EventBusService); + * + * // Subscribe to a specific event type + * const sub = bus.subscribe<{ userId: string }>('auth:login', (event) => { + * console.log('User logged in:', event.payload.userId); + * }); + * + * // Publish an event + * bus.publish({ type: 'auth:login', payload: { userId: '123' }, timestamp: Date.now() }); + * + * // Clean up + * sub.unsubscribe(); + * ``` + */ +@Injectable() +export class EventBusService { + private readonly injector = inject(Injector); + private readonly subject = new Subject(); + private readonly subscriptions: Subscription[] = []; + + /** + * Publish an event to all matching subscribers. + * + * If `timestamp` is not set, it will be added automatically. + */ + publish(event: DomainEvent): void { + const stamped = event.timestamp + ? event + : { ...event, timestamp: Date.now() }; + this.subject.next(stamped as DomainEvent); + } + + /** + * Subscribe to events of an exact type. + * + * @returns A `Subscription` that can be used to unsubscribe. + */ + subscribe( + eventType: string, + handler: EventHandler, + ): Subscription { + const sub = this.subject + .pipe(filter((e) => e.type === eventType)) + .subscribe((e) => handler(e as DomainEvent)); + this.subscriptions.push(sub); + return sub; + } + + /** + * Subscribe to events matching a pattern. + * + * Accepts either: + * - A string with `*` wildcard (e.g. `'auth:*'` matches `'auth:login'`, `'auth:logout'`) + * - A `RegExp` for full control + * + * @returns A `Subscription` that can be used to unsubscribe. + */ + subscribePattern( + pattern: string | RegExp, + handler: EventHandler, + ): Subscription { + const regex = + pattern instanceof RegExp + ? pattern + : new RegExp(`^${pattern.replace(/\*/g, '.*')}$`); + + const sub = this.subject + .pipe(filter((e) => regex.test(e.type))) + .subscribe((e) => handler(e as DomainEvent)); + this.subscriptions.push(sub); + return sub; + } + + /** + * Get an Observable stream of events for a specific type. + * + * Useful for integrating with RxJS pipelines. + */ + stream(eventType: string): Observable> { + return this.subject.pipe( + filter((e) => e.type === eventType), + ) as Observable>; + } + + /** + * Get a Signal that holds the latest event of a specific type. + * + * Starts as `undefined` until the first matching event arrives. + * Uses `toSignal()` from `@angular/core/rxjs-interop` with the + * service's own injector to avoid requiring a component context. + */ + signal(eventType: string): Signal | undefined> { + return runInInjectionContext(this.injector, () => + toSignal(this.stream(eventType)), + ); + } + + /** + * Returns a snapshot of all events that have passed through the bus + * during the current session (since last `destroy()`). + * + * Note: actual history storage is managed by `EventHistoryService`. + * This method returns an empty array — use `EventHistoryService.query()` + * for persistent history queries. + */ + history(): ReadonlyArray { + return []; + } + + /** + * Tear down the bus: complete the internal Subject and unsubscribe + * all tracked subscriptions. + * + * After calling `destroy()`, no further events can be published. + */ + destroy(): void { + this.subscriptions.forEach((s) => s.unsubscribe()); + this.subscriptions.length = 0; + this.subject.complete(); + } +} diff --git a/packages/core/src/lib/event-bus/event-bus.types.ts b/packages/core/src/lib/event-bus/event-bus.types.ts new file mode 100644 index 0000000..c78b8dd --- /dev/null +++ b/packages/core/src/lib/event-bus/event-bus.types.ts @@ -0,0 +1,166 @@ +import { InjectionToken } from '@angular/core'; + +// --------------------------------------------------------------------------- +// Domain event +// --------------------------------------------------------------------------- + +/** + * A typed domain event for internal pub/sub communication. + * + * Represents a message published through the `EventBusService` to + * coordinate features and modules without direct coupling. + * + * @typeParam T - Type of the event payload. Defaults to `unknown`. + * + * @example + * ```typescript + * const event: DomainEvent<{ userId: string }> = { + * type: 'auth:login', + * payload: { userId: '123' }, + * timestamp: Date.now(), + * source: 'auth-module', + * }; + * ``` + */ +export interface DomainEvent { + /** Event type identifier (colon-separated namespace, e.g. `'auth:login'`). */ + readonly type: string; + + /** Typed payload attached to the event. */ + readonly payload: T; + + /** Unix timestamp (ms) of when the event was created. */ + readonly timestamp: number; + + /** Optional correlation ID for tracing related events. */ + readonly correlationId?: string; + + /** Module or feature that originated the event. */ + readonly source?: string; + + /** Arbitrary metadata attached to the event. */ + readonly metadata?: Record; +} + +// --------------------------------------------------------------------------- +// Event handler +// --------------------------------------------------------------------------- + +/** + * Callback invoked when a matching event is received. + * + * @typeParam T - Type of the event payload. + */ +export type EventHandler = (event: DomainEvent) => void; + +// --------------------------------------------------------------------------- +// Event filter (for history queries) +// --------------------------------------------------------------------------- + +/** + * Filter criteria for querying event history. + * + * All fields are optional — when omitted, no filtering is applied + * for that criterion. + */ +export interface EventFilter { + /** Filter by exact event type. */ + readonly type?: string; + + /** Filter by event source. */ + readonly source?: string; + + /** Return only events after this Unix timestamp (ms). */ + readonly since?: number; +} + +// --------------------------------------------------------------------------- +// Queue status +// --------------------------------------------------------------------------- + +/** + * Status of the persistent event queue. + */ +export type QueueStatus = 'idle' | 'flushing' | 'error'; + +/** + * Result returned by `EventQueueService.flush()`. + */ +export interface FlushResult { + /** Number of events successfully delivered. */ + readonly delivered: number; + + /** Number of events that failed delivery. */ + readonly failed: number; +} + +// --------------------------------------------------------------------------- +// Module configuration (user-facing) +// --------------------------------------------------------------------------- + +/** + * Configuration input for `provideEventBus()`. + * + * All fields are optional — defaults provide a sensible configuration + * for most products. + * + * @example + * ```typescript + * provideEventBus({ + * enableQueue: true, + * maxQueueSize: 500, + * enableHistory: true, + * historySize: 200, + * }) + * ``` + */ +export interface EventBusOptions { + /** Enable the persistent event queue. Defaults to `true`. */ + readonly enableQueue?: boolean; + + /** Maximum number of events in the queue before oldest are dropped. Defaults to `1000`. */ + readonly maxQueueSize?: number; + + /** Enable the in-memory event history ring buffer. Defaults to `false`. */ + readonly enableHistory?: boolean; + + /** Maximum number of events kept in history. Defaults to `100`. */ + readonly historySize?: number; +} + +// --------------------------------------------------------------------------- +// Resolved config (internal) +// --------------------------------------------------------------------------- + +/** + * Resolved configuration stored in the DI container. + * + * Built from `EventBusOptions` by `provideEventBus()`. + * Services inject this via `EVENT_BUS_CONFIG`. + */ +export interface EventBusConfig { + /** Whether the persistent event queue is enabled. */ + readonly enableQueue: boolean; + + /** Maximum queue size. */ + readonly maxQueueSize: number; + + /** Whether the event history ring buffer is enabled. */ + readonly enableHistory: boolean; + + /** Maximum history size. */ + readonly historySize: number; +} + +// --------------------------------------------------------------------------- +// Injection token +// --------------------------------------------------------------------------- + +/** + * Injection token for the event-bus module configuration. + * + * Provided by `provideEventBus()`. Services inject this to read config. + */ +export const EVENT_BUS_CONFIG = new InjectionToken( + 'EVENT_BUS_CONFIG', +); diff --git a/packages/core/src/lib/event-bus/event-history.service.spec.ts b/packages/core/src/lib/event-bus/event-history.service.spec.ts new file mode 100644 index 0000000..7bb5f99 --- /dev/null +++ b/packages/core/src/lib/event-bus/event-history.service.spec.ts @@ -0,0 +1,193 @@ +import { TestBed } from '@angular/core/testing'; +import { EventHistoryService } from './event-history.service'; +import { DomainEvent, EVENT_BUS_CONFIG } from './event-bus.types'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function createEvent(overrides?: Partial): DomainEvent { + return { + type: 'test:event', + payload: null, + timestamp: Date.now(), + ...overrides, + }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('EventHistoryService', () => { + function setup(historySize?: number) { + TestBed.configureTestingModule({ + providers: [ + EventHistoryService, + ...(historySize != null + ? [{ + provide: EVENT_BUS_CONFIG, + useValue: { enableQueue: true, maxQueueSize: 1000, enableHistory: true, historySize }, + }] + : []), + ], + }); + return TestBed.inject(EventHistoryService); + } + + // ----------------------------------------------------------------------- + // add + // ----------------------------------------------------------------------- + describe('add', () => { + it('should add an event to the buffer', () => { + const svc = setup(); + svc.add(createEvent()); + expect(svc.size()).toBe(1); + }); + + it('should preserve events in insertion order', () => { + const svc = setup(); + svc.add(createEvent({ type: 'a' })); + svc.add(createEvent({ type: 'b' })); + svc.add(createEvent({ type: 'c' })); + + const all = svc.query(); + expect(all.map((e) => e.type)).toEqual(['a', 'b', 'c']); + }); + + it('should drop oldest events when historySize is exceeded', () => { + const svc = setup(3); + + svc.add(createEvent({ type: 'e1' })); + svc.add(createEvent({ type: 'e2' })); + svc.add(createEvent({ type: 'e3' })); + svc.add(createEvent({ type: 'e4' })); + + expect(svc.size()).toBe(3); + const types = svc.query().map((e) => e.type); + expect(types).toEqual(['e2', 'e3', 'e4']); + }); + + it('should default to historySize 100 when no config', () => { + const svc = setup(); + for (let i = 0; i < 105; i++) { + svc.add(createEvent({ type: `e${i}` })); + } + expect(svc.size()).toBe(100); + // First event should be e5 (e0-e4 dropped) + expect(svc.query()[0].type).toBe('e5'); + }); + }); + + // ----------------------------------------------------------------------- + // query + // ----------------------------------------------------------------------- + describe('query', () => { + it('should return all events when no filter is provided', () => { + const svc = setup(); + svc.add(createEvent({ type: 'a' })); + svc.add(createEvent({ type: 'b' })); + + expect(svc.query()).toHaveLength(2); + }); + + it('should filter by type', () => { + const svc = setup(); + svc.add(createEvent({ type: 'auth:login' })); + svc.add(createEvent({ type: 'auth:logout' })); + svc.add(createEvent({ type: 'order:placed' })); + + const result = svc.query({ type: 'auth:login' }); + expect(result).toHaveLength(1); + expect(result[0].type).toBe('auth:login'); + }); + + it('should filter by source', () => { + const svc = setup(); + svc.add(createEvent({ source: 'module-a' })); + svc.add(createEvent({ source: 'module-b' })); + svc.add(createEvent({ source: 'module-a' })); + + const result = svc.query({ source: 'module-a' }); + expect(result).toHaveLength(2); + }); + + it('should filter by since timestamp', () => { + const svc = setup(); + const now = Date.now(); + svc.add(createEvent({ timestamp: now - 5000 })); + svc.add(createEvent({ timestamp: now - 1000 })); + svc.add(createEvent({ timestamp: now })); + + const result = svc.query({ since: now - 2000 }); + expect(result).toHaveLength(2); + }); + + it('should combine multiple filters (AND logic)', () => { + const svc = setup(); + const now = Date.now(); + svc.add(createEvent({ type: 'auth:login', source: 'web', timestamp: now })); + svc.add(createEvent({ type: 'auth:login', source: 'mobile', timestamp: now })); + svc.add(createEvent({ type: 'auth:logout', source: 'web', timestamp: now })); + + const result = svc.query({ type: 'auth:login', source: 'web' }); + expect(result).toHaveLength(1); + expect(result[0].source).toBe('web'); + }); + + it('should return empty array when no events match', () => { + const svc = setup(); + svc.add(createEvent({ type: 'a' })); + + expect(svc.query({ type: 'nonexistent' })).toEqual([]); + }); + + it('should return a copy (not a reference to internal buffer)', () => { + const svc = setup(); + svc.add(createEvent()); + const result = svc.query(); + expect(result).not.toBe((svc as any).buffer); + }); + }); + + // ----------------------------------------------------------------------- + // clear + // ----------------------------------------------------------------------- + describe('clear', () => { + it('should remove all events from the buffer', () => { + const svc = setup(); + svc.add(createEvent()); + svc.add(createEvent()); + expect(svc.size()).toBe(2); + + svc.clear(); + expect(svc.size()).toBe(0); + expect(svc.query()).toEqual([]); + }); + + it('should not throw when buffer is already empty', () => { + const svc = setup(); + expect(() => svc.clear()).not.toThrow(); + }); + }); + + // ----------------------------------------------------------------------- + // size + // ----------------------------------------------------------------------- + describe('size', () => { + it('should return 0 for empty buffer', () => { + const svc = setup(); + expect(svc.size()).toBe(0); + }); + + it('should track the current number of events', () => { + const svc = setup(); + svc.add(createEvent()); + expect(svc.size()).toBe(1); + svc.add(createEvent()); + expect(svc.size()).toBe(2); + svc.clear(); + expect(svc.size()).toBe(0); + }); + }); +}); diff --git a/packages/core/src/lib/event-bus/event-history.service.ts b/packages/core/src/lib/event-bus/event-history.service.ts new file mode 100644 index 0000000..f3d8e8b --- /dev/null +++ b/packages/core/src/lib/event-bus/event-history.service.ts @@ -0,0 +1,83 @@ +import { inject, Injectable } from '@angular/core'; + +import { + DomainEvent, + EventBusConfig, + EVENT_BUS_CONFIG, + EventFilter, +} from './event-bus.types'; + +/** + * In-memory ring buffer that stores recent domain events for querying. + * + * When the buffer reaches `historySize` (from `EventBusConfig`), the + * oldest event is automatically dropped. + * + * @example + * ```typescript + * const history = inject(EventHistoryService); + * + * // Query all events of a specific type + * const loginEvents = history.query({ type: 'auth:login' }); + * + * // Query events from a specific source since a timestamp + * const recent = history.query({ source: 'sync', since: Date.now() - 60_000 }); + * + * // Get total stored events + * console.log('History size:', history.size()); + * ``` + */ +@Injectable() +export class EventHistoryService { + private readonly config = inject(EVENT_BUS_CONFIG, { optional: true }); + private readonly maxSize = this.config?.historySize ?? 100; + private readonly buffer: DomainEvent[] = []; + + /** + * Add an event to the history buffer. + * + * If the buffer is at capacity, the oldest event is dropped. + */ + add(event: DomainEvent): void { + this.buffer.push(event); + while (this.buffer.length > this.maxSize) { + this.buffer.shift(); + } + } + + /** + * Query events in the history buffer. + * + * All filter fields are optional — when omitted, no filtering is + * applied for that criterion. Returns events in chronological order. + * + * @param filter - Optional filter criteria (type, source, since) + * @returns Matching events as a read-only array + */ + query(filter?: EventFilter): ReadonlyArray { + if (!filter) { + return [...this.buffer]; + } + + return this.buffer.filter((event) => { + if (filter.type && event.type !== filter.type) return false; + if (filter.source && event.source !== filter.source) return false; + if (filter.since && event.timestamp < filter.since) return false; + return true; + }); + } + + /** + * Clear the entire history buffer. + */ + clear(): void { + this.buffer.length = 0; + } + + /** + * Get the number of events currently stored in the buffer. + */ + size(): number { + return this.buffer.length; + } +} diff --git a/packages/core/src/lib/event-bus/event-queue.service.spec.ts b/packages/core/src/lib/event-bus/event-queue.service.spec.ts new file mode 100644 index 0000000..7f2d4dd --- /dev/null +++ b/packages/core/src/lib/event-bus/event-queue.service.spec.ts @@ -0,0 +1,220 @@ +import { TestBed } from '@angular/core/testing'; +import { EventQueueService } from './event-queue.service'; +import { EventBusService } from './event-bus.service'; +import { DomainEvent, EVENT_BUS_CONFIG, EventBusConfig } from './event-bus.types'; +import { StorageService } from '../storage/storage.service'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function createEvent(overrides?: Partial): DomainEvent { + return { + type: 'queue:test', + payload: null, + timestamp: Date.now(), + ...overrides, + }; +} + +/** + * Minimal StorageService mock using an in-memory Map. + */ +function createMockStorage() { + const store = new Map(); + return { + get: vi.fn((key: string): T | null => (store.get(key) as T) ?? null), + set: vi.fn((key: string, value: T): void => { store.set(key, value); }), + remove: vi.fn((key: string): void => { store.delete(key); }), + _store: store, + }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('EventQueueService', () => { + function setup(config?: Partial) { + const mockStorage = createMockStorage(); + const publishSpy = vi.fn(); + + TestBed.configureTestingModule({ + providers: [ + EventQueueService, + EventBusService, + { provide: StorageService, useValue: mockStorage }, + ...(config + ? [{ provide: EVENT_BUS_CONFIG, useValue: { enableQueue: true, maxQueueSize: 1000, enableHistory: false, historySize: 100, ...config } }] + : []), + ], + }); + + const queue = TestBed.inject(EventQueueService); + const bus = TestBed.inject(EventBusService); + + // Spy on publish after injection + vi.spyOn(bus, 'publish').mockImplementation(publishSpy); + + return { queue, bus, mockStorage, publishSpy }; + } + + // ----------------------------------------------------------------------- + // enqueue + // ----------------------------------------------------------------------- + describe('enqueue', () => { + it('should add an event to the queue', () => { + const { queue, mockStorage } = setup(); + const event = createEvent(); + queue.enqueue(event); + + expect(mockStorage.set).toHaveBeenCalled(); + const savedQueue = mockStorage.set.mock.calls[0][1] as DomainEvent[]; + expect(savedQueue).toHaveLength(1); + expect(savedQueue[0]).toEqual(event); + }); + + it('should update pending count', () => { + const { queue } = setup(); + expect(queue.pending()()).toBe(0); + + queue.enqueue(createEvent()); + expect(queue.pending()()).toBe(1); + + queue.enqueue(createEvent()); + expect(queue.pending()()).toBe(2); + }); + + it('should drop oldest events when maxQueueSize is exceeded', () => { + const { queue } = setup({ maxQueueSize: 3 }); + + queue.enqueue(createEvent({ type: 'e1' })); + queue.enqueue(createEvent({ type: 'e2' })); + queue.enqueue(createEvent({ type: 'e3' })); + queue.enqueue(createEvent({ type: 'e4' })); + + expect(queue.pending()()).toBe(3); + + // Verify the last save has e2, e3, e4 (e1 dropped) + const lastCall = (queue as any).storage.set.mock.calls.at(-1); + const savedQueue = lastCall[1] as DomainEvent[]; + expect(savedQueue.map((e: DomainEvent) => e.type)).toEqual(['e2', 'e3', 'e4']); + }); + + it('should persist events via StorageService', () => { + const { queue, mockStorage } = setup(); + queue.enqueue(createEvent()); + + expect(mockStorage.set).toHaveBeenCalledWith( + 'event-bus.queue', + expect.any(Array), + ); + }); + }); + + // ----------------------------------------------------------------------- + // flush + // ----------------------------------------------------------------------- + describe('flush', () => { + it('should publish all queued events through EventBusService', () => { + const { queue, publishSpy } = setup(); + + queue.enqueue(createEvent({ type: 'f1' })); + queue.enqueue(createEvent({ type: 'f2' })); + + const result = queue.flush(); + + expect(publishSpy).toHaveBeenCalledTimes(2); + expect(result.delivered).toBe(2); + expect(result.failed).toBe(0); + }); + + it('should clear the queue after flush', () => { + const { queue } = setup(); + queue.enqueue(createEvent()); + + queue.flush(); + + expect(queue.pending()()).toBe(0); + }); + + it('should return { delivered: 0, failed: 0 } for empty queue', () => { + const { queue, publishSpy } = setup(); + const result = queue.flush(); + + expect(result).toEqual({ delivered: 0, failed: 0 }); + expect(publishSpy).not.toHaveBeenCalled(); + }); + + it('should count failed events when publish throws', () => { + const { queue, publishSpy } = setup(); + + queue.enqueue(createEvent({ type: 'ok' })); + queue.enqueue(createEvent({ type: 'fail' })); + queue.enqueue(createEvent({ type: 'ok2' })); + + publishSpy.mockImplementation((event: DomainEvent) => { + if (event.type === 'fail') throw new Error('subscriber error'); + }); + + const result = queue.flush(); + + expect(result.delivered).toBe(2); + expect(result.failed).toBe(1); + }); + + it('should set status to flushing during flush', () => { + const { queue, publishSpy } = setup(); + queue.enqueue(createEvent()); + + let statusDuringFlush: string | undefined; + publishSpy.mockImplementation(() => { + statusDuringFlush = queue.status()(); + }); + + queue.flush(); + + expect(statusDuringFlush).toBe('flushing'); + }); + + it('should set status to idle after successful flush', () => { + const { queue } = setup(); + queue.enqueue(createEvent()); + queue.flush(); + + expect(queue.status()()).toBe('idle'); + }); + + it('should set status to error if any event fails', () => { + const { queue, publishSpy } = setup(); + queue.enqueue(createEvent()); + + publishSpy.mockImplementation(() => { throw new Error('fail'); }); + queue.flush(); + + expect(queue.status()()).toBe('error'); + }); + }); + + // ----------------------------------------------------------------------- + // status / pending + // ----------------------------------------------------------------------- + describe('status / pending', () => { + it('should start with idle status', () => { + const { queue } = setup(); + expect(queue.status()()).toBe('idle'); + }); + + it('should start with 0 pending', () => { + const { queue } = setup(); + expect(queue.pending()()).toBe(0); + }); + + it('should return readonly signals', () => { + const { queue } = setup(); + // These are readonly — no set method exposed + expect(typeof queue.status()).toBe('function'); + expect(typeof queue.pending()).toBe('function'); + }); + }); +}); diff --git a/packages/core/src/lib/event-bus/event-queue.service.ts b/packages/core/src/lib/event-bus/event-queue.service.ts new file mode 100644 index 0000000..ee11ddd --- /dev/null +++ b/packages/core/src/lib/event-bus/event-queue.service.ts @@ -0,0 +1,131 @@ +import { inject, Injectable, signal, Signal } from '@angular/core'; + +import { StorageService } from '../storage/storage.service'; +import { EventBusService } from './event-bus.service'; +import { + DomainEvent, + EventBusConfig, + EVENT_BUS_CONFIG, + FlushResult, + QueueStatus, +} from './event-bus.types'; + +/** Storage key for the persisted queue. */ +const QUEUE_STORAGE_KEY = 'event-bus.queue'; + +/** + * Persistent event queue that buffers events for later delivery. + * + * Events are persisted via `StorageService` (localStorage with + * in-memory fallback). When `flush()` is called, all pending events + * are published through the `EventBusService` and the queue is cleared. + * + * Queue size is bounded by `maxQueueSize` from `EventBusConfig` — + * when the limit is reached, the oldest event is dropped. + * + * @example + * ```typescript + * const queue = inject(EventQueueService); + * + * // Enqueue when offline or deferred + * queue.enqueue({ type: 'sync:pending', payload: { id: 42 }, timestamp: Date.now() }); + * + * // Later, flush all pending events + * const result = queue.flush(); + * console.log(`Delivered: ${result.delivered}, Failed: ${result.failed}`); + * ``` + */ +@Injectable() +export class EventQueueService { + private readonly storage = inject(StorageService); + private readonly eventBus = inject(EventBusService); + private readonly config = inject(EVENT_BUS_CONFIG, { optional: true }); + + private readonly maxQueueSize = this.config?.maxQueueSize ?? 1000; + + private readonly _status = signal('idle'); + private readonly _pending = signal(this._loadQueue().length); + + /** + * Current queue status as a reactive Signal. + */ + status(): Signal { + return this._status.asReadonly(); + } + + /** + * Number of pending events as a reactive Signal. + */ + pending(): Signal { + return this._pending.asReadonly(); + } + + /** + * Add an event to the persistent queue. + * + * If the queue exceeds `maxQueueSize`, the oldest event is dropped. + */ + enqueue(event: DomainEvent): void { + const queue = this._loadQueue(); + queue.push(event); + + // Enforce max queue size — drop oldest + while (queue.length > this.maxQueueSize) { + queue.shift(); + } + + this._saveQueue(queue); + this._pending.set(queue.length); + } + + /** + * Flush all pending events through the `EventBusService`. + * + * Each event is published individually. Events that fail to publish + * (due to errors in subscribers) are counted as failed but do NOT + * block other events from being delivered. + * + * The queue is cleared after flush regardless of failures. + * + * @returns Result with counts of delivered and failed events. + */ + flush(): FlushResult { + const queue = this._loadQueue(); + if (queue.length === 0) { + return { delivered: 0, failed: 0 }; + } + + this._status.set('flushing'); + + let delivered = 0; + let failed = 0; + + for (const event of queue) { + try { + this.eventBus.publish(event); + delivered++; + } catch { + failed++; + } + } + + // Clear the queue + this._saveQueue([]); + this._pending.set(0); + this._status.set(failed > 0 ? 'error' : 'idle'); + + return { delivered, failed }; + } + + // ------------------------------------------------------------------------- + // Private helpers + // ------------------------------------------------------------------------- + + private _loadQueue(): DomainEvent[] { + return this.storage.get(QUEUE_STORAGE_KEY) ?? []; + } + + private _saveQueue(queue: DomainEvent[]): void { + this.storage.set(QUEUE_STORAGE_KEY, queue); + } +} diff --git a/packages/core/src/lib/event-bus/index.ts b/packages/core/src/lib/event-bus/index.ts new file mode 100644 index 0000000..f80a445 --- /dev/null +++ b/packages/core/src/lib/event-bus/index.ts @@ -0,0 +1,19 @@ +// Types +export type { + DomainEvent, + EventHandler, + EventFilter, + EventBusOptions, + EventBusConfig, + FlushResult, +} from './event-bus.types'; +export { EVENT_BUS_CONFIG } from './event-bus.types'; +export type { QueueStatus } from './event-bus.types'; + +// Provider factory +export { provideEventBus } from './provide-event-bus'; + +// Services +export { EventBusService } from './event-bus.service'; +export { EventQueueService } from './event-queue.service'; +export { EventHistoryService } from './event-history.service'; diff --git a/packages/core/src/lib/event-bus/provide-event-bus.spec.ts b/packages/core/src/lib/event-bus/provide-event-bus.spec.ts new file mode 100644 index 0000000..2f9c8de --- /dev/null +++ b/packages/core/src/lib/event-bus/provide-event-bus.spec.ts @@ -0,0 +1,88 @@ +import { TestBed } from '@angular/core/testing'; +import { provideEventBus } from './provide-event-bus'; +import { EventBusService } from './event-bus.service'; +import { EventQueueService } from './event-queue.service'; +import { EventHistoryService } from './event-history.service'; +import { EVENT_BUS_CONFIG, EventBusConfig } from './event-bus.types'; +import { StorageService } from '../storage/storage.service'; + +// --------------------------------------------------------------------------- +// Mock StorageService (EventQueueService depends on it) +// --------------------------------------------------------------------------- + +function createMockStorage() { + const store = new Map(); + return { + get: vi.fn((key: string): T | null => (store.get(key) as T) ?? null), + set: vi.fn((key: string, value: T): void => { store.set(key, value); }), + remove: vi.fn((key: string): void => { store.delete(key); }), + }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('provideEventBus', () => { + function setup(options?: Parameters[0]) { + TestBed.configureTestingModule({ + providers: [ + provideEventBus(options), + { provide: StorageService, useValue: createMockStorage() }, + ], + }); + } + + it('should register EventBusService', () => { + setup(); + expect(TestBed.inject(EventBusService)).toBeDefined(); + }); + + it('should register EventQueueService', () => { + setup(); + expect(TestBed.inject(EventQueueService)).toBeDefined(); + }); + + it('should register EventHistoryService', () => { + setup(); + expect(TestBed.inject(EventHistoryService)).toBeDefined(); + }); + + it('should provide default config when no options given', () => { + setup(); + const config = TestBed.inject(EVENT_BUS_CONFIG); + expect(config).toEqual({ + enableQueue: true, + maxQueueSize: 1000, + enableHistory: false, + historySize: 100, + }); + }); + + it('should merge user options with defaults', () => { + setup({ enableHistory: true, historySize: 50 }); + const config = TestBed.inject(EVENT_BUS_CONFIG); + expect(config).toEqual({ + enableQueue: true, + maxQueueSize: 1000, + enableHistory: true, + historySize: 50, + }); + }); + + it('should allow overriding all options', () => { + setup({ + enableQueue: false, + maxQueueSize: 500, + enableHistory: true, + historySize: 200, + }); + const config = TestBed.inject(EVENT_BUS_CONFIG); + expect(config).toEqual({ + enableQueue: false, + maxQueueSize: 500, + enableHistory: true, + historySize: 200, + }); + }); +}); diff --git a/packages/core/src/lib/event-bus/provide-event-bus.ts b/packages/core/src/lib/event-bus/provide-event-bus.ts new file mode 100644 index 0000000..4aad987 --- /dev/null +++ b/packages/core/src/lib/event-bus/provide-event-bus.ts @@ -0,0 +1,63 @@ +import { EnvironmentProviders, makeEnvironmentProviders } from '@angular/core'; + +import { + EVENT_BUS_CONFIG, + EventBusConfig, + EventBusOptions, +} from './event-bus.types'; +import { EventBusService } from './event-bus.service'; +import { EventQueueService } from './event-queue.service'; +import { EventHistoryService } from './event-history.service'; + +/** Default configuration values. */ +const DEFAULTS: EventBusConfig = { + enableQueue: true, + maxQueueSize: 1000, + enableHistory: false, + historySize: 100, +}; + +/** + * Configure the EventBus module. + * + * Registers `EventBusService`, `EventQueueService`, and + * `EventHistoryService` with the resolved configuration. + * + * This is a **CORE** module — call `provideEventBus()` in your + * `appConfig` providers to enable internal event-driven communication. + * + * @example + * ```ts + * import { provideEventBus } from '@fireflyframework/core'; + * + * export const appConfig: ApplicationConfig = { + * providers: [ + * provideEventBus({ + * enableQueue: true, + * enableHistory: true, + * historySize: 200, + * }), + * ], + * }; + * ``` + * + * @param options - Optional configuration. Defaults: enableQueue=true, maxQueueSize=1000, enableHistory=false, historySize=100 + * @returns EnvironmentProviders to register in the application config + */ +export function provideEventBus( + options?: EventBusOptions, +): EnvironmentProviders { + const config: EventBusConfig = { + enableQueue: options?.enableQueue ?? DEFAULTS.enableQueue, + maxQueueSize: options?.maxQueueSize ?? DEFAULTS.maxQueueSize, + enableHistory: options?.enableHistory ?? DEFAULTS.enableHistory, + historySize: options?.historySize ?? DEFAULTS.historySize, + }; + + return makeEnvironmentProviders([ + { provide: EVENT_BUS_CONFIG, useValue: config }, + EventBusService, + EventQueueService, + EventHistoryService, + ]); +}