-
-
Notifications
You must be signed in to change notification settings - Fork 48
Expand file tree
/
Copy pathoptions.go
More file actions
193 lines (175 loc) · 6.53 KB
/
options.go
File metadata and controls
193 lines (175 loc) · 6.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package queue
import (
"context"
"runtime"
"time"
"github.com/golang-queue/queue/core"
)
var (
defaultCapacity = 0
defaultWorkerCount = int64(runtime.NumCPU())
defaultNewLogger = NewLogger()
defaultFn = func(context.Context, core.TaskMessage) error { return nil }
defaultMetric = NewMetric()
)
// Option is a functional option for configuring a Queue.
// It follows the functional options pattern for flexible and extensible configuration.
type Option interface {
apply(*Options)
}
// OptionFunc is a function adapter that implements the Option interface.
// It allows regular functions to be used as Options.
type OptionFunc func(*Options)
// apply implements the Option interface by calling the function with the provided options.
func (f OptionFunc) apply(option *Options) {
f(option)
}
// WithWorkerCount sets the number of concurrent worker goroutines that will process jobs.
// If num is less than or equal to 0, it defaults to runtime.NumCPU().
// More workers allow higher concurrency but consume more system resources.
//
// Example:
//
// q := NewPool(10, WithWorkerCount(4)) // Creates a pool with 4 workers
func WithWorkerCount(num int64) Option {
return OptionFunc(func(q *Options) {
if num <= 0 {
num = defaultWorkerCount
}
q.workerCount = num
})
}
// WithQueueSize sets the maximum capacity of the queue.
// When set to 0 (default), the queue has unlimited capacity and will grow dynamically.
// When set to a positive value, Queue() will return ErrMaxCapacity when the limit is reached.
// Use this to prevent memory exhaustion under high load.
//
// Example:
//
// q := NewPool(5, WithQueueSize(1000)) // Queue will hold at most 1000 pending tasks
func WithQueueSize(num int) Option {
return OptionFunc(func(q *Options) {
q.queueSize = num
})
}
// WithLogger sets a custom logger for queue events and errors.
// By default, the queue uses a standard logger that writes to stderr.
// Use NewEmptyLogger() to disable logging entirely.
//
// Example:
//
// q := NewPool(5, WithLogger(myCustomLogger))
// // or disable logging:
// q := NewPool(5, WithLogger(NewEmptyLogger()))
func WithLogger(l Logger) Option {
return OptionFunc(func(q *Options) {
q.logger = l
})
}
// WithMetric sets a custom metrics collector for tracking queue statistics.
// The default metric tracks busy workers, success/failure counts, and submitted tasks.
// Implement the Metric interface to integrate with custom monitoring systems.
//
// Example:
//
// q := NewPool(5, WithMetric(myPrometheusMetric))
func WithMetric(m Metric) Option {
return OptionFunc(func(q *Options) {
q.metric = m
})
}
// WithWorker sets a custom worker implementation for the queue backend.
// By default, NewPool uses an in-memory Ring buffer worker.
// Use this to integrate external queue systems like NSQ, NATS, Redis, or RabbitMQ.
// This option is required when using NewQueue() instead of NewPool().
//
// Example:
//
// q, _ := NewQueue(WithWorker(myNSQWorker), WithWorkerCount(10))
func WithWorker(w core.Worker) Option {
return OptionFunc(func(q *Options) {
q.worker = w
})
}
// WithFn sets a custom handler function that will be called to process tasks.
// This function is used by the worker's Run method when processing job messages.
// The context allows cancellation and timeout control during task execution.
// If not set, defaults to a no-op function that returns nil.
//
// Example:
//
// handler := func(ctx context.Context, msg core.TaskMessage) error {
// // Process the message
// return processTask(msg)
// }
// q := NewPool(5, WithFn(handler))
func WithFn(fn func(context.Context, core.TaskMessage) error) Option {
return OptionFunc(func(q *Options) {
q.fn = fn
})
}
// WithAfterFn sets a callback function that will be executed after each job completes.
// This callback runs regardless of whether the job succeeded or failed.
// It executes after metrics are updated but before the worker picks up the next task.
// Useful for cleanup, logging, or triggering post-processing workflows.
//
// Example:
//
// q := NewPool(5, WithAfterFn(func() {
// log.Println("Job completed")
// }))
func WithAfterFn(afterFn func()) Option {
return OptionFunc(func(q *Options) {
q.afterFn = afterFn
})
}
// WithRetryInterval sets the interval at which the queue polls for new tasks when the queue is empty.
// This determines how often Request() is retried after receiving ErrNoTaskInQueue.
// Lower values provide faster response to new tasks but increase CPU usage.
// Defaults to 1 second.
//
// Example:
//
// q := NewPool(5, WithRetryInterval(100*time.Millisecond)) // Poll every 100ms
func WithRetryInterval(d time.Duration) Option {
return OptionFunc(func(q *Options) {
q.retryInterval = d
})
}
// Options holds the configuration parameters for a Queue.
// Use the With* functions to configure these options when creating a queue.
type Options struct {
workerCount int64 // Number of concurrent worker goroutines (default: runtime.NumCPU())
logger Logger // Logger for queue events (default: stderr logger)
queueSize int // Maximum queue capacity, 0 means unlimited (default: 0)
worker core.Worker // Worker implementation for queue backend (default: Ring buffer)
fn func(context.Context, core.TaskMessage) error // Task handler function (default: no-op)
afterFn func() // Callback executed after each job (default: nil)
metric Metric // Metrics collector (default: built-in metric)
retryInterval time.Duration // Polling interval when queue is empty (default: 1 second)
}
// NewOptions creates an Options struct with default values and applies any provided options.
// Default values:
// - workerCount: runtime.NumCPU()
// - queueSize: 0 (unlimited)
// - logger: stderr logger with timestamps
// - worker: nil (must be provided via WithWorker or use NewPool which sets Ring)
// - fn: no-op function returning nil
// - metric: built-in metric tracker
// - retryInterval: 1 second
func NewOptions(opts ...Option) *Options {
o := &Options{
workerCount: defaultWorkerCount,
queueSize: defaultCapacity,
logger: defaultNewLogger,
worker: nil,
fn: defaultFn,
metric: defaultMetric,
retryInterval: time.Second,
}
// Apply each provided option to override defaults
for _, opt := range opts {
opt.apply(o)
}
return o
}