Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,28 @@ coverage-all: coverage-clean
install:
python3 -m pip install .

DAPR_BRANCH ?= main
PROTO_URL = https://raw.githubusercontent.com/dapr/durabletask-protobuf/refs/heads/$(DAPR_BRANCH)/protos

gen-proto:
curl -o durabletask/internal/orchestrator_service.proto https://raw.githubusercontent.com/dapr/durabletask-protobuf/refs/heads/main/protos/orchestrator_service.proto
curl -H "Accept: application/vnd.github.v3+json" "https://api.github.com/repos/dapr/durabletask-protobuf/commits?path=protos/orchestrator_service.proto&sha=main&per_page=1" | jq -r '.[0].sha' > durabletask/internal/PROTO_SOURCE_COMMIT_HASH
curl -o durabletask/internal/orchestrator_service.proto $(PROTO_URL)/orchestrator_service.proto
curl -o durabletask/internal/orchestration.proto $(PROTO_URL)/orchestration.proto
curl -o durabletask/internal/history_events.proto $(PROTO_URL)/history_events.proto
curl -o durabletask/internal/orchestrator_actions.proto $(PROTO_URL)/orchestrator_actions.proto
curl -H "Accept: application/vnd.github.v3+json" "https://api.github.com/repos/dapr/durabletask-protobuf/commits?path=protos/orchestrator_service.proto&sha=$(DAPR_BRANCH)&per_page=1" | jq -r '.[0].sha' > durabletask/internal/PROTO_SOURCE_COMMIT_HASH
# NOTE: remember to check/update pyproject.toml protobuf version to follow https://github.com/grpc/grpc/blob/v{{VERSION GRPC IO TOOL BELLOW}}/tools/distrib/python/grpcio_tools/setup.py
pip install .[dev]
python3 -m grpc_tools.protoc --proto_path=. --python_out=. --pyi_out=. --grpc_python_out=. ./durabletask/internal/orchestrator_service.proto
# Rewrite bare inter-proto imports to full paths so protoc generates package-qualified Python imports
perl -i -pe 's|import "([^/]+\.proto)"|import "durabletask/internal/$$1"|g' durabletask/internal/*.proto
python3 -m grpc_tools.protoc \
--proto_path=. \
--python_out=. \
--pyi_out=. \
--grpc_python_out=. \
durabletask/internal/orchestrator_service.proto \
durabletask/internal/orchestration.proto \
durabletask/internal/history_events.proto \
durabletask/internal/orchestrator_actions.proto
rm durabletask/internal/*.proto

.PHONY: init test-unit test-e2e coverage-clean coverage-all gen-proto install
7 changes: 4 additions & 3 deletions durabletask/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
from google.protobuf import wrappers_pb2

import durabletask.internal.helpers as helpers
import durabletask.internal.orchestrator_service_pb2 as pb
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
import durabletask.internal.protos as pb
import durabletask.internal.shared as shared
from durabletask import task
from durabletask.aio.internal.grpc_interceptor import DefaultClientInterceptorImpl
from durabletask.aio.internal.shared import ClientInterceptor, get_grpc_aio_channel
from durabletask.client import (
OrchestrationIdReusePolicy,
OrchestrationState,
OrchestrationStatus,
TInput,
Expand Down Expand Up @@ -81,7 +82,7 @@ async def schedule_new_orchestration(
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
start_at: Optional[datetime] = None,
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
reuse_id_policy: Optional[OrchestrationIdReusePolicy] = None,
) -> str:
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)

Expand All @@ -93,7 +94,7 @@ async def schedule_new_orchestration(
else None,
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
version=helpers.get_string_value(None),
orchestrationIdReusePolicy=reuse_id_policy,
orchestrationIdReusePolicy=reuse_id_policy._to_pb() if reuse_id_policy else None,
)

self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
Expand Down
28 changes: 25 additions & 3 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from google.protobuf import wrappers_pb2

import durabletask.internal.helpers as helpers
import durabletask.internal.orchestrator_service_pb2 as pb
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
import durabletask.internal.protos as pb
import durabletask.internal.shared as shared
from durabletask import task
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
Expand Down Expand Up @@ -47,6 +47,28 @@ def __str__(self):
return helpers.get_orchestration_status_str(self.value)


class OrchestrationIdReuseAction(Enum):
"""Action to take when scheduling an orchestration whose ID already exists."""

ERROR = pb.ERROR
IGNORE = pb.IGNORE
TERMINATE = pb.TERMINATE


@dataclass
class OrchestrationIdReusePolicy:
"""Policy controlling what happens when a new orchestration is scheduled with an ID that already exists."""

action: OrchestrationIdReuseAction
operation_status: list[OrchestrationStatus]

def _to_pb(self) -> pb.OrchestrationIdReusePolicy:
return pb.OrchestrationIdReusePolicy(
operationStatus=[s.value for s in self.operation_status],
action=self.action.value,
)


@dataclass
class OrchestrationState:
instance_id: str
Expand Down Expand Up @@ -166,7 +188,7 @@ def schedule_new_orchestration(
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
start_at: Optional[datetime] = None,
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
reuse_id_policy: Optional[OrchestrationIdReusePolicy] = None,
) -> str:
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)

Expand All @@ -180,7 +202,7 @@ def schedule_new_orchestration(
input=input_pb,
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
version=wrappers_pb2.StringValue(value=""),
orchestrationIdReusePolicy=reuse_id_policy,
orchestrationIdReusePolicy=reuse_id_policy._to_pb() if reuse_id_policy else None,
)

self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
Expand Down
2 changes: 1 addition & 1 deletion durabletask/internal/PROTO_SOURCE_COMMIT_HASH
Original file line number Diff line number Diff line change
@@ -1 +1 @@
889781bbe90e6ec84ebe169978c4f2fd0df74ff0
a70949377cd95e5af1003f0b5e29e3772dea0d9e
2 changes: 1 addition & 1 deletion durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from google.protobuf import timestamp_pb2, wrappers_pb2

import durabletask.internal.orchestrator_service_pb2 as pb
import durabletask.internal.protos as pb

# TODO: The new_xxx_event methods are only used by test code and should be moved elsewhere

Expand Down
82 changes: 82 additions & 0 deletions durabletask/internal/history_events_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading