From b1c6e919137085df8d223e80df6e19f911ac424b Mon Sep 17 00:00:00 2001 From: PavelAgarkov Date: Sun, 21 Sep 2025 22:28:43 +0600 Subject: [PATCH] =?UTF-8?q?=D0=BF=D0=B5=D1=80=D0=B2=D0=B0=D1=8F=20=D0=BF?= =?UTF-8?q?=D0=BE=D0=BF=D1=8B=D1=82=D0=BA=D0=B0=20=D1=81=D0=B4=D0=B5=D0=BB?= =?UTF-8?q?=D0=B0=D1=82=D1=8C=20=D0=B4=D0=B0=D1=82=D0=B0=20=D0=BF=D1=80?= =?UTF-8?q?=D0=BE=D0=B2=D0=B0=D0=B9=D0=B4=D0=B5=D1=80,=20=D0=BF=D0=BE?= =?UTF-8?q?=D0=BA=D0=B0=20=D1=81=D1=8B=D1=80=D0=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- acceptanse_test.go | 24 +++++++-- contract.go | 2 + docker-compose.yaml | 46 +++++++++++++++++ envelope.go | 13 +++++ go.mod | 2 + go.sum | 18 +++++++ provider/contract.go | 14 +++++ provider/data_processor.go | 13 +++++ provider/redis_provider.go | 97 +++++++++++++++++++++++++++++++++++ provider/utils.go | 13 +++++ queue.go | 101 +++++++++++++++++++++++++++++++++++++ utils.go | 21 ++++++++ 12 files changed, 360 insertions(+), 4 deletions(-) create mode 100644 docker-compose.yaml create mode 100644 provider/contract.go create mode 100644 provider/data_processor.go create mode 100644 provider/redis_provider.go create mode 100644 provider/utils.go diff --git a/acceptanse_test.go b/acceptanse_test.go index a6dddf9..d010c46 100644 --- a/acceptanse_test.go +++ b/acceptanse_test.go @@ -11,13 +11,14 @@ import ( "testing" "time" + "github.com/PavelAgarkov/rate-envelope-queue/provider" "k8s.io/component-base/metrics/legacyregistry" ) type User struct { - Name string - Email string - Age int + Name string `json:"name"` + Email string `json:"email"` + Age int `json:"age"` } func Test_Acceptance(t *testing.T) { @@ -39,7 +40,22 @@ func Test_Acceptance(t *testing.T) { Email: "@gmail.com", Age: 30, } + + redisProvider, err := provider.NewRedisProvider( + parent, + provider.RedisConfig{ + Address: "localhost:6379", + Username: "", + Password: "", + DB: 0, + }, + "processing_table", + "fallback_table", + ) + defer redisProvider.Close() + emailEnvelope, err := NewEnvelope( + WithDataProvider(redisProvider), WithId(1), WithType("email_1"), //WithInterval(6*time.Second), @@ -225,7 +241,7 @@ func Test_Acceptance(t *testing.T) { start() stop() - envelopeQueue.Terminate() + //envelopeQueue.Terminate() err = envelopeQueue.Send(foodEnvelope) if err != nil { fmt.Println("add err after stop:", err) diff --git a/contract.go b/contract.go index 7bad66e..0b0a92a 100644 --- a/contract.go +++ b/contract.go @@ -13,6 +13,8 @@ var ( ErrAdditionEnvelopeToQueueBadFields = errors.New(fmt.Sprintf("%s: addition envelope to queue has bad fields", service)) ErrAdditionEnvelopeToQueueBadIntervals = errors.New(fmt.Sprintf("%s: addition envelope to queue has bad intervals", service)) ErrAllowedQueueCapacityExceeded = errors.New(fmt.Sprintf("%s: allowed queue capacity exceeded", service)) + + ErrPassToDataProvider = errors.New(fmt.Sprintf("%s: can't pass envelope to data provider", service)) ) type ( diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..a588d9e --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,46 @@ +services: + redis: + image: redis:7.2.1 + hostname: redis1 + container_name: rate-redis + ports: + - '6379:6379' + command: redis-server --save 20 1 --appendonly yes --loglevel warning + restart: unless-stopped + healthcheck: + test: [ 'CMD', 'redis-cli', 'ping' ] + interval: 2s + retries: 2 + timeout: 2s + networks: + - rate-queue + volumes: + - redis-data:/data + + postgres: + image: postgres:15.0 + container_name: rate-postgres + ports: + - "5433:5432" + volumes: + - pgdata:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready", "-d", "clients_database"] + interval: 10s + timeout: 3s + retries: 3 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: qwerty12345 + POSTGRES_DB: postgres + networks: + - rate-queue + restart: unless-stopped + +volumes: + pgdata: + redis-data: + +networks: + rate-queue: + driver: bridge \ No newline at end of file diff --git a/envelope.go b/envelope.go index a062b6b..e0b7802 100644 --- a/envelope.go +++ b/envelope.go @@ -4,12 +4,17 @@ import ( "context" "errors" "time" + + "github.com/PavelAgarkov/rate-envelope-queue/provider" ) type Envelope struct { id uint64 _type string + internalId string + fallbackId string + interval time.Duration deadline time.Duration @@ -24,6 +29,8 @@ type Envelope struct { stamps []Stamp // per-envelope stamps payload interface{} + + dataProvider provider.DataProvider } func NewDynamicEnvelope(deadline time.Duration, invoke Invoker, payload interface{}) (*Envelope, error) { @@ -146,3 +153,9 @@ func WithPayload(p interface{}) func(*Envelope) { e.payload = p } } + +func WithDataProvider(dp provider.DataProvider) func(*Envelope) { + return func(e *Envelope) { + e.dataProvider = dp + } +} diff --git a/go.mod b/go.mod index f07911b..752bf1e 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/PavelAgarkov/rate-envelope-queue go 1.24.0 require ( + github.com/go-redis/redis/v8 v8.11.5 github.com/stretchr/testify v1.10.0 k8s.io/client-go v0.34.0 k8s.io/component-base v0.34.0 @@ -13,6 +14,7 @@ require ( github.com/blang/semver/v4 v4.0.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index 69a5dff..9d35199 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,14 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= @@ -20,6 +26,12 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4= +github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= @@ -42,8 +54,12 @@ go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= @@ -51,6 +67,8 @@ google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojt gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/apimachinery v0.34.0 h1:eR1WO5fo0HyoQZt1wdISpFDffnWOvFLOOeJ7MgIv4z0= diff --git a/provider/contract.go b/provider/contract.go new file mode 100644 index 0000000..0602d2f --- /dev/null +++ b/provider/contract.go @@ -0,0 +1,14 @@ +package provider + +import "context" + +type DataProvider interface { + SaveOne(ctx context.Context, data Kv) error + TakeOne(ctx context.Context, data Kv) (any, error) + FinishOne(ctx context.Context, data Kv) error + NextPosition() error + Close() error + + GenerateProcessingPK() (string, error) + GenerateFallbackPK(pk string) (string, error) +} diff --git a/provider/data_processor.go b/provider/data_processor.go new file mode 100644 index 0000000..2f2a457 --- /dev/null +++ b/provider/data_processor.go @@ -0,0 +1,13 @@ +package provider + +type Kv struct { + Key string + Value any +} + +type dataProcessor struct { +} + +func newDataProcessor() *dataProcessor { + return &dataProcessor{} +} diff --git a/provider/redis_provider.go b/provider/redis_provider.go new file mode 100644 index 0000000..c2515ca --- /dev/null +++ b/provider/redis_provider.go @@ -0,0 +1,97 @@ +package provider + +import ( + "context" + "encoding/hex" + "encoding/json" + "fmt" + "log" + + "github.com/go-redis/redis/v8" +) + +type RedisConfig struct { + Address string + Username string + Password string + DB int +} + +type RedisProvider struct { + redisClient *redis.Client + processingTable, fallbackTable string +} + +func NewRedisProvider(ctx context.Context, redisCfg RedisConfig, processingTable, fallbackTable string) (*RedisProvider, error) { + redisClient := redis.NewClient(&redis.Options{ + Addr: redisCfg.Address, + Username: redisCfg.Username, + Password: redisCfg.Password, + }) + + if err := redisClient.Ping(ctx).Err(); err != nil { + log.Printf("RedisProvider: can't connect to redis: %v", err) + return nil, err + } + + return &RedisProvider{ + redisClient: redisClient, + processingTable: processingTable, + fallbackTable: fallbackTable, + }, nil +} + +func (rp *RedisProvider) SaveOne(ctx context.Context, data Kv) error { + bytes, err := json.Marshal(data.Value) + if err != nil { + return fmt.Errorf("marshal error: %w", err) + } + + err = rp.redisClient.Set(ctx, string(data.Key), bytes, 0).Err() + if err != nil { + return fmt.Errorf("RedisProvider: SaveOne: can't set key %s: %w", data.Key, err) + } + return nil +} + +func (rp *RedisProvider) TakeOne(ctx context.Context, data Kv) (any, error) { + val, err := rp.redisClient.Get(ctx, data.Key).Result() + if err != nil { + return nil, fmt.Errorf("RedisProvider: TakeOne: can't get key %s: %w", data.Key, err) + } + + var out any + if err := json.Unmarshal([]byte(val), &out); err != nil { + return nil, fmt.Errorf("unmarshal error: %w", err) + } + + return out, nil +} + +func (rp *RedisProvider) GenerateProcessingPK() (string, error) { + b := RandomBytes(16) + if b == nil { + return "", fmt.Errorf("RedisProvider: GeneratePK: can't generate random bytes") + } + return rp.processingTable + hex.EncodeToString(b), nil +} + +func (rp *RedisProvider) GenerateFallbackPK(pk string) (string, error) { + return rp.fallbackTable + pk, nil +} + +func (rp *RedisProvider) FinishOne(ctx context.Context, data Kv) error { + err := rp.redisClient.Del(ctx, string(data.Key)).Err() + if err != nil { + return fmt.Errorf("RedisProvider: FinishOne: can't delete key %s: %w", data.Key, err) + } + return nil +} + +func (rp *RedisProvider) NextPosition() error { + return nil +} + +func (rp *RedisProvider) Close() error { + return rp.redisClient.Close() +} diff --git a/provider/utils.go b/provider/utils.go new file mode 100644 index 0000000..7e828b6 --- /dev/null +++ b/provider/utils.go @@ -0,0 +1,13 @@ +package provider + +import ( + "crypto/rand" +) + +func RandomBytes(n int) []byte { + b := make([]byte, n) + if _, err := rand.Read(b); err != nil { + return nil + } + return b +} diff --git a/queue.go b/queue.go index 4b52b57..6bde6e8 100644 --- a/queue.go +++ b/queue.go @@ -3,12 +3,14 @@ package rate_envelope_queue import ( "context" "errors" + "fmt" "log" "runtime/debug" "sync" "sync/atomic" "time" + "github.com/PavelAgarkov/rate-envelope-queue/provider" "k8s.io/client-go/util/workqueue" _ "k8s.io/component-base/metrics/prometheus/workqueue" ) @@ -252,6 +254,12 @@ func (q *RateEnvelopeQueue) worker(ctx context.Context) { // ErrStopEnvelope — забыть и не перепланировать. Ошибка от пользователя о том, что задача больше не нужна case errors.Is(important, ErrStopEnvelope): queue.Forget(envelope) + if envelope.dataProvider != nil && envelope.internalId != "" { + err := q.replaceToFailureProvider(envelope) + if err != nil { + log.Printf(service+": envelope %s/%d: can't replace to failure data provider: %v", envelope._type, envelope.id, err) + } + } return nil @@ -314,6 +322,14 @@ func (q *RateEnvelopeQueue) worker(ctx context.Context) { } } } + + if envelope.dataProvider != nil && envelope.internalId != "" { + err := q.replaceToFailureProvider(envelope) + if err != nil { + log.Printf(service+": envelope %s/%d: can't replace to failure data provider: %v", envelope._type, envelope.id, err) + } + } + return nil default: @@ -329,6 +345,13 @@ func (q *RateEnvelopeQueue) worker(ctx context.Context) { envelope.successHook(hctx, envelope) cancel() } + + if envelope.dataProvider != nil && envelope.internalId != "" { + err := envelope.dataProvider.FinishOne(tctx, provider.Kv{Key: envelope.internalId}) + if err != nil { + log.Printf(service+": envelope %s/%d: can't finish in data provider: %v", envelope._type, envelope.id, err) + } + } return nil } }(envelope) @@ -381,6 +404,12 @@ func (q *RateEnvelopeQueue) Send(envelopes ...*Envelope) error { } q.pending = append(q.pending, envelopes...) q.pendingMu.Unlock() + + err := q.passToDataProvider(envelopes) + if err != nil { + return ErrPassToDataProvider + } + return nil } q.pendingMu.Unlock() @@ -414,6 +443,11 @@ func (q *RateEnvelopeQueue) Send(envelopes ...*Envelope) error { local.Add(e) } q.lifecycleMu.Unlock() + err := q.passToDataProvider(envelopes) + if err != nil { + return ErrPassToDataProvider + } + return nil case StateStopping, StateStopped: @@ -425,6 +459,73 @@ func (q *RateEnvelopeQueue) Send(envelopes ...*Envelope) error { } } +func (q *RateEnvelopeQueue) passToDataProvider(envelopes []*Envelope) error { + ctx, cancel := context.WithTimeout(q.terminateCtx, 5*time.Second) + for _, envelope := range envelopes { + if envelope != nil && envelope.dataProvider != nil { + pk, err := envelope.dataProvider.GenerateProcessingPK() + if err != nil { + cancel() + return fmt.Errorf("envelope %s/%d: %v", envelope._type, envelope.id, err) + } + kv := provider.Kv{ + Key: pk, + Value: envelope.payload, + } + err = envelope.dataProvider.SaveOne(ctx, kv) + if err != nil { + log.Printf(service+": failed to save envelope %s/%d to provider: %v", envelope._type, envelope.id, err) + cancel() + return fmt.Errorf("can't pass envelope to data provider: %w", err) + } + envelope.internalId = pk + } + } + cancel() + return nil +} + +func (q *RateEnvelopeQueue) replaceToFailureProvider(envelope *Envelope) error { + ctx, cancel := context.WithTimeout(q.terminateCtx, 5*time.Second) + kv := provider.Kv{Key: envelope.internalId} + entity, err := envelope.dataProvider.TakeOne(ctx, kv) + if err != nil { + log.Printf(service+": envelope %s/%d: can't take from data provider: %v", envelope._type, envelope.id, err) + cancel() + } + if entity != nil { + newPk, err := envelope.dataProvider.GenerateFallbackPK(kv.Key) + if err != nil { + log.Printf(service+": envelope %s/%d: can't generate fallback PK: %v", envelope._type, envelope.id, err) + cancel() + + return fmt.Errorf("can't pass envelope to failure data provider: %w", err) + } + newKv := provider.Kv{ + Key: newPk, + Value: entity, + } + + err = envelope.dataProvider.SaveOne(ctx, newKv) + if err != nil { + log.Printf(service+": envelope %s/%d: can't save to fallback in data provider: %v", envelope._type, envelope.id, err) + cancel() + + return fmt.Errorf("can't pass envelope to failure data provider: %w", err) + } + err = envelope.dataProvider.FinishOne(ctx, kv) + if err != nil { + log.Printf(service+": envelope %s/%d: can't finish in data provider: %v", envelope._type, envelope.id, err) + cancel() + + return fmt.Errorf("can't pass envelope to failure data provider: %w", err) + } + } + cancel() + + return nil +} + func (q *RateEnvelopeQueue) Start() { q.lifecycleMu.Lock() defer q.lifecycleMu.Unlock() diff --git a/utils.go b/utils.go index 55a5e2b..579665b 100644 --- a/utils.go +++ b/utils.go @@ -2,6 +2,7 @@ package rate_envelope_queue import ( "context" + "encoding/binary" "log" "runtime/debug" "time" @@ -23,3 +24,23 @@ func recoverWrap() { log.Printf(service+": panic recovered: %v\n%s", r, debug.Stack()) } } + +func Uint64ToBytesBE(v uint64) []byte { + var b [8]byte + binary.BigEndian.PutUint64(b[:], v) + return b[:] +} + +func Uint64ToBytesLE(v uint64) []byte { + var b [8]byte + binary.LittleEndian.PutUint64(b[:], v) + return b[:] +} + +func BytesToUint64BE(b []byte) uint64 { + return binary.BigEndian.Uint64(b) +} + +func BytesToUint64LE(b []byte) uint64 { + return binary.LittleEndian.Uint64(b) +}