From 0201476b06a79be93951b347a495b51368ccfd50 Mon Sep 17 00:00:00 2001 From: Peep van Puijenbroek Date: Thu, 22 Jan 2026 15:47:30 +0100 Subject: [PATCH] feat: add Cloudflare Queues destination Add support for publishing events to Cloudflare Queues via their HTTP API. Implements #655 - Add destcfqueues Go provider with Publish, Validate, and ComputeTarget - Add provider metadata (metadata.json and instructions.md) - Update OpenAPI spec with CloudflareQueues schemas - Add user documentation (cloudflare-queues.mdx) - Register provider in default.go - Add validation and publish unit tests - Add SDK test factory and integration tests --- docs/apis/openapi.yaml | 171 +++++++ docs/pages/destinations.mdx | 1 + docs/pages/destinations/cloudflare-queues.mdx | 92 ++++ .../cloudflare_queues/instructions.md | 176 +++++++ .../providers/cloudflare_queues/metadata.json | 33 ++ internal/destregistry/providers/default.go | 7 + .../providers/destcfqueues/destcfqueues.go | 294 ++++++++++++ .../destcfqueues/destcfqueues_publish_test.go | 394 ++++++++++++++++ .../destcfqueues_validate_test.go | 130 ++++++ .../factories/destination.factory.ts | 18 + spec-sdk-tests/package.json | 2 +- .../destinations/cloudflare-queues.test.ts | 440 ++++++++++++++++++ 12 files changed, 1757 insertions(+), 1 deletion(-) create mode 100644 docs/pages/destinations/cloudflare-queues.mdx create mode 100644 internal/destregistry/metadata/providers/cloudflare_queues/instructions.md create mode 100644 internal/destregistry/metadata/providers/cloudflare_queues/metadata.json create mode 100644 internal/destregistry/providers/destcfqueues/destcfqueues.go create mode 100644 internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go create mode 100644 internal/destregistry/providers/destcfqueues/destcfqueues_validate_test.go create mode 100644 spec-sdk-tests/tests/destinations/cloudflare-queues.test.ts diff --git a/docs/apis/openapi.yaml b/docs/apis/openapi.yaml index 39760b65..6ce27e00 100644 --- a/docs/apis/openapi.yaml +++ b/docs/apis/openapi.yaml @@ -268,6 +268,25 @@ components: type: string description: Optional AWS Session Token (for temporary credentials). example: "AQoDYXdzEPT//////////wEXAMPLE..." + CloudflareQueuesConfig: + type: object + required: [account_id, queue_id] + properties: + account_id: + type: string + description: Cloudflare Account ID + example: "abc123def456789" + queue_id: + type: string + description: Cloudflare Queue ID + example: "my-queue" + CloudflareQueuesCredentials: + type: object + required: [api_token] + properties: + api_token: + type: string + description: Cloudflare API Token with queues:write permission RabbitMQConfig: type: object required: [server_url, exchange] @@ -1094,6 +1113,92 @@ components: credentials: service_account_json: '{"type":"service_account","project_id":"my-project-123",...}' + DestinationCloudflareQueues: + type: object + required: + [ + id, + type, + topics, + config, + credentials, + created_at, + updated_at, + disabled_at, + ] + properties: + id: + type: string + description: Control plane generated ID or user provided ID for the destination. + example: "des_12345" + type: + type: string + description: Type of the destination. + enum: [cloudflare_queues] + example: "cloudflare_queues" + topics: + $ref: "#/components/schemas/Topics" + filter: + $ref: "#/components/schemas/Filter" + disabled_at: + type: string + format: date-time + nullable: true + description: ISO Date when the destination was disabled, or null if enabled. + example: null + created_at: + type: string + format: date-time + description: ISO Date when the destination was created. + example: "2024-01-01T00:00:00Z" + updated_at: + type: string + format: date-time + description: ISO Date when the destination was last updated. + example: "2024-01-01T00:00:00Z" + config: + $ref: "#/components/schemas/CloudflareQueuesConfig" + credentials: + $ref: "#/components/schemas/CloudflareQueuesCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } + target: + type: string + description: A human-readable representation of the destination target (Cloudflare Queue ID). Read-only. + readOnly: true + example: "my-queue" + target_url: + type: string + format: url + nullable: true + description: A URL link to the destination target (Cloudflare Dashboard link to the queue). Read-only. + readOnly: true + example: "https://dash.cloudflare.com/abc123def456789/queues/my-queue" + example: + id: "des_cf_queues_123" + type: "cloudflare_queues" + topics: ["*"] + disabled_at: null + created_at: "2024-03-10T14:30:00Z" + updated_at: "2024-03-10T14:30:00Z" + config: + account_id: "abc123def456789" + queue_id: "my-queue" + credentials: + api_token: "cf_token_..." + # Polymorphic Destination Schema (for Responses) Destination: oneOf: @@ -1105,6 +1210,7 @@ components: - $ref: "#/components/schemas/DestinationAzureServiceBus" - $ref: "#/components/schemas/DestinationAWSS3" - $ref: "#/components/schemas/DestinationGCPPubSub" + - $ref: "#/components/schemas/DestinationCloudflareQueues" discriminator: propertyName: type mapping: @@ -1116,6 +1222,7 @@ components: azure_servicebus: "#/components/schemas/DestinationAzureServiceBus" aws_s3: "#/components/schemas/DestinationAWSS3" gcp_pubsub: "#/components/schemas/DestinationGCPPubSub" + cloudflare_queues: "#/components/schemas/DestinationCloudflareQueues" DestinationCreateWebhook: type: object @@ -1391,6 +1498,40 @@ components: nullable: true description: Arbitrary contextual information stored with the destination. example: { "internal-id": "123", "team": "platform" } + DestinationCreateCloudflareQueues: + type: object + required: [type, topics, config, credentials] + properties: + id: + type: string + description: Optional user-provided ID. A UUID will be generated if empty. + example: "user-provided-id" + type: + type: string + description: Type of the destination. Must be 'cloudflare_queues'. + enum: [cloudflare_queues] + topics: + $ref: "#/components/schemas/Topics" + filter: + $ref: "#/components/schemas/Filter" + config: + $ref: "#/components/schemas/CloudflareQueuesConfig" + credentials: + $ref: "#/components/schemas/CloudflareQueuesCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } # Polymorphic Destination Creation Schema (for Request Bodies) DestinationCreate: @@ -1403,6 +1544,7 @@ components: - $ref: "#/components/schemas/DestinationCreateAzureServiceBus" - $ref: "#/components/schemas/DestinationCreateAWSS3" - $ref: "#/components/schemas/DestinationCreateGCPPubSub" + - $ref: "#/components/schemas/DestinationCreateCloudflareQueues" discriminator: propertyName: type mapping: @@ -1414,6 +1556,7 @@ components: azure_servicebus: "#/components/schemas/DestinationCreateAzureServiceBus" aws_s3: "#/components/schemas/DestinationCreateAWSS3" gcp_pubsub: "#/components/schemas/DestinationCreateGCPPubSub" + cloudflare_queues: "#/components/schemas/DestinationCreateCloudflareQueues" # Type-Specific Destination Update Schemas (for Request Bodies) WebhookCredentialsUpdate: @@ -1640,6 +1783,32 @@ components: nullable: true description: Arbitrary contextual information stored with the destination. example: { "internal-id": "123", "team": "platform" } + DestinationUpdateCloudflareQueues: + type: object + # Properties duplicated from DestinationUpdateBase + properties: + topics: + $ref: "#/components/schemas/Topics" + filter: + $ref: "#/components/schemas/Filter" + config: + $ref: "#/components/schemas/CloudflareQueuesConfig" # account_id/queue_id required here, but PATCH means optional + credentials: + $ref: "#/components/schemas/CloudflareQueuesCredentials" # api_token required here, but PATCH means optional + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } # Polymorphic Destination Update Schema (for Request Bodies) DestinationUpdate: @@ -1652,6 +1821,7 @@ components: - $ref: "#/components/schemas/DestinationUpdateAzureServiceBus" - $ref: "#/components/schemas/DestinationUpdateAWSS3" - $ref: "#/components/schemas/DestinationUpdateGCPPubSub" + - $ref: "#/components/schemas/DestinationUpdateCloudflareQueues" # Event Schemas PublishRequest: type: object @@ -1908,6 +2078,7 @@ components: - azure_servicebus - aws_s3 - gcp_pubsub + - cloudflare_queues description: Type of destination. example: "webhook" DestinationTypeSchema: diff --git a/docs/pages/destinations.mdx b/docs/pages/destinations.mdx index a902f88e..41196a0a 100644 --- a/docs/pages/destinations.mdx +++ b/docs/pages/destinations.mdx @@ -18,6 +18,7 @@ Outpost supports multiple event destination types. Each tenant can have multiple | [Azure Service Bus](/docs/destinations/azure-service-bus) | Send events to Azure Service Bus | | [GCP Pub/Sub](/docs/destinations/gcp-pubsub) | Publish events to Google Cloud Pub/Sub | | [RabbitMQ](/docs/destinations/rabbitmq) | Send events to a RabbitMQ exchange | +| [Cloudflare Queues](/docs/destinations/cloudflare-queues) | Send events to Cloudflare Queues | See the [roadmap](/docs/references/roadmap) for planned destination types. To be eligible as a destination type, it must be asynchronous in nature and not run any business logic. diff --git a/docs/pages/destinations/cloudflare-queues.mdx b/docs/pages/destinations/cloudflare-queues.mdx new file mode 100644 index 00000000..1dab186d --- /dev/null +++ b/docs/pages/destinations/cloudflare-queues.mdx @@ -0,0 +1,92 @@ +--- +title: Cloudflare Queues +--- + +Send events to Cloudflare Queues. + +## Configuration + +### Config + +| Field | Type | Required | Description | +| ----- | ---- | -------- | ----------- | +| `config.account_id` | string | Yes | Cloudflare Account ID | +| `config.queue_id` | string | Yes | Queue ID | + +### Credentials + +| Field | Type | Required | Description | +| ----- | ---- | -------- | ----------- | +| `credentials.api_token` | string | Yes | Cloudflare API Token | + +### Example + +```sh +curl --location 'https:///api/v1/tenants//destinations' \ +--header 'Content-Type: application/json' \ +--header 'Authorization: Bearer ' \ +--data '{ + "type": "cloudflare_queues", + "topics": ["orders"], + "config": { + "account_id": "", + "queue_id": "" + }, + "credentials": { + "api_token": "" + } +}' +``` + +## Message Format + +Events are sent to Cloudflare Queues as JSON messages with the following structure: + +```json +{ + "data": , + "metadata": +} +``` + +### Example Message + +If you publish an event: + +```json +{ + "topic": "orders", + "data": { + "order_id": "123", + "status": "created" + }, + "metadata": { + "source": "checkout-service" + } +} +``` + +The message sent to Cloudflare Queues will be: + +```json +{ + "data": { + "order_id": "123", + "status": "created" + }, + "metadata": { + "event-id": "evt_123", + "topic": "orders", + "timestamp": "1704067200", + "source": "checkout-service" + } +} +``` + +The `metadata` field contains system metadata (`event-id`, `topic`, `timestamp`) merged with any custom event metadata. + +## Required Permissions + +The Cloudflare API Token must have the following permission: + +- `queues:write` - Required to send messages to the queue diff --git a/internal/destregistry/metadata/providers/cloudflare_queues/instructions.md b/internal/destregistry/metadata/providers/cloudflare_queues/instructions.md new file mode 100644 index 00000000..0fce3872 --- /dev/null +++ b/internal/destregistry/metadata/providers/cloudflare_queues/instructions.md @@ -0,0 +1,176 @@ +# Cloudflare Queues Configuration Instructions + +[Cloudflare Queues](https://developers.cloudflare.com/queues/) is a global message queue that integrates natively with Cloudflare Workers. It enables you to: + +- Send and receive messages with guaranteed delivery +- Process messages asynchronously using Workers +- Build reliable, distributed architectures +- Scale automatically with no capacity planning + +## Prerequisites + +- **Cloudflare Account**: A Cloudflare account with a Workers Paid plan (required for Queues) +- **Wrangler CLI** (optional): Install via `npm install -g wrangler` for CLI-based setup + +## How to Find Your Account ID + +Your Cloudflare Account ID is required for API access. + +### Via Dashboard + +1. Log in to the [Cloudflare Dashboard](https://dash.cloudflare.com/) +2. Select your account +3. The Account ID is displayed in the URL: `https://dash.cloudflare.com//...` +4. Alternatively, go to **Workers & Pages** > **Overview** and find the Account ID in the right sidebar + +### Via Wrangler CLI + +```bash +# Authenticate with Cloudflare +npx wrangler login + +# List accounts and their IDs +npx wrangler whoami +``` + +## How to Create a Queue + +### Via Dashboard + +1. Log in to the [Cloudflare Dashboard](https://dash.cloudflare.com/) +2. Navigate to **Workers & Pages** > **Queues** +3. Click **Create Queue** +4. Enter a name for your queue +5. Click **Create** +6. Copy the **Queue ID** from the queue details page + +### Via Wrangler CLI + +```bash +# Create a new queue +npx wrangler queues create my-queue + +# List all queues to get the Queue ID +npx wrangler queues list +``` + +The output will show your queue with its ID: + +``` +┌──────────────────────────────────────┬──────────┐ +│ id │ name │ +├──────────────────────────────────────┼──────────┤ +│ 12345678-1234-1234-1234-123456789abc │ my-queue │ +└──────────────────────────────────────┴──────────┘ +``` + +## How to Create an API Token + +You need a Cloudflare API Token with permissions to write to Queues. + +### Via Dashboard + +1. Go to [Cloudflare API Tokens](https://dash.cloudflare.com/profile/api-tokens) +2. Click **Create Token** +3. Select **Create Custom Token** +4. Configure the token: + - **Token name**: e.g., "Outpost Queues Publisher" + - **Permissions**: + - Account > Queues > Edit + - **Account Resources**: + - Include > Your Account (or specific account) +5. Click **Continue to summary** +6. Click **Create Token** +7. Copy the token immediately (it won't be shown again) + +### Permission Details + +The API Token requires the following permission: +- **Account** > **Queues** > **Edit** - This grants `queues:write` access to send messages to queues + +## Configuration + +When configuring your Cloudflare Queues destination, you'll need: + +1. **Account ID**: Your Cloudflare Account ID +2. **Queue ID**: The UUID of your Cloudflare Queue +3. **API Token**: A Cloudflare API Token with Queues write permission + +## Message Format + +When events are sent to Cloudflare Queues, each message contains: + +- **body**: The event payload as a JSON object +- **contentType**: Set to `application/json` + +Messages are sent using the [Cloudflare Queues REST API](https://developers.cloudflare.com/api/operations/queue-send-messages). + +## Testing the Integration + +### Create a Consumer Worker + +To verify messages are being delivered, create a simple consumer Worker: + +```javascript +export default { + async queue(batch, env) { + for (const message of batch.messages) { + console.log('Received message:', JSON.stringify(message.body)); + message.ack(); + } + }, +}; +``` + +Deploy with wrangler.toml: + +```toml +name = "queue-consumer" +main = "src/index.js" + +[[queues.consumers]] +queue = "my-queue" +max_batch_size = 10 +max_batch_timeout = 30 +``` + +### View Queue Metrics + +1. Go to the [Cloudflare Dashboard](https://dash.cloudflare.com/) +2. Navigate to **Workers & Pages** > **Queues** +3. Select your queue +4. View metrics for messages sent, delivered, and acknowledged + +## Troubleshooting + +### Authentication Errors (401) + +- Verify your API Token is correct and hasn't been revoked +- Ensure the token has **Queues > Edit** permission +- Check the token is scoped to the correct account + +### Queue Not Found (404) + +- Verify the Queue ID is correct (it's a UUID, not the queue name) +- Ensure the queue exists in the account associated with your API Token +- Check the Account ID matches where the queue was created + +### Permission Denied (403) + +- Verify your API Token has the **Queues > Edit** permission +- Ensure the token is scoped to the account containing the queue + +### Rate Limiting (429) + +Cloudflare Queues has rate limits. If you encounter rate limiting: +- Implement backoff/retry logic +- Consider batching messages +- Review [Cloudflare Queues limits](https://developers.cloudflare.com/queues/platform/limits/) + +## Additional Resources + +- [Cloudflare Queues Documentation](https://developers.cloudflare.com/queues/) +- [Queues REST API Reference](https://developers.cloudflare.com/api/operations/queue-send-messages) +- [Cloudflare API Tokens](https://developers.cloudflare.com/fundamentals/api/get-started/create-token/) +- [Wrangler CLI Documentation](https://developers.cloudflare.com/workers/wrangler/) +- [Queues Pricing](https://developers.cloudflare.com/queues/platform/pricing/) diff --git a/internal/destregistry/metadata/providers/cloudflare_queues/metadata.json b/internal/destregistry/metadata/providers/cloudflare_queues/metadata.json new file mode 100644 index 00000000..fe4e20ab --- /dev/null +++ b/internal/destregistry/metadata/providers/cloudflare_queues/metadata.json @@ -0,0 +1,33 @@ +{ + "type": "cloudflare_queues", + "config_fields": [ + { + "key": "account_id", + "type": "text", + "label": "Account ID", + "description": "Your Cloudflare Account ID", + "required": true + }, + { + "key": "queue_id", + "type": "text", + "label": "Queue ID", + "description": "The ID of your Cloudflare Queue", + "required": true + } + ], + "credential_fields": [ + { + "key": "api_token", + "type": "text", + "label": "API Token", + "description": "Cloudflare API Token with queues:write permission", + "required": true, + "sensitive": true + } + ], + "label": "Cloudflare Queues", + "link": "https://developers.cloudflare.com/queues/", + "description": "Send events to Cloudflare Queues, a message queue that integrates natively with Cloudflare Workers.", + "icon": "" +} diff --git a/internal/destregistry/providers/default.go b/internal/destregistry/providers/default.go index ae19b0df..b47a3a95 100644 --- a/internal/destregistry/providers/default.go +++ b/internal/destregistry/providers/default.go @@ -6,6 +6,7 @@ import ( "github.com/hookdeck/outpost/internal/destregistry/providers/destawss3" "github.com/hookdeck/outpost/internal/destregistry/providers/destawssqs" "github.com/hookdeck/outpost/internal/destregistry/providers/destazureservicebus" + "github.com/hookdeck/outpost/internal/destregistry/providers/destcfqueues" "github.com/hookdeck/outpost/internal/destregistry/providers/destgcppubsub" "github.com/hookdeck/outpost/internal/destregistry/providers/desthookdeck" "github.com/hookdeck/outpost/internal/destregistry/providers/destrabbitmq" @@ -138,5 +139,11 @@ func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestina } registry.RegisterProvider("rabbitmq", rabbitmq) + cloudflareQueues, err := destcfqueues.New(loader, basePublisherOpts) + if err != nil { + return err + } + registry.RegisterProvider("cloudflare_queues", cloudflareQueues) + return nil } diff --git a/internal/destregistry/providers/destcfqueues/destcfqueues.go b/internal/destregistry/providers/destcfqueues/destcfqueues.go new file mode 100644 index 00000000..964001ed --- /dev/null +++ b/internal/destregistry/providers/destcfqueues/destcfqueues.go @@ -0,0 +1,294 @@ +package destcfqueues + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/metadata" + "github.com/hookdeck/outpost/internal/models" +) + +const ( + cloudflareAPIBaseURL = "https://api.cloudflare.com/client/v4" + providerType = "cloudflare_queues" +) + +// CloudflareQueuesDestination implements the destregistry.Provider interface for Cloudflare Queues. +type CloudflareQueuesDestination struct { + *destregistry.BaseProvider +} + +// CloudflareQueuesConfig holds the configuration for a Cloudflare Queues destination. +type CloudflareQueuesConfig struct { + AccountID string `json:"account_id" mapstructure:"account_id"` + QueueID string `json:"queue_id" mapstructure:"queue_id"` +} + +// CloudflareQueuesCredentials holds the credentials for authenticating with Cloudflare. +type CloudflareQueuesCredentials struct { + APIToken string `json:"api_token" mapstructure:"api_token"` +} + +var _ destregistry.Provider = (*CloudflareQueuesDestination)(nil) + +// New creates a new CloudflareQueuesDestination provider. +func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption) (*CloudflareQueuesDestination, error) { + base, err := destregistry.NewBaseProvider(loader, providerType, basePublisherOpts...) + if err != nil { + return nil, err + } + + return &CloudflareQueuesDestination{ + BaseProvider: base, + }, nil +} + +// Validate validates the destination configuration. +func (d *CloudflareQueuesDestination) Validate(ctx context.Context, destination *models.Destination) error { + _, _, err := d.resolveMetadata(ctx, destination) + if err != nil { + return err + } + return nil +} + +// CreatePublisher creates a new publisher for the destination. +func (d *CloudflareQueuesDestination) CreatePublisher(ctx context.Context, destination *models.Destination) (destregistry.Publisher, error) { + cfg, creds, err := d.resolveMetadata(ctx, destination) + if err != nil { + return nil, err + } + + httpClient, err := d.BaseProvider.MakeHTTPClient(destregistry.HTTPClientConfig{}) + if err != nil { + return nil, err + } + + return &CloudflareQueuesPublisher{ + BasePublisher: d.BaseProvider.NewPublisher(destregistry.WithDeliveryMetadata(destination.DeliveryMetadata)), + httpClient: httpClient, + accountID: cfg.AccountID, + queueID: cfg.QueueID, + apiToken: creds.APIToken, + }, nil +} + +// ComputeTarget returns the target information for display purposes. +func (d *CloudflareQueuesDestination) ComputeTarget(destination *models.Destination) destregistry.DestinationTarget { + accountID := destination.Config["account_id"] + queueID := destination.Config["queue_id"] + + return destregistry.DestinationTarget{ + Target: queueID, + TargetURL: makeCloudflareQueuesDashboardURL(accountID, queueID), + } +} + +// resolveMetadata validates and resolves the destination configuration and credentials. +func (d *CloudflareQueuesDestination) resolveMetadata(ctx context.Context, destination *models.Destination) (*CloudflareQueuesConfig, *CloudflareQueuesCredentials, error) { + if err := d.BaseProvider.Validate(ctx, destination); err != nil { + return nil, nil, err + } + + return &CloudflareQueuesConfig{ + AccountID: destination.Config["account_id"], + QueueID: destination.Config["queue_id"], + }, &CloudflareQueuesCredentials{ + APIToken: destination.Credentials["api_token"], + }, nil +} + +// makeCloudflareQueuesDashboardURL constructs the Cloudflare dashboard URL for a queue. +func makeCloudflareQueuesDashboardURL(accountID, queueID string) string { + if accountID == "" || queueID == "" { + return "" + } + return fmt.Sprintf("https://dash.cloudflare.com/%s/queues/%s", accountID, queueID) +} + +// CloudflareQueuesPublisher handles publishing events to Cloudflare Queues. +type CloudflareQueuesPublisher struct { + *destregistry.BasePublisher + httpClient *http.Client + accountID string + queueID string + apiToken string +} + +// Close gracefully shuts down the publisher. +func (p *CloudflareQueuesPublisher) Close() error { + p.BasePublisher.StartClose() + return nil +} + +// SetHTTPClient allows setting a custom HTTP client, primarily for testing purposes. +func (p *CloudflareQueuesPublisher) SetHTTPClient(client *http.Client) { + p.httpClient = client +} + +// cloudflareMessage represents a single message in the Cloudflare Queues API request. +type cloudflareMessage struct { + Body interface{} `json:"body"` +} + +// cloudflareMessagesRequest represents the request body for the Cloudflare Queues API. +type cloudflareMessagesRequest struct { + Messages []cloudflareMessage `json:"messages"` +} + +// cloudflareAPIResponse represents the response from the Cloudflare API. +type cloudflareAPIResponse struct { + Success bool `json:"success"` + Errors []cloudflareAPIError `json:"errors"` + Messages []string `json:"messages"` + Result []map[string]interface{} `json:"result"` +} + +// cloudflareAPIError represents an error from the Cloudflare API. +type cloudflareAPIError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// messageBody represents the body structure sent to Cloudflare Queues. +type messageBody struct { + Data interface{} `json:"data"` + Metadata map[string]string `json:"metadata"` +} + +// Format builds the HTTP request for publishing to Cloudflare Queues. +func (p *CloudflareQueuesPublisher) Format(ctx context.Context, event *models.Event) (*http.Request, error) { + now := time.Now() + metadata := p.BasePublisher.MakeMetadata(event, now) + + // Build the message body with data and metadata + body := messageBody{ + Data: event.Data, + Metadata: metadata, + } + + // Build the request payload + reqPayload := cloudflareMessagesRequest{ + Messages: []cloudflareMessage{ + {Body: body}, + }, + } + + payloadBytes, err := json.Marshal(reqPayload) + if err != nil { + return nil, fmt.Errorf("failed to marshal request payload: %w", err) + } + + // Build the API URL + url := fmt.Sprintf("%s/accounts/%s/queues/%s/messages", cloudflareAPIBaseURL, p.accountID, p.queueID) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(payloadBytes)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", p.apiToken)) + + return req, nil +} + +// Publish sends an event to Cloudflare Queues. +func (p *CloudflareQueuesPublisher) Publish(ctx context.Context, event *models.Event) (*destregistry.Delivery, error) { + if err := p.BasePublisher.StartPublish(); err != nil { + return nil, err + } + defer p.BasePublisher.FinishPublish() + + req, err := p.Format(ctx, event) + if err != nil { + return nil, err + } + + resp, err := p.httpClient.Do(req) + if err != nil { + return &destregistry.Delivery{ + Status: "failed", + Code: "ERR", + Response: map[string]interface{}{ + "error": err.Error(), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, providerType, map[string]interface{}{ + "error": err.Error(), + }) + } + defer resp.Body.Close() + + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return &destregistry.Delivery{ + Status: "failed", + Code: "ERR", + Response: map[string]interface{}{ + "error": fmt.Sprintf("failed to read response body: %s", err.Error()), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, providerType, map[string]interface{}{ + "error": fmt.Sprintf("failed to read response body: %s", err.Error()), + }) + } + + var apiResponse cloudflareAPIResponse + if err := json.Unmarshal(bodyBytes, &apiResponse); err != nil { + // If we can't parse the response, check status code + if resp.StatusCode >= 400 { + return &destregistry.Delivery{ + Status: "failed", + Code: fmt.Sprintf("%d", resp.StatusCode), + Response: map[string]interface{}{ + "status": resp.StatusCode, + "body": string(bodyBytes), + }, + }, destregistry.NewErrDestinationPublishAttempt( + fmt.Errorf("request failed with status %d", resp.StatusCode), + providerType, + map[string]interface{}{ + "status": resp.StatusCode, + "body": string(bodyBytes), + }) + } + } + + // Check for API-level errors + if !apiResponse.Success || len(apiResponse.Errors) > 0 { + errorMsg := "unknown error" + if len(apiResponse.Errors) > 0 { + errorMsg = apiResponse.Errors[0].Message + } + + return &destregistry.Delivery{ + Status: "failed", + Code: fmt.Sprintf("%d", resp.StatusCode), + Response: map[string]interface{}{ + "status": resp.StatusCode, + "success": apiResponse.Success, + "errors": apiResponse.Errors, + }, + }, destregistry.NewErrDestinationPublishAttempt( + fmt.Errorf("cloudflare API error: %s", errorMsg), + providerType, + map[string]interface{}{ + "status": resp.StatusCode, + "errors": apiResponse.Errors, + }) + } + + return &destregistry.Delivery{ + Status: "success", + Code: "OK", + Response: map[string]interface{}{ + "status": resp.StatusCode, + "result": apiResponse.Result, + }, + }, nil +} diff --git a/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go b/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go new file mode 100644 index 00000000..a0907ac6 --- /dev/null +++ b/internal/destregistry/providers/destcfqueues/destcfqueues_publish_test.go @@ -0,0 +1,394 @@ +package destcfqueues_test + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/hookdeck/outpost/internal/destregistry/providers/destcfqueues" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// cloudflareAPIResponse mirrors the response structure from Cloudflare API +type cloudflareAPIResponse struct { + Success bool `json:"success"` + Errors []cloudflareAPIError `json:"errors"` + Messages []string `json:"messages"` + Result []map[string]interface{} `json:"result"` +} + +type cloudflareAPIError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// cloudflareMessagesRequest mirrors the request structure for Cloudflare Queues API +type cloudflareMessagesRequest struct { + Messages []cloudflareMessage `json:"messages"` +} + +type cloudflareMessage struct { + Body messageBody `json:"body"` +} + +type messageBody struct { + Data interface{} `json:"data"` + Metadata map[string]string `json:"metadata"` +} + +func TestCloudflareQueuesPublisher_Format(t *testing.T) { + t.Parallel() + + provider, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "test-account-id", + "queue_id": "test-queue-id", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "api_token": "test-api-token", + }), + ) + + publisher, err := provider.CreatePublisher(context.Background(), &destination) + require.NoError(t, err) + defer publisher.Close() + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithID("evt_123"), + testutil.EventFactory.WithTopic("order.created"), + testutil.EventFactory.WithData(map[string]interface{}{ + "order_id": "test-order-123", + "amount": 99.99, + }), + testutil.EventFactory.WithMetadata(map[string]string{ + "source": "test-service", + }), + ) + + t.Run("should produce correct HTTP request structure", func(t *testing.T) { + t.Parallel() + req, err := publisher.(*destcfqueues.CloudflareQueuesPublisher).Format(context.Background(), &event) + require.NoError(t, err) + + // Verify HTTP method + assert.Equal(t, http.MethodPost, req.Method) + + // Verify URL structure + assert.Equal(t, "https://api.cloudflare.com/client/v4/accounts/test-account-id/queues/test-queue-id/messages", req.URL.String()) + + // Verify Content-Type header + assert.Equal(t, "application/json", req.Header.Get("Content-Type")) + }) + + t.Run("should contain bearer token in Authorization header", func(t *testing.T) { + t.Parallel() + req, err := publisher.(*destcfqueues.CloudflareQueuesPublisher).Format(context.Background(), &event) + require.NoError(t, err) + + authHeader := req.Header.Get("Authorization") + assert.Equal(t, "Bearer test-api-token", authHeader) + }) + + t.Run("should contain event data and metadata in request body", func(t *testing.T) { + t.Parallel() + req, err := publisher.(*destcfqueues.CloudflareQueuesPublisher).Format(context.Background(), &event) + require.NoError(t, err) + + // Read and parse the request body + bodyBytes, err := io.ReadAll(req.Body) + require.NoError(t, err) + + var reqPayload cloudflareMessagesRequest + err = json.Unmarshal(bodyBytes, &reqPayload) + require.NoError(t, err) + + // Verify the message structure + require.Len(t, reqPayload.Messages, 1) + + // Verify event data is in the body + dataMap, ok := reqPayload.Messages[0].Body.Data.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "test-order-123", dataMap["order_id"]) + assert.Equal(t, 99.99, dataMap["amount"]) + + // Verify metadata is present + metadata := reqPayload.Messages[0].Body.Metadata + assert.Equal(t, "evt_123", metadata["event-id"]) + assert.Equal(t, "order.created", metadata["topic"]) + assert.Equal(t, "test-service", metadata["source"]) + assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present in metadata") + }) +} + +func TestCloudflareQueuesPublisher_Publish_Success(t *testing.T) { + t.Parallel() + + // Create a mock server that simulates successful Cloudflare API response + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify the request + assert.Equal(t, http.MethodPost, r.Method) + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + assert.Equal(t, "Bearer test-api-token", r.Header.Get("Authorization")) + assert.True(t, strings.HasSuffix(r.URL.Path, "/accounts/test-account-id/queues/test-queue-id/messages")) + + // Return success response + response := cloudflareAPIResponse{ + Success: true, + Errors: []cloudflareAPIError{}, + Messages: []string{}, + Result: []map[string]interface{}{ + {"messageId": "msg-123"}, + }, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + // Create provider with custom HTTP client that routes to test server + provider, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "test-account-id", + "queue_id": "test-queue-id", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "api_token": "test-api-token", + }), + ) + + publisher, err := provider.CreatePublisher(context.Background(), &destination) + require.NoError(t, err) + defer publisher.Close() + + // Replace the HTTP client with one that routes to our test server + cfPublisher := publisher.(*destcfqueues.CloudflareQueuesPublisher) + cfPublisher.SetHTTPClient(&http.Client{ + Transport: &testTransport{serverURL: server.URL}, + }) + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithID("evt_123"), + testutil.EventFactory.WithTopic("order.created"), + testutil.EventFactory.WithData(map[string]interface{}{ + "order_id": "test-order-123", + }), + ) + + delivery, err := publisher.Publish(context.Background(), &event) + require.NoError(t, err) + assert.Equal(t, "success", delivery.Status) + assert.Equal(t, "OK", delivery.Code) +} + +func TestCloudflareQueuesPublisher_Publish_APIError(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + statusCode int + response cloudflareAPIResponse + expectedStatus string + expectedCode string + }{ + { + name: "401 Unauthorized", + statusCode: http.StatusUnauthorized, + response: cloudflareAPIResponse{ + Success: false, + Errors: []cloudflareAPIError{ + {Code: 10000, Message: "Authentication error"}, + }, + }, + expectedStatus: "failed", + expectedCode: "401", + }, + { + name: "403 Forbidden", + statusCode: http.StatusForbidden, + response: cloudflareAPIResponse{ + Success: false, + Errors: []cloudflareAPIError{ + {Code: 10001, Message: "Access denied"}, + }, + }, + expectedStatus: "failed", + expectedCode: "403", + }, + { + name: "404 Not Found", + statusCode: http.StatusNotFound, + response: cloudflareAPIResponse{ + Success: false, + Errors: []cloudflareAPIError{ + {Code: 10002, Message: "Queue not found"}, + }, + }, + expectedStatus: "failed", + expectedCode: "404", + }, + { + name: "500 Internal Server Error", + statusCode: http.StatusInternalServerError, + response: cloudflareAPIResponse{ + Success: false, + Errors: []cloudflareAPIError{ + {Code: 10003, Message: "Internal error"}, + }, + }, + expectedStatus: "failed", + expectedCode: "500", + }, + { + name: "API success false with errors", + statusCode: http.StatusOK, + response: cloudflareAPIResponse{ + Success: false, + Errors: []cloudflareAPIError{ + {Code: 10004, Message: "Validation error"}, + }, + }, + expectedStatus: "failed", + expectedCode: "200", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(tt.statusCode) + json.NewEncoder(w).Encode(tt.response) + })) + defer server.Close() + + provider, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "test-account-id", + "queue_id": "test-queue-id", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "api_token": "test-api-token", + }), + ) + + publisher, err := provider.CreatePublisher(context.Background(), &destination) + require.NoError(t, err) + defer publisher.Close() + + cfPublisher := publisher.(*destcfqueues.CloudflareQueuesPublisher) + cfPublisher.SetHTTPClient(&http.Client{ + Transport: &testTransport{serverURL: server.URL}, + }) + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithData(map[string]interface{}{"key": "value"}), + ) + + delivery, err := publisher.Publish(context.Background(), &event) + require.Error(t, err) + require.NotNil(t, delivery, "delivery should not be nil for API errors") + assert.Equal(t, tt.expectedStatus, delivery.Status) + assert.Equal(t, tt.expectedCode, delivery.Code) + }) + } +} + +func TestCloudflareQueuesPublisher_Publish_HTTPSuccess(t *testing.T) { + t.Parallel() + + successCodes := []int{ + http.StatusOK, + http.StatusCreated, + http.StatusAccepted, + } + + for _, statusCode := range successCodes { + t.Run(http.StatusText(statusCode), func(t *testing.T) { + t.Parallel() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + response := cloudflareAPIResponse{ + Success: true, + Errors: []cloudflareAPIError{}, + Messages: []string{}, + Result: []map[string]interface{}{ + {"messageId": "msg-123"}, + }, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + provider, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "test-account-id", + "queue_id": "test-queue-id", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "api_token": "test-api-token", + }), + ) + + publisher, err := provider.CreatePublisher(context.Background(), &destination) + require.NoError(t, err) + defer publisher.Close() + + cfPublisher := publisher.(*destcfqueues.CloudflareQueuesPublisher) + cfPublisher.SetHTTPClient(&http.Client{ + Transport: &testTransport{serverURL: server.URL}, + }) + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithData(map[string]interface{}{"key": "value"}), + ) + + delivery, err := publisher.Publish(context.Background(), &event) + require.NoError(t, err) + assert.Equal(t, "success", delivery.Status) + assert.Equal(t, "OK", delivery.Code) + }) + } +} + +// testTransport redirects requests to the test server +type testTransport struct { + serverURL string +} + +func (t *testTransport) RoundTrip(req *http.Request) (*http.Response, error) { + // Replace the URL with test server URL while keeping the path + newURL := t.serverURL + req.URL.Path + newReq, err := http.NewRequestWithContext(req.Context(), req.Method, newURL, req.Body) + if err != nil { + return nil, err + } + newReq.Header = req.Header + return http.DefaultTransport.RoundTrip(newReq) +} diff --git a/internal/destregistry/providers/destcfqueues/destcfqueues_validate_test.go b/internal/destregistry/providers/destcfqueues/destcfqueues_validate_test.go new file mode 100644 index 00000000..f05c5708 --- /dev/null +++ b/internal/destregistry/providers/destcfqueues/destcfqueues_validate_test.go @@ -0,0 +1,130 @@ +package destcfqueues_test + +import ( + "context" + "testing" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/providers/destcfqueues" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCloudflareQueuesDestination_Validate(t *testing.T) { + t.Parallel() + + validDestination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "test-account-id", + "queue_id": "test-queue-id", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "api_token": "test-api-token", + }), + ) + + cloudflareQueuesDestination, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + t.Run("should validate valid destination", func(t *testing.T) { + t.Parallel() + assert.NoError(t, cloudflareQueuesDestination.Validate(context.Background(), &validDestination)) + }) + + t.Run("should validate invalid type", func(t *testing.T) { + t.Parallel() + invalidDestination := validDestination + invalidDestination.Type = "invalid" + err := cloudflareQueuesDestination.Validate(context.Background(), &invalidDestination) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "type", validationErr.Errors[0].Field) + assert.Equal(t, "invalid_type", validationErr.Errors[0].Type) + }) + + t.Run("should validate missing account_id", func(t *testing.T) { + t.Parallel() + invalidDestination := validDestination + invalidDestination.Config = map[string]string{ + "queue_id": "test-queue-id", + } + err := cloudflareQueuesDestination.Validate(context.Background(), &invalidDestination) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "config.account_id", validationErr.Errors[0].Field) + assert.Equal(t, "required", validationErr.Errors[0].Type) + }) + + t.Run("should validate missing queue_id", func(t *testing.T) { + t.Parallel() + invalidDestination := validDestination + invalidDestination.Config = map[string]string{ + "account_id": "test-account-id", + } + err := cloudflareQueuesDestination.Validate(context.Background(), &invalidDestination) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "config.queue_id", validationErr.Errors[0].Field) + assert.Equal(t, "required", validationErr.Errors[0].Type) + }) + + t.Run("should validate missing api_token", func(t *testing.T) { + t.Parallel() + invalidDestination := validDestination + invalidDestination.Credentials = map[string]string{} + err := cloudflareQueuesDestination.Validate(context.Background(), &invalidDestination) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "credentials.api_token", validationErr.Errors[0].Field) + assert.Equal(t, "required", validationErr.Errors[0].Type) + }) +} + +func TestCloudflareQueuesDestination_ComputeTarget(t *testing.T) { + t.Parallel() + + cloudflareQueuesDestination, err := destcfqueues.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + t.Run("should return queue_id as target and dashboard URL", func(t *testing.T) { + t.Parallel() + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "my-account-123", + "queue_id": "my-queue-456", + }), + ) + target := cloudflareQueuesDestination.ComputeTarget(&destination) + assert.Equal(t, "my-queue-456", target.Target) + assert.Equal(t, "https://dash.cloudflare.com/my-account-123/queues/my-queue-456", target.TargetURL) + }) + + t.Run("should return empty target URL when account_id is missing", func(t *testing.T) { + t.Parallel() + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "queue_id": "my-queue-456", + }), + ) + target := cloudflareQueuesDestination.ComputeTarget(&destination) + assert.Equal(t, "my-queue-456", target.Target) + assert.Equal(t, "", target.TargetURL) + }) + + t.Run("should return empty target URL when queue_id is missing", func(t *testing.T) { + t.Parallel() + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("cloudflare_queues"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "account_id": "my-account-123", + }), + ) + target := cloudflareQueuesDestination.ComputeTarget(&destination) + assert.Equal(t, "", target.Target) + assert.Equal(t, "", target.TargetURL) + }) +} diff --git a/spec-sdk-tests/factories/destination.factory.ts b/spec-sdk-tests/factories/destination.factory.ts index 99cdb167..a2e054c8 100644 --- a/spec-sdk-tests/factories/destination.factory.ts +++ b/spec-sdk-tests/factories/destination.factory.ts @@ -7,6 +7,7 @@ import type { DestinationCreateAzureServiceBus, DestinationCreateAwss3, DestinationCreateGCPPubSub, + DestinationCreateCloudflareQueues, } from '../../sdks/outpost-typescript/dist/commonjs/models/components/index'; export function createWebhookDestination( @@ -143,3 +144,20 @@ export function createGcpPubSubDestination( ...overrides, }; } + +export function createCloudflareQueuesDestination( + overrides?: Partial +): DestinationCreateCloudflareQueues { + return { + type: 'cloudflare_queues', + topics: ['*'], + config: { + accountId: 'abc123def456', + queueId: 'my-queue-id', + }, + credentials: { + apiToken: 'cf-api-token-example', + }, + ...overrides, + }; +} diff --git a/spec-sdk-tests/package.json b/spec-sdk-tests/package.json index 727e43af..b4f10707 100644 --- a/spec-sdk-tests/package.json +++ b/spec-sdk-tests/package.json @@ -24,7 +24,7 @@ "author": "Outpost Team", "license": "Apache-2.0", "dependencies": { - "@hookdeck/outpost-sdk": "file:../../../sdks/outpost-typescript" + "@hookdeck/outpost-sdk": "file:../sdks/outpost-typescript" }, "devDependencies": { "@stoplight/spectral-cli": "^6.11.0", diff --git a/spec-sdk-tests/tests/destinations/cloudflare-queues.test.ts b/spec-sdk-tests/tests/destinations/cloudflare-queues.test.ts new file mode 100644 index 00000000..2ecc30ea --- /dev/null +++ b/spec-sdk-tests/tests/destinations/cloudflare-queues.test.ts @@ -0,0 +1,440 @@ +import { describe, it, before, after } from 'mocha'; +import { expect } from 'chai'; +import { SdkClient, createSdkClient } from '../../utils/sdk-client'; +import { createCloudflareQueuesDestination } from '../../factories/destination.factory'; +/* eslint-disable no-console */ +/* eslint-disable no-undef */ + +// Get configured test topics from environment (required) +if (!process.env.TEST_TOPICS) { + throw new Error('TEST_TOPICS environment variable is required. Please set it in .env file.'); +} +const TEST_TOPICS = process.env.TEST_TOPICS.split(',').map((t) => t.trim()); + +describe('Cloudflare Queues Destinations - Contract Tests (SDK-based validation)', () => { + let client: SdkClient; + + before(async () => { + client = createSdkClient(); + + // Create tenant if it doesn't exist (idempotent operation) + try { + await client.upsertTenant(); + } catch (error) { + console.warn('Failed to create tenant (may already exist):', error); + } + }); + + after(async () => { + // Cleanup: delete all destinations for the test tenant + try { + const destinations = await client.listDestinations(); + console.log(`Cleaning up ${destinations.length} destinations...`); + + for (const destination of destinations) { + try { + await client.deleteDestination(destination.id); + console.log(`Deleted destination: ${destination.id}`); + } catch (error) { + console.warn(`Failed to delete destination ${destination.id}:`, error); + } + } + + console.log('All destinations cleaned up'); + } catch (error) { + console.warn('Failed to list destinations for cleanup:', error); + } + + // Cleanup: delete the test tenant + try { + await client.deleteTenant(); + console.log('Test tenant deleted'); + } catch (error) { + console.warn('Failed to delete tenant:', error); + } + }); + + describe('POST /api/v1/tenants/{tenant_id}/destinations - Create Cloudflare Queues Destination', () => { + it('should create a Cloudflare Queues destination with valid config', async () => { + const destinationData = createCloudflareQueuesDestination(); + const destination = await client.createDestination(destinationData); + + expect(destination.type).to.equal('cloudflare_queues'); + expect(destination.config.accountId).to.equal(destinationData.config.accountId); + expect(destination.config.queueId).to.equal(destinationData.config.queueId); + }); + + it('should create a Cloudflare Queues destination with array of topics', async () => { + const destinationData = createCloudflareQueuesDestination({ + topics: TEST_TOPICS, + }); + const destination = await client.createDestination(destinationData); + + expect(destination.topics).to.have.lengthOf(TEST_TOPICS.length); + TEST_TOPICS.forEach((topic) => { + expect(destination.topics).to.include(topic); + }); + + // Cleanup + await client.deleteDestination(destination.id); + }); + + it('should create destination with user-provided ID', async () => { + const customId = `custom-cf-queues-${Date.now()}`; + const destinationData = createCloudflareQueuesDestination({ + id: customId, + }); + const destination = await client.createDestination(destinationData); + + expect(destination.id).to.equal(customId); + + // Cleanup + await client.deleteDestination(destination.id); + }); + + it('should reject creation with missing required config field: account_id', async () => { + let errorThrown = false; + try { + await client.createDestination({ + type: 'cloudflare_queues', + topics: '*', + config: { + // Missing accountId + queueId: 'my-queue-id', + }, + credentials: { + apiToken: 'cf-api-token-example', + }, + } as any); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.be.oneOf([400, 422]); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + + it('should reject creation with missing required config field: queue_id', async () => { + let errorThrown = false; + try { + await client.createDestination({ + type: 'cloudflare_queues', + topics: '*', + config: { + accountId: 'abc123def456', + // Missing queueId + }, + credentials: { + apiToken: 'cf-api-token-example', + }, + } as any); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.be.oneOf([400, 422]); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + + it('should reject creation with missing required credential field: api_token', async () => { + let errorThrown = false; + try { + await client.createDestination({ + type: 'cloudflare_queues', + topics: '*', + config: { + accountId: 'abc123def456', + queueId: 'my-queue-id', + }, + credentials: { + // Missing apiToken + }, + } as any); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.be.oneOf([400, 422]); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + + it('should reject creation with missing credentials', async () => { + let errorThrown = false; + try { + await client.createDestination({ + type: 'cloudflare_queues', + topics: '*', + config: { + accountId: 'abc123def456', + queueId: 'my-queue-id', + }, + // Missing credentials + } as any); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.be.oneOf([400, 422]); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + + it('should reject creation with missing type field', async () => { + let errorThrown = false; + try { + await client.createDestination({ + topics: '*', + config: { + accountId: 'abc123def456', + queueId: 'my-queue-id', + }, + credentials: { + apiToken: 'cf-api-token-example', + }, + } as any); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.be.oneOf([400, 422]); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + + it('should reject creation with empty topics', async () => { + let errorThrown = false; + try { + const destinationData = createCloudflareQueuesDestination({ + topics: [], + }); + await client.createDestination(destinationData); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.be.oneOf([400, 422]); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + }); + + describe('GET /api/v1/tenants/{tenant_id}/destinations/{id} - Retrieve Cloudflare Queues Destination', () => { + let destinationId: string; + + before(async () => { + const destinationData = createCloudflareQueuesDestination(); + const destination = await client.createDestination(destinationData); + destinationId = destination.id; + }); + + after(async () => { + try { + await client.deleteDestination(destinationId); + } catch (error) { + console.warn('Failed to cleanup destination:', error); + } + }); + + it('should retrieve an existing Cloudflare Queues destination', async () => { + const destination = await client.getDestination(destinationId); + + expect(destination.id).to.equal(destinationId); + expect(destination.type).to.equal('cloudflare_queues'); + expect(destination.config.accountId).to.exist; + expect(destination.config.queueId).to.exist; + }); + + it('should return 404 for non-existent destination', async () => { + let errorThrown = false; + try { + await client.getDestination('non-existent-id-12345'); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.equal(404); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + }); + + describe('GET /api/v1/tenants/{tenant_id}/destinations - List Cloudflare Queues Destinations', () => { + before(async () => { + // Create multiple Cloudflare Queues destinations for listing + await client.createDestination(createCloudflareQueuesDestination()); + await client.createDestination( + createCloudflareQueuesDestination({ + topics: [TEST_TOPICS[0]], + config: { + accountId: 'abc123def456', + queueId: 'my-queue-2', + }, + }) + ); + }); + + it('should list all destinations', async () => { + const destinations = await client.listDestinations(); + + expect(destinations.length).to.be.greaterThan(0); + }); + + it('should filter destinations by type', async () => { + const destinations = await client.listDestinations({ type: 'cloudflare_queues' }); + + destinations.forEach((dest) => { + expect(dest.type).to.equal('cloudflare_queues'); + }); + }); + }); + + describe('PATCH /api/v1/tenants/{tenant_id}/destinations/{id} - Update Cloudflare Queues Destination', () => { + let destinationId: string; + + before(async () => { + const destinationData = createCloudflareQueuesDestination(); + const destination = await client.createDestination(destinationData); + destinationId = destination.id; + }); + + after(async () => { + try { + await client.deleteDestination(destinationId); + } catch (error) { + console.warn('Failed to cleanup destination:', error); + } + }); + + it('should update destination topics', async () => { + const updated = await client.updateDestination(destinationId, { + topics: ['user.created', 'user.updated'], + }); + + expect(updated.id).to.equal(destinationId); + expect(updated.type).to.equal('cloudflare_queues'); + expect(updated.topics).to.include('user.created'); + expect(updated.topics).to.include('user.updated'); + }); + + it('should update destination config', async () => { + const updated = await client.updateDestination(destinationId, { + config: { + accountId: 'updated-account-id', + queueId: 'updated-queue-id', + }, + }); + + expect(updated.id).to.equal(destinationId); + expect(updated.config).to.exist; + if (updated.config) { + expect(updated.config.accountId).to.equal('updated-account-id'); + expect(updated.config.queueId).to.equal('updated-queue-id'); + } + }); + + it('should update destination credentials', async () => { + const updated = await client.updateDestination(destinationId, { + credentials: { + apiToken: 'updated-api-token', + }, + }); + + expect(updated.id).to.equal(destinationId); + }); + + it('should return 404 for updating non-existent destination', async () => { + let errorThrown = false; + try { + await client.updateDestination('non-existent-id-12345', { + topics: ['test'], + }); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.equal(404); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + }); + + describe('DELETE /api/v1/tenants/{tenant_id}/destinations/{id} - Delete Cloudflare Queues Destination', () => { + it('should delete an existing destination', async () => { + const destinationData = createCloudflareQueuesDestination(); + const destination = await client.createDestination(destinationData); + + await client.deleteDestination(destination.id); + + // Verify deletion by trying to get the destination + let errorThrown = false; + try { + await client.getDestination(destination.id); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + } + expect(errorThrown).to.be.true; + }); + + it('should return 404 for deleting non-existent destination', async () => { + let errorThrown = false; + try { + await client.deleteDestination('non-existent-id-12345'); + } catch (error: any) { + errorThrown = true; + expect(error).to.exist; + if (error.response) { + expect(error.response.status).to.equal(404); + } else { + expect(error.message).to.exist; + } + } + if (!errorThrown) { + expect.fail('Should have thrown an error'); + } + }); + }); +});