Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .air.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ root = "."
tmp_dir = "tmp"

[build]
poll = true
poll_interval = 500
cmd = "go build -o tmp/main cmd/outpost/main.go"
bin = "tmp/main"
delay = 100
Expand Down
7 changes: 6 additions & 1 deletion cmd/e2e/configs/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hookdeck/outpost/internal/config"
"github.com/hookdeck/outpost/internal/idgen"
"github.com/hookdeck/outpost/internal/infra"
"github.com/hookdeck/outpost/internal/logging"
"github.com/hookdeck/outpost/internal/redis"
"github.com/hookdeck/outpost/internal/util/testinfra"
"github.com/hookdeck/outpost/internal/util/testutil"
Expand Down Expand Up @@ -97,10 +98,14 @@ func Basic(t *testing.T, opts BasicOpts) config.Config {
if err != nil {
log.Println("Failed to create redis client:", err)
}
logger, err := logging.NewLogger(logging.WithLogLevel("warn"))
if err != nil {
log.Println("Failed to create logger:", err)
}
outpostInfra := infra.NewInfra(infra.Config{
DeliveryMQ: c.MQs.ToInfraConfig("deliverymq"),
LogMQ: c.MQs.ToInfraConfig("logmq"),
}, redisClient)
}, redisClient, logger, c.MQs.GetInfraType())
if err := outpostInfra.Teardown(context.Background()); err != nil {
log.Println("Teardown failed:", err)
}
Expand Down
8 changes: 6 additions & 2 deletions docs/pages/references/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ Global configurations are provided through env variables or a YAML file. ConfigM
| `AZURE_SERVICEBUS_RESOURCE_GROUP` | Azure resource group name | `nil` | Yes |
| `AZURE_SERVICEBUS_SUBSCRIPTION_ID` | Azure subscription ID | `nil` | Yes |
| `AZURE_SERVICEBUS_TENANT_ID` | Azure Active Directory tenant ID | `nil` | Yes |
| `CLICKHOUSE_ADDR` | Address (host:port) of the ClickHouse server. Example: 'localhost:9000'. | `nil` | No |
| `CLICKHOUSE_ADDR` | Address (host:port) of the ClickHouse server. Example: 'localhost:9000' or 'host.clickhouse.cloud:9440' for ClickHouse Cloud. | `nil` | No |
| `CLICKHOUSE_DATABASE` | Database name in ClickHouse to use. | `outpost` | No |
| `CLICKHOUSE_PASSWORD` | Password for ClickHouse authentication. | `nil` | No |
| `CLICKHOUSE_TLS_ENABLED` | Enable TLS for ClickHouse connection. | `false` | No |
| `CLICKHOUSE_USERNAME` | Username for ClickHouse authentication. | `nil` | No |
| `DELIVERY_IDEMPOTENCY_KEY_TTL` | Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 3600 (1 hour). | `3600` | No |
| `DELIVERY_MAX_CONCURRENCY` | Maximum number of delivery attempts to process concurrently. | `1` | No |
Expand Down Expand Up @@ -178,7 +179,7 @@ alert:
audit_log: true

clickhouse:
# Address (host:port) of the ClickHouse server. Example: 'localhost:9000'.
# Address (host:port) of the ClickHouse server. Example: 'localhost:9000' or 'host.clickhouse.cloud:9440' for ClickHouse Cloud.
addr: ""

# Database name in ClickHouse to use.
Expand All @@ -187,6 +188,9 @@ clickhouse:
# Password for ClickHouse authentication.
password: ""

# Enable TLS for ClickHouse connection.
tls_enabled: false

# Username for ClickHouse authentication.
username: ""

Expand Down
8 changes: 8 additions & 0 deletions internal/apirouter/logger_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ func getErrorFields(err error) []zap.Field {
originalErr = err
}

// Handle nil error case (e.g., ErrorResponse with nil Err field)
if originalErr == nil {
return []zap.Field{
zap.String("error", "unknown error"),
zap.String("error_type", "nil"),
}
}

fields := []zap.Field{
zap.String("error", originalErr.Error()),
zap.String("error_type", fmt.Sprintf("%T", originalErr)),
Expand Down
1 change: 1 addition & 0 deletions internal/apirouter/tenant_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (h *TenantHandlers) List(c *gin.Context) {
// Map errors to HTTP status codes
if errors.Is(err, models.ErrListTenantNotSupported) {
AbortWithError(c, http.StatusNotImplemented, ErrorResponse{
Err: err,
Code: http.StatusNotImplemented,
Message: err.Error(),
})
Expand Down
6 changes: 4 additions & 2 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app
import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -49,14 +50,15 @@ func (a *App) Run(ctx context.Context) error {
}

// PreRun initializes all dependencies before starting the application
func (a *App) PreRun(ctx context.Context) error {
func (a *App) PreRun(ctx context.Context) (err error) {
if err := a.setupLogger(); err != nil {
return err
}

defer func() {
if r := recover(); r != nil {
a.logger.Error("panic during PreRun", zap.Any("panic", r))
err = fmt.Errorf("panic during PreRun: %v", r)
}
}()

Expand Down Expand Up @@ -217,7 +219,7 @@ func (a *App) initializeInfrastructure(ctx context.Context) error {
DeliveryMQ: a.config.MQs.ToInfraConfig("deliverymq"),
LogMQ: a.config.MQs.ToInfraConfig("logmq"),
AutoProvision: a.config.MQs.AutoProvision,
}, a.redisClient); err != nil {
}, a.redisClient, a.logger, a.config.MQs.GetInfraType()); err != nil {
a.logger.Error("infrastructure initialization failed", zap.Error(err))
return err
}
Expand Down
21 changes: 15 additions & 6 deletions internal/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package clickhouse

import (
"crypto/tls"

"github.com/ClickHouse/clickhouse-go/v2"
chdriver "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
Expand All @@ -12,14 +14,15 @@ type (
)

type ClickHouseConfig struct {
Addr string
Username string
Password string
Database string
Addr string
Username string
Password string
Database string
TLSEnabled bool
}

func New(config *ClickHouseConfig) (DB, error) {
conn, err := clickhouse.Open(&clickhouse.Options{
opts := &clickhouse.Options{
Addr: []string{config.Addr},
Auth: clickhouse.Auth{
Database: config.Database,
Expand All @@ -31,6 +34,12 @@ func New(config *ClickHouseConfig) (DB, error) {
// Debugf: func(format string, v ...any) {
// fmt.Printf(format+"\n", v...)
// },
})
}

if config.TLSEnabled {
opts.TLS = &tls.Config{}
}

conn, err := clickhouse.Open(opts)
return conn, err
}
19 changes: 11 additions & 8 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,21 +382,23 @@ func (c *RedisConfig) ToConfig() *redis.RedisConfig {
}

type ClickHouseConfig struct {
Addr string `yaml:"addr" env:"CLICKHOUSE_ADDR" desc:"Address (host:port) of the ClickHouse server. Example: 'localhost:9000'." required:"N"`
Username string `yaml:"username" env:"CLICKHOUSE_USERNAME" desc:"Username for ClickHouse authentication." required:"N"`
Password string `yaml:"password" env:"CLICKHOUSE_PASSWORD" desc:"Password for ClickHouse authentication." required:"N"`
Database string `yaml:"database" env:"CLICKHOUSE_DATABASE" desc:"Database name in ClickHouse to use." required:"N"`
Addr string `yaml:"addr" env:"CLICKHOUSE_ADDR" desc:"Address (host:port) of the ClickHouse server. Example: 'localhost:9000' or 'host.clickhouse.cloud:9440' for ClickHouse Cloud." required:"N"`
Username string `yaml:"username" env:"CLICKHOUSE_USERNAME" desc:"Username for ClickHouse authentication." required:"N"`
Password string `yaml:"password" env:"CLICKHOUSE_PASSWORD" desc:"Password for ClickHouse authentication." required:"N"`
Database string `yaml:"database" env:"CLICKHOUSE_DATABASE" desc:"Database name in ClickHouse to use." required:"N"`
TLSEnabled bool `yaml:"tls_enabled" env:"CLICKHOUSE_TLS_ENABLED" desc:"Enable TLS for ClickHouse connection." required:"N"`
}

func (c *ClickHouseConfig) ToConfig() *clickhouse.ClickHouseConfig {
if c.Addr == "" {
return nil
}
return &clickhouse.ClickHouseConfig{
Addr: c.Addr,
Username: c.Username,
Password: c.Password,
Database: c.Database,
Addr: c.Addr,
Username: c.Username,
Password: c.Password,
Database: c.Database,
TLSEnabled: c.TLSEnabled,
}
}

Expand Down Expand Up @@ -480,6 +482,7 @@ func (c *Config) ToMigratorOpts() migrator.MigrationOpts {
Password: c.ClickHouse.Password,
Database: c.ClickHouse.Database,
DeploymentID: c.DeploymentID,
TLSEnabled: c.ClickHouse.TLSEnabled,
},
}
}
7 changes: 7 additions & 0 deletions internal/config/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ func (c *Config) LogConfigurationSummary() []zap.Field {
zap.Bool("postgres_configured", c.PostgresURL != ""),
zap.String("postgres_host", maskPostgresURLHost(c.PostgresURL)),

// ClickHouse
zap.Bool("clickhouse_configured", c.ClickHouse.Addr != ""),
zap.String("clickhouse_addr", c.ClickHouse.Addr),
zap.String("clickhouse_database", c.ClickHouse.Database),
zap.Bool("clickhouse_password_configured", c.ClickHouse.Password != ""),
zap.Bool("clickhouse_tls_enabled", c.ClickHouse.TLSEnabled),

// Message Queue
zap.String("mq_type", c.MQs.GetInfraType()),

Expand Down
20 changes: 20 additions & 0 deletions internal/deliverymq/messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (h *messageHandler) Handle(ctx context.Context, msg *mqs.Message) error {
}

h.logger.Ctx(ctx).Info("processing delivery event",
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", deliveryEvent.DestinationID),
Expand Down Expand Up @@ -200,6 +201,8 @@ func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.Deli

h.logger.Ctx(ctx).Error("failed to publish event",
zap.Error(err),
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("delivery_id", delivery.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", destination.ID),
Expand All @@ -220,6 +223,8 @@ func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.Deli
if err := h.retryScheduler.Cancel(ctx, deliveryEvent.GetRetryID()); err != nil {
h.logger.Ctx(ctx).Error("failed to cancel scheduled retry",
zap.Error(err),
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("delivery_id", delivery.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", destination.ID),
Expand All @@ -228,6 +233,8 @@ func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.Deli
return h.logDeliveryResult(ctx, &deliveryEvent, destination, delivery, err)
}
logger.Audit("scheduled retry canceled",
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("delivery_id", delivery.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", destination.ID),
Expand All @@ -244,6 +251,8 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, deliveryEvent *m
deliveryEvent.Delivery = delivery

logger.Audit("event delivered",
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("delivery_id", deliveryEvent.Delivery.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", destination.ID),
Expand All @@ -256,6 +265,8 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, deliveryEvent *m
if logErr := h.logMQ.Publish(ctx, *deliveryEvent); logErr != nil {
logger.Error("failed to publish delivery log",
zap.Error(logErr),
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("delivery_id", deliveryEvent.Delivery.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", destination.ID),
Expand Down Expand Up @@ -328,6 +339,8 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, deliveryEvent *
if monitorErr := h.alertMonitor.HandleAttempt(ctx, attempt); monitorErr != nil {
h.logger.Ctx(ctx).Error("failed to handle alert attempt",
zap.Error(monitorErr),
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("delivery_id", deliveryEvent.Delivery.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", destination.TenantID),
zap.String("destination_id", destination.ID),
Expand All @@ -336,6 +349,8 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, deliveryEvent *
}

h.logger.Ctx(ctx).Info("alert attempt handled",
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("delivery_id", deliveryEvent.Delivery.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", destination.TenantID),
zap.String("destination_id", destination.ID),
Expand Down Expand Up @@ -412,6 +427,7 @@ func (h *messageHandler) scheduleRetry(ctx context.Context, deliveryEvent models
if err := h.retryScheduler.Schedule(ctx, retryMessageStr, backoffDuration, scheduler.WithTaskID(deliveryEvent.GetRetryID())); err != nil {
h.logger.Ctx(ctx).Error("failed to schedule retry",
zap.Error(err),
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", deliveryEvent.DestinationID),
Expand All @@ -421,6 +437,7 @@ func (h *messageHandler) scheduleRetry(ctx context.Context, deliveryEvent models
}

h.logger.Ctx(ctx).Audit("retry scheduled",
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", deliveryEvent.DestinationID),
Expand Down Expand Up @@ -462,6 +479,7 @@ func (h *messageHandler) ensurePublishableDestination(ctx context.Context, deliv
logger := h.logger.Ctx(ctx)
fields := []zap.Field{
zap.Error(err),
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", deliveryEvent.DestinationID),
Expand All @@ -477,13 +495,15 @@ func (h *messageHandler) ensurePublishableDestination(ctx context.Context, deliv
}
if destination == nil {
h.logger.Ctx(ctx).Info("destination not found",
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", deliveryEvent.DestinationID))
return nil, models.ErrDestinationNotFound
}
if destination.DisabledAt != nil {
h.logger.Ctx(ctx).Info("skipping disabled destination",
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", destination.ID),
Expand Down
Loading