Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* [ENHANCEMENT] Ingester: Instrument Ingester CPU profile with userID for read APIs. #7184
* [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185
* [ENHANCEMENT] Ingester: Add feature flag to collect metrics of how expensive an unoptimized regex matcher is and new limits to protect Ingester query path against expensive unoptimized regex matchers. #7194 #7210
* [ENHANCEMENT] Querier: Add logging for api calls in querier during an OOMKill. #7216
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
* [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132
Expand Down
42 changes: 30 additions & 12 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/request_tracker"
)

const (
Expand Down Expand Up @@ -285,35 +286,52 @@ func NewQuerierHandler(

queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)

requestTracker := request_tracker.NewRequestTracker(querierCfg.ActiveQueryTrackerDir, "apis.active", querierCfg.MaxConcurrent, util_log.GoKitLogToSlog(logger))
var apiHandler http.Handler
var instantQueryHandler http.Handler
var rangedQueryHandler http.Handler
var legacyAPIHandler http.Handler
if requestTracker != nil {
apiHandler = request_tracker.NewRequestWrapper(promRouter, requestTracker, &request_tracker.ApiExtractor{})
legacyAPIHandler = request_tracker.NewRequestWrapper(legacyPromRouter, requestTracker, &request_tracker.ApiExtractor{})
instantQueryHandler = request_tracker.NewRequestWrapper(queryAPI.Wrap(queryAPI.InstantQueryHandler), requestTracker, &request_tracker.InstantQueryExtractor{})
rangedQueryHandler = request_tracker.NewRequestWrapper(queryAPI.Wrap(queryAPI.RangeQueryHandler), requestTracker, &request_tracker.RangedQueryExtractor{})
} else {
apiHandler = promRouter
legacyAPIHandler = legacyPromRouter
instantQueryHandler = queryAPI.Wrap(queryAPI.InstantQueryHandler)
rangedQueryHandler = queryAPI.Wrap(queryAPI.RangeQueryHandler)
}

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
router.Path(path.Join(prefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(instantQueryHandler)
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(rangedQueryHandler)
router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/format_query")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/parse_query")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/metadata")).Methods("GET").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(apiHandler)
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(apiHandler)
router.Path(path.Join(prefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(apiHandler)
router.Path(path.Join(prefix, "/api/v1/metadata")).Methods("GET").Handler(apiHandler)

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(instantQueryHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(rangedQueryHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/format_query")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/parse_query")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Methods("GET").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyAPIHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyAPIHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(legacyAPIHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Methods("GET").Handler(legacyAPIHandler)

if cfg.buildInfoEnabled {
router.Path(path.Join(prefix, "/api/v1/status/buildinfo")).Methods("GET").Handler(promRouter)
Expand Down
113 changes: 113 additions & 0 deletions pkg/util/request_tracker/request_extractor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package request_tracker

import (
"encoding/json"
"net/http"
"strings"
"time"
"unicode/utf8"

"github.com/cortexproject/cortex/pkg/util/requestmeta"
"github.com/cortexproject/cortex/pkg/util/users"
)

type Extractor interface {
Extract(r *http.Request) []byte
}

type DefaultExtractor struct{}

type ApiExtractor struct{}

type InstantQueryExtractor struct{}

type RangedQueryExtractor struct{}

func generateCommonMap(r *http.Request) map[string]interface{} {
ctx := r.Context()
entryMap := make(map[string]interface{})
entryMap["timestamp_sec"] = time.Now().Unix()
entryMap["Path"] = r.URL.Path
entryMap["Method"] = r.Method
entryMap["X-Scope-OrgID"], _ = users.TenantID(ctx)
entryMap["X-Request-ID"] = requestmeta.RequestIdFromContext(ctx)

return entryMap
}

func (e *DefaultExtractor) Extract(r *http.Request) []byte {
entryMap := generateCommonMap(r)

return generateJSONEntry(entryMap)
}

func (e *ApiExtractor) Extract(r *http.Request) []byte {
entryMap := generateCommonMap(r)
entryMap["limit"] = r.URL.Query().Get("limit")
entryMap["start"] = r.URL.Query().Get("start")
entryMap["end"] = r.URL.Query().Get("end")

matches := r.URL.Query()["match[]"]
entryMap["number-of-matches"] = len(matches)

minEntryJSON := generateJSONEntry(entryMap)

matchesStr := strings.Join(matches, ",")
matchesStr = trimStringByBytes(matchesStr, maxEntrySize-(len(minEntryJSON)+1))
entryMap["matches"] = matchesStr

return generateJSONEntry(entryMap)
}

func (e *InstantQueryExtractor) Extract(r *http.Request) []byte {
entryMap := generateCommonMap(r)
entryMap["limit"] = r.URL.Query().Get("limit")
entryMap["time"] = r.URL.Query().Get("time")
entryMap["lookback_delta"] = r.URL.Query().Get("lookback_delta")

minEntryJSON := generateJSONEntry(entryMap)
query := r.URL.Query().Get("query")
query = trimStringByBytes(query, maxEntrySize-(len(minEntryJSON)+1))
entryMap["query"] = query

return generateJSONEntry(entryMap)
}

func (e *RangedQueryExtractor) Extract(r *http.Request) []byte {
entryMap := generateCommonMap(r)
entryMap["limit"] = r.URL.Query().Get("limit")
entryMap["start"] = r.URL.Query().Get("start")
entryMap["end"] = r.URL.Query().Get("end")
entryMap["step"] = r.URL.Query().Get("step")
entryMap["lookback_delta"] = r.URL.Query().Get("lookback_delta")

minEntryJSON := generateJSONEntry(entryMap)
query := r.URL.Query().Get("query")
query = trimStringByBytes(query, maxEntrySize-(len(minEntryJSON)+1))
entryMap["query"] = query

return generateJSONEntry(entryMap)
}

func generateJSONEntry(entryMap map[string]interface{}) []byte {
jsonEntry, err := json.Marshal(entryMap)
if err != nil {
return []byte{}
}

return jsonEntry
}

func trimStringByBytes(str string, size int) string {
bytesStr := []byte(str)

trimIndex := len(bytesStr)
if size < len(bytesStr) {
for !utf8.RuneStart(bytesStr[size]) {
size--
}
trimIndex = size
}

return string(bytesStr[:trimIndex])
}
189 changes: 189 additions & 0 deletions pkg/util/request_tracker/request_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package request_tracker

import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"strings"
"time"

"github.com/edsrzf/mmap-go"
)

type RequestTracker struct {
mmappedFile []byte
getNextIndex chan int
logger *slog.Logger
closer io.Closer
maxConcurrent int
}

var _ io.Closer = &RequestTracker{}

const (
maxEntrySize int = 1000
)

func parseRequestBrokenJSON(brokenJSON []byte) (string, bool) {
requests := strings.ReplaceAll(string(brokenJSON), "\x00", "")
if len(requests) > 0 {
requests = requests[:len(requests)-1] + "]"
}

if len(requests) <= 1 {
return "[]", false
}

return requests, true
}

func logUnfinishedRequests(filename string, filesize int, logger *slog.Logger) {
if _, err := os.Stat(filename); err == nil {
fd, err := os.Open(filename)
if err != nil {
logger.Error("Failed to open request log file", "err", err)
return
}
defer fd.Close()

brokenJSON := make([]byte, filesize)
_, err = fd.Read(brokenJSON)
if err != nil {
logger.Error("Failed to read request log file", "err", err)
return
}

requests, requestsExist := parseRequestBrokenJSON(brokenJSON)
if !requestsExist {
return
}
logger.Info("These requests didn't finish in cortex's last run:", "requests", requests)
}
}

type mmappedRequestFile struct {
f io.Closer
m mmap.MMap
}

func (f *mmappedRequestFile) Close() error {
err := f.m.Unmap()
if err != nil {
err = fmt.Errorf("mmappedRequestFile: unmapping: %w", err)
}
if fErr := f.f.Close(); fErr != nil {
return errors.Join(fmt.Errorf("close mmappedRequestFile.f: %w", fErr), err)
}

return err
}

func getRequestMMappedFile(filename string, filesize int, logger *slog.Logger) ([]byte, io.Closer, error) {
file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o666)
if err != nil {
absPath, pathErr := filepath.Abs(filename)
if pathErr != nil {
absPath = filename
}
logger.Error("Error opening request log file", "file", absPath, "err", err)
return nil, nil, err
}

err = file.Truncate(int64(filesize))
if err != nil {
file.Close()
logger.Error("Error setting filesize.", "filesize", filesize, "err", err)
return nil, nil, err
}

fileAsBytes, err := mmap.Map(file, mmap.RDWR, 0)
if err != nil {
file.Close()
logger.Error("Failed to mmap", "file", filename, "Attempted size", filesize, "err", err)
return nil, nil, err
}

return fileAsBytes, &mmappedRequestFile{f: file, m: fileAsBytes}, err
}

func NewRequestTracker(localStoragePath string, fileName string, maxConcurrent int, logger *slog.Logger) *RequestTracker {
if localStoragePath == "" {
return nil
}

err := os.MkdirAll(localStoragePath, 0o777)
if err != nil {
logger.Error("Failed to create directory for logging active requests")
return nil
}

filename, filesize := filepath.Join(localStoragePath, fileName), 1+maxConcurrent*maxEntrySize
logUnfinishedRequests(filename, filesize, logger)

fileAsBytes, closer, err := getRequestMMappedFile(filename, filesize, logger)
if err != nil {
logger.Error("Unable to create mmap-ed active request log", "err", err)
return nil
}

copy(fileAsBytes, "[")
requestTracker := &RequestTracker{
mmappedFile: fileAsBytes,
closer: closer,
getNextIndex: make(chan int, maxConcurrent),
logger: logger,
maxConcurrent: maxConcurrent,
}

requestTracker.generateIndices(maxConcurrent)

return requestTracker
}

func (tracker *RequestTracker) generateIndices(maxConcurrent int) {
for i := 0; i < maxConcurrent; i++ {
tracker.getNextIndex <- 1 + (i * maxEntrySize)
}
}

func (tracker *RequestTracker) Delete(insertIndex int) {
copy(tracker.mmappedFile[insertIndex:], strings.Repeat("\x00", maxEntrySize))
tracker.getNextIndex <- insertIndex
}

func (tracker *RequestTracker) Insert(ctx context.Context, entry []byte) (int, error) {
if len(entry) > maxEntrySize {
entry = generateMinEntry()
}
select {
case i := <-tracker.getNextIndex:
fileBytes := tracker.mmappedFile
start, end := i, i+maxEntrySize

copy(fileBytes[start:], entry)
copy(fileBytes[end-1:], ",")
return i, nil
case <-ctx.Done():
return 0, ctx.Err()
}
}

func generateMinEntry() []byte {
entryMap := make(map[string]interface{})
entryMap["timestamp_sec"] = time.Now().Unix()
return generateJSONEntry(entryMap)
}

func (tracker *RequestTracker) Close() error {
if tracker == nil || tracker.closer == nil {
return nil
}
if err := tracker.closer.Close(); err != nil {
return fmt.Errorf("close RequestTracker.closer: %w", err)
}
return nil
}
Loading
Loading