diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 081b1e83a1c..9050ca418d4 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1695,6 +1695,240 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.subrange-ttl [subrange_ttl: | default = 24h] + parquet_row_ranges_cache: + # The parquet row ranges cache backend type. Single or Multiple cache + # backend can be provided. Supported values in single cache: memcached, + # redis, inmemory, and '' (disable). Supported values in multi level + # cache: a comma-separated list of (inmemory, memcached, redis) + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.backend + [backend: | default = ""] + + inmemory: + # Maximum size in bytes of in-memory parquet-row-ranges cache used + # (shared between all tenants). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + + memcached: + # Comma separated list of memcached addresses. Supported prefixes are: + # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV + # query, dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup + # made after that). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.addresses + [addresses: | default = ""] + + # The socket read/write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.timeout + [timeout: | default = 100ms] + + # The maximum number of idle connections that will be maintained per + # address. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-idle-connections + [max_idle_connections: | default = 16] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of concurrent connections running get operations. + # If set to 0, concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum number of keys a single underlying get operation should + # run. If more keys are specified, internally keys are split into + # multiple batches and fetched concurrently, honoring the max + # concurrency. If set to 0, the max batch size is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-batch-size + [max_get_multi_batch_size: | default = 0] + + # The maximum size of an item stored in memcached. Bigger items are not + # stored. If set to 0, no maximum size is enforced. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-item-size + [max_item_size: | default = 1048576] + + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.auto-discovery + [auto_discovery: | default = false] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit + # breaker becomes half-open. If set to 0, by default open duration is + # 60 seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should + # open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + redis: + # Comma separated list of redis addresses. Supported prefixes are: dns+ + # (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, + # dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after + # that). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.addresses + [addresses: | default = ""] + + # Redis username. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.username + [username: | default = ""] + + # Redis password. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.password + [password: | default = ""] + + # Database to be selected after connecting to the server. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.db + [db: | default = 0] + + # Specifies the master's name. Must be not empty for Redis Sentinel. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.master-name + [master_name: | default = ""] + + # The maximum number of concurrent GetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum size per batch for mget. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.get-multi-batch-size + [get_multi_batch_size: | default = 100] + + # The maximum number of concurrent SetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-set-multi-concurrency + [max_set_multi_concurrency: | default = 100] + + # The maximum size per batch for pipeline set. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-multi-batch-size + [set_multi_batch_size: | default = 100] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # Client dial timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.dial-timeout + [dial_timeout: | default = 5s] + + # Client read timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.read-timeout + [read_timeout: | default = 3s] + + # Client write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.write-timeout + [write_timeout: | default = 3s] + + # Whether to enable tls for redis connection. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-enabled + [tls_enabled: | default = false] + + # Path to the client certificate file, which will be used for + # authenticating with the server. Also requires the key path to be + # configured. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-cert-path + [tls_cert_path: | default = ""] + + # Path to the key file for the client certificate. Also requires the + # client certificate to be configured. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-key-path + [tls_key_path: | default = ""] + + # Path to the CA certificates file to validate server certificate + # against. If not set, the host's root CA certificates are used. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-ca-path + [tls_ca_path: | default = ""] + + # Override the expected name on the server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-server-name + [tls_server_name: | default = ""] + + # Skip validating server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-insecure-skip-verify + [tls_insecure_skip_verify: | default = false] + + # If not zero then client-side caching is enabled. Client-side caching + # is when data is stored in memory instead of fetching data each time. + # See https://redis.io/docs/manual/client-side-caching/ for more info. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.cache-size + [cache_size: | default = 0] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit + # breaker becomes half-open. If set to 0, by default open duration is + # 60 seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should + # open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + multilevel: + # The maximum number of concurrent asynchronous operations can occur + # when backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + + # TTL for caching parquet row ranges. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.ttl + [ttl: | default = 10m] + # Maximum number of entries in the regex matchers cache. 0 to disable. # CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items [matchers_cache_max_items: | default = 0] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 7235af218c5..e218ff35f5d 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1753,6 +1753,240 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.subrange-ttl [subrange_ttl: | default = 24h] + parquet_row_ranges_cache: + # The parquet row ranges cache backend type. Single or Multiple cache + # backend can be provided. Supported values in single cache: memcached, + # redis, inmemory, and '' (disable). Supported values in multi level + # cache: a comma-separated list of (inmemory, memcached, redis) + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.backend + [backend: | default = ""] + + inmemory: + # Maximum size in bytes of in-memory parquet-row-ranges cache used + # (shared between all tenants). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + + memcached: + # Comma separated list of memcached addresses. Supported prefixes are: + # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV + # query, dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup + # made after that). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.addresses + [addresses: | default = ""] + + # The socket read/write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.timeout + [timeout: | default = 100ms] + + # The maximum number of idle connections that will be maintained per + # address. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-idle-connections + [max_idle_connections: | default = 16] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of concurrent connections running get operations. + # If set to 0, concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum number of keys a single underlying get operation should + # run. If more keys are specified, internally keys are split into + # multiple batches and fetched concurrently, honoring the max + # concurrency. If set to 0, the max batch size is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-batch-size + [max_get_multi_batch_size: | default = 0] + + # The maximum size of an item stored in memcached. Bigger items are not + # stored. If set to 0, no maximum size is enforced. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-item-size + [max_item_size: | default = 1048576] + + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.auto-discovery + [auto_discovery: | default = false] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit + # breaker becomes half-open. If set to 0, by default open duration is + # 60 seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should + # open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + redis: + # Comma separated list of redis addresses. Supported prefixes are: dns+ + # (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, + # dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after + # that). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.addresses + [addresses: | default = ""] + + # Redis username. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.username + [username: | default = ""] + + # Redis password. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.password + [password: | default = ""] + + # Database to be selected after connecting to the server. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.db + [db: | default = 0] + + # Specifies the master's name. Must be not empty for Redis Sentinel. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.master-name + [master_name: | default = ""] + + # The maximum number of concurrent GetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum size per batch for mget. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.get-multi-batch-size + [get_multi_batch_size: | default = 100] + + # The maximum number of concurrent SetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-set-multi-concurrency + [max_set_multi_concurrency: | default = 100] + + # The maximum size per batch for pipeline set. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-multi-batch-size + [set_multi_batch_size: | default = 100] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # Client dial timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.dial-timeout + [dial_timeout: | default = 5s] + + # Client read timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.read-timeout + [read_timeout: | default = 3s] + + # Client write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.write-timeout + [write_timeout: | default = 3s] + + # Whether to enable tls for redis connection. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-enabled + [tls_enabled: | default = false] + + # Path to the client certificate file, which will be used for + # authenticating with the server. Also requires the key path to be + # configured. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-cert-path + [tls_cert_path: | default = ""] + + # Path to the key file for the client certificate. Also requires the + # client certificate to be configured. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-key-path + [tls_key_path: | default = ""] + + # Path to the CA certificates file to validate server certificate + # against. If not set, the host's root CA certificates are used. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-ca-path + [tls_ca_path: | default = ""] + + # Override the expected name on the server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-server-name + [tls_server_name: | default = ""] + + # Skip validating server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-insecure-skip-verify + [tls_insecure_skip_verify: | default = false] + + # If not zero then client-side caching is enabled. Client-side caching + # is when data is stored in memory instead of fetching data each time. + # See https://redis.io/docs/manual/client-side-caching/ for more info. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.cache-size + [cache_size: | default = 0] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit + # breaker becomes half-open. If set to 0, by default open duration is + # 60 seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should + # open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + multilevel: + # The maximum number of concurrent asynchronous operations can occur + # when backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + + # TTL for caching parquet row ranges. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.ttl + [ttl: | default = 10m] + # Maximum number of entries in the regex matchers cache. 0 to disable. # CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items [matchers_cache_max_items: | default = 0] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index f9c206ac537..7f14bf289f0 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2376,6 +2376,238 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.subrange-ttl [subrange_ttl: | default = 24h] + parquet_row_ranges_cache: + # The parquet row ranges cache backend type. Single or Multiple cache + # backend can be provided. Supported values in single cache: memcached, + # redis, inmemory, and '' (disable). Supported values in multi level cache: + # a comma-separated list of (inmemory, memcached, redis) + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.backend + [backend: | default = ""] + + inmemory: + # Maximum size in bytes of in-memory parquet-row-ranges cache used (shared + # between all tenants). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + + memcached: + # Comma separated list of memcached addresses. Supported prefixes are: + # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, + # dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after + # that). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.addresses + [addresses: | default = ""] + + # The socket read/write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.timeout + [timeout: | default = 100ms] + + # The maximum number of idle connections that will be maintained per + # address. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-idle-connections + [max_idle_connections: | default = 16] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of concurrent connections running get operations. If + # set to 0, concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum number of keys a single underlying get operation should run. + # If more keys are specified, internally keys are split into multiple + # batches and fetched concurrently, honoring the max concurrency. If set + # to 0, the max batch size is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-batch-size + [max_get_multi_batch_size: | default = 0] + + # The maximum size of an item stored in memcached. Bigger items are not + # stored. If set to 0, no maximum size is enforced. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-item-size + [max_item_size: | default = 1048576] + + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.auto-discovery + [auto_discovery: | default = false] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit breaker + # becomes half-open. If set to 0, by default open duration is 60 + # seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + redis: + # Comma separated list of redis addresses. Supported prefixes are: dns+ + # (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, + # dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after + # that). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.addresses + [addresses: | default = ""] + + # Redis username. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.username + [username: | default = ""] + + # Redis password. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.password + [password: | default = ""] + + # Database to be selected after connecting to the server. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.db + [db: | default = 0] + + # Specifies the master's name. Must be not empty for Redis Sentinel. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.master-name + [master_name: | default = ""] + + # The maximum number of concurrent GetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum size per batch for mget. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.get-multi-batch-size + [get_multi_batch_size: | default = 100] + + # The maximum number of concurrent SetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-set-multi-concurrency + [max_set_multi_concurrency: | default = 100] + + # The maximum size per batch for pipeline set. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-multi-batch-size + [set_multi_batch_size: | default = 100] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # Client dial timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.dial-timeout + [dial_timeout: | default = 5s] + + # Client read timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.read-timeout + [read_timeout: | default = 3s] + + # Client write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.write-timeout + [write_timeout: | default = 3s] + + # Whether to enable tls for redis connection. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-enabled + [tls_enabled: | default = false] + + # Path to the client certificate file, which will be used for + # authenticating with the server. Also requires the key path to be + # configured. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-cert-path + [tls_cert_path: | default = ""] + + # Path to the key file for the client certificate. Also requires the + # client certificate to be configured. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-key-path + [tls_key_path: | default = ""] + + # Path to the CA certificates file to validate server certificate against. + # If not set, the host's root CA certificates are used. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-ca-path + [tls_ca_path: | default = ""] + + # Override the expected name on the server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-server-name + [tls_server_name: | default = ""] + + # Skip validating server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-insecure-skip-verify + [tls_insecure_skip_verify: | default = false] + + # If not zero then client-side caching is enabled. Client-side caching is + # when data is stored in memory instead of fetching data each time. See + # https://redis.io/docs/manual/client-side-caching/ for more info. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.cache-size + [cache_size: | default = 0] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit breaker + # becomes half-open. If set to 0, by default open duration is 60 + # seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + multilevel: + # The maximum number of concurrent asynchronous operations can occur when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + + # TTL for caching parquet row ranges. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.ttl + [ttl: | default = 10m] + # Maximum number of entries in the regex matchers cache. 0 to disable. # CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items [matchers_cache_max_items: | default = 0] diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 559f1f1c533..33eba950616 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -142,6 +142,19 @@ func NewParquetQueryable( return nil, err } + rowRangesCacheBackend, err := cortex_tsdb.CreateParquetRowRangesCache(storageCfg.BucketStore.ParquetRowRangesCache, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) + if err != nil { + return nil, err + } + rowRangesCache := parquetutil.NewRowRangesCache(rowRangesCacheBackend, "parquet-row-ranges", storageCfg.BucketStore.ParquetRowRangesCache.TTL, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) + + var constraintCacheFunc queryable.ConstraintCacheFunction + if rowRangesCache != nil { + constraintCacheFunc = func(context.Context) (search.RowRangesForConstraintsCache, error) { + return rowRangesCache, nil + } + } + cDecoder := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()) parquetQueryableOpts := []queryable.QueryableOpts{ @@ -272,7 +285,7 @@ func NewParquetQueryable( } return shards, errGroup.Wait() - }, nil, cDecoder, parquetQueryableOpts...) + }, constraintCacheFunc, cDecoder, parquetQueryableOpts...) p := &parquetQueryableWithFallback{ subservices: manager, diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index d998c6ff53e..a5c9b15b6ac 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -218,6 +218,37 @@ func (cfg *ParquetLabelsCacheConfig) Validate() error { return cfg.BucketCacheBackend.Validate() } +type ParquetRowRangesCacheConfig struct { + BucketCacheBackend `yaml:",inline"` + + TTL time.Duration `yaml:"ttl"` +} + +func (cfg *ParquetRowRangesCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("The parquet row ranges cache backend type. Single or Multiple cache backend can be provided. "+ + "Supported values in single cache: %s, %s, %s, and '' (disable). "+ + "Supported values in multi level cache: a comma-separated list of (%s)", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory, strings.Join(supportedBucketCacheBackends, ", "))) + + cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.") + cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.") + cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.", "parquet-row-ranges") + cfg.MultiLevel.RegisterFlagsWithPrefix(f, prefix+"multilevel.") + + f.DurationVar(&cfg.TTL, prefix+"ttl", 10*time.Minute, "TTL for caching parquet row ranges.") + + // In the multi level parquet row ranges cache, backfill TTL follows the row ranges TTL. + cfg.MultiLevel.BackFillTTL = cfg.TTL +} + +func (cfg *ParquetRowRangesCacheConfig) Validate() error { + return cfg.BucketCacheBackend.Validate() +} + +func CreateParquetRowRangesCache(cfg ParquetRowRangesCacheConfig, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) { + cfg.MultiLevel.BackFillTTL = cfg.TTL + return createBucketCache("parquet-row-ranges-cache", &cfg.BucketCacheBackend, logger, reg) +} + func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, parquetLabelsConfig ParquetLabelsCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { cfg := cache.NewCachingBucketConfig() cachingConfigured := false diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index fa6f7b1c938..c06ac643a9f 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -277,25 +277,26 @@ func (cfg *TSDBConfig) IsBlocksShippingEnabled() bool { // BucketStoreConfig holds the config information for Bucket Stores used by the querier and store-gateway. type BucketStoreConfig struct { - SyncDir string `yaml:"sync_dir"` - SyncInterval time.Duration `yaml:"sync_interval"` - MaxConcurrent int `yaml:"max_concurrent"` - MaxInflightRequests int `yaml:"max_inflight_requests"` - TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"` - BlockSyncConcurrency int `yaml:"block_sync_concurrency"` - MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` - ConsistencyDelay time.Duration `yaml:"consistency_delay"` - IndexCache IndexCacheConfig `yaml:"index_cache"` - ChunksCache ChunksCacheConfig `yaml:"chunks_cache"` - MetadataCache MetadataCacheConfig `yaml:"metadata_cache"` - ParquetLabelsCache ParquetLabelsCacheConfig `yaml:"parquet_labels_cache"` - MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"` - IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"` - IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"` - IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"` - BucketIndex BucketIndexConfig `yaml:"bucket_index"` - BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"` - BucketStoreType string `yaml:"bucket_store_type"` + SyncDir string `yaml:"sync_dir"` + SyncInterval time.Duration `yaml:"sync_interval"` + MaxConcurrent int `yaml:"max_concurrent"` + MaxInflightRequests int `yaml:"max_inflight_requests"` + TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"` + BlockSyncConcurrency int `yaml:"block_sync_concurrency"` + MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` + ConsistencyDelay time.Duration `yaml:"consistency_delay"` + IndexCache IndexCacheConfig `yaml:"index_cache"` + ChunksCache ChunksCacheConfig `yaml:"chunks_cache"` + MetadataCache MetadataCacheConfig `yaml:"metadata_cache"` + ParquetLabelsCache ParquetLabelsCacheConfig `yaml:"parquet_labels_cache"` + ParquetRowRangesCache ParquetRowRangesCacheConfig `yaml:"parquet_row_ranges_cache"` + MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"` + IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"` + IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"` + IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"` + BucketIndex BucketIndexConfig `yaml:"bucket_index"` + BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"` + BucketStoreType string `yaml:"bucket_store_type"` // Chunk pool. MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"` @@ -356,6 +357,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { cfg.ChunksCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.chunks-cache.") cfg.MetadataCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.metadata-cache.") cfg.ParquetLabelsCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.parquet-labels-cache.") + cfg.ParquetRowRangesCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.parquet-row-ranges-cache.") cfg.BucketIndex.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.bucket-index.") f.StringVar(&cfg.SyncDir, "blocks-storage.bucket-store.sync-dir", "tsdb-sync", "Directory to store synchronized TSDB index headers.") @@ -417,6 +419,10 @@ func (cfg *BucketStoreConfig) Validate() error { if err != nil { return errors.Wrap(err, "parquet-labels-cache configuration") } + err = cfg.ParquetRowRangesCache.Validate() + if err != nil { + return errors.Wrap(err, "parquet-row-ranges-cache configuration") + } if !slices.Contains(supportedBlockDiscoveryStrategies, cfg.BlockDiscoveryStrategy) { return ErrInvalidBucketIndexBlockDiscoveryStrategy } diff --git a/pkg/storage/tsdb/multilevel_bucket_cache.go b/pkg/storage/tsdb/multilevel_bucket_cache.go index f9e2b4fbfd1..66c6c432918 100644 --- a/pkg/storage/tsdb/multilevel_bucket_cache.go +++ b/pkg/storage/tsdb/multilevel_bucket_cache.go @@ -71,6 +71,9 @@ func newMultiLevelBucketCache(name string, cfg MultiLevelBucketCacheConfig, reg case "parquet-labels-cache": itemName = "parquet_labels_cache" metricHelpText = "parquet labels cache" + case "parquet-row-ranges-cache": + itemName = "parquet_row_ranges_cache" + metricHelpText = "parquet row ranges cache" default: itemName = name } diff --git a/pkg/util/parquetutil/row_ranges_cache.go b/pkg/util/parquetutil/row_ranges_cache.go new file mode 100644 index 00000000000..5434667be79 --- /dev/null +++ b/pkg/util/parquetutil/row_ranges_cache.go @@ -0,0 +1,182 @@ +package parquetutil + +import ( + "context" + "crypto/sha256" + "encoding/binary" + "encoding/hex" + "fmt" + "hash" + "strconv" + "time" + + "github.com/prometheus-community/parquet-common/search" + parquet_storage "github.com/prometheus-community/parquet-common/storage" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/cache" + + "github.com/cortexproject/cortex/pkg/util/users" +) + +const ( + // 4 bytes uint32 count. + rowRangesHeaderSize = 4 + + // int64 From + int64 Count. + rowRangeEntrySize = 16 +) + +type rowRangesCacheMetrics struct { + hits *prometheus.CounterVec + misses *prometheus.CounterVec + decodeErrors *prometheus.CounterVec +} + +func newRowRangesCacheMetrics(r prometheus.Registerer) *rowRangesCacheMetrics { + return &rowRangesCacheMetrics{ + hits: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_row_ranges_cache_hits_total", + Help: "Total number of parquet row ranges cache hits.", + }, []string{"name"}), + misses: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_row_ranges_cache_misses_total", + Help: "Total number of parquet row ranges cache misses.", + }, []string{"name"}), + decodeErrors: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_row_ranges_cache_decode_errors_total", + Help: "Total number of parquet row ranges cache decode errors.", + }, []string{"name"}), + } +} + +type RowRangesCache struct { + cache cache.Cache + name string + metrics *rowRangesCacheMetrics + ttl time.Duration +} + +func NewRowRangesCache(cache cache.Cache, name string, ttl time.Duration, reg prometheus.Registerer) *RowRangesCache { + if cache == nil { + return nil + } + + return &RowRangesCache{ + cache: cache, + name: name, + metrics: newRowRangesCacheMetrics(reg), + ttl: ttl, + } +} + +func (c *RowRangesCache) Get(ctx context.Context, shard parquet_storage.ParquetShard, rgIdx int, cs []search.Constraint) ([]search.RowRange, bool) { + key, ok := rowRangesCacheKey(c.name, ctx, shard, rgIdx, cs) + if !ok { + c.metrics.misses.WithLabelValues(c.name).Inc() + return nil, false + } + + hits := c.cache.Fetch(ctx, []string{key}) + buf, ok := hits[key] + if !ok { + c.metrics.misses.WithLabelValues(c.name).Inc() + return nil, false + } + + rowRanges, err := decodeRowRanges(buf) + if err != nil { + c.metrics.decodeErrors.WithLabelValues(c.name).Inc() + c.metrics.misses.WithLabelValues(c.name).Inc() + return nil, false + } + + c.metrics.hits.WithLabelValues(c.name).Inc() + return rowRanges, true +} + +func (c *RowRangesCache) Set(ctx context.Context, shard parquet_storage.ParquetShard, rgIdx int, cs []search.Constraint, rr []search.RowRange) error { + key, ok := rowRangesCacheKey(c.name, ctx, shard, rgIdx, cs) + if !ok { + return nil + } + + c.cache.Store(map[string][]byte{key: encodeRowRanges(rr)}, c.ttl) + return nil +} + +func (c *RowRangesCache) Delete(context.Context, parquet_storage.ParquetShard, int, []search.Constraint) error { + // thanos/pkg/cache.Cache doesn't expose deletion. + return nil +} + +func (c *RowRangesCache) Close() error { + // thanos/pkg/cache.Cache doesn't expose Close. + return nil +} + +func rowRangesCacheKey(name string, ctx context.Context, shard parquet_storage.ParquetShard, rgIdx int, cs []search.Constraint) (string, bool) { + userID, err := users.TenantID(ctx) + if err != nil { + return "", false + } + + //gomemcache rejects whitespace or keys over 250 bytes, so hash the key + h := sha256.New() + writeCacheKeyPart(h, userID) + writeCacheKeyPart(h, shard.Name()) + writeCacheKeyPart(h, strconv.Itoa(shard.ShardIdx())) + writeCacheKeyPart(h, strconv.Itoa(rgIdx)) + for _, c := range cs { + writeCacheKeyPart(h, c.String()) + } + + return name + ":" + hex.EncodeToString(h.Sum(nil)), true +} + +func writeCacheKeyPart(h hash.Hash, part string) { + _, _ = fmt.Fprintf(h, "%d:", len(part)) + _, _ = h.Write([]byte(part)) +} + +func encodeRowRanges(rowRanges []search.RowRange) []byte { + buf := make([]byte, rowRangesHeaderSize+len(rowRanges)*rowRangeEntrySize) + binary.LittleEndian.PutUint32(buf[:rowRangesHeaderSize], uint32(len(rowRanges))) + + offset := rowRangesHeaderSize + for _, rowRange := range rowRanges { + binary.LittleEndian.PutUint64(buf[offset:offset+8], uint64(rowRange.From)) + binary.LittleEndian.PutUint64(buf[offset+8:offset+rowRangeEntrySize], uint64(rowRange.Count)) + offset += rowRangeEntrySize + } + + return buf +} + +func decodeRowRanges(buf []byte) ([]search.RowRange, error) { + if len(buf) < rowRangesHeaderSize { + return nil, fmt.Errorf("invalid row ranges cache entry: got %d bytes, expected at least %d", len(buf), rowRangesHeaderSize) + } + + count := binary.LittleEndian.Uint32(buf[:rowRangesHeaderSize]) + if count > uint32((len(buf)-rowRangesHeaderSize)/rowRangeEntrySize) { + return nil, fmt.Errorf("invalid row ranges cache entry: got %d bytes for %d row ranges", len(buf), count) + } + + expectedSize := rowRangesHeaderSize + int(count)*rowRangeEntrySize + if len(buf) != expectedSize { + return nil, fmt.Errorf("invalid row ranges cache entry: got %d bytes, expected %d", len(buf), expectedSize) + } + + rowRanges := make([]search.RowRange, int(count)) + offset := rowRangesHeaderSize + for i := range rowRanges { + rowRanges[i] = search.RowRange{ + From: int64(binary.LittleEndian.Uint64(buf[offset : offset+8])), + Count: int64(binary.LittleEndian.Uint64(buf[offset+8 : offset+rowRangeEntrySize])), + } + offset += rowRangeEntrySize + } + + return rowRanges, nil +} diff --git a/pkg/util/parquetutil/row_ranges_cache_test.go b/pkg/util/parquetutil/row_ranges_cache_test.go new file mode 100644 index 00000000000..3e18b0f326c --- /dev/null +++ b/pkg/util/parquetutil/row_ranges_cache_test.go @@ -0,0 +1,300 @@ +package parquetutil + +import ( + "bytes" + "context" + "maps" + "testing" + "time" + + "github.com/parquet-go/parquet-go" + "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/search" + parquet_storage "github.com/prometheus-community/parquet-common/storage" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" +) + +func TestRowRanges_EncodeDecode(t *testing.T) { + tests := []struct { + name string + rowRanges []search.RowRange + encoded []byte + }{ + { + name: "empty", + rowRanges: []search.RowRange{}, + encoded: []byte{ + 0, 0, 0, 0, + }, + }, + { + name: "multiple row ranges", + rowRanges: []search.RowRange{ + {From: 1, Count: 2}, + {From: 256, Count: 513}, + }, + encoded: []byte{ + 2, 0, 0, 0, + 1, 0, 0, 0, 0, 0, 0, 0, + 2, 0, 0, 0, 0, 0, 0, 0, + 0, 1, 0, 0, 0, 0, 0, 0, + 1, 2, 0, 0, 0, 0, 0, 0, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + encoded := encodeRowRanges(tt.rowRanges) + require.Equal(t, tt.encoded, encoded) + + decoded, err := decodeRowRanges(encoded) + require.NoError(t, err) + require.Equal(t, tt.rowRanges, decoded) + }) + } +} + +func TestRowRanges_InvalidDecode(t *testing.T) { + tests := []struct { + name string + buf []byte + errContains string + }{ + { + name: "short header", + buf: []byte{1, 0}, + errContains: "invalid row ranges cache entry: got 2 bytes, expected at least 4", + }, + { + name: "truncated row range", + buf: []byte{1, 0, 0, 0}, + errContains: "invalid row ranges cache entry: got 4 bytes for 1 row ranges", + }, + { + name: "trailing bytes", + buf: []byte{0, 0, 0, 0, 0}, + errContains: "invalid row ranges cache entry: got 5 bytes, expected 4", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + decoded, err := decodeRowRanges(tt.buf) + require.Nil(t, decoded) + require.ErrorContains(t, err, tt.errContains) + }) + } +} + +func TestRowRangesCacheKey(t *testing.T) { + cacheName := "parquet-row-ranges" + shard := fakeParquetShard{name: "block-01/labels.parquet", shardIdx: 7} + ctx := user.InjectOrgID(context.Background(), "tenant-a") + constraints := []search.Constraint{ + equalConstraint("labels.foo", "bar baz:qux"), + equalConstraint("labels.zone", "us-east"), + } + + key, ok := rowRangesCacheKey(cacheName, ctx, shard, 4, constraints) + require.True(t, ok) + require.Regexp(t, `^parquet-row-ranges:[0-9a-f]{64}$`, key) + require.Len(t, key, len(cacheName)+1+64) + require.NotContains(t, key, " ") + require.NotContains(t, key, "bar baz:qux") + + sameKey, ok := rowRangesCacheKey(cacheName, ctx, shard, 4, constraints) + require.True(t, ok) + require.Equal(t, key, sameKey) + + differentConstraintKey, ok := rowRangesCacheKey(cacheName, ctx, shard, 4, []search.Constraint{ + equalConstraint("labels.foo", "different"), + equalConstraint("labels.zone", "us-east"), + }) + require.True(t, ok) + require.NotEqual(t, key, differentConstraintKey) + + noConstraintKey, ok := rowRangesCacheKey(cacheName, ctx, shard, 4, nil) + require.True(t, ok) + require.Regexp(t, `^parquet-row-ranges:[0-9a-f]{64}$`, noConstraintKey) + require.NotEqual(t, key, noConstraintKey) +} + +func TestRowRangesCacheKey_MissingTenant(t *testing.T) { + shard := fakeParquetShard{name: "block-01/labels.parquet", shardIdx: 7} + + key, ok := rowRangesCacheKey("parquet-row-ranges", context.Background(), shard, 3, nil) + require.False(t, ok) + require.Empty(t, key) +} + +func TestRowRangesCache_GetSet(t *testing.T) { + env := newRowRangesCacheTestEnv(t) + rowRanges := []search.RowRange{ + {From: 10, Count: 3}, + {From: 30, Count: 4}, + } + + got, ok := env.cache.Get(env.ctx, env.shard, 2, env.constraints) + require.False(t, ok) + require.Nil(t, got) + + require.NoError(t, env.cache.Set(env.ctx, env.shard, 2, env.constraints, rowRanges)) + require.Equal(t, time.Hour, env.backingCache.lastTTL) + require.Len(t, env.backingCache.entries, 1) + + got, ok = env.cache.Get(env.ctx, env.shard, 2, env.constraints) + require.True(t, ok) + require.Equal(t, rowRanges, got) + + require.NoError(t, testutil.GatherAndCompare(env.reg, bytes.NewBufferString(` + # HELP cortex_parquet_row_ranges_cache_hits_total Total number of parquet row ranges cache hits. + # TYPE cortex_parquet_row_ranges_cache_hits_total counter + cortex_parquet_row_ranges_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_row_ranges_cache_misses_total Total number of parquet row ranges cache misses. + # TYPE cortex_parquet_row_ranges_cache_misses_total counter + cortex_parquet_row_ranges_cache_misses_total{name="test"} 1 + `), + "cortex_parquet_row_ranges_cache_hits_total", + "cortex_parquet_row_ranges_cache_misses_total", + )) +} + +func TestRowRangesCache_GetCorruptEntry(t *testing.T) { + env := newRowRangesCacheTestEnv(t) + key, ok := rowRangesCacheKey("test", env.ctx, env.shard, 2, env.constraints) + require.True(t, ok) + env.backingCache.entries[key] = []byte{1, 0, 0, 0} + + got, ok := env.cache.Get(env.ctx, env.shard, 2, env.constraints) + require.False(t, ok) + require.Nil(t, got) + + require.NoError(t, testutil.GatherAndCompare(env.reg, bytes.NewBufferString(` + # HELP cortex_parquet_row_ranges_cache_decode_errors_total Total number of parquet row ranges cache decode errors. + # TYPE cortex_parquet_row_ranges_cache_decode_errors_total counter + cortex_parquet_row_ranges_cache_decode_errors_total{name="test"} 1 + # HELP cortex_parquet_row_ranges_cache_misses_total Total number of parquet row ranges cache misses. + # TYPE cortex_parquet_row_ranges_cache_misses_total counter + cortex_parquet_row_ranges_cache_misses_total{name="test"} 1 + `), + "cortex_parquet_row_ranges_cache_decode_errors_total", + "cortex_parquet_row_ranges_cache_misses_total", + )) +} + +func TestRowRangesCache_MissingTenant(t *testing.T) { + env := newRowRangesCacheTestEnv(t) + rowRanges := []search.RowRange{{From: 10, Count: 3}} + + require.NoError(t, env.cache.Set(context.Background(), env.shard, 2, env.constraints, rowRanges)) + require.Empty(t, env.backingCache.entries) + + got, ok := env.cache.Get(context.Background(), env.shard, 2, env.constraints) + require.False(t, ok) + require.Nil(t, got) + require.Equal(t, 0, env.backingCache.fetchCalls) + + require.NoError(t, testutil.GatherAndCompare(env.reg, bytes.NewBufferString(` + # HELP cortex_parquet_row_ranges_cache_misses_total Total number of parquet row ranges cache misses. + # TYPE cortex_parquet_row_ranges_cache_misses_total counter + cortex_parquet_row_ranges_cache_misses_total{name="test"} 1 + `), + "cortex_parquet_row_ranges_cache_misses_total", + )) +} + +func TestNewRowRangesCache_NilBackend(t *testing.T) { + require.Nil(t, NewRowRangesCache(nil, "test", time.Hour, prometheus.NewRegistry())) +} + +func equalConstraint(path, value string) search.Constraint { + return search.Equal(path, parquet.ValueOf(value)) +} + +type rowRangesCacheTestEnv struct { + reg *prometheus.Registry + backingCache *fakeThanosCache + cache *RowRangesCache + ctx context.Context + shard fakeParquetShard + constraints []search.Constraint +} + +func newRowRangesCacheTestEnv(t *testing.T) rowRangesCacheTestEnv { + t.Helper() + + reg := prometheus.NewRegistry() + backingCache := newFakeThanosCache() + + return rowRangesCacheTestEnv{ + reg: reg, + backingCache: backingCache, + cache: NewRowRangesCache(backingCache, "test", time.Hour, reg), + ctx: user.InjectOrgID(context.Background(), "tenant-a"), + shard: fakeParquetShard{name: "block-01/labels.parquet", shardIdx: 7}, + constraints: []search.Constraint{equalConstraint("labels.foo", "bar")}, + } +} + +type fakeThanosCache struct { + entries map[string][]byte + lastTTL time.Duration + fetchCalls int +} + +func newFakeThanosCache() *fakeThanosCache { + return &fakeThanosCache{ + entries: map[string][]byte{}, + } +} + +func (c *fakeThanosCache) Store(data map[string][]byte, ttl time.Duration) { + c.lastTTL = ttl + maps.Copy(c.entries, data) +} + +func (c *fakeThanosCache) Fetch(_ context.Context, keys []string) map[string][]byte { + c.fetchCalls++ + + hits := map[string][]byte{} + for _, key := range keys { + value, ok := c.entries[key] + if ok { + hits[key] = value + } + } + return hits +} + +func (c *fakeThanosCache) Name() string { + return "fake" +} + +type fakeParquetShard struct { + name string + shardIdx int +} + +func (s fakeParquetShard) Name() string { + return s.name +} + +func (s fakeParquetShard) ShardIdx() int { + return s.shardIdx +} + +func (s fakeParquetShard) LabelsFile() parquet_storage.ParquetFileView { + return nil +} + +func (s fakeParquetShard) ChunksFile() parquet_storage.ParquetFileView { + return nil +} + +func (s fakeParquetShard) TSDBSchema() (*schema.TSDBSchema, error) { + return nil, nil +} diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 50b9a88bb71..654dfdb739b 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -2606,6 +2606,326 @@ }, "type": "object" }, + "parquet_row_ranges_cache": { + "properties": { + "backend": { + "description": "The parquet row ranges cache backend type. Single or Multiple cache backend can be provided. Supported values in single cache: memcached, redis, inmemory, and '' (disable). Supported values in multi level cache: a comma-separated list of (inmemory, memcached, redis)", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.backend" + }, + "inmemory": { + "properties": { + "max_size_bytes": { + "default": 1073741824, + "description": "Maximum size in bytes of in-memory parquet-row-ranges cache used (shared between all tenants).", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.inmemory.max-size-bytes" + } + }, + "type": "object" + }, + "memcached": { + "properties": { + "addresses": { + "description": "Comma separated list of memcached addresses. Supported prefixes are: dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after that).", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.addresses" + }, + "auto_discovery": { + "default": false, + "description": "Use memcached auto-discovery mechanism provided by some cloud provider like GCP and AWS", + "type": "boolean", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.auto-discovery" + }, + "max_async_buffer_size": { + "default": 10000, + "description": "The maximum number of enqueued asynchronous operations allowed.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-buffer-size" + }, + "max_async_concurrency": { + "default": 3, + "description": "The maximum number of concurrent asynchronous operations can occur.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-concurrency" + }, + "max_get_multi_batch_size": { + "default": 0, + "description": "The maximum number of keys a single underlying get operation should run. If more keys are specified, internally keys are split into multiple batches and fetched concurrently, honoring the max concurrency. If set to 0, the max batch size is unlimited.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-batch-size" + }, + "max_get_multi_concurrency": { + "default": 100, + "description": "The maximum number of concurrent connections running get operations. If set to 0, concurrency is unlimited.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-concurrency" + }, + "max_idle_connections": { + "default": 16, + "description": "The maximum number of idle connections that will be maintained per address.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-idle-connections" + }, + "max_item_size": { + "default": 1048576, + "description": "The maximum size of an item stored in memcached. Bigger items are not stored. If set to 0, no maximum size is enforced.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-item-size" + }, + "set_async_circuit_breaker_config": { + "properties": { + "consecutive_failures": { + "default": 5, + "description": "Consecutive failures to determine if the circuit breaker should open.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.consecutive-failures" + }, + "enabled": { + "default": false, + "description": "If true, enable circuit breaker.", + "type": "boolean", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.enabled" + }, + "failure_percent": { + "default": 0.05, + "description": "Failure percentage to determine if the circuit breaker should open.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.failure-percent" + }, + "half_open_max_requests": { + "default": 10, + "description": "Maximum number of requests allowed to pass through when the circuit breaker is half-open. If set to 0, by default it allows 1 request.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.half-open-max-requests" + }, + "min_requests": { + "default": 50, + "description": "Minimal requests to trigger the circuit breaker.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.min-requests" + }, + "open_duration": { + "default": "5s", + "description": "Period of the open state after which the state of the circuit breaker becomes half-open. If set to 0, by default open duration is 60 seconds.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.open-duration", + "x-format": "duration" + } + }, + "type": "object" + }, + "timeout": { + "default": "100ms", + "description": "The socket read/write timeout.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.timeout", + "x-format": "duration" + } + }, + "type": "object" + }, + "multilevel": { + "properties": { + "max_async_buffer_size": { + "default": 10000, + "description": "The maximum number of enqueued asynchronous operations allowed when backfilling cache items.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-buffer-size" + }, + "max_async_concurrency": { + "default": 3, + "description": "The maximum number of concurrent asynchronous operations can occur when backfilling cache items.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-concurrency" + }, + "max_backfill_items": { + "default": 10000, + "description": "The maximum number of items to backfill per asynchronous operation.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-backfill-items" + } + }, + "type": "object" + }, + "redis": { + "properties": { + "addresses": { + "description": "Comma separated list of redis addresses. Supported prefixes are: dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after that).", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.addresses" + }, + "cache_size": { + "default": 0, + "description": "If not zero then client-side caching is enabled. Client-side caching is when data is stored in memory instead of fetching data each time. See https://redis.io/docs/manual/client-side-caching/ for more info.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.cache-size" + }, + "db": { + "default": 0, + "description": "Database to be selected after connecting to the server.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.db" + }, + "dial_timeout": { + "default": "5s", + "description": "Client dial timeout.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.dial-timeout", + "x-format": "duration" + }, + "get_multi_batch_size": { + "default": 100, + "description": "The maximum size per batch for mget.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.get-multi-batch-size" + }, + "master_name": { + "description": "Specifies the master's name. Must be not empty for Redis Sentinel.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.master-name" + }, + "max_async_buffer_size": { + "default": 10000, + "description": "The maximum number of enqueued asynchronous operations allowed.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-buffer-size" + }, + "max_async_concurrency": { + "default": 3, + "description": "The maximum number of concurrent asynchronous operations can occur.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-concurrency" + }, + "max_get_multi_concurrency": { + "default": 100, + "description": "The maximum number of concurrent GetMulti() operations. If set to 0, concurrency is unlimited.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-get-multi-concurrency" + }, + "max_set_multi_concurrency": { + "default": 100, + "description": "The maximum number of concurrent SetMulti() operations. If set to 0, concurrency is unlimited.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-set-multi-concurrency" + }, + "password": { + "description": "Redis password.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.password" + }, + "read_timeout": { + "default": "3s", + "description": "Client read timeout.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.read-timeout", + "x-format": "duration" + }, + "set_async_circuit_breaker_config": { + "properties": { + "consecutive_failures": { + "default": 5, + "description": "Consecutive failures to determine if the circuit breaker should open.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.consecutive-failures" + }, + "enabled": { + "default": false, + "description": "If true, enable circuit breaker.", + "type": "boolean", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.enabled" + }, + "failure_percent": { + "default": 0.05, + "description": "Failure percentage to determine if the circuit breaker should open.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.failure-percent" + }, + "half_open_max_requests": { + "default": 10, + "description": "Maximum number of requests allowed to pass through when the circuit breaker is half-open. If set to 0, by default it allows 1 request.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.half-open-max-requests" + }, + "min_requests": { + "default": 50, + "description": "Minimal requests to trigger the circuit breaker.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.min-requests" + }, + "open_duration": { + "default": "5s", + "description": "Period of the open state after which the state of the circuit breaker becomes half-open. If set to 0, by default open duration is 60 seconds.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.open-duration", + "x-format": "duration" + } + }, + "type": "object" + }, + "set_multi_batch_size": { + "default": 100, + "description": "The maximum size per batch for pipeline set.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-multi-batch-size" + }, + "tls_ca_path": { + "description": "Path to the CA certificates file to validate server certificate against. If not set, the host's root CA certificates are used.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-ca-path" + }, + "tls_cert_path": { + "description": "Path to the client certificate file, which will be used for authenticating with the server. Also requires the key path to be configured.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-cert-path" + }, + "tls_enabled": { + "default": false, + "description": "Whether to enable tls for redis connection.", + "type": "boolean", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-enabled" + }, + "tls_insecure_skip_verify": { + "default": false, + "description": "Skip validating server certificate.", + "type": "boolean", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-insecure-skip-verify" + }, + "tls_key_path": { + "description": "Path to the key file for the client certificate. Also requires the client certificate to be configured.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-key-path" + }, + "tls_server_name": { + "description": "Override the expected name on the server certificate.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-server-name" + }, + "username": { + "description": "Redis username.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.username" + }, + "write_timeout": { + "default": "3s", + "description": "Client write timeout.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.write-timeout", + "x-format": "duration" + } + }, + "type": "object" + }, + "ttl": { + "default": "10m0s", + "description": "TTL for caching parquet row ranges.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.ttl", + "x-format": "duration" + } + }, + "type": "object" + }, "parquet_shard_cache_size": { "default": 512, "description": "[Experimental] Maximum size of the Parquet shard cache. 0 to disable.",