From 881a7a3c640e6f8ab6b4c5c148c9ea6e3a6f482c Mon Sep 17 00:00:00 2001 From: Maria Garcia Luque Date: Sat, 23 May 2026 00:59:32 +0200 Subject: [PATCH 1/6] [STEP-2.14-001] Add event-bus types, interfaces and config token Define the public contract for the EventBus module (CORE #16): DomainEvent, EventHandler, EventFilter, QueueStatus, FlushResult, EventBusOptions, EventBusConfig, EVENT_BUS_CONFIG. --- .../core/src/lib/event-bus/event-bus.types.ts | 166 ++++++++++++++++++ 1 file changed, 166 insertions(+) create mode 100644 packages/core/src/lib/event-bus/event-bus.types.ts diff --git a/packages/core/src/lib/event-bus/event-bus.types.ts b/packages/core/src/lib/event-bus/event-bus.types.ts new file mode 100644 index 0000000..c78b8dd --- /dev/null +++ b/packages/core/src/lib/event-bus/event-bus.types.ts @@ -0,0 +1,166 @@ +import { InjectionToken } from '@angular/core'; + +// --------------------------------------------------------------------------- +// Domain event +// --------------------------------------------------------------------------- + +/** + * A typed domain event for internal pub/sub communication. + * + * Represents a message published through the `EventBusService` to + * coordinate features and modules without direct coupling. + * + * @typeParam T - Type of the event payload. Defaults to `unknown`. + * + * @example + * ```typescript + * const event: DomainEvent<{ userId: string }> = { + * type: 'auth:login', + * payload: { userId: '123' }, + * timestamp: Date.now(), + * source: 'auth-module', + * }; + * ``` + */ +export interface DomainEvent { + /** Event type identifier (colon-separated namespace, e.g. `'auth:login'`). */ + readonly type: string; + + /** Typed payload attached to the event. */ + readonly payload: T; + + /** Unix timestamp (ms) of when the event was created. */ + readonly timestamp: number; + + /** Optional correlation ID for tracing related events. */ + readonly correlationId?: string; + + /** Module or feature that originated the event. */ + readonly source?: string; + + /** Arbitrary metadata attached to the event. */ + readonly metadata?: Record; +} + +// --------------------------------------------------------------------------- +// Event handler +// --------------------------------------------------------------------------- + +/** + * Callback invoked when a matching event is received. + * + * @typeParam T - Type of the event payload. + */ +export type EventHandler = (event: DomainEvent) => void; + +// --------------------------------------------------------------------------- +// Event filter (for history queries) +// --------------------------------------------------------------------------- + +/** + * Filter criteria for querying event history. + * + * All fields are optional — when omitted, no filtering is applied + * for that criterion. + */ +export interface EventFilter { + /** Filter by exact event type. */ + readonly type?: string; + + /** Filter by event source. */ + readonly source?: string; + + /** Return only events after this Unix timestamp (ms). */ + readonly since?: number; +} + +// --------------------------------------------------------------------------- +// Queue status +// --------------------------------------------------------------------------- + +/** + * Status of the persistent event queue. + */ +export type QueueStatus = 'idle' | 'flushing' | 'error'; + +/** + * Result returned by `EventQueueService.flush()`. + */ +export interface FlushResult { + /** Number of events successfully delivered. */ + readonly delivered: number; + + /** Number of events that failed delivery. */ + readonly failed: number; +} + +// --------------------------------------------------------------------------- +// Module configuration (user-facing) +// --------------------------------------------------------------------------- + +/** + * Configuration input for `provideEventBus()`. + * + * All fields are optional — defaults provide a sensible configuration + * for most products. + * + * @example + * ```typescript + * provideEventBus({ + * enableQueue: true, + * maxQueueSize: 500, + * enableHistory: true, + * historySize: 200, + * }) + * ``` + */ +export interface EventBusOptions { + /** Enable the persistent event queue. Defaults to `true`. */ + readonly enableQueue?: boolean; + + /** Maximum number of events in the queue before oldest are dropped. Defaults to `1000`. */ + readonly maxQueueSize?: number; + + /** Enable the in-memory event history ring buffer. Defaults to `false`. */ + readonly enableHistory?: boolean; + + /** Maximum number of events kept in history. Defaults to `100`. */ + readonly historySize?: number; +} + +// --------------------------------------------------------------------------- +// Resolved config (internal) +// --------------------------------------------------------------------------- + +/** + * Resolved configuration stored in the DI container. + * + * Built from `EventBusOptions` by `provideEventBus()`. + * Services inject this via `EVENT_BUS_CONFIG`. + */ +export interface EventBusConfig { + /** Whether the persistent event queue is enabled. */ + readonly enableQueue: boolean; + + /** Maximum queue size. */ + readonly maxQueueSize: number; + + /** Whether the event history ring buffer is enabled. */ + readonly enableHistory: boolean; + + /** Maximum history size. */ + readonly historySize: number; +} + +// --------------------------------------------------------------------------- +// Injection token +// --------------------------------------------------------------------------- + +/** + * Injection token for the event-bus module configuration. + * + * Provided by `provideEventBus()`. Services inject this to read config. + */ +export const EVENT_BUS_CONFIG = new InjectionToken( + 'EVENT_BUS_CONFIG', +); From 6ded49d31b35ad3e4a1155fd987497bf9b7e08d3 Mon Sep 17 00:00:00 2001 From: Maria Garcia Luque Date: Sat, 23 May 2026 01:06:43 +0200 Subject: [PATCH 2/6] [STEP-2.14-002] Add EventBusService with pub/sub, pattern matching and signal support Implement central event bus service with publish, subscribe, subscribePattern (wildcard/RegExp), stream (Observable), signal (via runInInjectionContext), history (stub) and destroy methods. 19 unit tests covering all API surface. --- .../lib/event-bus/event-bus.service.spec.ts | 267 ++++++++++++++++++ .../src/lib/event-bus/event-bus.service.ts | 138 +++++++++ 2 files changed, 405 insertions(+) create mode 100644 packages/core/src/lib/event-bus/event-bus.service.spec.ts create mode 100644 packages/core/src/lib/event-bus/event-bus.service.ts diff --git a/packages/core/src/lib/event-bus/event-bus.service.spec.ts b/packages/core/src/lib/event-bus/event-bus.service.spec.ts new file mode 100644 index 0000000..8ad6d99 --- /dev/null +++ b/packages/core/src/lib/event-bus/event-bus.service.spec.ts @@ -0,0 +1,267 @@ +import { TestBed } from '@angular/core/testing'; +import { EventBusService } from './event-bus.service'; +import { DomainEvent } from './event-bus.types'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function createEvent( + overrides?: Partial>, +): DomainEvent { + return { + type: 'test:event', + payload: undefined as T, + timestamp: Date.now(), + ...overrides, + } as DomainEvent; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('EventBusService', () => { + function setup() { + TestBed.configureTestingModule({ + providers: [EventBusService], + }); + return TestBed.inject(EventBusService); + } + + // ----------------------------------------------------------------------- + // publish + subscribe + // ----------------------------------------------------------------------- + describe('publish / subscribe', () => { + it('should deliver events to matching subscribers', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribe('user:created', (e) => received.push(e)); + + const event = createEvent({ type: 'user:created', payload: { id: 1 } }); + bus.publish(event); + + expect(received).toHaveLength(1); + expect(received[0]).toEqual(event); + }); + + it('should not deliver events to non-matching subscribers', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribe('user:created', (e) => received.push(e)); + + bus.publish(createEvent({ type: 'user:deleted' })); + + expect(received).toHaveLength(0); + }); + + it('should deliver to multiple subscribers of the same type', () => { + const bus = setup(); + const r1: DomainEvent[] = []; + const r2: DomainEvent[] = []; + bus.subscribe('order:placed', (e) => r1.push(e)); + bus.subscribe('order:placed', (e) => r2.push(e)); + + bus.publish(createEvent({ type: 'order:placed' })); + + expect(r1).toHaveLength(1); + expect(r2).toHaveLength(1); + }); + + it('should unsubscribe correctly', () => { + const bus = setup(); + const received: DomainEvent[] = []; + const sub = bus.subscribe('ping', (e) => received.push(e)); + + bus.publish(createEvent({ type: 'ping' })); + expect(received).toHaveLength(1); + + sub.unsubscribe(); + bus.publish(createEvent({ type: 'ping' })); + expect(received).toHaveLength(1); + }); + + it('should auto-set timestamp if not provided', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribe('ts:test', (e) => received.push(e)); + + const before = Date.now(); + bus.publish({ type: 'ts:test', payload: null, timestamp: 0 }); + // timestamp 0 is falsy, so it should be auto-set + expect(received[0].timestamp).toBeGreaterThanOrEqual(before); + }); + + it('should preserve explicit timestamp', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribe('ts:explicit', (e) => received.push(e)); + + bus.publish(createEvent({ type: 'ts:explicit', timestamp: 42 })); + expect(received[0].timestamp).toBe(42); + }); + }); + + // ----------------------------------------------------------------------- + // subscribePattern + // ----------------------------------------------------------------------- + describe('subscribePattern', () => { + it('should match wildcard patterns', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribePattern('auth:*', (e) => received.push(e)); + + bus.publish(createEvent({ type: 'auth:login' })); + bus.publish(createEvent({ type: 'auth:logout' })); + bus.publish(createEvent({ type: 'order:placed' })); + + expect(received).toHaveLength(2); + expect(received[0].type).toBe('auth:login'); + expect(received[1].type).toBe('auth:logout'); + }); + + it('should match RegExp patterns', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribePattern(/^error:/, (e) => received.push(e)); + + bus.publish(createEvent({ type: 'error:network' })); + bus.publish(createEvent({ type: 'error:timeout' })); + bus.publish(createEvent({ type: 'info:deploy' })); + + expect(received).toHaveLength(2); + }); + + it('should support multiple wildcards in pattern', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribePattern('*:created', (e) => received.push(e)); + + bus.publish(createEvent({ type: 'user:created' })); + bus.publish(createEvent({ type: 'order:created' })); + bus.publish(createEvent({ type: 'order:deleted' })); + + expect(received).toHaveLength(2); + }); + + it('should unsubscribe correctly', () => { + const bus = setup(); + const received: DomainEvent[] = []; + const sub = bus.subscribePattern('test:*', (e) => received.push(e)); + + bus.publish(createEvent({ type: 'test:one' })); + expect(received).toHaveLength(1); + + sub.unsubscribe(); + bus.publish(createEvent({ type: 'test:two' })); + expect(received).toHaveLength(1); + }); + }); + + // ----------------------------------------------------------------------- + // stream + // ----------------------------------------------------------------------- + describe('stream', () => { + it('should return an Observable filtered by event type', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.stream('data:updated').subscribe((e) => received.push(e)); + + bus.publish(createEvent({ type: 'data:updated', payload: { v: 1 } })); + bus.publish(createEvent({ type: 'data:deleted' })); + bus.publish(createEvent({ type: 'data:updated', payload: { v: 2 } })); + + expect(received).toHaveLength(2); + expect((received[0].payload as { v: number }).v).toBe(1); + expect((received[1].payload as { v: number }).v).toBe(2); + }); + + it('should allow RxJS operators on the returned Observable', () => { + const bus = setup(); + const types: string[] = []; + bus + .stream('pipe:test') + .subscribe((e) => types.push(e.type)); + + bus.publish(createEvent({ type: 'pipe:test' })); + expect(types).toEqual(['pipe:test']); + }); + }); + + // ----------------------------------------------------------------------- + // signal + // ----------------------------------------------------------------------- + describe('signal', () => { + it('should return a Signal that starts as undefined', () => { + const bus = setup(); + const sig = bus.signal('sig:test'); + expect(sig()).toBeUndefined(); + }); + + it('should update when a matching event is published', () => { + const bus = setup(); + const sig = bus.signal<{ n: number }>('sig:update'); + + bus.publish(createEvent({ type: 'sig:update', payload: { n: 42 } })); + + expect(sig()).toBeDefined(); + expect(sig()!.payload.n).toBe(42); + }); + + it('should always reflect the latest event', () => { + const bus = setup(); + const sig = bus.signal('sig:latest'); + + bus.publish(createEvent({ type: 'sig:latest', payload: 1 })); + bus.publish(createEvent({ type: 'sig:latest', payload: 2 })); + bus.publish(createEvent({ type: 'sig:latest', payload: 3 })); + + expect(sig()!.payload).toBe(3); + }); + }); + + // ----------------------------------------------------------------------- + // history (stub — delegated to EventHistoryService) + // ----------------------------------------------------------------------- + describe('history', () => { + it('should return an empty array (stub for EventHistoryService)', () => { + const bus = setup(); + bus.publish(createEvent({ type: 'any' })); + expect(bus.history()).toEqual([]); + }); + }); + + // ----------------------------------------------------------------------- + // destroy + // ----------------------------------------------------------------------- + describe('destroy', () => { + it('should complete the internal Subject', () => { + const bus = setup(); + let completed = false; + bus.stream('test').subscribe({ complete: () => (completed = true) }); + + bus.destroy(); + + expect(completed).toBe(true); + }); + + it('should unsubscribe all tracked subscriptions', () => { + const bus = setup(); + const received: DomainEvent[] = []; + bus.subscribe('destroy:test', (e) => received.push(e)); + bus.subscribePattern('destroy:*', (e) => received.push(e)); + + bus.destroy(); + + // Publishing after destroy should not deliver (Subject completed) + // But even if it could, subscriptions are unsubscribed + expect(received).toHaveLength(0); + }); + + it('should not throw when called multiple times', () => { + const bus = setup(); + bus.destroy(); + expect(() => bus.destroy()).not.toThrow(); + }); + }); +}); diff --git a/packages/core/src/lib/event-bus/event-bus.service.ts b/packages/core/src/lib/event-bus/event-bus.service.ts new file mode 100644 index 0000000..6ad3f0a --- /dev/null +++ b/packages/core/src/lib/event-bus/event-bus.service.ts @@ -0,0 +1,138 @@ +import { inject, Injectable, Injector, runInInjectionContext, Signal } from '@angular/core'; +import { toSignal } from '@angular/core/rxjs-interop'; +import { Observable, Subject, Subscription, filter } from 'rxjs'; + +import { DomainEvent, EventHandler } from './event-bus.types'; + +/** + * Central pub/sub service for internal event-driven communication. + * + * Allows features and modules to communicate without direct coupling. + * Events are dispatched synchronously to all matching subscribers. + * + * Supports exact type matching via `subscribe()` and pattern matching + * (wildcards / RegExp) via `subscribePattern()`. + * + * @example + * ```typescript + * const bus = inject(EventBusService); + * + * // Subscribe to a specific event type + * const sub = bus.subscribe<{ userId: string }>('auth:login', (event) => { + * console.log('User logged in:', event.payload.userId); + * }); + * + * // Publish an event + * bus.publish({ type: 'auth:login', payload: { userId: '123' }, timestamp: Date.now() }); + * + * // Clean up + * sub.unsubscribe(); + * ``` + */ +@Injectable() +export class EventBusService { + private readonly injector = inject(Injector); + private readonly subject = new Subject(); + private readonly subscriptions: Subscription[] = []; + + /** + * Publish an event to all matching subscribers. + * + * If `timestamp` is not set, it will be added automatically. + */ + publish(event: DomainEvent): void { + const stamped = event.timestamp + ? event + : { ...event, timestamp: Date.now() }; + this.subject.next(stamped as DomainEvent); + } + + /** + * Subscribe to events of an exact type. + * + * @returns A `Subscription` that can be used to unsubscribe. + */ + subscribe( + eventType: string, + handler: EventHandler, + ): Subscription { + const sub = this.subject + .pipe(filter((e) => e.type === eventType)) + .subscribe((e) => handler(e as DomainEvent)); + this.subscriptions.push(sub); + return sub; + } + + /** + * Subscribe to events matching a pattern. + * + * Accepts either: + * - A string with `*` wildcard (e.g. `'auth:*'` matches `'auth:login'`, `'auth:logout'`) + * - A `RegExp` for full control + * + * @returns A `Subscription` that can be used to unsubscribe. + */ + subscribePattern( + pattern: string | RegExp, + handler: EventHandler, + ): Subscription { + const regex = + pattern instanceof RegExp + ? pattern + : new RegExp(`^${pattern.replace(/\*/g, '.*')}$`); + + const sub = this.subject + .pipe(filter((e) => regex.test(e.type))) + .subscribe((e) => handler(e as DomainEvent)); + this.subscriptions.push(sub); + return sub; + } + + /** + * Get an Observable stream of events for a specific type. + * + * Useful for integrating with RxJS pipelines. + */ + stream(eventType: string): Observable> { + return this.subject.pipe( + filter((e) => e.type === eventType), + ) as Observable>; + } + + /** + * Get a Signal that holds the latest event of a specific type. + * + * Starts as `undefined` until the first matching event arrives. + * Uses `toSignal()` from `@angular/core/rxjs-interop` with the + * service's own injector to avoid requiring a component context. + */ + signal(eventType: string): Signal | undefined> { + return runInInjectionContext(this.injector, () => + toSignal(this.stream(eventType)), + ); + } + + /** + * Returns a snapshot of all events that have passed through the bus + * during the current session (since last `destroy()`). + * + * Note: actual history storage is managed by `EventHistoryService`. + * This method returns an empty array — use `EventHistoryService.query()` + * for persistent history queries. + */ + history(): ReadonlyArray { + return []; + } + + /** + * Tear down the bus: complete the internal Subject and unsubscribe + * all tracked subscriptions. + * + * After calling `destroy()`, no further events can be published. + */ + destroy(): void { + this.subscriptions.forEach((s) => s.unsubscribe()); + this.subscriptions.length = 0; + this.subject.complete(); + } +} From 2f9890f6bd5da1931da5bcd51d64d45ad4d0b696 Mon Sep 17 00:00:00 2001 From: Maria Garcia Luque Date: Sat, 23 May 2026 01:10:08 +0200 Subject: [PATCH 3/6] [STEP-2.14-003] Add EventQueueService with persistent queue via StorageService Implement event queue with enqueue (bounded by maxQueueSize), flush (delivers all pending via EventBusService), status and pending as reactive Signals. Persists to localStorage via StorageService. 14 unit tests. --- .../lib/event-bus/event-queue.service.spec.ts | 220 ++++++++++++++++++ .../src/lib/event-bus/event-queue.service.ts | 131 +++++++++++ 2 files changed, 351 insertions(+) create mode 100644 packages/core/src/lib/event-bus/event-queue.service.spec.ts create mode 100644 packages/core/src/lib/event-bus/event-queue.service.ts diff --git a/packages/core/src/lib/event-bus/event-queue.service.spec.ts b/packages/core/src/lib/event-bus/event-queue.service.spec.ts new file mode 100644 index 0000000..7f2d4dd --- /dev/null +++ b/packages/core/src/lib/event-bus/event-queue.service.spec.ts @@ -0,0 +1,220 @@ +import { TestBed } from '@angular/core/testing'; +import { EventQueueService } from './event-queue.service'; +import { EventBusService } from './event-bus.service'; +import { DomainEvent, EVENT_BUS_CONFIG, EventBusConfig } from './event-bus.types'; +import { StorageService } from '../storage/storage.service'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function createEvent(overrides?: Partial): DomainEvent { + return { + type: 'queue:test', + payload: null, + timestamp: Date.now(), + ...overrides, + }; +} + +/** + * Minimal StorageService mock using an in-memory Map. + */ +function createMockStorage() { + const store = new Map(); + return { + get: vi.fn((key: string): T | null => (store.get(key) as T) ?? null), + set: vi.fn((key: string, value: T): void => { store.set(key, value); }), + remove: vi.fn((key: string): void => { store.delete(key); }), + _store: store, + }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('EventQueueService', () => { + function setup(config?: Partial) { + const mockStorage = createMockStorage(); + const publishSpy = vi.fn(); + + TestBed.configureTestingModule({ + providers: [ + EventQueueService, + EventBusService, + { provide: StorageService, useValue: mockStorage }, + ...(config + ? [{ provide: EVENT_BUS_CONFIG, useValue: { enableQueue: true, maxQueueSize: 1000, enableHistory: false, historySize: 100, ...config } }] + : []), + ], + }); + + const queue = TestBed.inject(EventQueueService); + const bus = TestBed.inject(EventBusService); + + // Spy on publish after injection + vi.spyOn(bus, 'publish').mockImplementation(publishSpy); + + return { queue, bus, mockStorage, publishSpy }; + } + + // ----------------------------------------------------------------------- + // enqueue + // ----------------------------------------------------------------------- + describe('enqueue', () => { + it('should add an event to the queue', () => { + const { queue, mockStorage } = setup(); + const event = createEvent(); + queue.enqueue(event); + + expect(mockStorage.set).toHaveBeenCalled(); + const savedQueue = mockStorage.set.mock.calls[0][1] as DomainEvent[]; + expect(savedQueue).toHaveLength(1); + expect(savedQueue[0]).toEqual(event); + }); + + it('should update pending count', () => { + const { queue } = setup(); + expect(queue.pending()()).toBe(0); + + queue.enqueue(createEvent()); + expect(queue.pending()()).toBe(1); + + queue.enqueue(createEvent()); + expect(queue.pending()()).toBe(2); + }); + + it('should drop oldest events when maxQueueSize is exceeded', () => { + const { queue } = setup({ maxQueueSize: 3 }); + + queue.enqueue(createEvent({ type: 'e1' })); + queue.enqueue(createEvent({ type: 'e2' })); + queue.enqueue(createEvent({ type: 'e3' })); + queue.enqueue(createEvent({ type: 'e4' })); + + expect(queue.pending()()).toBe(3); + + // Verify the last save has e2, e3, e4 (e1 dropped) + const lastCall = (queue as any).storage.set.mock.calls.at(-1); + const savedQueue = lastCall[1] as DomainEvent[]; + expect(savedQueue.map((e: DomainEvent) => e.type)).toEqual(['e2', 'e3', 'e4']); + }); + + it('should persist events via StorageService', () => { + const { queue, mockStorage } = setup(); + queue.enqueue(createEvent()); + + expect(mockStorage.set).toHaveBeenCalledWith( + 'event-bus.queue', + expect.any(Array), + ); + }); + }); + + // ----------------------------------------------------------------------- + // flush + // ----------------------------------------------------------------------- + describe('flush', () => { + it('should publish all queued events through EventBusService', () => { + const { queue, publishSpy } = setup(); + + queue.enqueue(createEvent({ type: 'f1' })); + queue.enqueue(createEvent({ type: 'f2' })); + + const result = queue.flush(); + + expect(publishSpy).toHaveBeenCalledTimes(2); + expect(result.delivered).toBe(2); + expect(result.failed).toBe(0); + }); + + it('should clear the queue after flush', () => { + const { queue } = setup(); + queue.enqueue(createEvent()); + + queue.flush(); + + expect(queue.pending()()).toBe(0); + }); + + it('should return { delivered: 0, failed: 0 } for empty queue', () => { + const { queue, publishSpy } = setup(); + const result = queue.flush(); + + expect(result).toEqual({ delivered: 0, failed: 0 }); + expect(publishSpy).not.toHaveBeenCalled(); + }); + + it('should count failed events when publish throws', () => { + const { queue, publishSpy } = setup(); + + queue.enqueue(createEvent({ type: 'ok' })); + queue.enqueue(createEvent({ type: 'fail' })); + queue.enqueue(createEvent({ type: 'ok2' })); + + publishSpy.mockImplementation((event: DomainEvent) => { + if (event.type === 'fail') throw new Error('subscriber error'); + }); + + const result = queue.flush(); + + expect(result.delivered).toBe(2); + expect(result.failed).toBe(1); + }); + + it('should set status to flushing during flush', () => { + const { queue, publishSpy } = setup(); + queue.enqueue(createEvent()); + + let statusDuringFlush: string | undefined; + publishSpy.mockImplementation(() => { + statusDuringFlush = queue.status()(); + }); + + queue.flush(); + + expect(statusDuringFlush).toBe('flushing'); + }); + + it('should set status to idle after successful flush', () => { + const { queue } = setup(); + queue.enqueue(createEvent()); + queue.flush(); + + expect(queue.status()()).toBe('idle'); + }); + + it('should set status to error if any event fails', () => { + const { queue, publishSpy } = setup(); + queue.enqueue(createEvent()); + + publishSpy.mockImplementation(() => { throw new Error('fail'); }); + queue.flush(); + + expect(queue.status()()).toBe('error'); + }); + }); + + // ----------------------------------------------------------------------- + // status / pending + // ----------------------------------------------------------------------- + describe('status / pending', () => { + it('should start with idle status', () => { + const { queue } = setup(); + expect(queue.status()()).toBe('idle'); + }); + + it('should start with 0 pending', () => { + const { queue } = setup(); + expect(queue.pending()()).toBe(0); + }); + + it('should return readonly signals', () => { + const { queue } = setup(); + // These are readonly — no set method exposed + expect(typeof queue.status()).toBe('function'); + expect(typeof queue.pending()).toBe('function'); + }); + }); +}); diff --git a/packages/core/src/lib/event-bus/event-queue.service.ts b/packages/core/src/lib/event-bus/event-queue.service.ts new file mode 100644 index 0000000..ee11ddd --- /dev/null +++ b/packages/core/src/lib/event-bus/event-queue.service.ts @@ -0,0 +1,131 @@ +import { inject, Injectable, signal, Signal } from '@angular/core'; + +import { StorageService } from '../storage/storage.service'; +import { EventBusService } from './event-bus.service'; +import { + DomainEvent, + EventBusConfig, + EVENT_BUS_CONFIG, + FlushResult, + QueueStatus, +} from './event-bus.types'; + +/** Storage key for the persisted queue. */ +const QUEUE_STORAGE_KEY = 'event-bus.queue'; + +/** + * Persistent event queue that buffers events for later delivery. + * + * Events are persisted via `StorageService` (localStorage with + * in-memory fallback). When `flush()` is called, all pending events + * are published through the `EventBusService` and the queue is cleared. + * + * Queue size is bounded by `maxQueueSize` from `EventBusConfig` — + * when the limit is reached, the oldest event is dropped. + * + * @example + * ```typescript + * const queue = inject(EventQueueService); + * + * // Enqueue when offline or deferred + * queue.enqueue({ type: 'sync:pending', payload: { id: 42 }, timestamp: Date.now() }); + * + * // Later, flush all pending events + * const result = queue.flush(); + * console.log(`Delivered: ${result.delivered}, Failed: ${result.failed}`); + * ``` + */ +@Injectable() +export class EventQueueService { + private readonly storage = inject(StorageService); + private readonly eventBus = inject(EventBusService); + private readonly config = inject(EVENT_BUS_CONFIG, { optional: true }); + + private readonly maxQueueSize = this.config?.maxQueueSize ?? 1000; + + private readonly _status = signal('idle'); + private readonly _pending = signal(this._loadQueue().length); + + /** + * Current queue status as a reactive Signal. + */ + status(): Signal { + return this._status.asReadonly(); + } + + /** + * Number of pending events as a reactive Signal. + */ + pending(): Signal { + return this._pending.asReadonly(); + } + + /** + * Add an event to the persistent queue. + * + * If the queue exceeds `maxQueueSize`, the oldest event is dropped. + */ + enqueue(event: DomainEvent): void { + const queue = this._loadQueue(); + queue.push(event); + + // Enforce max queue size — drop oldest + while (queue.length > this.maxQueueSize) { + queue.shift(); + } + + this._saveQueue(queue); + this._pending.set(queue.length); + } + + /** + * Flush all pending events through the `EventBusService`. + * + * Each event is published individually. Events that fail to publish + * (due to errors in subscribers) are counted as failed but do NOT + * block other events from being delivered. + * + * The queue is cleared after flush regardless of failures. + * + * @returns Result with counts of delivered and failed events. + */ + flush(): FlushResult { + const queue = this._loadQueue(); + if (queue.length === 0) { + return { delivered: 0, failed: 0 }; + } + + this._status.set('flushing'); + + let delivered = 0; + let failed = 0; + + for (const event of queue) { + try { + this.eventBus.publish(event); + delivered++; + } catch { + failed++; + } + } + + // Clear the queue + this._saveQueue([]); + this._pending.set(0); + this._status.set(failed > 0 ? 'error' : 'idle'); + + return { delivered, failed }; + } + + // ------------------------------------------------------------------------- + // Private helpers + // ------------------------------------------------------------------------- + + private _loadQueue(): DomainEvent[] { + return this.storage.get(QUEUE_STORAGE_KEY) ?? []; + } + + private _saveQueue(queue: DomainEvent[]): void { + this.storage.set(QUEUE_STORAGE_KEY, queue); + } +} From 86b65544f7a611b7d815c03a3e6b93cb3e7b0898 Mon Sep 17 00:00:00 2001 From: Maria Garcia Luque Date: Sat, 23 May 2026 01:13:05 +0200 Subject: [PATCH 4/6] [STEP-2.14-004] Add EventHistoryService with in-memory ring buffer Implement event history service with add (bounded by historySize), query (filter by type, source, since with AND logic), clear and size methods. Default capacity 100 events. 15 unit tests. --- .../event-bus/event-history.service.spec.ts | 193 ++++++++++++++++++ .../lib/event-bus/event-history.service.ts | 83 ++++++++ 2 files changed, 276 insertions(+) create mode 100644 packages/core/src/lib/event-bus/event-history.service.spec.ts create mode 100644 packages/core/src/lib/event-bus/event-history.service.ts diff --git a/packages/core/src/lib/event-bus/event-history.service.spec.ts b/packages/core/src/lib/event-bus/event-history.service.spec.ts new file mode 100644 index 0000000..7bb5f99 --- /dev/null +++ b/packages/core/src/lib/event-bus/event-history.service.spec.ts @@ -0,0 +1,193 @@ +import { TestBed } from '@angular/core/testing'; +import { EventHistoryService } from './event-history.service'; +import { DomainEvent, EVENT_BUS_CONFIG } from './event-bus.types'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function createEvent(overrides?: Partial): DomainEvent { + return { + type: 'test:event', + payload: null, + timestamp: Date.now(), + ...overrides, + }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('EventHistoryService', () => { + function setup(historySize?: number) { + TestBed.configureTestingModule({ + providers: [ + EventHistoryService, + ...(historySize != null + ? [{ + provide: EVENT_BUS_CONFIG, + useValue: { enableQueue: true, maxQueueSize: 1000, enableHistory: true, historySize }, + }] + : []), + ], + }); + return TestBed.inject(EventHistoryService); + } + + // ----------------------------------------------------------------------- + // add + // ----------------------------------------------------------------------- + describe('add', () => { + it('should add an event to the buffer', () => { + const svc = setup(); + svc.add(createEvent()); + expect(svc.size()).toBe(1); + }); + + it('should preserve events in insertion order', () => { + const svc = setup(); + svc.add(createEvent({ type: 'a' })); + svc.add(createEvent({ type: 'b' })); + svc.add(createEvent({ type: 'c' })); + + const all = svc.query(); + expect(all.map((e) => e.type)).toEqual(['a', 'b', 'c']); + }); + + it('should drop oldest events when historySize is exceeded', () => { + const svc = setup(3); + + svc.add(createEvent({ type: 'e1' })); + svc.add(createEvent({ type: 'e2' })); + svc.add(createEvent({ type: 'e3' })); + svc.add(createEvent({ type: 'e4' })); + + expect(svc.size()).toBe(3); + const types = svc.query().map((e) => e.type); + expect(types).toEqual(['e2', 'e3', 'e4']); + }); + + it('should default to historySize 100 when no config', () => { + const svc = setup(); + for (let i = 0; i < 105; i++) { + svc.add(createEvent({ type: `e${i}` })); + } + expect(svc.size()).toBe(100); + // First event should be e5 (e0-e4 dropped) + expect(svc.query()[0].type).toBe('e5'); + }); + }); + + // ----------------------------------------------------------------------- + // query + // ----------------------------------------------------------------------- + describe('query', () => { + it('should return all events when no filter is provided', () => { + const svc = setup(); + svc.add(createEvent({ type: 'a' })); + svc.add(createEvent({ type: 'b' })); + + expect(svc.query()).toHaveLength(2); + }); + + it('should filter by type', () => { + const svc = setup(); + svc.add(createEvent({ type: 'auth:login' })); + svc.add(createEvent({ type: 'auth:logout' })); + svc.add(createEvent({ type: 'order:placed' })); + + const result = svc.query({ type: 'auth:login' }); + expect(result).toHaveLength(1); + expect(result[0].type).toBe('auth:login'); + }); + + it('should filter by source', () => { + const svc = setup(); + svc.add(createEvent({ source: 'module-a' })); + svc.add(createEvent({ source: 'module-b' })); + svc.add(createEvent({ source: 'module-a' })); + + const result = svc.query({ source: 'module-a' }); + expect(result).toHaveLength(2); + }); + + it('should filter by since timestamp', () => { + const svc = setup(); + const now = Date.now(); + svc.add(createEvent({ timestamp: now - 5000 })); + svc.add(createEvent({ timestamp: now - 1000 })); + svc.add(createEvent({ timestamp: now })); + + const result = svc.query({ since: now - 2000 }); + expect(result).toHaveLength(2); + }); + + it('should combine multiple filters (AND logic)', () => { + const svc = setup(); + const now = Date.now(); + svc.add(createEvent({ type: 'auth:login', source: 'web', timestamp: now })); + svc.add(createEvent({ type: 'auth:login', source: 'mobile', timestamp: now })); + svc.add(createEvent({ type: 'auth:logout', source: 'web', timestamp: now })); + + const result = svc.query({ type: 'auth:login', source: 'web' }); + expect(result).toHaveLength(1); + expect(result[0].source).toBe('web'); + }); + + it('should return empty array when no events match', () => { + const svc = setup(); + svc.add(createEvent({ type: 'a' })); + + expect(svc.query({ type: 'nonexistent' })).toEqual([]); + }); + + it('should return a copy (not a reference to internal buffer)', () => { + const svc = setup(); + svc.add(createEvent()); + const result = svc.query(); + expect(result).not.toBe((svc as any).buffer); + }); + }); + + // ----------------------------------------------------------------------- + // clear + // ----------------------------------------------------------------------- + describe('clear', () => { + it('should remove all events from the buffer', () => { + const svc = setup(); + svc.add(createEvent()); + svc.add(createEvent()); + expect(svc.size()).toBe(2); + + svc.clear(); + expect(svc.size()).toBe(0); + expect(svc.query()).toEqual([]); + }); + + it('should not throw when buffer is already empty', () => { + const svc = setup(); + expect(() => svc.clear()).not.toThrow(); + }); + }); + + // ----------------------------------------------------------------------- + // size + // ----------------------------------------------------------------------- + describe('size', () => { + it('should return 0 for empty buffer', () => { + const svc = setup(); + expect(svc.size()).toBe(0); + }); + + it('should track the current number of events', () => { + const svc = setup(); + svc.add(createEvent()); + expect(svc.size()).toBe(1); + svc.add(createEvent()); + expect(svc.size()).toBe(2); + svc.clear(); + expect(svc.size()).toBe(0); + }); + }); +}); diff --git a/packages/core/src/lib/event-bus/event-history.service.ts b/packages/core/src/lib/event-bus/event-history.service.ts new file mode 100644 index 0000000..f3d8e8b --- /dev/null +++ b/packages/core/src/lib/event-bus/event-history.service.ts @@ -0,0 +1,83 @@ +import { inject, Injectable } from '@angular/core'; + +import { + DomainEvent, + EventBusConfig, + EVENT_BUS_CONFIG, + EventFilter, +} from './event-bus.types'; + +/** + * In-memory ring buffer that stores recent domain events for querying. + * + * When the buffer reaches `historySize` (from `EventBusConfig`), the + * oldest event is automatically dropped. + * + * @example + * ```typescript + * const history = inject(EventHistoryService); + * + * // Query all events of a specific type + * const loginEvents = history.query({ type: 'auth:login' }); + * + * // Query events from a specific source since a timestamp + * const recent = history.query({ source: 'sync', since: Date.now() - 60_000 }); + * + * // Get total stored events + * console.log('History size:', history.size()); + * ``` + */ +@Injectable() +export class EventHistoryService { + private readonly config = inject(EVENT_BUS_CONFIG, { optional: true }); + private readonly maxSize = this.config?.historySize ?? 100; + private readonly buffer: DomainEvent[] = []; + + /** + * Add an event to the history buffer. + * + * If the buffer is at capacity, the oldest event is dropped. + */ + add(event: DomainEvent): void { + this.buffer.push(event); + while (this.buffer.length > this.maxSize) { + this.buffer.shift(); + } + } + + /** + * Query events in the history buffer. + * + * All filter fields are optional — when omitted, no filtering is + * applied for that criterion. Returns events in chronological order. + * + * @param filter - Optional filter criteria (type, source, since) + * @returns Matching events as a read-only array + */ + query(filter?: EventFilter): ReadonlyArray { + if (!filter) { + return [...this.buffer]; + } + + return this.buffer.filter((event) => { + if (filter.type && event.type !== filter.type) return false; + if (filter.source && event.source !== filter.source) return false; + if (filter.since && event.timestamp < filter.since) return false; + return true; + }); + } + + /** + * Clear the entire history buffer. + */ + clear(): void { + this.buffer.length = 0; + } + + /** + * Get the number of events currently stored in the buffer. + */ + size(): number { + return this.buffer.length; + } +} From f5ea07300ab32cf4bb91b05312b837905cba8f8a Mon Sep 17 00:00:00 2001 From: Maria Garcia Luque Date: Sat, 23 May 2026 01:20:08 +0200 Subject: [PATCH 5/6] [STEP-2.14-005] Add provideEventBus() factory and barrel exports MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Create provideEventBus(options?) with makeEnvironmentProviders() - Resolve EventBusOptions → EventBusConfig with sensible defaults - Register EVENT_BUS_CONFIG, EventBusService, EventQueueService, EventHistoryService - Add event-bus/index.ts barrel with typed exports - Re-export event-bus module from packages/core/src/index.ts - 6 new tests (832 total, 0 failures) --- packages/core/src/index.ts | 1 + packages/core/src/lib/event-bus/index.ts | 19 ++++ .../lib/event-bus/provide-event-bus.spec.ts | 88 +++++++++++++++++++ .../src/lib/event-bus/provide-event-bus.ts | 63 +++++++++++++ 4 files changed, 171 insertions(+) create mode 100644 packages/core/src/lib/event-bus/index.ts create mode 100644 packages/core/src/lib/event-bus/provide-event-bus.spec.ts create mode 100644 packages/core/src/lib/event-bus/provide-event-bus.ts diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 51564c3..7ae47a7 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -15,3 +15,4 @@ export * from './lib/environment'; export * from './lib/security'; export * from './lib/files'; export * from './lib/notifications'; +export * from './lib/event-bus'; diff --git a/packages/core/src/lib/event-bus/index.ts b/packages/core/src/lib/event-bus/index.ts new file mode 100644 index 0000000..f80a445 --- /dev/null +++ b/packages/core/src/lib/event-bus/index.ts @@ -0,0 +1,19 @@ +// Types +export type { + DomainEvent, + EventHandler, + EventFilter, + EventBusOptions, + EventBusConfig, + FlushResult, +} from './event-bus.types'; +export { EVENT_BUS_CONFIG } from './event-bus.types'; +export type { QueueStatus } from './event-bus.types'; + +// Provider factory +export { provideEventBus } from './provide-event-bus'; + +// Services +export { EventBusService } from './event-bus.service'; +export { EventQueueService } from './event-queue.service'; +export { EventHistoryService } from './event-history.service'; diff --git a/packages/core/src/lib/event-bus/provide-event-bus.spec.ts b/packages/core/src/lib/event-bus/provide-event-bus.spec.ts new file mode 100644 index 0000000..2f9c8de --- /dev/null +++ b/packages/core/src/lib/event-bus/provide-event-bus.spec.ts @@ -0,0 +1,88 @@ +import { TestBed } from '@angular/core/testing'; +import { provideEventBus } from './provide-event-bus'; +import { EventBusService } from './event-bus.service'; +import { EventQueueService } from './event-queue.service'; +import { EventHistoryService } from './event-history.service'; +import { EVENT_BUS_CONFIG, EventBusConfig } from './event-bus.types'; +import { StorageService } from '../storage/storage.service'; + +// --------------------------------------------------------------------------- +// Mock StorageService (EventQueueService depends on it) +// --------------------------------------------------------------------------- + +function createMockStorage() { + const store = new Map(); + return { + get: vi.fn((key: string): T | null => (store.get(key) as T) ?? null), + set: vi.fn((key: string, value: T): void => { store.set(key, value); }), + remove: vi.fn((key: string): void => { store.delete(key); }), + }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('provideEventBus', () => { + function setup(options?: Parameters[0]) { + TestBed.configureTestingModule({ + providers: [ + provideEventBus(options), + { provide: StorageService, useValue: createMockStorage() }, + ], + }); + } + + it('should register EventBusService', () => { + setup(); + expect(TestBed.inject(EventBusService)).toBeDefined(); + }); + + it('should register EventQueueService', () => { + setup(); + expect(TestBed.inject(EventQueueService)).toBeDefined(); + }); + + it('should register EventHistoryService', () => { + setup(); + expect(TestBed.inject(EventHistoryService)).toBeDefined(); + }); + + it('should provide default config when no options given', () => { + setup(); + const config = TestBed.inject(EVENT_BUS_CONFIG); + expect(config).toEqual({ + enableQueue: true, + maxQueueSize: 1000, + enableHistory: false, + historySize: 100, + }); + }); + + it('should merge user options with defaults', () => { + setup({ enableHistory: true, historySize: 50 }); + const config = TestBed.inject(EVENT_BUS_CONFIG); + expect(config).toEqual({ + enableQueue: true, + maxQueueSize: 1000, + enableHistory: true, + historySize: 50, + }); + }); + + it('should allow overriding all options', () => { + setup({ + enableQueue: false, + maxQueueSize: 500, + enableHistory: true, + historySize: 200, + }); + const config = TestBed.inject(EVENT_BUS_CONFIG); + expect(config).toEqual({ + enableQueue: false, + maxQueueSize: 500, + enableHistory: true, + historySize: 200, + }); + }); +}); diff --git a/packages/core/src/lib/event-bus/provide-event-bus.ts b/packages/core/src/lib/event-bus/provide-event-bus.ts new file mode 100644 index 0000000..4aad987 --- /dev/null +++ b/packages/core/src/lib/event-bus/provide-event-bus.ts @@ -0,0 +1,63 @@ +import { EnvironmentProviders, makeEnvironmentProviders } from '@angular/core'; + +import { + EVENT_BUS_CONFIG, + EventBusConfig, + EventBusOptions, +} from './event-bus.types'; +import { EventBusService } from './event-bus.service'; +import { EventQueueService } from './event-queue.service'; +import { EventHistoryService } from './event-history.service'; + +/** Default configuration values. */ +const DEFAULTS: EventBusConfig = { + enableQueue: true, + maxQueueSize: 1000, + enableHistory: false, + historySize: 100, +}; + +/** + * Configure the EventBus module. + * + * Registers `EventBusService`, `EventQueueService`, and + * `EventHistoryService` with the resolved configuration. + * + * This is a **CORE** module — call `provideEventBus()` in your + * `appConfig` providers to enable internal event-driven communication. + * + * @example + * ```ts + * import { provideEventBus } from '@fireflyframework/core'; + * + * export const appConfig: ApplicationConfig = { + * providers: [ + * provideEventBus({ + * enableQueue: true, + * enableHistory: true, + * historySize: 200, + * }), + * ], + * }; + * ``` + * + * @param options - Optional configuration. Defaults: enableQueue=true, maxQueueSize=1000, enableHistory=false, historySize=100 + * @returns EnvironmentProviders to register in the application config + */ +export function provideEventBus( + options?: EventBusOptions, +): EnvironmentProviders { + const config: EventBusConfig = { + enableQueue: options?.enableQueue ?? DEFAULTS.enableQueue, + maxQueueSize: options?.maxQueueSize ?? DEFAULTS.maxQueueSize, + enableHistory: options?.enableHistory ?? DEFAULTS.enableHistory, + historySize: options?.historySize ?? DEFAULTS.historySize, + }; + + return makeEnvironmentProviders([ + { provide: EVENT_BUS_CONFIG, useValue: config }, + EventBusService, + EventQueueService, + EventHistoryService, + ]); +} From 30787f13c9a928669e969ef837b3a6a520ea67f6 Mon Sep 17 00:00:00 2001 From: Maria Garcia Luque Date: Sat, 23 May 2026 01:25:11 +0200 Subject: [PATCH 6/6] [STEP-2.14-006] Bump core version to 0.15.0 with EventBus module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Version bump 0.14.0 → 0.15.0 in package.json - Add CHANGELOG entry for 0.15.0 documenting EventBus module (CORE #16) - Update compare links for [Unreleased] and [0.15.0] --- packages/core/CHANGELOG.md | 13 ++++++++++++- packages/core/package.json | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/packages/core/CHANGELOG.md b/packages/core/CHANGELOG.md index b94f604..bb4b1af 100644 --- a/packages/core/CHANGELOG.md +++ b/packages/core/CHANGELOG.md @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.15.0] - 2026-05-23 + +### Added +- Event Bus module (CORE #16): `provideEventBus(options?)` factory with `EVENT_BUS_CONFIG` injection token +- `EventBusService` — central pub/sub with typed `DomainEvent`, `publish()`, `subscribe()`, `subscribePattern()` (wildcard/regex) +- Dual reactive API: `stream(eventType)` returns `Observable>`, `signal(eventType)` returns `Signal | undefined>` +- `EventQueueService` — persistent event queue backed by `StorageService` (localStorage), bounded by `maxQueueSize`, with `enqueue()`, `flush()`, `clear()` +- `EventHistoryService` — in-memory ring buffer bounded by `historySize`, with `query(filter?)` supporting AND-logic filters (type, source, since) +- Types: `DomainEvent`, `EventHandler`, `EventFilter`, `QueueStatus`, `FlushResult`, `EventBusOptions`, `EventBusConfig` + ## [0.14.0] - 2026-05-22 ### Added @@ -214,7 +224,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - User context module: `UserContextService` - API runtime module: `ApiClient`, `TransportRegistry`, `HttpTransportAdapter` -[Unreleased]: https://github.com/fireflyframework/firefly-frontend-framework/compare/core@0.14.0...HEAD +[Unreleased]: https://github.com/fireflyframework/firefly-frontend-framework/compare/core@0.15.0...HEAD +[0.15.0]: https://github.com/fireflyframework/firefly-frontend-framework/compare/core@0.14.0...core@0.15.0 [0.14.0]: https://github.com/fireflyframework/firefly-frontend-framework/compare/core@0.13.0...core@0.14.0 [0.13.0]: https://github.com/fireflyframework/firefly-frontend-framework/compare/core@0.12.0...core@0.13.0 [0.12.0]: https://github.com/fireflyframework/firefly-frontend-framework/compare/core@0.11.0...core@0.12.0 diff --git a/packages/core/package.json b/packages/core/package.json index b54db61..327fac3 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@fireflyframework/core", - "version": "0.14.0", + "version": "0.15.0", "publishConfig": { "registry": "https://npm.pkg.github.com" },