Skip to content
71 changes: 57 additions & 14 deletions node.js/messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,54 @@ status: released
<!--- % include links-for-node.md %} -->
<!--- % include _chapters toc="2,3" %} -->

## Overview

Messaging enables decoupled communication between services using events.
CAP distinguishes between the logical and technical messaging layers, separating business concerns from technical infrastructure.

The **logical layer** consists of three primary components:

**Modeled Events**: Events are defined in CDS models with typed schemas, providing compile-time validation and IDE support. These events represent business occurrences like `'orderProcessed'`, or `'stockUpdated'`.

**Event Topics**: Topics organize events into logical channels and are responsible for event routing. Topics can be explicitly defined as anootation or derived from service and event names.

**CAP Services**: Services act as event producers or consumers, using simple APIs like `srv.emit('reviewed', data)` or `srv.on('orderProcessed', handler)`. Services communicate using logical event names without needing to know the underlying infrastructure details.

The **technical layer** handles the actual message transport and delivery:

**CAP Messaging Service**: Acts as the translation layer between logical events and technical infrastructure. It manages topic resolution, message serialization, and routing logic.
For topic resolution the logical events are delegated to the messaging service, the corresponding event name on the technical service is either the fully qualified event name or the value of the @topic annotation if given.

**Message Brokers**: Form the core of the technical infrastructure, handling message persistence, delivery guarantees, and cross-service communication. Examples include SAP Event Mesh, Apache Kafka, or Redis Streams.

The message flow follows a clear path through both layers:

**Outbound Flow (Publisher)**: A CAP service calls `srv.emit('reviewed', data)` → CAP Messaging Service resolves the event name to a fully qualified topic (e.g., `OrderSrv.reviewed`) → Message is serialized and sent to the Event Broker → Broker stores and distributes the message to all subscribers.

**Inbound Flow (Subscriber)**: Event Broker delivers message from subscribed topic → CAP Messaging Service receives the message → Service name and event name are resolved from the topic → Message is routed to the appropriate CAP service handler via `srv.on('reviewed', handler)`. Registering a modeled `srv.on(...)` event handler causes the broker to listen to those events, e.g. creates a subscription for Event Mesh.

**Alternatively** custom handlers can bypass the service layer and work directly with the messaging service.

### Summary Table


| CDS Event Declaration | Emitting via `srv.emit` | Emitting via `messaging.emit` | Broker Topic | Receiving via `srv.on` | Receiving via `messaging.on` |
|------------------------------|-------------------------|-------------------------------|----------------------|------------------------|------------------------------|
| No `@topic` | `'reviewed'` | `'OrderSrv.reviewed'` | `OrderSrv.reviewed` | `'reviewed'` | `'OrderSrv.reviewed'` |
| With `@topic: 'foo.bar'` | `'reviewed'` | `'foo.bar'` | `foo.bar` | `'reviewed'` | `'foo.bar'` |


### Key Points

- Logical service API (srv.emit, srv.on) uses the event name as declared in CDS.
- Technical messaging API (messaging.emit, messaging.on) uses the resolved topic name.
- If no @topic is specified, the topic defaults to the event’s fully qualified name.
- If @topic is specified, it overrides the default topic name.

## cds.**MessagingService** <i> class </i>

Class `cds.MessagingService` and subclasses thereof are technical services representing asynchronous messaging channels.
They can be used directly/low-level, or behind the scenes on higher-level service-to-service eventing.
Class `cds.MessagingService` and subclasses thereof are technical services representing asynchronous messaging channels.
They can be used directly/low-level, or behind the scenes on higher-level service-to-service eventing.

### class cds.**MessagingService** <i> extends cds.Service </i>

Expand Down Expand Up @@ -55,7 +98,7 @@ In _srv/external/external.cds_:
service ExternalService {
event ExternalEvent {
ID: UUID;
name: String;
rating: Decimal;
}
}
```
Expand All @@ -66,7 +109,7 @@ In _srv/own.cds_:
service OwnService {
event OwnEvent {
ID: UUID;
name: String;
rating: Decimal;
}
}
```
Expand Down Expand Up @@ -94,7 +137,7 @@ Example:
```cds
service OwnService {
@topic: 'my.custom.topic'
event OwnEvent { ID: UUID; name: String; }
event OwnEvent { ID: UUID; rating: Decimal; }
}
```

Expand All @@ -109,19 +152,19 @@ Example:
const messaging = await cds.connect.to('messaging')

this.after(['CREATE', 'UPDATE', 'DELETE'], 'Reviews', async (_, req) => {
const { subject } = req.data
const { ID } = req.data
const { rating } = await cds.run(
SELECT.one(['round(avg(rating),2) as rating'])
.from(Reviews)
.where({ subject }))
.where({ ID }))

// send to a topic
await messaging.emit('cap/msg/system/review/reviewed',
{ subject, rating })
await messaging.emit('cap/msg/system/my/custom/topic',
{ ID, rating })

// alternative if you want to send custom headers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

headers can also be the 3rd argument, i.e. messaging.emit('topic', data, headers)

await messaging.emit({ event: 'cap/msg/system/review/reviewed',
data: { subject, rating },
await messaging.emit({ event: 'cap/msg/system/my/custom/topic',
data: { ID, rating },
headers: { 'X-Correlation-ID': req.headers['X-Correlation-ID'] }})
})
```
Expand All @@ -141,9 +184,9 @@ Example:
const messaging = await cds.connect.to('messaging')

// listen to a topic
messaging.on('cap/msg/system/review/reviewed', msg => {
const { subject, rating } = msg.data
return cds.run(UPDATE(Books, subject).with({ rating }))
messaging.on('cap/msg/system/my/custom/topic', msg => {
const { ID, rating } = msg.data
return cds.run(UPDATE(Books, ID).with({ rating }))
})
```

Expand Down
Loading