-
Notifications
You must be signed in to change notification settings - Fork 30
Expand file tree
/
Copy pathapp.py
More file actions
90 lines (70 loc) · 3.09 KB
/
app.py
File metadata and controls
90 lines (70 loc) · 3.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import logging
import restate
from pydantic import BaseModel
from restate.exceptions import TerminalError
from sagas.clients import flight_client
from sagas.clients import car_rental_client
from sagas.clients import hotel_client
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] [%(process)d] [%(levelname)s] - %(message)s",
)
class BookingRequest(BaseModel):
customer_id: str
flight: flight_client.FlightRequest
car: car_rental_client.CarRentalRequest
hotel: hotel_client.HotelRequest
"""
Trip reservation workflow using sagas:
Restate infinitely retries failures and recovers previous progress.
But for some types of failures (terminal errors), we don't want to retry
but want to undo the previous actions and finish.
Restate guarantees the execution of your code. This makes it very easy to implement sagas.
We execute actions and keep a list of undo actions.
When a terminal exception occurs, Restate ensures execution of all compensations.
+------ Initialize compensations list ------+
|
v
+------------------ Try --------------------+
| 1. Reserve Flights & Register Cancel |
| 2. Reserve Car & Register Cancel |
| 3. Reserve Hotel & Register Cancel |
+------------------ Catch ------------------+
| If TerminalException: |
| Execute compensations in reverse order |
| Rethrow error |
+-------------------------------------------+
Note: that the compensation logic is purely implemented in user code (no special Restate API)
"""
booking_workflow = restate.Service("BookingWorkflow")
@booking_workflow.handler()
async def run(ctx: restate.Context, req: BookingRequest):
# Create a list of undo actions
compensations = []
try:
# For each action, we register a compensation that will be executed on failures
compensations.append(
lambda: ctx.run_typed("Cancel flight", flight_client.cancel, customer_id=req.customer_id)
)
await ctx.run_typed("Book flight", flight_client.book, customer_id=req.customer_id, flight=req.flight)
compensations.append(
lambda: ctx.run_typed("Cancel car", car_rental_client.cancel, customer_id=req.customer_id)
)
await ctx.run_typed("Book car", car_rental_client.book, customer_id=req.customer_id, car=req.car)
compensations.append(
lambda: ctx.run_typed("Cancel hotel", hotel_client.cancel, customer_id=req.customer_id)
)
await ctx.run_typed("Book hotel", hotel_client.book, customer_id=req.customer_id, hotel=req.hotel)
# Terminal errors are not retried by Restate, so undo previous actions and fail the workflow
except TerminalError as e:
# Restate guarantees that all compensations are executed
for compensation in reversed(compensations):
await compensation()
raise e
app = restate.app([booking_workflow])
if __name__ == "__main__":
import hypercorn
import asyncio
conf = hypercorn.Config()
conf.bind = ["0.0.0.0:9080"]
asyncio.run(hypercorn.asyncio.serve(app, conf))