-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathsql-server-event-store.ddl
More file actions
217 lines (190 loc) · 5.44 KB
/
sql-server-event-store.ddl
File metadata and controls
217 lines (190 loc) · 5.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
-- T-SQL Event Store (SQL Server 2025)
-- Requires: SQL Server 2025 for native JSON type and JSON INDEX
SET QUOTED_IDENTIFIER ON;
SET ANSI_NULLS ON;
GO
CREATE TABLE ledger
(
entity NVARCHAR(255) NOT NULL,
entity_key NVARCHAR(255) NOT NULL,
event NVARCHAR(255) NOT NULL,
data JSON NOT NULL,
-- can be anything, like a ULID, nanoid, etc.
append_key NVARCHAR(255) NOT NULL UNIQUE,
-- previous event id
-- null for first event in entity instance; null does not trigger UNIQUE constraint
previous_id UNIQUEIDENTIFIER NULL,
event_id UNIQUEIDENTIFIER NOT NULL UNIQUE,
timestamp DATETIMEOFFSET NOT NULL DEFAULT SYSDATETIMEOFFSET(),
-- sequence for all events in all entities
sequence BIGINT IDENTITY(1,1) PRIMARY KEY
);
GO
CREATE INDEX entity_index ON ledger (entity, entity_key);
GO
-- Unique constraint on previous_id, but only for non-NULL values
-- This allows multiple NULL values (for first events in different entities)
CREATE UNIQUE INDEX idx_previous_id_unique ON ledger (previous_id) WHERE previous_id IS NOT NULL;
GO
-- Unique constraint to prevent race conditions (first event per stream)
-- Ensures only one row with previous_id IS NULL per (entity, entity_key).
CREATE UNIQUE INDEX uq_first_event_per_stream
ON ledger(entity, entity_key)
WHERE previous_id IS NULL;
GO
-- JSON index on all paths for efficient querying (SQL Server 2025)
-- Slows down a lot during writes.
-- CREATE JSON INDEX idx_data ON ledger (data);
-- GO
-- Immutable ledger: prevent DELETE
CREATE TRIGGER no_delete_ledger ON ledger
INSTEAD OF DELETE
AS
BEGIN
THROW 50001, 'Cannot delete events from the ledger', 1;
END;
GO
-- Immutable ledger: prevent UPDATE
CREATE TRIGGER no_update_ledger ON ledger
INSTEAD OF UPDATE
AS
BEGIN
THROW 50002, 'Cannot update events in the ledger', 1;
END;
GO
CREATE OR ALTER PROCEDURE append_event
@entity NVARCHAR(255),
@entity_key NVARCHAR(255),
@event NVARCHAR(255),
@data JSON,
@append_key NVARCHAR(255),
@previous_id UNIQUEIDENTIFIER = NULL,
@event_id UNIQUEIDENTIFIER OUTPUT
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;
BEGIN TRY
-- set session flag
EXEC sys.sp_set_session_context
@key = N'allow_direct_ledger_insert',
@value = 1;
BEGIN TRAN;
-- Validate if stream exists already, if previous_id is NULL
IF (@previous_id IS NULL)
BEGIN
IF EXISTS (
SELECT 1
FROM ledger l
WHERE l.entity = @entity
AND l.entity_key = @entity_key
)
BEGIN
THROW 50005, 'previous_id can only be null for first entity event', 1;
END;
END
ELSE
BEGIN
IF NOT EXISTS (
SELECT 1
FROM ledger l
WHERE l.event_id = @previous_id
AND l.entity = @entity
AND l.entity_key = @entity_key
)
BEGIN
THROW 50006, 'previous_id must be in the same entity', 1;
END;
IF EXISTS (
SELECT 1
FROM ledger prev
WHERE prev.event_id = @previous_id
AND EXISTS (
SELECT 1
FROM ledger newer
WHERE newer.entity = @entity
AND newer.entity_key = @entity_key
AND newer.sequence > prev.sequence
)
)
BEGIN
THROW 50007, 'previous_id must reference the newest event in entity', 1;
END;
END;
-- Insert
SET @event_id = NEWID();
INSERT INTO ledger (entity, entity_key, event, data, append_key, previous_id, event_id)
VALUES (
@entity,
@entity_key,
@event,
@data,
@append_key,
@previous_id,
@event_id
);
COMMIT TRAN;
-- clear session flag
EXEC sys.sp_set_session_context
@key = N'allow_direct_ledger_insert',
@value = NULL;
END TRY
BEGIN CATCH
IF XACT_STATE() <> 0
ROLLBACK TRAN;
-- always clear session flag even on error
EXEC sys.sp_set_session_context
@key = N'allow_direct_ledger_insert',
@value = NULL;
THROW;
END CATCH;
END;
GO
CREATE OR ALTER TRIGGER no_direct_insert_ledger
ON ledger
AFTER INSERT
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;
IF SESSION_CONTEXT(N'allow_direct_ledger_insert') = 1
RETURN;
THROW 50003, 'Use append_event procedure to insert events into the ledger', 1;
END;
GO
CREATE OR ALTER VIEW replay_events AS
SELECT
entity,
entity_key,
event,
data,
append_key,
previous_id,
event_id,
timestamp,
sequence
FROM ledger;
GO
CREATE OR ALTER FUNCTION replay_events_after(@after_event_id UNIQUEIDENTIFIER)
RETURNS TABLE
AS
RETURN
(
SELECT
entity,
entity_key,
event,
data,
append_key,
previous_id,
event_id,
timestamp,
sequence
FROM ledger
WHERE sequence > (
SELECT sequence
FROM ledger
WHERE event_id = @after_event_id
)
);
GO