From be909535d12b5295958ed9693c5f87fa83239e13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guilhem=20Sant=C3=A9?= <76043368+guilhem-sante@users.noreply.github.com> Date: Tue, 24 Mar 2026 00:29:10 +0100 Subject: [PATCH 1/4] wip: synchronized clock --- configs/default.yml | 5 ++ go.mod | 3 + go.sum | 6 ++ internal/clock/clock.go | 17 +++++ internal/clock/doc.go | 1 + internal/clock/election.go | 151 +++++++++++++++++++++++++++++++++++++ internal/config/config.go | 18 +++++ internal/config/resolve.go | 33 ++++++++ 8 files changed, 234 insertions(+) create mode 100644 internal/clock/clock.go create mode 100644 internal/clock/doc.go create mode 100644 internal/clock/election.go create mode 100644 internal/config/resolve.go diff --git a/configs/default.yml b/configs/default.yml index 42ab2f2..2926820 100644 --- a/configs/default.yml +++ b/configs/default.yml @@ -4,3 +4,8 @@ server: shutdown_timeout: 30 log: level: info +redis: + addr: localhost:6739 + # password: "" + # password_file: "/var/run/secrets/redis_password" + db: 0 diff --git a/go.mod b/go.mod index 8232a73..0d8e83f 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( require ( github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fatih/structs v1.1.0 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/go-logr/logr v1.4.3 // indirect @@ -32,10 +33,12 @@ require ( github.com/knadh/koanf/maps v0.1.2 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/redis/go-redis/v9 v9.18.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/otel/metric v1.42.0 // indirect go.opentelemetry.io/otel/trace v1.42.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect + go.uber.org/atomic v1.11.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/net v0.52.0 // indirect golang.org/x/sys v0.42.0 // indirect diff --git a/go.sum b/go.sum index fca7d70..b0f9caf 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= @@ -45,6 +47,8 @@ github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zx github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs= +github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= @@ -75,6 +79,8 @@ go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4Len go.opentelemetry.io/otel/trace v1.42.0/go.mod h1:f3K9S+IFqnumBkKhRJMeaZeNk9epyhnCmQh/EysQCdc= go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= diff --git a/internal/clock/clock.go b/internal/clock/clock.go new file mode 100644 index 0000000..39b69c4 --- /dev/null +++ b/internal/clock/clock.go @@ -0,0 +1,17 @@ +package clock + +import ( + "time" +) + +const ( + clockChannel = "samarkand:clock" + tickInterval = 100 * time.Millisecond +) + +// Tick is the payload published on every clock advance. +type Tick struct { + Seq uint64 `json:"seq"` // monotonic tick counter + MarketTime time.Time `json:"market_time"` // virtual market time + PublishedAt time.Time `json:"published_at"` // wall time of publication +} diff --git a/internal/clock/doc.go b/internal/clock/doc.go new file mode 100644 index 0000000..fa04f4c --- /dev/null +++ b/internal/clock/doc.go @@ -0,0 +1 @@ +package clock diff --git a/internal/clock/election.go b/internal/clock/election.go new file mode 100644 index 0000000..4a428a9 --- /dev/null +++ b/internal/clock/election.go @@ -0,0 +1,151 @@ +package clock + +import ( + "context" + "errors" + "time" + "log/slog" + + "github.com/redis/go-redis/v9" +) + +const ( + leaderKey = "market:leader" + + leaseTTL = 3 * time.Second // how long the lock lives without renewal + renewInterval = leaseTTL / 3 // renew well before expiry + retryInterval = 500 * time.Millisecond // how often a standby retries acquisition +) + +type Election struct { + rdb *redis.Client + leaderID string // unique ID for this instance (e.g. hostname + pid) + log *slog.Logger +} + +func NewElection(rdb *redis.Client, leaderID string) *Election { + return &Election{rdb: rdb, leaderID: leaderID} +} + + +// Campaign blocks until this instance wins the election, then calls +// onElected in a goroutine. When onElected returns (or ctx is cancelled), +// the lease is released and Campaign returns — callers should loop: +// +// for { +// if err := e.Campaign(ctx, onElected); !errors.Is(err, ErrNotRenewed) { break } +// } +func (e *Election) Campaign(ctx context.Context, onElected func(ctx context.Context)) error { + // --- Phase 1: acquire the lock --- + for { + ok, err := e.rdb.SetNX(ctx, leaderKey, e.leaderID, leaseTTL).Result() + if err != nil { + if ctx.Err() != nil { + return ctx.Err() + } + + e.log.Error("acquire error", "error", err) + } else if ok { + break // we are the leader + } + + // Lock is held by someone else; wait before retrying. + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(retryInterval): + } + } + + e.log.Error("leader elected", "leader_id", e.leaderID) + + // --- Phase 2: hold the lock via a renewal goroutine --- + leaderCtx, abdicate := context.WithCancel(ctx) + defer abdicate() + + renewErr := make(chan error, 1) + go func() { + renewErr <- e.holdLease(leaderCtx) + }() + + // Run the master work in a separate goroutine so we can watch for + // lease loss at the same time. + done := make(chan struct{}) + go func() { + defer close(done) + onElected(leaderCtx) + }() + + var result error + select { + case err := <-renewErr: + // Lease lost — tell onElected to stop. + abdicate() + <-done + result = err + case <-done: + // onElected finished on its own (shouldn't happen in normal operation). + result = nil + } + + // Best-effort release so the next leader doesn't wait for TTL. + e.release(context.Background()) + return result +} + +// holdLease renews the lease until ctx is cancelled or renewal fails. +func (e *Election) holdLease(ctx context.Context) error { + ticker := time.NewTicker(renewInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + // Only renew if we still own the key. + renewed, err := e.tryRenew(ctx) + if err != nil { + e.log.Error("renewal error", "error", err) + return ErrNotRenewed + } + if !renewed { + e.log.Error("lost leadership", "leader_id", e.leaderID) + return ErrNotRenewed + } + } + } +} + +// tryRenew uses a Lua script so the check-and-set is atomic. +var renewScript = redis.NewScript(` + if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("PEXPIRE", KEYS[1], ARGV[2]) + else + return 0 + end +`) + +func (e *Election) tryRenew(ctx context.Context) (bool, error) { + ttlMs := leaseTTL.Milliseconds() + res, err := renewScript.Run(ctx, e.rdb, []string{leaderKey}, e.leaderID, ttlMs).Int() + return res == 1, err +} + +// release deletes the key only if we still own it. +var releaseScript = redis.NewScript(` + if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) + else + return 0 + end +`) + +func (e *Election) release(ctx context.Context) { + if _, err := releaseScript.Run(ctx, e.rdb, []string{leaderKey}, e.leaderID).Int(); err != nil { + e.log.Error("release error (non-fatal)", "error", err) + } +} + +// ErrNotRenewed is returned when a leader loses its lease. +var ErrNotRenewed = errors.New("leader lease not renewed") diff --git a/internal/config/config.go b/internal/config/config.go index 8800db6..4dc4fe8 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,6 +5,7 @@ import "time" type Config struct { Server ServerConfig `koanf:"server"` Log LogConfig `koanf:"log"` + Redis RedisConfig `koanf:"redis"` } type ServerConfig struct { @@ -16,6 +17,17 @@ type LogConfig struct { Level string `koanf:"level"` } +type RedisConfig struct { + Addr string `koanf:"addr"` + Password *string `koanf:"password"` + PasswordFile *string `koanf:"password_file"` + Db string `koanf:"db"` + + // Resolved at load time — not populated from config directly. + // Unexported so it isn't accidentally (de)serialised. + resolvedPassword *string +} + func Default() Config { return Config{ Server: ServerConfig{ @@ -25,5 +37,11 @@ func Default() Config { Log: LogConfig{ Level: "info", }, + Redis: RedisConfig{ + Addr: "localhost:6739", + Password: nil, + PasswordFile: nil, + Db: "0", + }, } } diff --git a/internal/config/resolve.go b/internal/config/resolve.go new file mode 100644 index 0000000..0737a58 --- /dev/null +++ b/internal/config/resolve.go @@ -0,0 +1,33 @@ +package config + +import ( + "errors" + "fmt" + "os" + "strings" +) + +func (r *RedisConfig) ResolvedPassword() *string { + return r.resolvedPassword +} + +// Resolve validates the password configuration and, if PasswordFile is set, +// reads the file and stores its trimmed contents as the effective password. +func (r *RedisConfig) Resolve() error { + if r.Password != nil && r.PasswordFile != nil { + return errors.New("redis: password and password_file are mutually exclusive") + } + + if r.PasswordFile != nil { + data, err := os.ReadFile(*r.PasswordFile) + if err != nil { + return fmt.Errorf("redis: reading password_file: %w", err) + } + s := strings.TrimRight(string(data), "\r\n") + r.resolvedPassword = &s + return nil + } + + r.resolvedPassword = r.Password + return nil +} From 1998236f0dc8fc8bf87c9982c9bf1d1c8880be9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guilhem=20Sant=C3=A9?= <76043368+guilhem-sante@users.noreply.github.com> Date: Mon, 13 Apr 2026 15:29:10 +0200 Subject: [PATCH 2/4] feat: configuration loading from JSON schema Load defaults and validate configuration from JSON schema. Enable less Go specific configuration definition, improving application general comprehension. --- .gitignore | 3 + .goreleaser.yml | 2 +- Dockerfile | 2 +- cmd/samarkand/samarkand.go | 71 +---------- configs/default.yml | 11 -- go.mod | 8 +- go.sum | 14 +++ internal/app/command.go | 111 ++++++++++++++++++ .../app/command_test.go | 4 +- internal/clock/clock.go | 4 +- internal/clock/election.go | 11 +- internal/config/config.go | 42 +++---- internal/config/config.schema.json | 107 +++++++++++++++++ internal/config/loader.go | 73 ++++++++++-- internal/config/resolve.go | 11 ++ internal/logger/logger.go | 12 +- 16 files changed, 348 insertions(+), 138 deletions(-) delete mode 100644 configs/default.yml create mode 100644 internal/app/command.go rename cmd/samarkand/samarkand_test.go => internal/app/command_test.go (98%) create mode 100644 internal/config/config.schema.json diff --git a/.gitignore b/.gitignore index 45e81b3..e04f741 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,6 @@ dist/ build/ bin/ .task/ + +# Debug file +.debug* diff --git a/.goreleaser.yml b/.goreleaser.yml index 669d4cb..cb14825 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -17,7 +17,7 @@ builds: - -trimpath ldflags: - "-s -w" - - "{{if .IsNightly}}-X github.com/coding-kelps/samarkand/internal/version.version={{.Version}}{{end}}" + - "{{if .IsNightly}}-X github.com/coding-kelps/samarkand/internal/metadata.version={{.Version}}{{end}}" skip_upload: true gomod: diff --git a/Dockerfile b/Dockerfile index 6cbfa2a..5fe550f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ ARG TARGETPLATFORM COPY --chown=nonroot:nonroot $TARGETPLATFORM/samarkand /bin/ -EXPOSE 8080 50051 +EXPOSE 50051 ENTRYPOINT ["/bin/samarkand"] diff --git a/cmd/samarkand/samarkand.go b/cmd/samarkand/samarkand.go index cca45eb..92a9a35 100644 --- a/cmd/samarkand/samarkand.go +++ b/cmd/samarkand/samarkand.go @@ -2,83 +2,16 @@ package main import ( "context" - "fmt" - "io" "log" - "log/slog" "os" - "github.com/urfave/cli/v3" - "go.opentelemetry.io/contrib/bridges/otelslog" - + "github.com/coding-kelps/samarkand/internal/app" "github.com/coding-kelps/samarkand/internal/config" - "github.com/coding-kelps/samarkand/internal/logger" - "github.com/coding-kelps/samarkand/internal/server" - "github.com/coding-kelps/samarkand/internal/metadata" ) func main() { - cmd := newCommand(os.Stdout, config.Load) + cmd := app.NewCommand(os.Stdout, config.Load) if err := cmd.Run(context.Background(), os.Args); err != nil { log.Fatal(err) } } - -func newCommand(w io.Writer, loader func(string) (config.Config, error)) *cli.Command { - return &cli.Command{ - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "config", - Aliases: []string{"c"}, - Usage: "Path to configuration file", - Sources: cli.EnvVars("SAMARKAND_CONFIG"), - }, - }, - Commands: []*cli.Command{ - { - Name: "start", - Usage: "start samarkand market server.", - Action: func(ctx context.Context, cmd *cli.Command) error { - cfg, err := loader(cmd.Root().String("config")) - if err != nil { - slog.Error("failed to load configuration", "error", err) - return err - } - - shutdown, err := logger.SetupOTelLogger(&cfg, ctx) - if err != nil { - slog.Error("failed to set up OTel logger", "error", err) - return err - } - defer shutdown(ctx) - - level, err := logger.ParseLogLevel(cfg.Log.Level) - if err != nil { - return err - } - - otelHandler := otelslog.NewHandler(metadata.GetName()) - consoleHandler := slog.NewTextHandler(w, &slog.HandlerOptions{ - Level: level, - AddSource: true, - }) - - logger := slog.New(logger.NewMultiHandler(otelHandler, consoleHandler)) - slog.SetDefault(logger) - - s := server.New(&cfg, logger) - - return s.Start() - }, - }, - { - Name: "version", - Usage: "Get the version samarkand.", - Action: func(context.Context, *cli.Command) error { - fmt.Fprintln(w, metadata.GetVersionWithBuildInfo()) - return nil - }, - }, - }, - } -} diff --git a/configs/default.yml b/configs/default.yml deleted file mode 100644 index 2926820..0000000 --- a/configs/default.yml +++ /dev/null @@ -1,11 +0,0 @@ ---- -server: - addr: :50051 - shutdown_timeout: 30 -log: - level: info -redis: - addr: localhost:6739 - # password: "" - # password_file: "/var/run/secrets/redis_password" - db: 0 diff --git a/go.mod b/go.mod index 0d8e83f..302e445 100644 --- a/go.mod +++ b/go.mod @@ -25,11 +25,17 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fatih/structs v1.1.0 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect + github.com/go-json-experiment/json v0.0.0-20260214004413-d219187c3433 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.5.0 // indirect + github.com/goccy/go-yaml v1.19.2 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect + github.com/kaptinlin/go-i18n v0.4.0 // indirect + github.com/kaptinlin/jsonpointer v0.4.17 // indirect + github.com/kaptinlin/jsonschema v0.7.7 // indirect + github.com/kaptinlin/messageformat-go v0.4.20 // indirect github.com/knadh/koanf/maps v0.1.2 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect @@ -42,7 +48,7 @@ require ( go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/net v0.52.0 // indirect golang.org/x/sys v0.42.0 // indirect - golang.org/x/text v0.35.0 // indirect + golang.org/x/text v0.36.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 // indirect google.golang.org/protobuf v1.36.11 // indirect diff --git a/go.sum b/go.sum index b0f9caf..cbd5298 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/go-json-experiment/json v0.0.0-20260214004413-d219187c3433 h1:vymEbVwYFP/L05h5TKQxvkXoKxNvTpjxYKdF1Nlwuao= +github.com/go-json-experiment/json v0.0.0-20260214004413-d219187c3433/go.mod h1:tphK2c80bpPhMOI4v6bIc2xWywPfbqi1Z06+RcrMkDg= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -17,6 +19,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-viper/mapstructure/v2 v2.5.0 h1:vM5IJoUAy3d7zRSVtIwQgBj7BiWtMPfmPEgAXnvj1Ro= github.com/go-viper/mapstructure/v2 v2.5.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/goccy/go-yaml v1.19.2 h1:PmFC1S6h8ljIz6gMRBopkjP1TVT7xuwrButHID66PoM= +github.com/goccy/go-yaml v1.19.2/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= @@ -25,6 +29,14 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs= github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c= +github.com/kaptinlin/go-i18n v0.4.0 h1:i7L3U2yurg+xhokITtJ0k+mjHnXqkoyz8ju5Wb7W8Oc= +github.com/kaptinlin/go-i18n v0.4.0/go.mod h1:njA6x0+4MWGcLWT0KLrwekhRPmze1Hnstf2+VJFzwpM= +github.com/kaptinlin/jsonpointer v0.4.17 h1:mY9k8ciWncxbsECyaxKnR0MdmxamNdp2tLQkAKVrtSk= +github.com/kaptinlin/jsonpointer v0.4.17/go.mod h1:SsfsjqnHG5zuKo1DTBzk1VknaHlL4osHw+X9kZKukpU= +github.com/kaptinlin/jsonschema v0.7.7 h1:41BlQJ9dskH0oE5DSzBUrl/w4JQYIr6N6L0B5GNyDoM= +github.com/kaptinlin/jsonschema v0.7.7/go.mod h1:rKjWfyySHSxAD7Li2ctYkPlOu960igoKBvZ2ADRtd5Q= +github.com/kaptinlin/messageformat-go v0.4.20 h1:a0ufTd5liiUubIGeGxpSTnNS8ZSrN4DV01/wGFmfzMs= +github.com/kaptinlin/messageformat-go v0.4.20/go.mod h1:FqdEPfQLkqVBX7OBRMPgYwUPvKYJohFD9Ok1BMzCfIo= github.com/knadh/koanf/maps v0.1.2 h1:RBfmAW5CnZT+PJ1CVc1QSJKf4Xu9kxfQgYVQSu8hpbo= github.com/knadh/koanf/maps v0.1.2/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= github.com/knadh/koanf/parsers/yaml v1.1.0 h1:3ltfm9ljprAHt4jxgeYLlFPmUaunuCgu1yILuTXRdM4= @@ -91,6 +103,8 @@ golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= +golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= +golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 h1:JLQynH/LBHfCTSbDWl+py8C+Rg/k1OVH3xfcaiANuF0= diff --git a/internal/app/command.go b/internal/app/command.go new file mode 100644 index 0000000..2e2d834 --- /dev/null +++ b/internal/app/command.go @@ -0,0 +1,111 @@ +package app + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + + "github.com/urfave/cli/v3" + "go.opentelemetry.io/contrib/bridges/otelslog" + + "github.com/coding-kelps/samarkand/internal/config" + "github.com/coding-kelps/samarkand/internal/logger" + "github.com/coding-kelps/samarkand/internal/metadata" + "github.com/coding-kelps/samarkand/internal/server" +) + +func NewCommand(w io.Writer, loader func(string) (config.Config, error)) *cli.Command { + return &cli.Command{ + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "config", + Aliases: []string{"c"}, + Usage: "Path to configuration file", + Sources: cli.EnvVars("SAMARKAND__CONFIG"), + }, + }, + Commands: []*cli.Command{ + { + Name: "start", + Usage: "start samarkand market server", + Action: startAction(w, loader), + }, + { + Name: "validate", + Usage: "validate the loaded configuration", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "insecure-debug", + Usage: "Display loaded configuration (for debugging purposes only!)", + }, + }, + Action: validateAction(w, loader), + }, + { + Name: "version", + Usage: "Get the version of samarkand", + Action: func(_ context.Context, _ *cli.Command) error { + fmt.Fprintln(w, metadata.GetVersionWithBuildInfo()) + return nil + }, + }, + }, + } +} + +func startAction(w io.Writer, loader func(string) (config.Config, error)) cli.ActionFunc { + return func(ctx context.Context, cmd *cli.Command) error { + cfg, err := loader(cmd.Root().String("config")) + if err != nil { + return err + } + + shutdown, err := logger.SetupOTelLogger(&cfg, ctx) + if err != nil { + return err + } + defer shutdown(ctx) + + level, err := logger.ParseLogLevel(cfg.Log.Level) + if err != nil { + return err + } + + otelHandler := otelslog.NewHandler(metadata.GetName()) + consoleHandler := slog.NewTextHandler(w, &slog.HandlerOptions{ + Level: level, + AddSource: true, + }) + + log := slog.New(logger.NewMultiHandler(otelHandler, consoleHandler)) + slog.SetDefault(log) + + s := server.New(&cfg, log) + return s.Start() + } +} + +func validateAction(w io.Writer, loader func(string) (config.Config, error)) cli.ActionFunc { + return func(ctx context.Context, cmd *cli.Command) error { + cfg, err := loader(cmd.Root().String("config")) + if err != nil { + return err + } + + fmt.Fprintln(w, "configuration is valid") + + if cmd.Bool("insecure-debug") { + + cfgJSON, err := json.Marshal(cfg) + if err != nil { + return err + } + + fmt.Println(string(cfgJSON)) + } + + return nil + } +} diff --git a/cmd/samarkand/samarkand_test.go b/internal/app/command_test.go similarity index 98% rename from cmd/samarkand/samarkand_test.go rename to internal/app/command_test.go index 792906a..433c450 100644 --- a/cmd/samarkand/samarkand_test.go +++ b/internal/app/command_test.go @@ -1,4 +1,4 @@ -package main +package app import ( "bytes" @@ -16,7 +16,7 @@ func failingLoader(path string) (config.Config, error) { func run(args []string, loader func(string) (config.Config, error)) (string, error) { var buf bytes.Buffer - cmd := newCommand(&buf, loader) + cmd := NewCommand(&buf, loader) err := cmd.Run(context.Background(), args) return buf.String(), err } diff --git a/internal/clock/clock.go b/internal/clock/clock.go index 39b69c4..752fd9c 100644 --- a/internal/clock/clock.go +++ b/internal/clock/clock.go @@ -11,7 +11,7 @@ const ( // Tick is the payload published on every clock advance. type Tick struct { - Seq uint64 `json:"seq"` // monotonic tick counter - MarketTime time.Time `json:"market_time"` // virtual market time + Seq uint64 `json:"seq"` // monotonic tick counter + MarketTime time.Time `json:"market_time"` // virtual market time PublishedAt time.Time `json:"published_at"` // wall time of publication } diff --git a/internal/clock/election.go b/internal/clock/election.go index 4a428a9..cac0829 100644 --- a/internal/clock/election.go +++ b/internal/clock/election.go @@ -3,8 +3,8 @@ package clock import ( "context" "errors" - "time" "log/slog" + "time" "github.com/redis/go-redis/v9" ) @@ -12,22 +12,21 @@ import ( const ( leaderKey = "market:leader" - leaseTTL = 3 * time.Second // how long the lock lives without renewal - renewInterval = leaseTTL / 3 // renew well before expiry - retryInterval = 500 * time.Millisecond // how often a standby retries acquisition + leaseTTL = 3 * time.Second // how long the lock lives without renewal + renewInterval = leaseTTL / 3 // renew well before expiry + retryInterval = 500 * time.Millisecond // how often a standby retries acquisition ) type Election struct { rdb *redis.Client leaderID string // unique ID for this instance (e.g. hostname + pid) - log *slog.Logger + log *slog.Logger } func NewElection(rdb *redis.Client, leaderID string) *Election { return &Election{rdb: rdb, leaderID: leaderID} } - // Campaign blocks until this instance wins the election, then calls // onElected in a goroutine. When onElected returns (or ctx is cancelled), // the lease is released and Campaign returns — callers should loop: diff --git a/internal/config/config.go b/internal/config/config.go index 4dc4fe8..1880344 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,45 +3,31 @@ package config import "time" type Config struct { - Server ServerConfig `koanf:"server"` - Log LogConfig `koanf:"log"` - Redis RedisConfig `koanf:"redis"` + Server ServerConfig `koanf:"server" json:"server"` + Log LogConfig `koanf:"log" json:"log"` + Redis RedisConfig `koanf:"redis" json:"redis"` } type ServerConfig struct { - Addr string `koanf:"addr"` - ShutdownTimeout time.Duration `koanf:"shutdown_timeout"` + Addr string `koanf:"addr" json:"addr"` + ShutdownTimeout int `koanf:"shutdown_timeout" json:"shutdown_timeout"` + + // Resolved at load time — not populated from config directly. + // Unexported so it isn't accidentally (de)serialised. + resolvedShutdownTimeout time.Duration } type LogConfig struct { - Level string `koanf:"level"` + Level string `koanf:"level" json:"level"` } type RedisConfig struct { - Addr string `koanf:"addr"` - Password *string `koanf:"password"` - PasswordFile *string `koanf:"password_file"` - Db string `koanf:"db"` + Addr string `koanf:"addr" json:"addr"` + Password *string `koanf:"password" json:"password,omitempty"` + PasswordFile *string `koanf:"password_file" json:"password_file,omitempty"` + Db int `koanf:"db" json:"db"` // Resolved at load time — not populated from config directly. // Unexported so it isn't accidentally (de)serialised. resolvedPassword *string } - -func Default() Config { - return Config{ - Server: ServerConfig{ - Addr: ":50051", - ShutdownTimeout: 30 * time.Second, - }, - Log: LogConfig{ - Level: "info", - }, - Redis: RedisConfig{ - Addr: "localhost:6739", - Password: nil, - PasswordFile: nil, - Db: "0", - }, - } -} diff --git a/internal/config/config.schema.json b/internal/config/config.schema.json new file mode 100644 index 0000000..f80af14 --- /dev/null +++ b/internal/config/config.schema.json @@ -0,0 +1,107 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/coding-kelps/samarkand/internal/config/config.schema.json", + "title": "Samarkand configuration", + "description": "Configuration schema for the Samarkand application.", + "type": "object", + "additionalProperties": false, + "properties": { + "server": { + "description": "API server (gRPC & HTTP).", + "type": "object", + "additionalProperties": false, + "properties": { + "addr": { + "description": "Listening address of the server", + "type": "string", + "default": "0.0.0.0:50051" + }, + "shutdown_timeout": { + "description": "Timeout delay duration when shutting down in seconds.", + "type": "integer", + "minimum": 0, + "maximum": 300, + "default": 30 + } + }, + "default": {} + }, + "log": { + "description": "Application logging.", + "type": "object", + "additionalProperties": false, + "properties": { + "level": { + "description": "Log level.", + "type": "string", + "enum": [ + "debug", + "info", + "warn", + "error", + "fatal" + ], + "default": "info" + } + }, + "default": {} + }, + "redis": { + "description": "Redis client.", + "type": "object", + "additionalProperties": false, + "properties": { + "addr": { + "description": "Address of the Redis service.", + "type": "string", + "default": "localhost:6379" + }, + "password": { + "description": "Authentication password.", + "type": "string" + }, + "password_file": { + "description": "Filepath of the file containing the authentication password.", + "type": "string" + }, + "db": { + "description": "Redis database index to use.", + "type": "integer", + "minimum": 0, + "maximum": 15, + "default": 0 + } + }, + "oneOf": [ + { + "required": [ + "password" + ] + }, + { + "required": [ + "password_file" + ] + }, + { + "not": { + "anyOf": [ + { + "required": [ + "password" + ] + }, + { + "required": [ + "password_file" + ] + } + ] + } + } + ], + "default": {} + } + }, + "default": {} +} \ No newline at end of file diff --git a/internal/config/loader.go b/internal/config/loader.go index b614682..8478186 100644 --- a/internal/config/loader.go +++ b/internal/config/loader.go @@ -1,37 +1,88 @@ package config import ( + // use embed macro for JSON schema. + _ "embed" + "fmt" + "sort" "strings" + "github.com/kaptinlin/jsonschema" "github.com/knadh/koanf/parsers/yaml" "github.com/knadh/koanf/providers/env/v2" "github.com/knadh/koanf/providers/file" - "github.com/knadh/koanf/providers/structs" "github.com/knadh/koanf/v2" ) -func Load(cfgFilePath string) (Config, error) { +//go:embed config.schema.json +var jsonSchema []byte + +type ErrDefaultsLoading struct { + Err error +} + +func (e *ErrDefaultsLoading) Error() string { + return fmt.Sprintf("failed to load defaults: %v", e.Err) +} + +type ErrConfigValidation struct { + Errors map[string]string +} + +func (e *ErrConfigValidation) Error() string { + paths := make([]string, 0, len(e.Errors)) + for p := range e.Errors { + paths = append(paths, p) + } + sort.Strings(paths) + + msgs := make([]string, 0, len(paths)) + for _, p := range paths { + msgs = append(msgs, fmt.Sprintf("%s: %s", p, e.Errors[p])) + } + return "configuration validation failed: " + strings.Join(msgs, "; ") +} + +func Load(fp string) (Config, error) { + var cfg Config + k := koanf.New(".") - defaults := Default() - if err := k.Load(structs.Provider(defaults, "koanf"), nil); err != nil { + compiler := jsonschema.NewCompiler() + schema, err := compiler.Compile(jsonSchema) + if err != nil { return Config{}, err } - if cfgFilePath != "" { - if err := k.Load(file.Provider(cfgFilePath), yaml.Parser()); err != nil { + // Loads defaults from JSON schema + if err := schema.Unmarshal(&cfg, []byte("{}")); err != nil { + return Config{}, &ErrDefaultsLoading{Err: err} + } + + if fp != "" { + if err := k.Load(file.Provider(fp), yaml.Parser()); err != nil { return Config{}, err } } - k.Load(env.Provider(".", env.Opt{ - Prefix: "SAMARKAND_", + err = k.Load(env.Provider(".", env.Opt{ + Prefix: "SAMARKAND__", TransformFunc: func(k, v string) (string, any) { - k = strings.ReplaceAll(strings.ToLower(strings.TrimPrefix(k, "SAMARKAND_")), "_", ".") + k = strings.ReplaceAll(strings.ToLower(strings.TrimPrefix(k, "SAMARKAND__")), "_", ".") return k, v }, }), nil) + if err != nil { + return Config{}, err + } - var cfg Config - return cfg, k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{Tag: "koanf"}) + if err := k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{Tag: "koanf"}); err != nil { + return Config{}, err + } + + if result := schema.ValidateStruct(cfg); !result.IsValid() { + return cfg, &ErrConfigValidation{Errors: result.DetailedErrors()} + } + + return cfg, nil } diff --git a/internal/config/resolve.go b/internal/config/resolve.go index 0737a58..cc17ebb 100644 --- a/internal/config/resolve.go +++ b/internal/config/resolve.go @@ -5,8 +5,19 @@ import ( "fmt" "os" "strings" + "time" ) +func (s *ServerConfig) ResolvedShutdownTimeout() *string { + return nil +} + +func (s *ServerConfig) Resolve() error { + s.resolvedShutdownTimeout = time.Duration(s.ShutdownTimeout) * time.Second + + return nil +} + func (r *RedisConfig) ResolvedPassword() *string { return r.resolvedPassword } diff --git a/internal/logger/logger.go b/internal/logger/logger.go index cdca675..3ccc2c0 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -2,9 +2,9 @@ package logger import ( "context" + "fmt" "log/slog" "time" - "fmt" otlp "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" "go.opentelemetry.io/otel/log/global" @@ -17,11 +17,11 @@ import ( ) func ParseLogLevel(s string) (slog.Level, error) { - var level slog.Level - if err := level.UnmarshalText([]byte(s)); err != nil { - return level, fmt.Errorf("invalid log level %q: %w", s, err) - } - return level, nil + var level slog.Level + if err := level.UnmarshalText([]byte(s)); err != nil { + return level, fmt.Errorf("invalid log level %q: %w", s, err) + } + return level, nil } func SetupOTelLogger(cfg *config.Config, ctx context.Context) (shutdown func(context.Context) error, err error) { From ee360b969a0634137b16537063707467a74b2c41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guilhem=20Sant=C3=A9?= <76043368+guilhem-sante@users.noreply.github.com> Date: Fri, 1 May 2026 15:35:48 +0200 Subject: [PATCH 3/4] refactor: isolate application logic Move application logic out of command code. Also refactored code to use local OTeL logger provider rather than global. --- cmd/samarkand/samarkand.go | 4 +- go.mod | 8 ++-- go.sum | 16 ++++--- internal/app/app.go | 58 +++++++++++++++++++++++ internal/{app => command}/command.go | 48 ++++++++----------- internal/{app => command}/command_test.go | 2 +- internal/config/loader.go | 3 ++ internal/logger/logger.go | 10 ++-- internal/server/grpc.go | 11 +---- internal/server/server.go | 29 +++++++----- 10 files changed, 118 insertions(+), 71 deletions(-) create mode 100644 internal/app/app.go rename internal/{app => command}/command.go (57%) rename internal/{app => command}/command_test.go (99%) diff --git a/cmd/samarkand/samarkand.go b/cmd/samarkand/samarkand.go index 92a9a35..67ad58e 100644 --- a/cmd/samarkand/samarkand.go +++ b/cmd/samarkand/samarkand.go @@ -5,12 +5,12 @@ import ( "log" "os" - "github.com/coding-kelps/samarkand/internal/app" + "github.com/coding-kelps/samarkand/internal/command" "github.com/coding-kelps/samarkand/internal/config" ) func main() { - cmd := app.NewCommand(os.Stdout, config.Load) + cmd := command.NewCommand(os.Stdout, config.Load) if err := cmd.Run(context.Background(), os.Args); err != nil { log.Fatal(err) } diff --git a/go.mod b/go.mod index 302e445..80bd65e 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,16 @@ module github.com/coding-kelps/samarkand go 1.26.1 require ( + github.com/kaptinlin/jsonschema v0.7.7 github.com/knadh/koanf/parsers/yaml v1.1.0 github.com/knadh/koanf/providers/env/v2 v2.0.0 github.com/knadh/koanf/providers/file v1.2.1 - github.com/knadh/koanf/providers/structs v1.0.0 github.com/knadh/koanf/v2 v2.3.4 + github.com/redis/go-redis/v9 v9.18.0 github.com/urfave/cli/v3 v3.7.0 go.opentelemetry.io/contrib/bridges/otelslog v0.17.0 go.opentelemetry.io/otel v1.42.0 go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.18.0 - go.opentelemetry.io/otel/log v0.18.0 go.opentelemetry.io/otel/sdk v1.42.0 go.opentelemetry.io/otel/sdk/log v0.18.0 golang.org/x/sync v0.20.0 @@ -23,7 +23,6 @@ require ( github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/fatih/structs v1.1.0 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/go-json-experiment/json v0.0.0-20260214004413-d219187c3433 // indirect github.com/go-logr/logr v1.4.3 // indirect @@ -34,13 +33,12 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect github.com/kaptinlin/go-i18n v0.4.0 // indirect github.com/kaptinlin/jsonpointer v0.4.17 // indirect - github.com/kaptinlin/jsonschema v0.7.7 // indirect github.com/kaptinlin/messageformat-go v0.4.20 // indirect github.com/knadh/koanf/maps v0.1.2 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect - github.com/redis/go-redis/v9 v9.18.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/log v0.18.0 // indirect go.opentelemetry.io/otel/metric v1.42.0 // indirect go.opentelemetry.io/otel/trace v1.42.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect diff --git a/go.sum b/go.sum index cbd5298..5a00afc 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -6,8 +10,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= -github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/go-json-experiment/json v0.0.0-20260214004413-d219187c3433 h1:vymEbVwYFP/L05h5TKQxvkXoKxNvTpjxYKdF1Nlwuao= @@ -37,6 +39,8 @@ github.com/kaptinlin/jsonschema v0.7.7 h1:41BlQJ9dskH0oE5DSzBUrl/w4JQYIr6N6L0B5G github.com/kaptinlin/jsonschema v0.7.7/go.mod h1:rKjWfyySHSxAD7Li2ctYkPlOu960igoKBvZ2ADRtd5Q= github.com/kaptinlin/messageformat-go v0.4.20 h1:a0ufTd5liiUubIGeGxpSTnNS8ZSrN4DV01/wGFmfzMs= github.com/kaptinlin/messageformat-go v0.4.20/go.mod h1:FqdEPfQLkqVBX7OBRMPgYwUPvKYJohFD9Ok1BMzCfIo= +github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/knadh/koanf/maps v0.1.2 h1:RBfmAW5CnZT+PJ1CVc1QSJKf4Xu9kxfQgYVQSu8hpbo= github.com/knadh/koanf/maps v0.1.2/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= github.com/knadh/koanf/parsers/yaml v1.1.0 h1:3ltfm9ljprAHt4jxgeYLlFPmUaunuCgu1yILuTXRdM4= @@ -45,8 +49,6 @@ github.com/knadh/koanf/providers/env/v2 v2.0.0 h1:Ad5H3eun722u+FvchiIcEIJZsZ2M6o github.com/knadh/koanf/providers/env/v2 v2.0.0/go.mod h1:1g01PE+Ve1gBfWNNw2wmULRP0tc8RJrjn5p2N/jNCIc= github.com/knadh/koanf/providers/file v1.2.1 h1:bEWbtQwYrA+W2DtdBrQWyXqJaJSG3KrP3AESOJYp9wM= github.com/knadh/koanf/providers/file v1.2.1/go.mod h1:bp1PM5f83Q+TOUu10J/0ApLBd9uIzg+n9UgthfY+nRA= -github.com/knadh/koanf/providers/structs v1.0.0 h1:DznjB7NQykhqCar2LvNug3MuxEQsZ5KvfgMbio+23u4= -github.com/knadh/koanf/providers/structs v1.0.0/go.mod h1:kjo5TFtgpaZORlpoJqcbeLowM2cINodv8kX+oFAeQ1w= github.com/knadh/koanf/v2 v2.3.4 h1:fnynNSDlujWE+v83hAp8wKr/cdoxHLO0629SN+U8Urc= github.com/knadh/koanf/v2 v2.3.4/go.mod h1:gRb40VRAbd4iJMYYD5IxZ6hfuopFcXBpc9bbQpZwo28= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -57,6 +59,8 @@ github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa1 github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/pelletier/go-toml/v2 v2.3.0 h1:k59bC/lIZREW0/iVaQR8nDHxVq8OVlIzYCOJf421CaM= +github.com/pelletier/go-toml/v2 v2.3.0/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs= @@ -67,6 +71,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/urfave/cli/v3 v3.7.0 h1:AGSnbUyjtLiM+WJUb4dzXKldl/gL+F8OwmRDtVr6g2U= github.com/urfave/cli/v3 v3.7.0/go.mod h1:ysVLtOEmg2tOy6PknnYVhDoouyC/6N42TMeoMzskhso= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/bridges/otelslog v0.17.0 h1:NFIS6x7wyObQ7cR84x7bt1sr8nYBx89s3x3GwRjw40k= @@ -101,8 +107,6 @@ golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= -golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= -golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= diff --git a/internal/app/app.go b/internal/app/app.go new file mode 100644 index 0000000..0f257f6 --- /dev/null +++ b/internal/app/app.go @@ -0,0 +1,58 @@ +package app + +import ( + "context" + "log/slog" + "os" + + "go.opentelemetry.io/contrib/bridges/otelslog" + sdklog "go.opentelemetry.io/otel/sdk/log" + + "github.com/coding-kelps/samarkand/internal/config" + "github.com/coding-kelps/samarkand/internal/logger" + "github.com/coding-kelps/samarkand/internal/metadata" + "github.com/coding-kelps/samarkand/internal/server" +) + +type App struct { + otelProvider *sdklog.LoggerProvider + logger *slog.Logger + server *server.Server +} + +func NewApp(ctx context.Context, cfg *config.Config) (App, error) { + level, err := logger.ParseLogLevel(cfg.Log.Level) + if err != nil { + return App{}, err + } + + otelProvider, err := logger.NewOTelLoggerProvider(ctx) + if err != nil { + return App{}, err + } + otelHandler := otelslog.NewHandler(metadata.GetName(), otelslog.WithLoggerProvider(otelProvider)) + consoleHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: level, + AddSource: true, + }) + + logger := slog.New(logger.NewMultiHandler(otelHandler, consoleHandler)) + server := server.NewServer(&server.ServerConfig{ + Addr: cfg.Server.Addr, + Logger: logger, + }) + + return App{ + logger: logger, + server: server, + }, nil +} + +func (a *App) Run() error { + + err := a.server.Start() + if err != nil { + return err + } + return nil +} diff --git a/internal/app/command.go b/internal/command/command.go similarity index 57% rename from internal/app/command.go rename to internal/command/command.go index 2e2d834..7471fd0 100644 --- a/internal/app/command.go +++ b/internal/command/command.go @@ -1,19 +1,16 @@ -package app +package command import ( "context" "encoding/json" "fmt" "io" - "log/slog" "github.com/urfave/cli/v3" - "go.opentelemetry.io/contrib/bridges/otelslog" + "github.com/coding-kelps/samarkand/internal/app" "github.com/coding-kelps/samarkand/internal/config" - "github.com/coding-kelps/samarkand/internal/logger" "github.com/coding-kelps/samarkand/internal/metadata" - "github.com/coding-kelps/samarkand/internal/server" ) func NewCommand(w io.Writer, loader func(string) (config.Config, error)) *cli.Command { @@ -30,7 +27,7 @@ func NewCommand(w io.Writer, loader func(string) (config.Config, error)) *cli.Co { Name: "start", Usage: "start samarkand market server", - Action: startAction(w, loader), + Action: start(w, loader), }, { Name: "validate", @@ -41,53 +38,39 @@ func NewCommand(w io.Writer, loader func(string) (config.Config, error)) *cli.Co Usage: "Display loaded configuration (for debugging purposes only!)", }, }, - Action: validateAction(w, loader), + Action: validate(w, loader), }, { - Name: "version", - Usage: "Get the version of samarkand", - Action: func(_ context.Context, _ *cli.Command) error { - fmt.Fprintln(w, metadata.GetVersionWithBuildInfo()) - return nil - }, + Name: "version", + Usage: "Get the version of samarkand", + Action: version(w, loader), }, }, } } -func startAction(w io.Writer, loader func(string) (config.Config, error)) cli.ActionFunc { +func start(w io.Writer, loader func(string) (config.Config, error)) cli.ActionFunc { return func(ctx context.Context, cmd *cli.Command) error { cfg, err := loader(cmd.Root().String("config")) if err != nil { return err } - shutdown, err := logger.SetupOTelLogger(&cfg, ctx) + a, err := app.NewApp(ctx, &cfg) if err != nil { return err } - defer shutdown(ctx) - level, err := logger.ParseLogLevel(cfg.Log.Level) + err = a.Run() if err != nil { return err } - otelHandler := otelslog.NewHandler(metadata.GetName()) - consoleHandler := slog.NewTextHandler(w, &slog.HandlerOptions{ - Level: level, - AddSource: true, - }) - - log := slog.New(logger.NewMultiHandler(otelHandler, consoleHandler)) - slog.SetDefault(log) - - s := server.New(&cfg, log) - return s.Start() + return nil } } -func validateAction(w io.Writer, loader func(string) (config.Config, error)) cli.ActionFunc { +func validate(w io.Writer, loader func(string) (config.Config, error)) cli.ActionFunc { return func(ctx context.Context, cmd *cli.Command) error { cfg, err := loader(cmd.Root().String("config")) if err != nil { @@ -109,3 +92,10 @@ func validateAction(w io.Writer, loader func(string) (config.Config, error)) cli return nil } } + +func version(w io.Writer, _ func(string) (config.Config, error)) cli.ActionFunc { + return func(_ context.Context, _ *cli.Command) error { + fmt.Fprintln(w, metadata.GetVersionWithBuildInfo()) + return nil + } +} diff --git a/internal/app/command_test.go b/internal/command/command_test.go similarity index 99% rename from internal/app/command_test.go rename to internal/command/command_test.go index 433c450..f9f13d6 100644 --- a/internal/app/command_test.go +++ b/internal/command/command_test.go @@ -1,4 +1,4 @@ -package app +package command import ( "bytes" diff --git a/internal/config/loader.go b/internal/config/loader.go index 8478186..e61b88e 100644 --- a/internal/config/loader.go +++ b/internal/config/loader.go @@ -68,6 +68,9 @@ func Load(fp string) (Config, error) { err = k.Load(env.Provider(".", env.Opt{ Prefix: "SAMARKAND__", TransformFunc: func(k, v string) (string, any) { + if k == "SAMARKAND__CONFIG" { + return "", nil + } k = strings.ReplaceAll(strings.ToLower(strings.TrimPrefix(k, "SAMARKAND__")), "_", ".") return k, v }, diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 3ccc2c0..5e19e79 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -7,12 +7,10 @@ import ( "time" otlp "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" - "go.opentelemetry.io/otel/log/global" sdklog "go.opentelemetry.io/otel/sdk/log" "go.opentelemetry.io/otel/sdk/resource" - semconv "go.opentelemetry.io/otel/semconv/v1.9.0" + semconv "go.opentelemetry.io/otel/semconv/v1.40.0" - "github.com/coding-kelps/samarkand/internal/config" "github.com/coding-kelps/samarkand/internal/metadata" ) @@ -24,7 +22,7 @@ func ParseLogLevel(s string) (slog.Level, error) { return level, nil } -func SetupOTelLogger(cfg *config.Config, ctx context.Context) (shutdown func(context.Context) error, err error) { +func NewOTelLoggerProvider(ctx context.Context) (*sdklog.LoggerProvider, error) { exporter, err := otlp.New(ctx) if err != nil { return nil, err @@ -50,9 +48,7 @@ func SetupOTelLogger(cfg *config.Config, ctx context.Context) (shutdown func(con ), ) - global.SetLoggerProvider(provider) - - return provider.Shutdown, nil + return provider, nil } type multiHandler struct { diff --git a/internal/server/grpc.go b/internal/server/grpc.go index df2706e..88e2cb3 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -7,8 +7,8 @@ import ( "google.golang.org/grpc/reflection" ) -func (s *Server) newGRPCServer() *grpc.Server { - srv := grpc.NewServer(s.grpcServerOptions()...) +func newGRPCServer() *grpc.Server { + srv := grpc.NewServer(grpc.ChainUnaryInterceptor(), grpc.ChainStreamInterceptor()) healthSrv := health.NewServer() grpc_health_v1.RegisterHealthServer(srv, healthSrv) @@ -18,10 +18,3 @@ func (s *Server) newGRPCServer() *grpc.Server { return srv } - -func (s *Server) grpcServerOptions() []grpc.ServerOption { - return []grpc.ServerOption{ - grpc.ChainUnaryInterceptor(), - grpc.ChainStreamInterceptor(), - } -} diff --git a/internal/server/server.go b/internal/server/server.go index 473168c..2595d1b 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -11,20 +11,25 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" - - "github.com/coding-kelps/samarkand/internal/config" ) type Server struct { - cfg *config.Config - log *slog.Logger - grpc *grpc.Server + addr string + logger *slog.Logger + grpc *grpc.Server +} + +type ServerConfig struct { + Addr string + Logger *slog.Logger } -func New(cfg *config.Config, log *slog.Logger) *Server { - s := &Server{cfg: cfg, log: log} - s.grpc = s.newGRPCServer() - return s +func NewServer(cfg *ServerConfig) *Server { + return &Server{ + addr: cfg.Addr, + grpc: newGRPCServer(), + logger: cfg.Logger, + } } func (s *Server) Start() error { @@ -34,12 +39,12 @@ func (s *Server) Start() error { g, ctx := errgroup.WithContext(ctx) g.Go(func() error { - addr := s.cfg.Server.Addr + addr := s.addr lis, err := net.Listen("tcp", addr) if err != nil { return fmt.Errorf("grpc listen %s: %w", addr, err) } - s.log.Info("gRPC server listening", "addr", addr) + s.logger.Info("gRPC server listening", "addr", addr) if err := s.grpc.Serve(lis); err != nil { return fmt.Errorf("grpc serve: %w", err) } @@ -50,6 +55,6 @@ func (s *Server) Start() error { } func (s *Server) Stop() { - s.log.Info("stopping gRPC server") + s.logger.Info("stopping gRPC server") s.grpc.GracefulStop() } From 5d36432dc41757f35c1d850b0f2958ee2f17181f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guilhem=20Sant=C3=A9?= <76043368+guilhem-sante@users.noreply.github.com> Date: Fri, 15 May 2026 20:15:11 +0200 Subject: [PATCH 4/4] feat: synchronized clock Added synchronization system to enable distributed clock within application cluster. Also added application graceful shutdown feature. --- .github/workflows/release.yml | 4 +- .github/workflows/test.yml | 4 +- .goreleaser.yml | 7 +- .pre-commit-config.yaml | 18 ++-- Dockerfile => Containerfile | 0 Taskfile.yml | 2 +- go.mod | 2 +- internal/app/app.go | 86 +++++++++++++--- internal/clock/clock.go | 157 +++++++++++++++++++++++++++-- internal/clock/election.go | 76 ++++++++------ internal/clock/errors.go | 20 ++++ internal/command/command.go | 4 +- internal/config/config.go | 14 ++- internal/config/config.schema.json | 21 ++++ internal/config/loader.go | 14 ++- internal/config/resolve.go | 32 ++++-- internal/server/server.go | 11 +- mkdocs.yml | 2 +- 18 files changed, 385 insertions(+), 89 deletions(-) rename Dockerfile => Containerfile (100%) create mode 100644 internal/clock/errors.go diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index acf4347..68597cc 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -18,10 +18,10 @@ jobs: with: fetch-depth: 0 - - name: Set up Go + - name: Set up Go 1.26.3 uses: actions/setup-go@v6.3.0 with: - go-version: 1.26.x + go-version: 1.26.3 - name: Install Syft uses: anchore/sbom-action/download-syft@v0.23.0 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 054c748..7aa2311 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,10 +17,10 @@ jobs: platform: [ubuntu-latest] runs-on: ${{matrix.platform}} steps: - - name: Set up Go 1.26.1 + - name: Set up Go 1.26.3 uses: actions/setup-go@v6.3.0 with: - go-version: 1.26.1 + go-version: 1.26.3 id: go - name: Check out code into the Go module directory diff --git a/.goreleaser.yml b/.goreleaser.yml index cb14825..5d7b278 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -18,13 +18,13 @@ builds: ldflags: - "-s -w" - "{{if .IsNightly}}-X github.com/coding-kelps/samarkand/internal/metadata.version={{.Version}}{{end}}" - skip_upload: true gomod: proxy: true dockers_v2: - - images: + - dockerfile: Containerfile + images: - coding-kelps/samarkand - ghcr.io/coding-kelps/samarkand tags: @@ -33,7 +33,6 @@ dockers_v2: - "{{if .IsNightly}}nightly{{end}}" labels: "org.opencontainers.image.name": "{{.ProjectName}}" - "org.opencontainers.image.name": "Samarkand" "org.opencontainers.image.description": "A low-latency virtual market server." "org.opencontainers.image.created": "{{.Date}}" "org.opencontainers.image.revision": "{{.FullCommit}}" @@ -42,7 +41,7 @@ dockers_v2: platforms: - linux/amd64 - linux/arm64 - disable: "{{ .IsSnapshot }}" + # disable: "{{ .IsSnapshot }}" sbom: "{{ not .IsNightly }}" git: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 078f876..d2036b5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,8 +1,14 @@ --- repos: -- repo: https://github.com/pre-commit/pre-commit-hooks - rev: v6.0.0 - hooks: - - id: check-yaml - - id: end-of-file-fixer - - id: trailing-whitespace +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v6.0.0 + hooks: + - id: check-yaml + - id: end-of-file-fixer + - id: trailing-whitespace + +- repo: https://github.com/golangci/golangci-lint + rev: v2.12.2 + hooks: + - id: golangci-lint + args: ["--timeout=5m"] diff --git a/Dockerfile b/Containerfile similarity index 100% rename from Dockerfile rename to Containerfile diff --git a/Taskfile.yml b/Taskfile.yml index fe4974b..84e2788 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -86,7 +86,7 @@ tasks: vars: [IMAGE_REF] cmds: - cosign verify - --certificate-identity-regexp="https://github.com/yourorg/samarkand" + --certificate-identity-regexp="https://github.com/coding-kelps/samarkand" --certificate-oidc-issuer="https://token.actions.githubusercontent.com" {{.IMAGE_REF}} diff --git a/go.mod b/go.mod index 80bd65e..262c53d 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/coding-kelps/samarkand -go 1.26.1 +go 1.26.3 require ( github.com/kaptinlin/jsonschema v0.7.7 diff --git a/internal/app/app.go b/internal/app/app.go index 0f257f6..eb3192f 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -4,10 +4,14 @@ import ( "context" "log/slog" "os" + "os/signal" + "syscall" + "time" "go.opentelemetry.io/contrib/bridges/otelslog" sdklog "go.opentelemetry.io/otel/sdk/log" + "github.com/coding-kelps/samarkand/internal/clock" "github.com/coding-kelps/samarkand/internal/config" "github.com/coding-kelps/samarkand/internal/logger" "github.com/coding-kelps/samarkand/internal/metadata" @@ -17,42 +21,100 @@ import ( type App struct { otelProvider *sdklog.LoggerProvider logger *slog.Logger + clock *clock.MarketClock server *server.Server } func NewApp(ctx context.Context, cfg *config.Config) (App, error) { - level, err := logger.ParseLogLevel(cfg.Log.Level) - if err != nil { - return App{}, err - } + var otelProvider *sdklog.LoggerProvider + var loggerHandlers []slog.Handler - otelProvider, err := logger.NewOTelLoggerProvider(ctx) + level, err := logger.ParseLogLevel(cfg.Log.Level) if err != nil { return App{}, err } - otelHandler := otelslog.NewHandler(metadata.GetName(), otelslog.WithLoggerProvider(otelProvider)) consoleHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: level, AddSource: true, }) + loggerHandlers = append(loggerHandlers, consoleHandler) + + if cfg.Log.OpenTelemetry.Enabled { + otelProvider, err := logger.NewOTelLoggerProvider(ctx) + if err != nil { + return App{}, err + } + otelHandler := otelslog.NewHandler(metadata.GetName(), otelslog.WithLoggerProvider(otelProvider)) + loggerHandlers = append(loggerHandlers, otelHandler) + } - logger := slog.New(logger.NewMultiHandler(otelHandler, consoleHandler)) + logger := slog.New(logger.NewMultiHandler(loggerHandlers...)) server := server.NewServer(&server.ServerConfig{ Addr: cfg.Server.Addr, Logger: logger, }) + clock, err := clock.NewMarketClock(&clock.MarketClockConfig{ + Redis: clock.RedisConfig{ + Addr: cfg.Redis.Addr, + Username: cfg.Redis.ResolvedUsername(), + Password: cfg.Redis.ResolvedPassword(), + DB: cfg.Redis.DB, + }, + Logger: logger, + }) + if err != nil { + return App{}, err + } return App{ - logger: logger, - server: server, + otelProvider: otelProvider, + logger: logger, + server: server, + clock: clock, }, nil } -func (a *App) Run() error { +func (a *App) Run(ctx context.Context) error { + ctx, stop := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) - err := a.server.Start() - if err != nil { + go func() { + <-ctx.Done() + a.logger.Info("SIGTERM signal catched gracefully shutdown") + stop() + a.Shutdown() + }() + + if err := a.clock.Start(ctx); err != nil { return err } + + go func() { + tickInterval := 10 * time.Second + ticker := time.NewTicker(tickInterval) + + for { + select { + case <-ticker.C: + a.logger.Info("clock time", "time", a.clock.Now().Format("2006-01-02 15:04:05")) + case <-ctx.Done(): + return + } + } + }() + + if err := a.server.Start(ctx); err != nil { + return err + } + + return nil +} + +func (a *App) Shutdown() error { + a.server.Stop() + + if err := a.clock.Close(); err != nil { + return err + } + return nil } diff --git a/internal/clock/clock.go b/internal/clock/clock.go index 752fd9c..7ff0f6a 100644 --- a/internal/clock/clock.go +++ b/internal/clock/clock.go @@ -1,17 +1,162 @@ package clock import ( + "context" + "log/slog" + "sync" "time" + + "github.com/redis/go-redis/v9" ) const ( - clockChannel = "samarkand:clock" + // the election leader Redis lock + leaderLockName = "samarkand:leader" + // the Redis key of the clock key-value store + clockKeyName = "samarkand:clock" + // the clock Redis PUB/SUB channel + clockChannelName = "samarkand:clock:ticks" + // how often is the clock updated tickInterval = 100 * time.Millisecond + // how long the lock lives without renewal + leaseTTL = 60 * time.Second + // renew well before expiry + renewInterval = leaseTTL / 3 + // how often a standby retries acquisition + retryInterval = 500 * time.Millisecond ) -// Tick is the payload published on every clock advance. -type Tick struct { - Seq uint64 `json:"seq"` // monotonic tick counter - MarketTime time.Time `json:"market_time"` // virtual market time - PublishedAt time.Time `json:"published_at"` // wall time of publication +type MarketClock struct { + rdb *redis.Client + election *Election + value time.Time + lock sync.RWMutex + logger *slog.Logger +} + +type MarketClockConfig struct { + Redis RedisConfig + Logger *slog.Logger +} + +type RedisConfig struct { + Addr string + Username string + Password string + DB int +} + +func NewMarketClock(cfg *MarketClockConfig) (*MarketClock, error) { + rdb := redis.NewClient(&redis.Options{ + Addr: cfg.Redis.Addr, + Username: cfg.Redis.Username, + Password: cfg.Redis.Password, + DB: cfg.Redis.DB, + }) + logger := cfg.Logger + + e, err := NewElection(&ElectionConfig{ + Rdb: rdb, + Lock: leaderLockName, + LeaseTTL: leaseTTL, + RenewInterval: renewInterval, + RetryInterval: retryInterval, + Logger: logger, + }) + if err != nil { + return nil, &ErrElectionError{err: err} + } + + return &MarketClock{ + rdb: rdb, + election: e, + logger: logger, + }, nil +} + +func (c *MarketClock) Start(ctx context.Context) error { + go func() { + for { + err := c.election.Campaign(ctx, c.send) + if _, ok := err.(*ErrNotRenewed); !ok { + break + } + } + }() + + go c.receive(ctx) + + return nil +} + +func (c *MarketClock) Close() error { + if err := c.rdb.Close(); err != nil { + return err + } + + return nil +} + +func (c *MarketClock) send(ctx context.Context) error { + var clock time.Time + + ticker := time.NewTicker(tickInterval) + res, err := c.rdb.Get(ctx, clockKeyName).Result() + if err == redis.Nil { + clock = time.Now() // Replace time.Now() with configured starting time. + serialzedClock, err := clock.MarshalBinary() + if err != nil { + return err + } + if err := c.rdb.Set(ctx, clockKeyName, serialzedClock, 0).Err(); err != nil { + return err + } + } else if err != nil { + return err + } + if err := clock.UnmarshalBinary([]byte(res)); err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case _ = <-ticker.C: + clock = clock.Add(tickInterval) + serialzedClock, err := clock.MarshalBinary() + + if err = c.rdb.Set(ctx, clockKeyName, serialzedClock, 0).Err(); err != nil { + return err + } + if err = c.rdb.Publish(ctx, clockChannelName, serialzedClock).Err(); err != nil { + return err + } + } + } +} + +func (c *MarketClock) receive(ctx context.Context) error { + pubsub := c.rdb.Subscribe(ctx, clockChannelName) + defer pubsub.Close() + ch := pubsub.Channel() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case m := <-ch: + c.lock.Lock() + if err := c.value.UnmarshalBinary([]byte(m.Payload)); err != nil { + return err + } + c.lock.Unlock() + } + } +} + +func (c *MarketClock) Now() time.Time { + c.lock.RLock() + defer c.lock.RUnlock() + return c.value } diff --git a/internal/clock/election.go b/internal/clock/election.go index cac0829..fcdb429 100644 --- a/internal/clock/election.go +++ b/internal/clock/election.go @@ -2,29 +2,47 @@ package clock import ( "context" - "errors" + "fmt" "log/slog" + "os" "time" "github.com/redis/go-redis/v9" ) -const ( - leaderKey = "market:leader" +type Election struct { + rdb *redis.Client + lock string + // unique ID for this instance (e.g. hostname + pid) + instanceID string + leaseTTL time.Duration + renewInterval time.Duration + retryInterval time.Duration + logger *slog.Logger +} - leaseTTL = 3 * time.Second // how long the lock lives without renewal - renewInterval = leaseTTL / 3 // renew well before expiry - retryInterval = 500 * time.Millisecond // how often a standby retries acquisition -) +type ElectionConfig struct { + Rdb *redis.Client + Lock string + LeaseTTL time.Duration + RenewInterval time.Duration + RetryInterval time.Duration -type Election struct { - rdb *redis.Client - leaderID string // unique ID for this instance (e.g. hostname + pid) - log *slog.Logger + Logger *slog.Logger } -func NewElection(rdb *redis.Client, leaderID string) *Election { - return &Election{rdb: rdb, leaderID: leaderID} +func NewElection(cfg *ElectionConfig) (*Election, error) { + hostname, err := os.Hostname() + if err != nil { + return nil, err + } + instanceID := fmt.Sprintf("%s-%d", hostname, os.Getpid()) + + return &Election{ + rdb: cfg.Rdb, + instanceID: instanceID, + logger: cfg.Logger, + }, nil } // Campaign blocks until this instance wins the election, then calls @@ -34,17 +52,20 @@ func NewElection(rdb *redis.Client, leaderID string) *Election { // for { // if err := e.Campaign(ctx, onElected); !errors.Is(err, ErrNotRenewed) { break } // } -func (e *Election) Campaign(ctx context.Context, onElected func(ctx context.Context)) error { +func (e *Election) Campaign(ctx context.Context, onElected func(ctx context.Context) error) error { // --- Phase 1: acquire the lock --- for { - ok, err := e.rdb.SetNX(ctx, leaderKey, e.leaderID, leaseTTL).Result() - if err != nil { + res, err := e.rdb.SetArgs(ctx, e.lock, e.instanceID, redis.SetArgs{ + Mode: "NX", + TTL: leaseTTL, + }).Result() + if err != redis.Nil && err != nil { if ctx.Err() != nil { return ctx.Err() } - e.log.Error("acquire error", "error", err) - } else if ok { + e.logger.Error("acquire error", "error", err) + } else if res == "OK" { break // we are the leader } @@ -56,7 +77,7 @@ func (e *Election) Campaign(ctx context.Context, onElected func(ctx context.Cont } } - e.log.Error("leader elected", "leader_id", e.leaderID) + e.logger.Info("gained leadership", "instance_id", e.instanceID) // --- Phase 2: hold the lock via a renewal goroutine --- leaderCtx, abdicate := context.WithCancel(ctx) @@ -105,12 +126,12 @@ func (e *Election) holdLease(ctx context.Context) error { // Only renew if we still own the key. renewed, err := e.tryRenew(ctx) if err != nil { - e.log.Error("renewal error", "error", err) - return ErrNotRenewed + e.logger.Error("renewal error", "error", err) + return &ErrNotRenewed{} } if !renewed { - e.log.Error("lost leadership", "leader_id", e.leaderID) - return ErrNotRenewed + e.logger.Error("lost leadership", "instance_id", e.instanceID) + return &ErrNotRenewed{} } } } @@ -127,7 +148,7 @@ var renewScript = redis.NewScript(` func (e *Election) tryRenew(ctx context.Context) (bool, error) { ttlMs := leaseTTL.Milliseconds() - res, err := renewScript.Run(ctx, e.rdb, []string{leaderKey}, e.leaderID, ttlMs).Int() + res, err := renewScript.Run(ctx, e.rdb, []string{e.lock}, e.instanceID, ttlMs).Int() return res == 1, err } @@ -141,10 +162,7 @@ var releaseScript = redis.NewScript(` `) func (e *Election) release(ctx context.Context) { - if _, err := releaseScript.Run(ctx, e.rdb, []string{leaderKey}, e.leaderID).Int(); err != nil { - e.log.Error("release error (non-fatal)", "error", err) + if _, err := releaseScript.Run(ctx, e.rdb, []string{e.lock}, e.instanceID).Int(); err != nil { + e.logger.Error("release error (non-fatal)", "error", err) } } - -// ErrNotRenewed is returned when a leader loses its lease. -var ErrNotRenewed = errors.New("leader lease not renewed") diff --git a/internal/clock/errors.go b/internal/clock/errors.go new file mode 100644 index 0000000..ec493e4 --- /dev/null +++ b/internal/clock/errors.go @@ -0,0 +1,20 @@ +package clock + +import ( + "fmt" +) + +type ErrNotRenewed struct { +} + +func (e *ErrNotRenewed) Error() string { + return "leader lease not renewed" +} + +type ErrElectionError struct { + err error +} + +func (e *ErrElectionError) Error() string { + return fmt.Sprintf("Election error: %s", e.err) +} diff --git a/internal/command/command.go b/internal/command/command.go index 7471fd0..e39c2ba 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -49,7 +49,7 @@ func NewCommand(w io.Writer, loader func(string) (config.Config, error)) *cli.Co } } -func start(w io.Writer, loader func(string) (config.Config, error)) cli.ActionFunc { +func start(_ io.Writer, loader func(string) (config.Config, error)) cli.ActionFunc { return func(ctx context.Context, cmd *cli.Command) error { cfg, err := loader(cmd.Root().String("config")) if err != nil { @@ -61,7 +61,7 @@ func start(w io.Writer, loader func(string) (config.Config, error)) cli.ActionFu return err } - err = a.Run() + err = a.Run(ctx) if err != nil { return err } diff --git a/internal/config/config.go b/internal/config/config.go index 1880344..e2a69f5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -18,16 +18,24 @@ type ServerConfig struct { } type LogConfig struct { - Level string `koanf:"level" json:"level"` + Level string `koanf:"level" json:"level"` + OpenTelemetry OpenTelemetryConfig `koanf:"opentelemetry" json:"opentelemetry"` +} + +type OpenTelemetryConfig struct { + Enabled bool `koanf:"enabled" json:"enabled"` } type RedisConfig struct { Addr string `koanf:"addr" json:"addr"` + Username *string `koanf:"username" json:"username,omitempty"` + UsernameFile *string `koanf:"username_file" json:"username_file,omitempty"` Password *string `koanf:"password" json:"password,omitempty"` PasswordFile *string `koanf:"password_file" json:"password_file,omitempty"` - Db int `koanf:"db" json:"db"` + DB int `koanf:"db" json:"db"` // Resolved at load time — not populated from config directly. // Unexported so it isn't accidentally (de)serialised. - resolvedPassword *string + resolvedUsername string + resolvedPassword string } diff --git a/internal/config/config.schema.json b/internal/config/config.schema.json index f80af14..e91d2db 100644 --- a/internal/config/config.schema.json +++ b/internal/config/config.schema.json @@ -42,6 +42,19 @@ "fatal" ], "default": "info" + }, + "opentelemetry": { + "description": "OpenTelemetry exporting configuration of the logger.", + "type": "object", + "additionalProperties": false, + "properties": { + "enabled": { + "description": "Enable the logger OpenTelemetry exporter.", + "type": "boolean", + "default": false + } + }, + "default": {} } }, "default": {} @@ -56,6 +69,14 @@ "type": "string", "default": "localhost:6379" }, + "username": { + "description": "Authentication username.", + "type": "string" + }, + "username_file": { + "description": "Filepath of the file containing the authentication username.", + "type": "string" + }, "password": { "description": "Authentication password.", "type": "string" diff --git a/internal/config/loader.go b/internal/config/loader.go index e61b88e..94ec02e 100644 --- a/internal/config/loader.go +++ b/internal/config/loader.go @@ -14,6 +14,10 @@ import ( "github.com/knadh/koanf/v2" ) +const ( + AppPrefix = "SAMARKAND__" +) + //go:embed config.schema.json var jsonSchema []byte @@ -66,12 +70,12 @@ func Load(fp string) (Config, error) { } err = k.Load(env.Provider(".", env.Opt{ - Prefix: "SAMARKAND__", + Prefix: AppPrefix, TransformFunc: func(k, v string) (string, any) { - if k == "SAMARKAND__CONFIG" { + if k == fmt.Sprintf("%sCONFIG", AppPrefix) { return "", nil } - k = strings.ReplaceAll(strings.ToLower(strings.TrimPrefix(k, "SAMARKAND__")), "_", ".") + k = strings.ReplaceAll(strings.ToLower(strings.TrimPrefix(k, AppPrefix)), "__", ".") return k, v }, }), nil) @@ -87,5 +91,9 @@ func Load(fp string) (Config, error) { return cfg, &ErrConfigValidation{Errors: result.DetailedErrors()} } + // Temporary + // I need to find a generalized way of handling resolved values + _ = cfg.Redis.Resolve() + return cfg, nil } diff --git a/internal/config/resolve.go b/internal/config/resolve.go index cc17ebb..731b388 100644 --- a/internal/config/resolve.go +++ b/internal/config/resolve.go @@ -1,8 +1,6 @@ package config import ( - "errors" - "fmt" "os" "strings" "time" @@ -18,27 +16,41 @@ func (s *ServerConfig) Resolve() error { return nil } -func (r *RedisConfig) ResolvedPassword() *string { +func (r *RedisConfig) ResolvedUsername() string { + return r.resolvedUsername +} + +func (r *RedisConfig) ResolvedPassword() string { return r.resolvedPassword } -// Resolve validates the password configuration and, if PasswordFile is set, -// reads the file and stores its trimmed contents as the effective password. +// Resolve validates the username and password configuration and, if +// UsernameFile or PasswordFile is set, reads the file and stores its trimmed +// contents as the effective username or password. func (r *RedisConfig) Resolve() error { - if r.Password != nil && r.PasswordFile != nil { - return errors.New("redis: password and password_file are mutually exclusive") + if r.UsernameFile != nil { + data, err := os.ReadFile(*r.UsernameFile) + if err != nil { + return err + } + s := strings.TrimRight(string(data), "\r\n") + r.resolvedUsername = s + return nil + } else { + r.resolvedUsername = *r.Username } if r.PasswordFile != nil { data, err := os.ReadFile(*r.PasswordFile) if err != nil { - return fmt.Errorf("redis: reading password_file: %w", err) + return err } s := strings.TrimRight(string(data), "\r\n") - r.resolvedPassword = &s + r.resolvedPassword = s return nil + } else { + r.resolvedPassword = *r.Password } - r.resolvedPassword = r.Password return nil } diff --git a/internal/server/server.go b/internal/server/server.go index 2595d1b..3949552 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -5,9 +5,6 @@ import ( "fmt" "log/slog" "net" - "os" - "os/signal" - "syscall" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -32,22 +29,22 @@ func NewServer(cfg *ServerConfig) *Server { } } -func (s *Server) Start() error { - ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) - defer stop() - +func (s *Server) Start(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) g.Go(func() error { addr := s.addr + lis, err := net.Listen("tcp", addr) if err != nil { return fmt.Errorf("grpc listen %s: %w", addr, err) } + s.logger.Info("gRPC server listening", "addr", addr) if err := s.grpc.Serve(lis); err != nil { return fmt.Errorf("grpc serve: %w", err) } + return nil }) diff --git a/mkdocs.yml b/mkdocs.yml index e6687ce..a0fd140 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -6,7 +6,7 @@ site_author: Coding Kelps copyright: Copyright (c) 2026 Coding Kelps. repo_name: coding-kelps/samarkand repo_url: https://github.com/coding-kelps/samarkand -edit_uri: blob/master/docs/ +edit_uri: blob/main/docs/ theme: name: material features: