Skip to content

Commit 683fe41

Browse files
committed
test(webapp): split runsReplicationService part5 + real measured shard weights
1 parent bc29649 commit 683fe41

8 files changed

Lines changed: 138 additions & 123 deletions

apps/webapp/test/runsReplicationService.part1.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import superjson from "superjson";
1010

1111
vi.setConfig({ testTimeout: 60_000 });
1212

13-
describe("RunsReplicationService (part 1/6)", () => {
13+
describe("RunsReplicationService (part 1/7)", () => {
1414
replicationContainerTest(
1515
"should replicate runs to clickhouse",
1616
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {

apps/webapp/test/runsReplicationService.part2.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { TestReplicationClickhouseFactory } from "./utils/testReplicationClickho
1010

1111
vi.setConfig({ testTimeout: 60_000 });
1212

13-
describe("RunsReplicationService (part 2/6)", () => {
13+
describe("RunsReplicationService (part 2/7)", () => {
1414
replicationContainerTest(
1515
"should handover leadership to a second service, and the second service should be able to extend the leader lock",
1616
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {

apps/webapp/test/runsReplicationService.part3.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { TestReplicationClickhouseFactory } from "./utils/testReplicationClickho
1010

1111
vi.setConfig({ testTimeout: 60_000 });
1212

13-
describe("RunsReplicationService (part 3/6)", () => {
13+
describe("RunsReplicationService (part 3/7)", () => {
1414
replicationContainerTest(
1515
"should insert TaskRuns even if there are incomplete Unicode escape sequences in the JSON",
1616
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {

apps/webapp/test/runsReplicationService.part4.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import superjson from "superjson";
1010

1111
vi.setConfig({ testTimeout: 60_000 });
1212

13-
describe("RunsReplicationService (part 4/6)", () => {
13+
describe("RunsReplicationService (part 4/7)", () => {
1414
replicationContainerTest(
1515
"should replicate updates to an existing TaskRun to ClickHouse",
1616
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {

apps/webapp/test/runsReplicationService.part5.test.ts

Lines changed: 1 addition & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { TestReplicationClickhouseFactory } from "./utils/testReplicationClickho
1010

1111
vi.setConfig({ testTimeout: 60_000 });
1212

13-
describe("RunsReplicationService (part 5/6)", () => {
13+
describe("RunsReplicationService (part 5/7)", () => {
1414
replicationContainerTest(
1515
"should replicate all events in a single transaction (insert, update)",
1616
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
@@ -145,111 +145,4 @@ describe("RunsReplicationService (part 5/6)", () => {
145145
}
146146
);
147147

148-
replicationContainerTest(
149-
"should be able to handle processing transactions for a long period of time",
150-
{ timeout: 60_000 * 5 },
151-
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
152-
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
153-
154-
const clickhouse = new ClickHouse({
155-
url: clickhouseContainer.getConnectionUrl(),
156-
name: "runs-replication-long-tx",
157-
logLevel: "warn",
158-
});
159-
160-
const runsReplicationService = new RunsReplicationService({
161-
clickhouseFactory: new TestReplicationClickhouseFactory(clickhouse),
162-
pgConnectionUrl: postgresContainer.getConnectionUri(),
163-
serviceName: "runs-replication-long-tx",
164-
slotName: "task_runs_to_clickhouse_v1",
165-
publicationName: "task_runs_to_clickhouse_v1_publication",
166-
redisOptions,
167-
maxFlushConcurrency: 1,
168-
flushIntervalMs: 100,
169-
flushBatchSize: 10,
170-
leaderLockTimeoutMs: 5000,
171-
leaderLockExtendIntervalMs: 1000,
172-
ackIntervalSeconds: 5,
173-
logLevel: "warn",
174-
});
175-
176-
await runsReplicationService.start();
177-
178-
const organization = await prisma.organization.create({
179-
data: {
180-
title: "test-long-tx",
181-
slug: "test-long-tx",
182-
},
183-
});
184-
185-
const project = await prisma.project.create({
186-
data: {
187-
name: "test-long-tx",
188-
slug: "test-long-tx",
189-
organizationId: organization.id,
190-
externalRef: "test-long-tx",
191-
},
192-
});
193-
194-
const runtimeEnvironment = await prisma.runtimeEnvironment.create({
195-
data: {
196-
slug: "test-long-tx",
197-
type: "DEVELOPMENT",
198-
projectId: project.id,
199-
organizationId: organization.id,
200-
apiKey: "test-long-tx",
201-
pkApiKey: "test-long-tx",
202-
shortcode: "test-long-tx",
203-
},
204-
});
205-
206-
// Start an interval that will create a new run every 500ms for 4 minutes
207-
const interval = setInterval(async () => {
208-
await prisma.taskRun.create({
209-
data: {
210-
friendlyId: `run_long_tx_${Date.now()}`,
211-
taskIdentifier: "my-task-long-tx",
212-
payload: JSON.stringify({ long: 1 }),
213-
payloadType: "application/json",
214-
traceId: `long-${Date.now()}`,
215-
spanId: `long-${Date.now()}`,
216-
queue: "test-long-tx",
217-
runtimeEnvironmentId: runtimeEnvironment.id,
218-
projectId: project.id,
219-
organizationId: organization.id,
220-
environmentType: "DEVELOPMENT",
221-
engine: "V2",
222-
status: "PENDING",
223-
attemptNumber: 1,
224-
createdAt: new Date(),
225-
updatedAt: new Date(),
226-
},
227-
});
228-
}, 500);
229-
230-
// Wait for 1 minute
231-
await setTimeout(1 * 60 * 1000);
232-
233-
// Stop the interval
234-
clearInterval(interval);
235-
236-
// Wait for replication
237-
await setTimeout(1000);
238-
239-
// Query ClickHouse for all runs using FINAL
240-
const queryRuns = clickhouse.reader.query({
241-
name: "runs-replication-long-tx",
242-
query: `SELECT * FROM trigger_dev.task_runs_v2 FINAL`,
243-
schema: z.any(),
244-
});
245-
246-
const [queryError, result] = await queryRuns({});
247-
expect(queryError).toBeNull();
248-
249-
expect(result?.length).toBeGreaterThanOrEqual(50);
250-
251-
await runsReplicationService.stop();
252-
}
253-
);
254-
255148
});

apps/webapp/test/runsReplicationService.part6.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { TestReplicationClickhouseFactory } from "./utils/testReplicationClickho
1010

1111
vi.setConfig({ testTimeout: 60_000 });
1212

13-
describe("RunsReplicationService (part 6/6)", () => {
13+
describe("RunsReplicationService (part 6/7)", () => {
1414
replicationContainerTest(
1515
"should sort batch inserts according to table schema ordering for optimal performance",
1616
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import { ClickHouse, getTaskRunField, getPayloadField } from "@internal/clickhouse";
2+
import { replicationContainerTest } from "@internal/testcontainers";
3+
import { Logger } from "@trigger.dev/core/logger";
4+
import { readFile } from "node:fs/promises";
5+
import { setTimeout } from "node:timers/promises";
6+
import { z } from "zod";
7+
import { RunsReplicationService } from "~/services/runsReplicationService.server";
8+
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
9+
import { TestReplicationClickhouseFactory } from "./utils/testReplicationClickhouseFactory";
10+
11+
vi.setConfig({ testTimeout: 60_000 });
12+
13+
describe("RunsReplicationService (part 7/7)", () => {
14+
replicationContainerTest(
15+
"should be able to handle processing transactions for a long period of time",
16+
{ timeout: 60_000 * 5 },
17+
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
18+
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
19+
20+
const clickhouse = new ClickHouse({
21+
url: clickhouseContainer.getConnectionUrl(),
22+
name: "runs-replication-long-tx",
23+
logLevel: "warn",
24+
});
25+
26+
const runsReplicationService = new RunsReplicationService({
27+
clickhouseFactory: new TestReplicationClickhouseFactory(clickhouse),
28+
pgConnectionUrl: postgresContainer.getConnectionUri(),
29+
serviceName: "runs-replication-long-tx",
30+
slotName: "task_runs_to_clickhouse_v1",
31+
publicationName: "task_runs_to_clickhouse_v1_publication",
32+
redisOptions,
33+
maxFlushConcurrency: 1,
34+
flushIntervalMs: 100,
35+
flushBatchSize: 10,
36+
leaderLockTimeoutMs: 5000,
37+
leaderLockExtendIntervalMs: 1000,
38+
ackIntervalSeconds: 5,
39+
logLevel: "warn",
40+
});
41+
42+
await runsReplicationService.start();
43+
44+
const organization = await prisma.organization.create({
45+
data: {
46+
title: "test-long-tx",
47+
slug: "test-long-tx",
48+
},
49+
});
50+
51+
const project = await prisma.project.create({
52+
data: {
53+
name: "test-long-tx",
54+
slug: "test-long-tx",
55+
organizationId: organization.id,
56+
externalRef: "test-long-tx",
57+
},
58+
});
59+
60+
const runtimeEnvironment = await prisma.runtimeEnvironment.create({
61+
data: {
62+
slug: "test-long-tx",
63+
type: "DEVELOPMENT",
64+
projectId: project.id,
65+
organizationId: organization.id,
66+
apiKey: "test-long-tx",
67+
pkApiKey: "test-long-tx",
68+
shortcode: "test-long-tx",
69+
},
70+
});
71+
72+
// Start an interval that will create a new run every 500ms for 4 minutes
73+
const interval = setInterval(async () => {
74+
await prisma.taskRun.create({
75+
data: {
76+
friendlyId: `run_long_tx_${Date.now()}`,
77+
taskIdentifier: "my-task-long-tx",
78+
payload: JSON.stringify({ long: 1 }),
79+
payloadType: "application/json",
80+
traceId: `long-${Date.now()}`,
81+
spanId: `long-${Date.now()}`,
82+
queue: "test-long-tx",
83+
runtimeEnvironmentId: runtimeEnvironment.id,
84+
projectId: project.id,
85+
organizationId: organization.id,
86+
environmentType: "DEVELOPMENT",
87+
engine: "V2",
88+
status: "PENDING",
89+
attemptNumber: 1,
90+
createdAt: new Date(),
91+
updatedAt: new Date(),
92+
},
93+
});
94+
}, 500);
95+
96+
// Wait for 1 minute
97+
await setTimeout(1 * 60 * 1000);
98+
99+
// Stop the interval
100+
clearInterval(interval);
101+
102+
// Wait for replication
103+
await setTimeout(1000);
104+
105+
// Query ClickHouse for all runs using FINAL
106+
const queryRuns = clickhouse.reader.query({
107+
name: "runs-replication-long-tx",
108+
query: `SELECT * FROM trigger_dev.task_runs_v2 FINAL`,
109+
schema: z.any(),
110+
});
111+
112+
const [queryError, result] = await queryRuns({});
113+
expect(queryError).toBeNull();
114+
115+
expect(result?.length).toBeGreaterThanOrEqual(50);
116+
117+
await runsReplicationService.stop();
118+
}
119+
);
120+
121+
});

test-timings.json

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,13 @@
5353
"apps/webapp/test/replay-after-crash.test.ts": 2233,
5454
"apps/webapp/test/runsBackfiller.test.ts": 15478,
5555
"apps/webapp/test/runsReplicationBenchmark.test.ts": 0,
56-
"apps/webapp/test/runsRepository.part1.test.ts": 46000,
57-
"apps/webapp/test/runsRepository.part2.test.ts": 46000,
56+
"apps/webapp/test/runsRepository.part1.test.ts": 53000,
57+
"apps/webapp/test/runsRepository.part2.test.ts": 57000,
5858
"apps/webapp/test/sanitizeRowsOnParseError.test.ts": 8,
5959
"apps/webapp/test/sentryTenantContext.test.ts": 5,
6060
"apps/webapp/test/sentryTraceContext.server.test.ts": 12,
6161
"apps/webapp/test/sessionDuration.test.ts": 18416,
62-
"apps/webapp/test/sessionsReplicationService.test.ts": 32887,
62+
"apps/webapp/test/sessionsReplicationService.test.ts": 30000,
6363
"apps/webapp/test/shouldRevalidateRunsList.test.ts": 5,
6464
"apps/webapp/test/slackErrorAlerts.test.ts": 0,
6565
"apps/webapp/test/tenantContext.test.ts": 26,
@@ -74,7 +74,7 @@
7474
"apps/webapp/test/workerQueueSplit.test.ts": 3,
7575
"apps/webapp/test/components/DateTime.test.ts": 24,
7676
"apps/webapp/test/engine/batchPayloads.test.ts": 5018,
77-
"apps/webapp/test/engine/streamBatchItems.test.ts": 47000,
77+
"apps/webapp/test/engine/streamBatchItems.test.ts": 45000,
7878
"apps/webapp/test/engine/taskIdentifierRegistry.test.ts": 13152,
7979
"apps/webapp/test/engine/triggerTask.test.ts": 31630,
8080
"apps/webapp/test/presenters/mapRunToLiveFields.test.ts": 3,
@@ -209,12 +209,13 @@
209209
"internal-packages/clickhouse/src/client/client.test.ts": 9138,
210210
"internal-packages/sdk-compat-tests/src/tests/bundler.test.ts": 348,
211211
"internal-packages/sdk-compat-tests/src/tests/import.test.ts": 4742,
212-
"apps/webapp/test/runsReplicationService.part1.test.ts": 73000,
212+
"apps/webapp/test/runsReplicationService.part1.test.ts": 74000,
213213
"apps/webapp/test/runsReplicationService.part2.test.ts": 64000,
214-
"apps/webapp/test/runsReplicationService.part3.test.ts": 43000,
215-
"apps/webapp/test/runsReplicationService.part4.test.ts": 61000,
214+
"apps/webapp/test/runsReplicationService.part3.test.ts": 30000,
215+
"apps/webapp/test/runsReplicationService.part4.test.ts": 70000,
216216
"apps/webapp/test/runsReplicationService.part5.test.ts": 43000,
217-
"apps/webapp/test/runsReplicationService.part6.test.ts": 43000,
218-
"apps/webapp/test/runsRepository.part3.test.ts": 34000,
219-
"apps/webapp/test/runsRepository.part4.test.ts": 46000
217+
"apps/webapp/test/runsReplicationService.part6.test.ts": 32000,
218+
"apps/webapp/test/runsRepository.part3.test.ts": 43000,
219+
"apps/webapp/test/runsRepository.part4.test.ts": 57000,
220+
"apps/webapp/test/runsReplicationService.part7.test.ts": 43000
220221
}

0 commit comments

Comments
 (0)