diff --git a/Cargo.lock b/Cargo.lock index 1abbf338b6..212ab4860f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3059,29 +3059,6 @@ dependencies = [ "serde", ] -[[package]] -name = "elasticsearch" -version = "9.1.0-alpha.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12bb303aa6e1d28c0c86b6fbfe484fd0fd3f512629aeed1ac4f6b85f81d9834a" -dependencies = [ - "base64 0.22.1", - "bytes", - "dyn-clone", - "flate2", - "lazy_static", - "parking_lot", - "percent-encoding", - "reqwest 0.12.24", - "rustc_version", - "serde", - "serde_json", - "serde_with", - "tokio", - "url", - "void", -] - [[package]] name = "elliptic-curve" version = "0.13.8" @@ -5275,7 +5252,6 @@ dependencies = [ "dotenv-build", "dotenvy", "either", - "elasticsearch", "eyre", "futures", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index af805f52e8..a7a9dbe04b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,6 @@ dotenv-build = "0.1.1" dotenvy = "0.15.7" dunce = "1.0.5" either = "1.15.0" -elasticsearch = "9.1.0-alpha.1" encoding_rs = "0.8.35" enumset = "1.1.10" eyre = "0.6.12" diff --git a/apps/labrinth/Cargo.toml b/apps/labrinth/Cargo.toml index 0632770a08..9d99dc5007 100644 --- a/apps/labrinth/Cargo.toml +++ b/apps/labrinth/Cargo.toml @@ -45,7 +45,6 @@ deadpool-redis.workspace = true derive_more = { workspace = true, features = ["deref", "deref_mut"] } dotenvy = { workspace = true } either = { workspace = true } -elasticsearch = { workspace = true, features = ["experimental-apis"] } eyre = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } diff --git a/apps/labrinth/src/env.rs b/apps/labrinth/src/env.rs index ab5c6165b8..e51add7e3b 100644 --- a/apps/labrinth/src/env.rs +++ b/apps/labrinth/src/env.rs @@ -120,42 +120,50 @@ impl FromStr for StringCsv { } vars! { - SENTRY_ENVIRONMENT: String; - SENTRY_TRACES_SAMPLE_RATE: f32; - SITE_URL: String; - CDN_URL: String; - LABRINTH_ADMIN_KEY: String; - LABRINTH_MEDAL_KEY: String; - LABRINTH_EXTERNAL_NOTIFICATION_KEY: String; - RATE_LIMIT_IGNORE_KEY: String; - DATABASE_URL: String; - REDIS_URL: String; - BIND_ADDR: String; - SELF_ADDR: String; - - LOCAL_INDEX_INTERVAL: u64; - VERSION_INDEX_INTERVAL: u64; - - WHITELISTED_MODPACK_DOMAINS: Json>; - ALLOWED_CALLBACK_URLS: Json>; - ANALYTICS_ALLOWED_ORIGINS: Json>; + SENTRY_ENVIRONMENT: String = "development"; + SENTRY_TRACES_SAMPLE_RATE: f32 = 0.1f32; + SITE_URL: String = "http://localhost:3000"; + CDN_URL: String = "file:///tmp/modrinth"; + LABRINTH_ADMIN_KEY: String = "feedbeef"; + LABRINTH_MEDAL_KEY: String = ""; + LABRINTH_EXTERNAL_NOTIFICATION_KEY: String = "beeffeed"; + RATE_LIMIT_IGNORE_KEY: String = "feedbeef"; + DATABASE_URL: String = "postgresql://labrinth:labrinth@localhost/labrinth"; + REDIS_URL: String = "redis://localhost"; + BIND_ADDR: String = "0.0.0.0:8000"; + SELF_ADDR: String = "http://127.0.0.1:8000"; + + LOCAL_INDEX_INTERVAL: u64 = 3600u64; + VERSION_INDEX_INTERVAL: u64 = 1800u64; + + WHITELISTED_MODPACK_DOMAINS: Json> = Json(vec![ + "cdn.modrinth.com".into(), + "github.com".into(), + "raw.githubusercontent.com".into(), + ]); + ALLOWED_CALLBACK_URLS: Json> = Json(vec![ + "localhost".into(), + ".modrinth.com".into(), + "127.0.0.1".into(), + "[::1]".into(), + ]); + ANALYTICS_ALLOWED_ORIGINS: Json> = Json(vec![ + "http://127.0.0.1:3000".into(), + "http://localhost:3000".into(), + "https://modrinth.com".into(), + "https://www.modrinth.com".into(), + "*".into(), + ]); // search SEARCH_BACKEND: crate::search::SearchBackendKind = crate::search::SearchBackendKind::Typesense; - MEILISEARCH_READ_ADDR: String; - MEILISEARCH_WRITE_ADDRS: StringCsv; - MEILISEARCH_KEY: String; - ELASTICSEARCH_URL: String; - ELASTICSEARCH_INDEX_PREFIX: String; - ELASTICSEARCH_USERNAME: String = ""; - ELASTICSEARCH_PASSWORD: String = ""; SEARCH_INDEX_CHUNK_SIZE: i64 = 5000i64; TYPESENSE_URL: String = "http://localhost:8108"; TYPESENSE_API_KEY: String = "modrinth"; TYPESENSE_INDEX_PREFIX: String = "labrinth"; // storage - STORAGE_BACKEND: crate::file_hosting::FileHostKind; + STORAGE_BACKEND: crate::file_hosting::FileHostKind = crate::file_hosting::FileHostKind::Local; // s3 S3_PUBLIC_BUCKET_NAME: String = ""; @@ -173,98 +181,98 @@ vars! { S3_PRIVATE_SECRET: String = ""; // local - MOCK_FILE_PATH: String = ""; - - GITHUB_CLIENT_ID: String; - GITHUB_CLIENT_SECRET: String; - GITLAB_CLIENT_ID: String; - GITLAB_CLIENT_SECRET: String; - DISCORD_CLIENT_ID: String; - DISCORD_CLIENT_SECRET: String; - MICROSOFT_CLIENT_ID: String; - MICROSOFT_CLIENT_SECRET: String; - GOOGLE_CLIENT_ID: String; - GOOGLE_CLIENT_SECRET: String; - STEAM_API_KEY: String; - - TREMENDOUS_API_URL: String; - TREMENDOUS_API_KEY: String; - TREMENDOUS_PRIVATE_KEY: String; - - PAYPAL_API_URL: String; - PAYPAL_WEBHOOK_ID: String; - PAYPAL_CLIENT_ID: String; - PAYPAL_CLIENT_SECRET: String; - PAYPAL_NVP_USERNAME: String; - PAYPAL_NVP_PASSWORD: String; - PAYPAL_NVP_SIGNATURE: String; + MOCK_FILE_PATH: String = "/tmp/modrinth"; + + GITHUB_CLIENT_ID: String = "none"; + GITHUB_CLIENT_SECRET: String = "none"; + GITLAB_CLIENT_ID: String = "none"; + GITLAB_CLIENT_SECRET: String = "none"; + DISCORD_CLIENT_ID: String = "none"; + DISCORD_CLIENT_SECRET: String = "none"; + MICROSOFT_CLIENT_ID: String = "none"; + MICROSOFT_CLIENT_SECRET: String = "none"; + GOOGLE_CLIENT_ID: String = "none"; + GOOGLE_CLIENT_SECRET: String = "none"; + STEAM_API_KEY: String = "none"; + + TREMENDOUS_API_URL: String = "https://testflight.tremendous.com/api/v2/"; + TREMENDOUS_API_KEY: String = "none"; + TREMENDOUS_PRIVATE_KEY: String = "none"; + + PAYPAL_API_URL: String = "https://api-m.sandbox.paypal.com/v1/"; + PAYPAL_WEBHOOK_ID: String = "none"; + PAYPAL_CLIENT_ID: String = "none"; + PAYPAL_CLIENT_SECRET: String = "none"; + PAYPAL_NVP_USERNAME: String = "none"; + PAYPAL_NVP_PASSWORD: String = "none"; + PAYPAL_NVP_SIGNATURE: String = "none"; PAYPAL_BALANCE_ALERT_THRESHOLD: u64 = 0u64; BREX_BALANCE_ALERT_THRESHOLD: u64 = 0u64; TREMENDOUS_BALANCE_ALERT_THRESHOLD: u64 = 0u64; MURAL_BALANCE_ALERT_THRESHOLD: u64 = 0u64; - HCAPTCHA_SECRET: String; + HCAPTCHA_SECRET: String = "none"; - SMTP_USERNAME: String; - SMTP_PASSWORD: String; - SMTP_HOST: String; - SMTP_PORT: u16; - SMTP_TLS: String; - SMTP_FROM_NAME: String; - SMTP_FROM_ADDRESS: String; + SMTP_USERNAME: String = ""; + SMTP_PASSWORD: String = ""; + SMTP_HOST: String = "localhost"; + SMTP_PORT: u16 = 1025u16; + SMTP_TLS: String = "none"; + SMTP_FROM_NAME: String = "Modrinth"; + SMTP_FROM_ADDRESS: String = "no-reply@mail.modrinth.com"; - SITE_VERIFY_EMAIL_PATH: String; - SITE_RESET_PASSWORD_PATH: String; - SITE_BILLING_PATH: String; + SITE_VERIFY_EMAIL_PATH: String = "auth/verify-email"; + SITE_RESET_PASSWORD_PATH: String = "auth/reset-password"; + SITE_BILLING_PATH: String = "none"; - SENDY_URL: String; - SENDY_LIST_ID: String; - SENDY_API_KEY: String; + SENDY_URL: String = "none"; + SENDY_LIST_ID: String = "none"; + SENDY_API_KEY: String = "none"; - CLICKHOUSE_REPLICATED: bool; - CLICKHOUSE_URL: String; - CLICKHOUSE_USER: String; - CLICKHOUSE_PASSWORD: String; - CLICKHOUSE_DATABASE: String; + CLICKHOUSE_REPLICATED: bool = false; + CLICKHOUSE_URL: String = "http://localhost:8123"; + CLICKHOUSE_USER: String = "default"; + CLICKHOUSE_PASSWORD: String = "default"; + CLICKHOUSE_DATABASE: String = "staging_ariadne"; - FLAME_ANVIL_URL: String; + FLAME_ANVIL_URL: String = "none"; - GOTENBERG_URL: String; - GOTENBERG_CALLBACK_BASE: String; - GOTENBERG_TIMEOUT: u64; + GOTENBERG_URL: String = "http://localhost:13000"; + GOTENBERG_CALLBACK_BASE: String = "http://host.docker.internal:8000/_internal/gotenberg"; + GOTENBERG_TIMEOUT: u64 = 30000u64; - STRIPE_API_KEY: String; - STRIPE_WEBHOOK_SECRET: String; + STRIPE_API_KEY: String = "none"; + STRIPE_WEBHOOK_SECRET: String = "none"; - ADITUDE_API_KEY: String; + ADITUDE_API_KEY: String = "none"; - PYRO_API_KEY: String; + PYRO_API_KEY: String = "none"; - BREX_API_URL: String; - BREX_API_KEY: String; + BREX_API_URL: String = "https://platform.brexapis.com/v2/"; + BREX_API_KEY: String = "none"; - DELPHI_URL: String; + DELPHI_URL: String = ""; - AVALARA_1099_API_URL: String; - AVALARA_1099_API_KEY: String; - AVALARA_1099_API_TEAM_ID: String; - AVALARA_1099_COMPANY_ID: String; + AVALARA_1099_API_URL: String = "https://www.track1099.com/api"; + AVALARA_1099_API_KEY: String = "none"; + AVALARA_1099_API_TEAM_ID: String = "none"; + AVALARA_1099_COMPANY_ID: String = "207337084"; - ANROK_API_URL: String; - ANROK_API_KEY: String; + ANROK_API_URL: String = ""; + ANROK_API_KEY: String = ""; - PAYOUT_ALERT_SLACK_WEBHOOK: String; + PAYOUT_ALERT_SLACK_WEBHOOK: String = "none"; CLOUDFLARE_INTEGRATION: bool = false; - ARCHON_URL: String; + ARCHON_URL: String = ""; - MURALPAY_API_URL: String; - MURALPAY_API_KEY: String; - MURALPAY_TRANSFER_API_KEY: String; + MURALPAY_API_URL: String = "https://api-staging.muralpay.com"; + MURALPAY_API_KEY: String = "none"; + MURALPAY_TRANSFER_API_KEY: String = "none"; MURALPAY_SOURCE_ACCOUNT_ID: muralpay::AccountId = muralpay::AccountId(uuid::Uuid::nil()); - DEFAULT_AFFILIATE_REVENUE_SPLIT: Decimal; + DEFAULT_AFFILIATE_REVENUE_SPLIT: Decimal = Decimal::new(1, 1); DATABASE_ACQUIRE_TIMEOUT_MS: u64 = 30000u64; DATABASE_MIN_CONNECTIONS: u32 = 0u32; @@ -286,7 +294,7 @@ vars! { MODERATION_SLACK_WEBHOOK: String = ""; DELPHI_SLACK_WEBHOOK: String = ""; - TREMENDOUS_CAMPAIGN_ID: String = ""; + TREMENDOUS_CAMPAIGN_ID: String = "none"; // server pinging SERVER_PING_MAX_CONCURRENT: usize = 16usize; diff --git a/apps/labrinth/src/search/backend/elasticsearch/mod.rs b/apps/labrinth/src/search/backend/elasticsearch/mod.rs deleted file mode 100644 index 5c8cb8612e..0000000000 --- a/apps/labrinth/src/search/backend/elasticsearch/mod.rs +++ /dev/null @@ -1,1403 +0,0 @@ -use crate::database::PgPool; -use crate::database::redis::RedisPool; -use crate::env::ENV; -use crate::models::ids::VersionId; -use crate::routes::ApiError; -use crate::search::backend::{ - SearchIndex, SearchIndexName, combined_search_filters, parse_search_index, - parse_search_request, -}; -use crate::search::indexing::index_local; -use crate::search::{ - ResultSearchProject, SearchBackend, SearchField, SearchRequest, - SearchResults, TasksCancelFilter, UploadSearchProject, -}; -use crate::util::error::Context; -use ariadne::ids::base62_impl::to_base62; -use async_trait::async_trait; -use elasticsearch::auth::Credentials; -use elasticsearch::http::Url; -use elasticsearch::http::request::JsonBody; -use elasticsearch::http::response::Response; -use elasticsearch::http::transport::{ - SingleNodeConnectionPool, TransportBuilder, -}; -use elasticsearch::indices::{ - IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts, - IndicesGetAliasParts, IndicesRefreshParts, -}; -use elasticsearch::params::Refresh; -use elasticsearch::tasks::TasksCancelParts; -use elasticsearch::{ - BulkParts, DeleteByQueryParts, Elasticsearch as EsClient, SearchParts, -}; -use eyre::eyre; -use regex::Regex; -use reqwest::StatusCode; -use serde::{Deserialize, Serialize}; -use serde_json::{Value, json}; -use std::borrow::Cow; -use std::collections::HashMap; -use std::sync::LazyLock; -use std::time::Duration; - -#[derive(Debug, Clone)] -pub struct ElasticsearchConfig { - pub url: String, - pub index_prefix: String, - pub meta_namespace: String, - pub username: String, - pub password: String, -} - -impl ElasticsearchConfig { - pub fn new(meta_namespace: Option) -> Self { - Self { - url: ENV.ELASTICSEARCH_URL.clone(), - index_prefix: ENV.ELASTICSEARCH_INDEX_PREFIX.clone(), - meta_namespace: meta_namespace.unwrap_or_default(), - username: ENV.ELASTICSEARCH_USERNAME.clone(), - password: ENV.ELASTICSEARCH_PASSWORD.clone(), - } - } - - pub fn get_index_name(&self, index: &str) -> String { - if self.meta_namespace.is_empty() { - format!("{}_{}", self.index_prefix, index) - } else { - format!("{}_{}_{}", self.meta_namespace, self.index_prefix, index) - } - } -} - -pub struct Elasticsearch { - pub config: ElasticsearchConfig, - pub client: EsClient, -} - -#[derive(Serialize, Deserialize, Debug, Clone, Default)] -pub struct RequestConfig { - #[serde(default)] - pub multi_match: MultiMatchConfig, - #[serde(default)] - pub sort: Vec, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct MultiMatchConfig { - #[serde(default = "default_multi_match_type", rename = "type")] - pub ty: String, - #[serde(default = "default_multi_match_fields")] - pub fields: Vec, - #[serde(flatten, default)] - pub extra: HashMap, -} - -impl Default for MultiMatchConfig { - fn default() -> Self { - Self { - ty: default_multi_match_type(), - fields: default_multi_match_fields(), - extra: HashMap::new(), - } - } -} - -fn default_multi_match_type() -> String { - "bool_prefix".to_string() -} - -fn default_multi_match_fields() -> Vec { - vec![ - "name^8".to_string(), - "name._2gram^8".to_string(), - "name._3gram^8".to_string(), - "slug^8".to_string(), - "slug._2gram^8".to_string(), - "slug._3gram^8".to_string(), - "author^2".to_string(), - "author._2gram^2".to_string(), - "author._3gram^2".to_string(), - "summary^3".to_string(), - "summary._2gram^3".to_string(), - "summary._3gram^3".to_string(), - ] -} - -pub struct ElasticsearchFieldSpec { - pub path: &'static str, - pub mapping: Value, -} - -impl SearchField { - pub fn elasticsearch_spec(self) -> ElasticsearchFieldSpec { - match self { - SearchField::Categories => ElasticsearchFieldSpec { - path: "categories", - mapping: json!({ "type": "keyword" }), - }, - SearchField::Name => ElasticsearchFieldSpec { - path: "name", - mapping: json!({ "type": "search_as_you_type" }), - }, - SearchField::Author => ElasticsearchFieldSpec { - path: "author", - mapping: json!({ - "type": "search_as_you_type", - "fields": { "keyword": { "type": "keyword" } } - }), - }, - SearchField::License => ElasticsearchFieldSpec { - path: "license", - mapping: json!({ "type": "keyword" }), - }, - SearchField::ProjectTypes => ElasticsearchFieldSpec { - path: "project_types", - mapping: json!({ "type": "keyword" }), - }, - SearchField::ProjectId => ElasticsearchFieldSpec { - path: "project_id", - mapping: json!({ "type": "keyword" }), - }, - SearchField::OpenSource => ElasticsearchFieldSpec { - path: "open_source", - mapping: json!({ "type": "boolean" }), - }, - SearchField::Environment => ElasticsearchFieldSpec { - path: "environment", - mapping: json!({ "type": "keyword" }), - }, - SearchField::GameVersions => ElasticsearchFieldSpec { - path: "game_versions", - mapping: json!({ "type": "keyword" }), - }, - SearchField::ClientSide => ElasticsearchFieldSpec { - path: "client_side", - mapping: json!({ "type": "keyword" }), - }, - SearchField::ServerSide => ElasticsearchFieldSpec { - path: "server_side", - mapping: json!({ "type": "keyword" }), - }, - SearchField::MinecraftServerRegion => ElasticsearchFieldSpec { - path: "minecraft_server.region", - mapping: json!({ "type": "keyword" }), - }, - SearchField::MinecraftServerLanguages => ElasticsearchFieldSpec { - path: "minecraft_server.languages", - mapping: json!({ "type": "keyword" }), - }, - SearchField::MinecraftJavaServerContentKind => { - ElasticsearchFieldSpec { - path: "minecraft_java_server.content.kind", - mapping: json!({ "type": "keyword" }), - } - } - SearchField::MinecraftJavaServerContentSupportedGameVersions => { - ElasticsearchFieldSpec { - path: "minecraft_java_server.content.supported_game_versions", - mapping: json!({ "type": "keyword" }), - } - } - SearchField::MinecraftJavaServerPingData => { - ElasticsearchFieldSpec { - path: "minecraft_java_server.ping.data", - mapping: json!({ "type": "object" }), - } - } - } - } -} - -static ELASTICSEARCH_PROPERTIES: LazyLock> = - LazyLock::new(|| { - use strum::IntoEnumIterator; - - let mut properties = serde_json::Map::from_iter([ - ("version_id".to_string(), json!({ "type": "keyword" })), - ( - "slug".to_string(), - json!({ - "type": "search_as_you_type", - "fields": { "keyword": { "type": "keyword" } } - }), - ), - ( - "author".to_string(), - json!({ - "type": "search_as_you_type", - "fields": { "keyword": { "type": "keyword" } } - }), - ), - ("name".to_string(), json!({ "type": "search_as_you_type" })), - ( - "summary".to_string(), - json!({ "type": "search_as_you_type" }), - ), - ( - "display_categories".to_string(), - json!({ "type": "keyword" }), - ), - ("downloads".to_string(), json!({ "type": "integer" })), - ("follows".to_string(), json!({ "type": "integer" })), - ("date_created".to_string(), json!({ "type": "date" })), - ("created_timestamp".to_string(), json!({ "type": "long" })), - ("date_modified".to_string(), json!({ "type": "date" })), - ("modified_timestamp".to_string(), json!({ "type": "long" })), - ( - "version_published_timestamp".to_string(), - json!({ "type": "long" }), - ), - ("license".to_string(), json!({ "type": "keyword" })), - ("loaders".to_string(), json!({ "type": "keyword" })), - ("color".to_string(), json!({ "type": "long" })), - ("environment".to_string(), json!({ "type": "keyword" })), - ("mrpack_loaders".to_string(), json!({ "type": "keyword" })), - ( - "minecraft_server.country".to_string(), - json!({ "type": "keyword" }), - ), - ]); - - for field in SearchField::iter() { - let spec = field.elasticsearch_spec(); - insert_nested_mapping(&mut properties, spec.path, spec.mapping); - } - - insert_nested_mapping( - &mut properties, - "minecraft_java_server.content.recommended_game_version", - json!({ "type": "keyword" }), - ); - insert_nested_mapping( - &mut properties, - "minecraft_java_server.verified_plays_2w", - json!({ "type": "long" }), - ); - insert_nested_mapping( - &mut properties, - "minecraft_java_server.verified_plays_4w", - json!({ "type": "long" }), - ); - insert_nested_mapping( - &mut properties, - "minecraft_java_server.is_online", - json!({ "type": "boolean" }), - ); - insert_nested_mapping( - &mut properties, - "minecraft_java_server.ping.data.players_online", - json!({ "type": "long" }), - ); - - let minecraft_java_server = properties - .remove("minecraft_java_server") - .unwrap_or_else(|| json!({ "properties": {} })); - - properties.insert( - "minecraft_java_server".to_string(), - json!({ - "type": "object", - "properties": merge_object_properties(minecraft_java_server) - }), - ); - - properties - }); - -fn merge_object_properties(value: Value) -> Value { - match value { - Value::Object(mut obj) => { - obj.remove("properties").unwrap_or_else(|| json!({})) - } - _ => json!({}), - } -} - -fn insert_nested_mapping( - properties: &mut serde_json::Map, - path: &str, - mapping: Value, -) { - let mut parts = path.split('.'); - let Some(first) = parts.next() else { - return; - }; - - if let Some(rest) = parts.next() { - let remaining = std::iter::once(rest) - .chain(parts) - .collect::>() - .join("."); - let entry = properties - .entry(first.to_string()) - .or_insert_with(|| json!({ "type": "object", "properties": {} })); - let obj = entry.as_object_mut().expect("object mapping"); - let nested = obj - .entry("properties".to_string()) - .or_insert_with(|| json!({})); - let nested = nested.as_object_mut().expect("nested properties object"); - insert_nested_mapping(nested, &remaining, mapping); - } else { - properties.insert(first.to_string(), mapping); - } -} - -impl Elasticsearch { - fn multi_match_query( - query_text: &str, - request_config: &RequestConfig, - ) -> Value { - let mut multi_match = serde_json::Map::from_iter([ - ("query".to_string(), Value::String(query_text.to_string())), - ( - "type".to_string(), - Value::String(request_config.multi_match.ty.clone()), - ), - ( - "fields".to_string(), - Value::Array( - request_config - .multi_match - .fields - .clone() - .into_iter() - .map(Value::String) - .collect(), - ), - ), - ]); - - for (key, value) in &request_config.multi_match.extra { - if key != "query" { - multi_match.insert(key.clone(), value.clone()); - } - } - - json!({ "multi_match": Value::Object(multi_match) }) - } - - fn escape_query_string_value(value: &str) -> String { - const RESERVED: [char; 21] = [ - '+', '-', '=', '&', '|', '>', '<', '!', '(', ')', '{', '}', '[', - ']', '^', '"', '~', '*', '?', ':', '\\', - ]; - - let mut escaped = String::with_capacity(value.len()); - for ch in value.chars() { - if RESERVED.contains(&ch) || ch == '/' { - escaped.push('\\'); - } - escaped.push(ch); - } - escaped - } - - fn normalize_meili_filter_syntax(filters: &str) -> String { - static IN_FILTER_RE: LazyLock = LazyLock::new(|| { - Regex::new( - r"(?i)\b([a-zA-Z_][a-zA-Z0-9_]*)\s+(NOT\s+)?IN\s*\[([^\]]*)\]", - ) - .expect("valid regex") - }); - - IN_FILTER_RE - .replace_all(filters, |captures: ®ex::Captures<'_>| { - let field = - captures.get(1).map(|m| m.as_str()).unwrap_or_default(); - let is_not = captures.get(2).is_some(); - let list = captures - .get(3) - .map(|m| m.as_str()) - .unwrap_or_default() - .split(',') - .map(str::trim) - .filter(|value| !value.is_empty()) - .map(Self::escape_query_string_value) - .collect::>(); - - if list.is_empty() { - captures - .get(0) - .map(|m| m.as_str()) - .unwrap_or_default() - .to_string() - } else if is_not { - format!("NOT {field}:({})", list.join(" OR ")) - } else { - format!("{field}:({})", list.join(" OR ")) - } - }) - .into_owned() - } - - fn get_next_index_name(&self, alias_name: &str, next: bool) -> String { - if next { - format!("{alias_name}__alt") - } else { - format!("{alias_name}__current") - } - } - - fn get_index_candidates(&self, alias_name: &str) -> [String; 3] { - [ - alias_name.to_string(), - self.get_next_index_name(alias_name, false), - self.get_next_index_name(alias_name, true), - ] - } - - fn parse_condition_query(condition: &str) -> Value { - let (field, value, negative) = - if let Some((f, v)) = condition.split_once("!=") { - (f.trim(), v.trim(), true) - } else if let Some((f, v)) = condition.split_once(':') { - (f.trim(), v.trim(), false) - } else if let Some((f, v)) = condition.split_once('=') { - (f.trim(), v.trim(), false) - } else { - ("", "", false) - }; - - let field = match field { - "project_type" => "project_types", - "title" => "name", - _ => field, - }; - let clause = match field { - // Search text fields are analyzed; phrase matching aligns with - // Meilisearch behavior for quoted/multi-word values. - "name" | "summary" | "author" | "slug" => json!({ - "match_phrase": { - field: { - "value": value, - "case_insensitive": true - } - } - }), - "categories" | "license" | "project_types" | "project_id" - | "environment" | "game_versions" | "mrpack_loaders" - | "client_side" | "server_side" => json!({ - "term": { - field: { - "value": value, - "case_insensitive": true - } - } - }), - _ => json!({ - "term": { - field: value - } - }), - }; - - if negative { - json!({ - "bool": { - "must_not": [clause] - } - }) - } else { - clause - } - } - - fn facets_filter_clauses( - facets_json: Option<&str>, - ) -> Result, ApiError> { - let Some(raw_facets) = facets_json else { - return Ok(Vec::new()); - }; - - let facets = serde_json::from_str::>>(raw_facets) - .wrap_request_err("failed to parse facets")?; - - let facets = facets - .into_iter() - .map(|facet_group| { - facet_group - .into_iter() - .map(|facet| { - if facet.is_array() { - serde_json::from_value::>(facet) - .unwrap_or_default() - } else { - vec![ - serde_json::from_value::(facet) - .unwrap_or_default(), - ] - } - }) - .collect::>>() - }) - .collect::>>>(); - - let mut clauses = Vec::new(); - for or_group in facets { - let should = or_group - .into_iter() - .map(|and_group| { - let mut must = Vec::new(); - let mut must_not = Vec::new(); - for condition in and_group { - let q = Self::parse_condition_query(&condition); - if q.get("bool") - .and_then(|b| b.get("must_not")) - .is_some() - { - if let Some(parts) = - q["bool"]["must_not"].as_array() - { - must_not.extend(parts.iter().cloned()); - } - } else { - must.push(q); - } - } - - json!({ - "bool": { - "must": must, - "must_not": must_not - } - }) - }) - .collect::>(); - - clauses.push(json!({ - "bool": { - "should": should, - "minimum_should_match": 1 - } - })); - } - - Ok(clauses) - } - - pub fn new(meta_namespace: Option) -> eyre::Result { - let config = ElasticsearchConfig::new(meta_namespace); - let url = Url::parse(&config.url) - .wrap_err("failed to parse Elasticsearch URL")?; - let mut builder = - TransportBuilder::new(SingleNodeConnectionPool::new(url)); - - let has_basic_username = !config.username.trim().is_empty(); - let has_basic_password = !config.password.trim().is_empty(); - if has_basic_username || has_basic_password { - if !has_basic_username || !has_basic_password { - return Err(eyre!( - "Elasticsearch basic auth requires both `ELASTICSEARCH_USERNAME` and `ELASTICSEARCH_PASSWORD`" - )); - } - builder = builder.auth(Credentials::Basic( - config.username.clone(), - config.password.clone(), - )); - } - - let transport = builder - .build() - .wrap_err("failed to create Elasticsearch transport")?; - let client = EsClient::new(transport); - - Ok(Self { config, client }) - } - - fn get_sort_index( - &self, - index: &str, - new_filters: Option<&str>, - request_config: &RequestConfig, - ) -> Result<(String, Value), ApiError> { - let sort = parse_search_index(index, new_filters)?; - let index_name = match sort.index_name { - SearchIndexName::Projects => self.config.get_index_name("projects"), - SearchIndexName::ProjectsFiltered => { - self.config.get_index_name("projects_filtered") - } - }; - - let default_sort = match sort.index { - SearchIndex::Relevance => ( - index_name, - json!([ - { "_score": { "order": "desc" } }, - { "downloads": { "order": "desc" } }, - { "version_published_timestamp": { "order": "desc" } } - ]), - ), - SearchIndex::Downloads => ( - index_name, - json!([ - { "downloads": { "order": "desc" } }, - { "version_published_timestamp": { "order": "desc" } } - ]), - ), - SearchIndex::Follows => ( - index_name, - json!([ - { "follows": { "order": "desc" } }, - { "version_published_timestamp": { "order": "desc" } } - ]), - ), - SearchIndex::Updated => ( - index_name, - json!([ - { "date_modified": { "order": "desc" } }, - { "version_published_timestamp": { "order": "desc" } } - ]), - ), - SearchIndex::Newest => ( - index_name, - json!([ - { "date_created": { "order": "desc" } }, - { "version_published_timestamp": { "order": "desc" } } - ]), - ), - SearchIndex::MinecraftJavaServerVerifiedPlays2w => ( - index_name, - json!([ - { - "minecraft_java_server.verified_plays_2w": { - "order": "desc" - } - }, - { - "minecraft_java_server.ping.data.players_online": { - "order": "desc" - } - }, - { "version_published_timestamp": { "order": "desc" } } - ]), - ), - SearchIndex::MinecraftJavaServerPlayersOnline => ( - index_name, - json!([ - { - "minecraft_java_server.ping.data.players_online": { - "order": "desc" - } - }, - { "version_published_timestamp": { "order": "desc" } } - ]), - ), - }; - - if request_config.sort.is_empty() { - Ok(default_sort) - } else { - Ok((default_sort.0, Value::Array(request_config.sort.clone()))) - } - } - - async fn ensure_index(&self, index_name: &str) -> Result<(), ApiError> { - let exists = self - .client - .indices() - .exists(IndicesExistsParts::Index(&[index_name])) - .send() - .await - .wrap_internal_err( - "failed to check Elasticsearch index existence", - )?; - - if exists.status_code().is_success() { - return Ok(()); - } - - let response = self - .client - .indices() - .create(IndicesCreateParts::Index(index_name)) - .body(json!({ - "mappings": { - "dynamic": true, - "properties": Value::Object(ELASTICSEARCH_PROPERTIES.clone()) - } - })) - .send() - .await - .wrap_internal_err("failed to create Elasticsearch index")?; - - if response.status_code().is_success() { - Ok(()) - } else { - let body = - response.json::().await.unwrap_or_else(|_| json!({})); - Err(ApiError::Internal(eyre!( - "failed to create Elasticsearch index `{index_name}`: {body}" - ))) - } - } - - async fn delete_index_if_exists( - &self, - index_name: &str, - ) -> Result<(), ApiError> { - let delete = self - .client - .indices() - .delete(IndicesDeleteParts::Index(&[index_name])) - .send() - .await - .wrap_internal_err("failed to delete Elasticsearch index")?; - - let success_or_not_found = delete.status_code().is_success() - || delete.status_code() == StatusCode::NOT_FOUND; - - if !success_or_not_found { - let body = - delete.json::().await.unwrap_or_else(|_| json!({})); - return Err(ApiError::Internal(eyre!( - "failed to delete Elasticsearch index `{index_name}`: {body}" - ))); - } - - Ok(()) - } - - async fn get_alias_target( - &self, - alias_name: &str, - ) -> Result, ApiError> { - let response = self - .client - .indices() - .get_alias(IndicesGetAliasParts::Name(&[alias_name])) - .send() - .await - .wrap_internal_err("failed to get Elasticsearch alias")?; - - if response.status_code() == StatusCode::NOT_FOUND { - return Ok(None); - } - - if !response.status_code().is_success() { - let body = - response.json::().await.unwrap_or_else(|_| json!({})); - return Err(ApiError::Internal(eyre!( - "failed to get Elasticsearch alias `{alias_name}`: {body}" - ))); - } - - let body = response.json::().await.wrap_internal_err( - "failed to parse Elasticsearch alias response", - )?; - Ok(body - .as_object() - .and_then(|x| x.keys().next().cloned()) - .filter(|x| !x.is_empty())) - } - - async fn index_exists(&self, index_name: &str) -> Result { - let exists = self - .client - .indices() - .exists(IndicesExistsParts::Index(&[index_name])) - .send() - .await - .wrap_internal_err( - "failed to check Elasticsearch index existence", - )?; - Ok(exists.status_code().is_success()) - } - - async fn swap_alias( - &self, - alias_name: &str, - next_index: &str, - current_index: Option<&str>, - ) -> Result<(), ApiError> { - let mut actions = vec![json!({ - "add": { - "index": next_index, - "alias": alias_name - } - })]; - if let Some(current_index) = current_index { - actions.push(json!({ - "remove": { - "index": current_index, - "alias": alias_name - } - })); - } - - let response = self - .client - .indices() - .update_aliases() - .body(json!({ "actions": actions })) - .send() - .await - .wrap_internal_err("failed to update Elasticsearch aliases")?; - - if !response.status_code().is_success() { - let body = - response.json::().await.unwrap_or_else(|_| json!({})); - return Err(ApiError::Internal(eyre!( - "failed to swap Elasticsearch alias `{alias_name}`: {body}" - ))); - } - - Ok(()) - } - - async fn bulk_index_documents( - &self, - index_name: &str, - docs: &[crate::search::UploadSearchProject], - ) -> Result<(), ApiError> { - if docs.is_empty() { - return Ok(()); - } - - let mut body: Vec> = Vec::with_capacity(docs.len() * 2); - for doc in docs { - body.push(json!({"index": {"_id": doc.version_id}}).into()); - body.push( - serde_json::to_value(doc) - .wrap_internal_err("failed to serialize document for Elasticsearch bulk index")? - .into(), - ); - } - - let response = self - .client - .bulk(BulkParts::Index(index_name)) - .refresh(Refresh::WaitFor) - .body(body) - .send() - .await - .wrap_internal_err( - "failed to request bulk index Elasticsearch documents", - )? - .error_for_status_code() - .wrap_internal_err( - "failed to bulk index Elasticsearch documents", - )?; - - self.ensure_no_errors(response, "bulk index").await - } - - async fn ensure_no_errors( - &self, - resp: Response, - action: &str, - ) -> Result<(), ApiError> { - let body = resp - .json::() - .await - .wrap_internal_err("failed to parse Elasticsearch response")?; - if body.get("errors").and_then(Value::as_bool).unwrap_or(false) { - return Err(ApiError::Internal(eyre!( - "Elasticsearch `{action}` reported partial failures: {body}" - ))); - } - - Ok(()) - } - - fn meili_like_filters(info: &SearchRequest) -> Option> { - let raw = combined_search_filters(info)?; - - Some(Self::normalize_meili_filter_syntax(&raw).into()) - } -} - -#[async_trait] -impl SearchBackend for Elasticsearch { - async fn search_for_project_raw( - &self, - info: &SearchRequest, - ) -> Result { - let parsed = parse_search_request(info)?; - let request_config = &info.elasticsearch_config; - let (index_name, sort) = self.get_sort_index( - parsed.index, - info.new_filters.as_deref(), - request_config, - )?; - let include_metadata = info.show_metadata; - - let mut must = Vec::new(); - let query_text = parsed.query.trim(); - if query_text.is_empty() { - must.push(json!({"match_all": {}})); - } else { - must.push(Self::multi_match_query(query_text, request_config)); - } - - let mut filter = Self::facets_filter_clauses(info.facets.as_deref())?; - if let Some(filter_string) = Self::meili_like_filters(info) - && !filter_string.trim().is_empty() - { - filter.push(json!({ - "query_string": { - "query": filter_string, - "default_operator": "AND", - "lenient": true - } - })); - } - - let response = self - .client - .search(SearchParts::Index(&[index_name.as_str()])) - .from(parsed.offset as i64) - .size(parsed.hits_per_page as i64) - .track_total_hits(true) - .body(json!({ - "query": { - "bool": { - "must": must, - "filter": filter - } - }, - "collapse": { - "field": "project_id" - }, - "sort": sort, - "track_scores": include_metadata, - "explain": include_metadata, - "profile": include_metadata, - "aggs": { - "unique_projects": { - "cardinality": { - "field": "project_id", - } - } - } - })) - .send() - .await - .wrap_internal_err("failed to execute Elasticsearch search")?; - - if let Err(err) = response.error_for_status_code_ref() { - let err = eyre!(err); - let err = match response.json::().await { - Ok(json) => err.wrap_err(eyre!( - "search request failed: {}", - serde_json::to_string_pretty(&json).unwrap() - )), - Err(_) => err.wrap_err("search request failed"), - }; - return Err(ApiError::Internal(err)); - } - - let response_body = response.json::().await.wrap_internal_err( - "failed to parse Elasticsearch search response", - )?; - - let hits = response_body["hits"]["hits"] - .as_array() - .cloned() - .unwrap_or_default() - .into_iter() - .map(|hit| -> Result, ApiError> { - let Some(source) = hit.get("_source").cloned() else { - return Ok(None); - }; - - let metadata = include_metadata.then(|| { - let mut metadata = serde_json::Map::new(); - - if let Some(score) = hit.get("_score") { - metadata.insert("_score".to_string(), score.clone()); - } - if let Some(sort) = hit.get("sort") { - metadata.insert("sort".to_string(), sort.clone()); - } - if let Some(explanation) = hit.get("_explanation") { - metadata.insert( - "_explanation".to_string(), - explanation.clone(), - ); - } - - Value::Object(metadata) - }); - - serde_json::from_value::(source) - .wrap_internal_err( - "failed to deserialize Elasticsearch hit", - ) - .map(|project| { - let mut result: ResultSearchProject = project.into(); - result.search_metadata = metadata; - Some(result) - }) - }) - .collect::, ApiError>>()? - .into_iter() - .flatten() - .collect::>(); - - let total_hits = response_body.get("aggregations") - .and_then(|aggs| aggs.get("unique_projects")) - .and_then(|unique| unique.get("value")) - .and_then(Value::as_u64) - .map(|v| v as usize) - .wrap_internal_err("missing `aggregations.unique_projects.value` in Elasticsearch response")?; - - Ok(SearchResults { - hits, - page: parsed.page, - hits_per_page: parsed.hits_per_page, - total_hits, - }) - } - - async fn index_projects( - &self, - ro_pool: PgPool, - redis: RedisPool, - ) -> eyre::Result<()> { - let projects_alias = self.config.get_index_name("projects"); - let filtered_alias = self.config.get_index_name("projects_filtered"); - - let projects_current = self.get_alias_target(&projects_alias).await?; - let filtered_current = self.get_alias_target(&filtered_alias).await?; - let projects_legacy_current = projects_current.is_none() - && self.index_exists(&projects_alias).await?; - let filtered_legacy_current = filtered_current.is_none() - && self.index_exists(&filtered_alias).await?; - - let projects_next = if projects_current - .as_deref() - .is_some_and(|x| x.ends_with("__alt")) - { - self.get_next_index_name(&projects_alias, false) - } else { - self.get_next_index_name(&projects_alias, true) - }; - let filtered_next = if filtered_current - .as_deref() - .is_some_and(|x| x.ends_with("__alt")) - { - self.get_next_index_name(&filtered_alias, false) - } else { - self.get_next_index_name(&filtered_alias, true) - }; - - self.delete_index_if_exists(&projects_next).await?; - self.delete_index_if_exists(&filtered_next).await?; - self.ensure_index(&projects_next).await?; - self.ensure_index(&filtered_next).await?; - let mut cursor = 0_i64; - - loop { - let (uploads, next_cursor) = index_local( - &ro_pool, - &redis, - cursor, - ENV.SEARCH_INDEX_CHUNK_SIZE, - ) - .await - .wrap_internal_err("failed to index local")?; - if uploads.is_empty() { - break; - } - - self.bulk_index_documents(&projects_next, &uploads).await?; - self.bulk_index_documents(&filtered_next, &uploads).await?; - cursor = next_cursor; - } - - let indices = [projects_next.as_str(), filtered_next.as_str()]; - self.client - .indices() - .refresh(IndicesRefreshParts::Index(&indices)) - .send() - .await - .wrap_internal_err("failed to refresh Elasticsearch indexes")?; - - if projects_legacy_current { - self.delete_index_if_exists(&projects_alias).await?; - } - if filtered_legacy_current { - self.delete_index_if_exists(&filtered_alias).await?; - } - - self.swap_alias( - &projects_alias, - &projects_next, - projects_current.as_deref(), - ) - .await?; - self.swap_alias( - &filtered_alias, - &filtered_next, - filtered_current.as_deref(), - ) - .await?; - - if let Some(index) = projects_current { - self.delete_index_if_exists(&index).await?; - } - if let Some(index) = filtered_current { - self.delete_index_if_exists(&index).await?; - } - - Ok(()) - } - - async fn remove_documents(&self, ids: &[VersionId]) -> eyre::Result<()> { - if ids.is_empty() { - return Ok(()); - } - - let ids_base62 = - ids.iter().map(|id| to_base62(id.0)).collect::>(); - for alias_name in [ - self.config.get_index_name("projects"), - self.config.get_index_name("projects_filtered"), - ] { - let index_names = self.get_index_candidates(&alias_name); - for index_name in index_names { - let response = self - .client - .delete_by_query(DeleteByQueryParts::Index(&[ - index_name.as_str() - ])) - .refresh(true) - .body(json!({ - "query": { - "terms": { - "version_id": ids_base62 - } - } - })) - .send() - .await - .wrap_internal_err( - "failed to delete Elasticsearch documents by query", - )?; - let status = response.status_code(); - if status == StatusCode::NOT_FOUND { - continue; - } - if !status.is_success() { - let body = response - .json::() - .await - .unwrap_or_else(|_| json!({})); - return Err(eyre!( - "failed to delete documents from index `{index_name}`: {body}" - )); - } - } - } - - Ok(()) - } - - async fn tasks(&self) -> eyre::Result { - #[derive(Serialize)] - struct ElasticTask { - uid: u64, - status: &'static str, - duration: Option, - enqueued_at: Option, - } - - #[derive(Serialize)] - struct TaskList { - by_instance: HashMap>, - } - - let response = self - .client - .tasks() - .list() - .detailed(true) - .group_by(elasticsearch::params::GroupBy::Nodes) - .send() - .await - .wrap_internal_err("failed to list Elasticsearch tasks")?; - - let body = response - .json::() - .await - .wrap_internal_err("failed to parse Elasticsearch task response")?; - - let by_instance = body["nodes"] - .as_object() - .map(|nodes| { - nodes - .iter() - .map(|(node_id, node_value)| { - let tasks = node_value["tasks"] - .as_object() - .map(|tasks| { - tasks - .iter() - .map(|(task_id, task)| { - let uid = task_id - .rsplit(':') - .next() - .and_then(|v| v.parse::().ok()) - .unwrap_or_default(); - let nanos = - task["running_time_in_nanos"] - .as_u64(); - ElasticTask { - uid, - status: "processing", - duration: nanos - .map(Duration::from_nanos), - enqueued_at: task - .get("start_time_in_millis") - .and_then(Value::as_u64), - } - }) - .collect::>() - }) - .unwrap_or_default(); - (node_id.clone(), tasks) - }) - .collect::>() - }) - .unwrap_or_default(); - - let response = serde_json::to_value(TaskList { by_instance }) - .wrap_internal_err("failed to serialize Elasticsearch tasks")?; - Ok(response) - } - - async fn tasks_cancel( - &self, - filter: &TasksCancelFilter, - ) -> eyre::Result<()> { - match filter { - TasksCancelFilter::All | TasksCancelFilter::AllEnqueued => { - let response = self - .client - .tasks() - .cancel(TasksCancelParts::None) - .wait_for_completion(true) - .send() - .await - .wrap_internal_err( - "failed to cancel Elasticsearch tasks", - )?; - if !response.status_code().is_success() { - let body = response - .json::() - .await - .unwrap_or_else(|_| json!({})); - return Err(eyre!( - "failed to cancel Elasticsearch tasks: {body}" - )); - } - } - TasksCancelFilter::Indexes { indexes } => { - let response = self - .client - .tasks() - .list() - .detailed(true) - .group_by(elasticsearch::params::GroupBy::None) - .send() - .await - .wrap_internal_err("failed to list Elasticsearch tasks")?; - - let body = response.json::().await.wrap_internal_err( - "failed to parse Elasticsearch tasks list", - )?; - let tasks = - body["tasks"].as_object().cloned().unwrap_or_default(); - - for (task_id, task) in tasks { - let description = - task["description"].as_str().unwrap_or_default(); - if indexes.iter().any(|index| description.contains(index)) { - let response = self - .client - .tasks() - .cancel(TasksCancelParts::TaskId(&task_id)) - .wait_for_completion(true) - .send() - .await - .wrap_internal_err( - "failed to cancel Elasticsearch task by id", - )?; - if !response.status_code().is_success() { - let body = response - .json::() - .await - .unwrap_or_else(|_| json!({})); - return Err(eyre!( - "failed to cancel Elasticsearch task `{task_id}`: {body}" - )); - } - } - } - } - } - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::Elasticsearch; - use crate::search::{ - SearchRequest, - backend::{elasticsearch::RequestConfig, typesense}, - }; - use serde_json::json; - - #[test] - fn search_regression_not_in_filter_list_query_string() { - // failing case: - // http://localhost:8000/v2/search?facets=%5B%5B%22client_side%3Aoptional%22%2C%22client_side%3Arequired%22%5D%2C%5B%22project_type%3Amod%22%5D%2C%5B%22versions%3A1.8.9%22%2C%22versions%3A1.12.2%22%2C%22versions%3A1.17.1%22%2C%22versions%3A1.18.2%22%2C%22versions%3A1.19%22%2C%22versions%3A1.19.2%22%2C%22versions%3A1.19.3%22%2C%22versions%3A1.19.4%22%2C%22versions%3A1.20%22%2C%22versions%3A1.20.1%22%2C%22versions%3A1.20.2%22%2C%22versions%3A1.20.4%22%2C%22versions%3A1.20.6%22%2C%22versions%3A1.21%22%2C%22versions%3A1.21.1%22%2C%22versions%3A1.21.3%22%2C%22versions%3A1.21.4%22%2C%22versions%3A1.21.5%22%2C%22versions%3A1.21.7%22%2C%22versions%3A1.21.8%22%2C%22versions%3A1.21.10%22%2C%22versions%3A1.21.11%22%5D%5D&filters=(project_id%20NOT%20IN%20[P7dR8mSH,%20hvFnDODi,%20XaIYsn4W,%20xIEuGYOS,%20kqJFAPU9,%20H8CaAYZC,%203llatzyE,%20JyKlunuD])&index=relevance&limit=20&offset=0 - - let facets = "[[\"client_side:optional\",\"client_side:required\"],[\"project_type:mod\"],[\"versions:1.8.9\",\"versions:1.12.2\",\"versions:1.17.1\",\"versions:1.18.2\",\"versions:1.19\",\"versions:1.19.2\",\"versions:1.19.3\",\"versions:1.19.4\",\"versions:1.20\",\"versions:1.20.1\",\"versions:1.20.2\",\"versions:1.20.4\",\"versions:1.20.6\",\"versions:1.21\",\"versions:1.21.1\",\"versions:1.21.3\",\"versions:1.21.4\",\"versions:1.21.5\",\"versions:1.21.7\",\"versions:1.21.8\",\"versions:1.21.10\",\"versions:1.21.11\"]]"; - let filter_query = "(project_id NOT IN [P7dR8mSH, hvFnDODi, XaIYsn4W, xIEuGYOS, kqJFAPU9, H8CaAYZC, 3llatzyE, JyKlunuD])"; - - let info = SearchRequest { - query: None, - offset: Some("0".to_string()), - index: Some("relevance".to_string()), - limit: Some("20".to_string()), - show_metadata: false, - elasticsearch_config: RequestConfig::default(), - typesense_config: typesense::RequestConfig::default(), - new_filters: None, - facets: Some(facets.to_string()), - filters: Some(filter_query.to_string()), - version: None, - }; - - let mut filter = - Elasticsearch::facets_filter_clauses(info.facets.as_deref()) - .expect("facets should parse"); - let filter_string = Elasticsearch::meili_like_filters(&info) - .expect("expected filter string"); - filter.push(json!({ - "query_string": { - "query": filter_string, - "default_operator": "AND", - "lenient": true - } - })); - - let query = filter - .last() - .and_then(|x| x.get("query_string")) - .and_then(|x| x.get("query")) - .and_then(|x| x.as_str()) - .expect("expected query_string.query"); - - let expected = "(NOT project_id:(P7dR8mSH OR hvFnDODi OR XaIYsn4W OR xIEuGYOS OR kqJFAPU9 OR H8CaAYZC OR 3llatzyE OR JyKlunuD))"; - assert_eq!(query, expected); - assert!( - !query.contains("NOT IN ["), - "error case: Elasticsearch query_string cannot parse Meilisearch-style `NOT IN [..]` filters" - ); - } -} diff --git a/apps/labrinth/src/search/backend/meilisearch/indexing.rs b/apps/labrinth/src/search/backend/meilisearch/indexing.rs deleted file mode 100644 index 8f2a4bfa74..0000000000 --- a/apps/labrinth/src/search/backend/meilisearch/indexing.rs +++ /dev/null @@ -1,701 +0,0 @@ -use std::sync::LazyLock; -use std::time::Duration; - -use crate::database::PgPool; -use crate::database::redis::RedisPool; -use crate::env::ENV; -use crate::search::backend::meilisearch::MeilisearchConfig; -use crate::search::indexing::index_local; -use crate::search::{SearchField, UploadSearchProject}; -use crate::util::error::Context; -use ariadne::ids::base62_impl::to_base62; -use eyre::{Result, eyre}; -use futures::StreamExt; -use futures::stream::FuturesOrdered; -use meilisearch_sdk::client::{Client, SwapIndexes}; -use meilisearch_sdk::indexes::Index; -use meilisearch_sdk::settings::{PaginationSetting, Settings}; -use meilisearch_sdk::task_info::TaskInfo; -use tracing::{Instrument, error, info, info_span, instrument}; - -// // The chunk size for adding projects to the indexing database. If the request size -// // is too large (>10MiB) then the request fails with an error. This chunk size -// // assumes a max average size of 4KiB per project to avoid this cap. -// -// Set this to 50k for better observability -const MEILISEARCH_CHUNK_SIZE: usize = 50000; // 10_000_000 - -fn search_operation_timeout() -> std::time::Duration { - std::time::Duration::from_millis(ENV.SEARCH_OPERATION_TIMEOUT) -} - -pub async fn remove_documents( - ids: &[crate::models::ids::VersionId], - config: &MeilisearchConfig, -) -> Result<()> { - let mut indexes = get_indexes_for_indexing(config, false, false) - .await - .wrap_err("failed to get current indexes")?; - let indexes_next = get_indexes_for_indexing(config, true, false) - .await - .wrap_err("failed to get next indexes")?; - - for list in &mut indexes { - for alt_list in &indexes_next { - list.extend(alt_list.iter().cloned()); - } - } - - let client = config - .make_batch_client() - .wrap_err("failed to create batch client")?; - let client = &client; - - let ids_base62 = ids.iter().map(|x| to_base62(x.0)).collect::>(); - let mut deletion_tasks = FuturesOrdered::new(); - - client.across_all(indexes, |index_list, client| { - for index in index_list { - let owned_client = client.clone(); - let ids_base62_ref = &ids_base62; - deletion_tasks.push_back(async move { - index - .delete_documents(ids_base62_ref) - .await - .wrap_err_with(|| { - eyre!("failed to request to delete documents {ids_base62_ref:?}") - })? - .wait_for_completion( - &owned_client, - None, - Some(Duration::from_secs(15)), - ) - .await - .wrap_err_with(|| { - eyre!("failed to delete documents {ids_base62_ref:?}") - }) - }); - } - }); - - while let Some(result) = deletion_tasks.next().await { - result?; - } - - Ok(()) -} - -pub async fn index_projects( - ro_pool: PgPool, - redis: RedisPool, - config: &MeilisearchConfig, -) -> Result<()> { - info!("Indexing projects."); - - info!("Ensuring current indexes exists"); - // First, ensure current index exists (so no error happens- current index should be worst-case empty, not missing) - get_indexes_for_indexing(config, false, false) - .await - .wrap_err("failed to get indexes for indexing")?; - - info!("Deleting surplus indexes"); - // Then, delete the next index if it still exists - let indices = get_indexes_for_indexing(config, true, false) - .await - .wrap_err("failed to get next indexes to delete")?; - for client_indices in indices { - for index in client_indices { - index.delete().await.wrap_err("failed to delete an index")?; - } - } - - info!("Recreating next index"); - // Recreate the next index for indexing - let indices = get_indexes_for_indexing(config, true, true) - .await - .wrap_internal_err("failed to recreate next index")?; - - let all_loader_fields = - crate::database::models::loader_fields::LoaderField::get_fields_all( - &ro_pool, &redis, - ) - .await - .wrap_internal_err("failed to get all loader fields")? - .into_iter() - .map(|x| x.field) - .collect::>(); - - info!("Gathering local projects"); - - let mut cursor = 0; - let mut idx = 0; - let mut total = 0; - - loop { - info!("Gathering index data chunk {idx}"); - idx += 1; - - let (uploads, next_cursor) = - index_local(&ro_pool, &redis, cursor, 10000).await?; - total += uploads.len(); - - if uploads.is_empty() { - info!( - "No more projects to index, indexed {total} projects after {idx} chunks" - ); - break; - } - - cursor = next_cursor; - - add_projects_batch_client( - &indices, - uploads, - all_loader_fields.clone(), - config, - ) - .await?; - } - - info!("Swapping indexes"); - - // Swap the index - swap_index(config, "projects").await?; - swap_index(config, "projects_filtered").await?; - - info!("Deleting old indexes"); - - // Delete the now-old index - for index_list in indices { - for index in index_list { - index.delete().await?; - } - } - - info!("Done adding projects."); - Ok(()) -} - -pub async fn swap_index( - config: &MeilisearchConfig, - index_name: &str, -) -> Result<()> { - let client = config.make_batch_client()?; - let index_name_next = config.get_index_name(index_name, true); - let index_name = config.get_index_name(index_name, false); - let swap_indices = SwapIndexes { - indexes: (index_name_next, index_name), - rename: None, - }; - - let swap_indices_ref = &swap_indices; - - // is it "indexes" or "indices"? who knows! roll a die! - client - .with_all_clients("swap_indexes", |client| async move { - let task = client - .swap_indexes([swap_indices_ref]) - .await - .wrap_err("failed to swap indices")?; - - monitor_task( - client, - task, - Duration::from_secs(60 * 10), // 10 minutes - Some(Duration::from_secs(1)), - ) - .await?; - Ok(()) - }) - .await?; - - Ok(()) -} - -#[instrument(skip(config))] -pub async fn get_indexes_for_indexing( - config: &MeilisearchConfig, - next: bool, // Get the 'next' one - update_settings: bool, -) -> Result>> { - let client = config.make_batch_client()?; - let project_name = config.get_index_name("projects", next); - let project_filtered_name = - config.get_index_name("projects_filtered", next); - - let project_name_ref = &project_name; - let project_filtered_name_ref = &project_filtered_name; - - let results = client - .with_all_clients("get_indexes_for_indexing", |client| async move { - let projects_index = create_or_update_index( - client, - project_name_ref, - Some(&[ - "words", - "typo", - "proximity", - "attribute", - "exactness", - "sort", - ]), - update_settings, - ) - .await?; - let projects_filtered_index = create_or_update_index( - client, - project_filtered_name_ref, - Some(&[ - "sort", - "words", - "typo", - "proximity", - "attribute", - "exactness", - ]), - update_settings, - ) - .await?; - - Ok(vec![projects_index, projects_filtered_index]) - }) - .await?; - - Ok(results) -} - -#[instrument(skip_all, fields(name))] -async fn create_or_update_index( - client: &Client, - name: &str, - custom_rules: Option<&'static [&'static str]>, - update_settings: bool, -) -> Result { - info!("Updating/creating index"); - - match client.get_index(name).await { - Ok(index) => { - info!("Updating index settings."); - - let mut settings = default_settings(); - - if let Some(custom_rules) = custom_rules { - settings = settings.with_ranking_rules(custom_rules); - } - - if update_settings { - info!("Updating index settings"); - index - .set_settings(&settings) - .await - .inspect_err(|e| { - error!("Error setting index settings: {e:?}") - })? - .wait_for_completion( - client, - None, - Some(search_operation_timeout()), - ) - .await - .inspect_err(|e| { - error!( - "Error setting index settings while waiting: {e:?}" - ) - })?; - } - info!("Done performing index settings set."); - - Ok(index) - } - _ => { - info!("Creating index."); - - // Only create index and set settings if the index doesn't already exist - let task = client.create_index(name, Some("version_id")).await?; - let task = task - .wait_for_completion( - client, - None, - Some(search_operation_timeout()), - ) - .await - .inspect_err(|e| { - error!("Error creating index while waiting: {e:?}") - })?; - let index = task - .try_make_index(client) - .map_err(|x| x.unwrap_failure())?; - - let mut settings = default_settings(); - - if let Some(custom_rules) = custom_rules { - settings = settings.with_ranking_rules(custom_rules); - } - - if update_settings { - index - .set_settings(&settings) - .await - .inspect_err(|e| { - error!("Error setting index settings: {e:?}") - })? - .wait_for_completion( - client, - None, - Some(search_operation_timeout()), - ) - .await - .inspect_err(|e| { - error!( - "Error setting index settings while waiting: {e:?}" - ) - })?; - } - - Ok(index) - } - } -} - -#[instrument(skip_all, fields(%index.uid, mods.len = mods.len()))] -async fn add_to_index( - client: &Client, - index: &Index, - mods: &[UploadSearchProject], -) -> Result<()> { - for chunk in mods.chunks(MEILISEARCH_CHUNK_SIZE) { - info!( - "Adding chunk of {} versions starting with version id {}", - chunk.len(), - chunk[0].version_id - ); - - let now = std::time::Instant::now(); - - let task = index - .add_or_replace(chunk, Some("version_id")) - .await - .inspect_err(|e| error!("Error adding chunk to index: {e:?}"))?; - - monitor_task( - client, - task, - Duration::from_secs(60 * 5), // Timeout after 10 minutes - Some(Duration::from_secs(1)), // Poll once every second - ) - .await?; - - info!( - "Added chunk of {} projects to index in {:.2} seconds", - chunk.len(), - now.elapsed().as_secs_f64() - ); - } - - Ok(()) -} - -async fn monitor_task( - client: &Client, - task: TaskInfo, - timeout: Duration, - poll: Option, -) -> Result<()> { - let now = std::time::Instant::now(); - - let id = task.get_task_uid(); - let mut interval = tokio::time::interval(Duration::from_secs(30)); - interval.reset(); - - let wait = task.wait_for_completion(client, poll, Some(timeout)); - - tokio::select! { - biased; - - result = wait => { - info!("Task {id} completed in {:.2} seconds: {result:?}", now.elapsed().as_secs_f64()); - result?; - } - - _ = interval.tick() => { - struct Id(u32); - - impl AsRef for Id { - fn as_ref(&self) -> &u32 { - &self.0 - } - } - - // it takes an AsRef but u32 itself doesn't impl it lol - if let Ok(task) = client.get_task(Id(id)).await { - if task.is_pending() { - info!("Task {id} is still pending after {:.2} seconds", now.elapsed().as_secs_f64()); - } - } else { - error!("Error getting task {id}"); - } - } - }; - - Ok(()) -} - -#[instrument(skip_all, fields(index.uid = %index.uid))] -async fn update_and_add_to_index( - client: &Client, - index: &Index, - projects: &[UploadSearchProject], - _additional_fields: &[String], -) -> Result<()> { - // TODO: Uncomment this- hardcoding loader_fields is a band-aid fix, and will be fixed soon - // let mut new_filterable_attributes: Vec = index.get_filterable_attributes().await?; - // let mut new_displayed_attributes = index.get_displayed_attributes().await?; - - // // Check if any 'additional_fields' are not already in the index - // // Only add if they are not already in the index - // let new_fields = additional_fields - // .iter() - // .filter(|x| !new_filterable_attributes.contains(x)) - // .collect::>(); - // if !new_fields.is_empty() { - // info!("Adding new fields to index: {:?}", new_fields); - // new_filterable_attributes.extend(new_fields.iter().map(|s: &&String| s.to_string())); - // new_displayed_attributes.extend(new_fields.iter().map(|s| s.to_string())); - - // // Adds new fields to the index - // let filterable_task = index - // .set_filterable_attributes(new_filterable_attributes) - // .await?; - // let displayable_task = index - // .set_displayed_attributes(new_displayed_attributes) - // .await?; - - // // Allow a long timeout for adding new attributes- it only needs to happen the once - // filterable_task - // .wait_for_completion(client, None, Some(search_operation_timeout() * 100)) - // .await?; - // displayable_task - // .wait_for_completion(client, None, Some(search_operation_timeout() * 100)) - // .await?; - // } - - info!("Adding to index."); - - add_to_index(client, index, projects).await?; - - Ok(()) -} - -pub async fn add_projects_batch_client( - indices: &[Vec], - projects: Vec, - additional_fields: Vec, - config: &MeilisearchConfig, -) -> Result<()> { - let client = config.make_batch_client()?; - - let index_references = indices - .iter() - .map(|x| x.iter().collect()) - .collect::>>(); - - let mut tasks = FuturesOrdered::new(); - - let mut id = 0; - - client.across_all(index_references, |index_list, client| { - let span = info_span!("add_projects_batch", client.idx = id); - id += 1; - - for index in index_list { - let owned_client = client.clone(); - let projects_ref = &projects; - let additional_fields_ref = &additional_fields; - tasks.push_back( - async move { - update_and_add_to_index( - &owned_client, - index, - projects_ref, - additional_fields_ref, - ) - .await - } - .instrument(span.clone()), - ); - } - }); - - while let Some(result) = tasks.next().await { - result?; - } - - Ok(()) -} - -fn default_settings() -> Settings { - Settings::new() - .with_distinct_attribute(Some("project_id")) - .with_displayed_attributes(DEFAULT_DISPLAYED_ATTRIBUTES) - .with_searchable_attributes(DEFAULT_SEARCHABLE_ATTRIBUTES) - .with_sortable_attributes(DEFAULT_SORTABLE_ATTRIBUTES) - .with_filterable_attributes(&*MEILI_FILTERABLE_ATTRIBUTES) - .with_pagination(PaginationSetting { - max_total_hits: 2147483647, - }) -} - -pub struct MeilisearchFieldSpec { - pub path: &'static str, - pub filterable: bool, -} - -impl SearchField { - pub const fn meilisearch_spec(self) -> MeilisearchFieldSpec { - match self { - SearchField::Categories => MeilisearchFieldSpec { - path: "categories", - filterable: true, - }, - SearchField::Name => MeilisearchFieldSpec { - path: "name", - filterable: true, - }, - SearchField::Author => MeilisearchFieldSpec { - path: "author", - filterable: true, - }, - SearchField::License => MeilisearchFieldSpec { - path: "license", - filterable: true, - }, - SearchField::ProjectTypes => MeilisearchFieldSpec { - path: "project_types", - filterable: true, - }, - SearchField::ProjectId => MeilisearchFieldSpec { - path: "project_id", - filterable: true, - }, - SearchField::OpenSource => MeilisearchFieldSpec { - path: "open_source", - filterable: true, - }, - SearchField::Environment => MeilisearchFieldSpec { - path: "environment", - filterable: true, - }, - SearchField::GameVersions => MeilisearchFieldSpec { - path: "game_versions", - filterable: true, - }, - SearchField::ClientSide => MeilisearchFieldSpec { - path: "client_side", - filterable: true, - }, - SearchField::ServerSide => MeilisearchFieldSpec { - path: "server_side", - filterable: true, - }, - SearchField::MinecraftServerRegion => MeilisearchFieldSpec { - path: "minecraft_server.region", - filterable: true, - }, - SearchField::MinecraftServerLanguages => MeilisearchFieldSpec { - path: "minecraft_server.languages", - filterable: true, - }, - SearchField::MinecraftJavaServerContentKind => { - MeilisearchFieldSpec { - path: "minecraft_java_server.content.kind", - filterable: true, - } - } - SearchField::MinecraftJavaServerContentSupportedGameVersions => { - MeilisearchFieldSpec { - path: "minecraft_java_server.content.supported_game_versions", - filterable: true, - } - } - SearchField::MinecraftJavaServerPingData => MeilisearchFieldSpec { - path: "minecraft_java_server.ping.data", - filterable: true, - }, - } - } -} - -static MEILI_FILTERABLE_ATTRIBUTES: LazyLock> = - LazyLock::new(|| { - use strum::IntoEnumIterator; - - SearchField::iter() - .filter_map(|field| { - let spec = field.meilisearch_spec(); - spec.filterable.then_some(spec.path) - }) - .collect() - }); - -const DEFAULT_DISPLAYED_ATTRIBUTES: &[&str] = &[ - "project_id", - "version_id", - "project_types", - "slug", - "author", - "name", - "summary", - "categories", - "display_categories", - "downloads", - "follows", - "icon_url", - "date_created", - "date_modified", - "latest_version", - "license", - "gallery", - "featured_gallery", - "color", - // Note: loader fields are not here, but are added on as they are needed (so they can be dynamically added depending on which exist). - // TODO: remove these- as they should be automatically populated. This is a band-aid fix. - "environment", - "game_versions", - "mrpack_loaders", - // V2 legacy fields for logical consistency - "client_side", - "server_side", - // Non-searchable fields for filling out the Project model. - "license_url", - "monetization_status", - "team_id", - "thread_id", - "versions", - "date_published", - "date_queued", - "status", - "requested_status", - "games", - "organization_id", - "links", - "gallery_items", - "loaders", // search uses loaders as categories- this is purely for the Project model. - "project_loader_fields", - "minecraft_mod", - "minecraft_server", - "minecraft_java_server", - "minecraft_bedrock_server", -]; - -const DEFAULT_SEARCHABLE_ATTRIBUTES: &[&str] = - &["name", "summary", "author", "slug"]; - -const DEFAULT_SORTABLE_ATTRIBUTES: &[&str] = &[ - "downloads", - "follows", - "date_created", - "date_modified", - "version_published_timestamp", - "minecraft_java_server.verified_plays_2w", - "minecraft_java_server.ping.data.players_online", -]; diff --git a/apps/labrinth/src/search/backend/meilisearch/mod.rs b/apps/labrinth/src/search/backend/meilisearch/mod.rs deleted file mode 100644 index cf693fe48e..0000000000 --- a/apps/labrinth/src/search/backend/meilisearch/mod.rs +++ /dev/null @@ -1,489 +0,0 @@ -use crate::database::PgPool; -use crate::database::redis::RedisPool; -use crate::env::ENV; -use crate::models::ids::VersionId; -use crate::routes::ApiError; -use crate::search::backend::{ - SearchIndex, SearchIndexName, combined_search_filters, parse_search_index, - parse_search_request, -}; -use crate::search::{ - ResultSearchProject, SearchBackend, SearchRequest, SearchResults, - TasksCancelFilter, -}; -use crate::util::error::Context; -use async_trait::async_trait; -use eyre::Result; -use futures::TryStreamExt; -use futures::stream::FuturesOrdered; -use itertools::Itertools; -use meilisearch_sdk::client::Client; -use meilisearch_sdk::tasks::{Task, TasksCancelQuery}; -use serde::Serialize; -use serde_json::Value; -use std::collections::HashMap; -use std::fmt::Write; -use std::time::Duration; -use tracing::{Instrument, info_span}; - -pub mod indexing; - -#[derive(Debug, Clone)] -pub struct MeilisearchReadClient { - pub client: Client, -} - -impl std::ops::Deref for MeilisearchReadClient { - type Target = Client; - - fn deref(&self) -> &Self::Target { - &self.client - } -} - -pub struct BatchClient { - pub clients: Vec, -} - -impl BatchClient { - pub fn new(clients: Vec) -> Self { - Self { clients } - } - - pub async fn with_all_clients<'a, T, G, Fut>( - &'a self, - task_name: &str, - generator: G, - ) -> Result> - where - G: Fn(&'a Client) -> Fut, - Fut: Future> + 'a, - { - let mut tasks = FuturesOrdered::new(); - for (idx, client) in self.clients.iter().enumerate() { - tasks.push_back(generator(client).instrument(info_span!( - "client_task", - task.name = task_name, - client.idx = idx, - ))); - } - - let results = tasks.try_collect::>().await?; - Ok(results) - } - - pub fn across_all(&self, data: Vec, mut predicate: F) -> Vec - where - F: FnMut(T, &Client) -> R, - { - assert_eq!( - data.len(), - self.clients.len(), - "mismatch between data len and meilisearch client count" - ); - self.clients - .iter() - .zip(data) - .map(|(client, item)| predicate(item, client)) - .collect() - } -} - -#[derive(Debug, Clone)] -pub struct MeilisearchConfig { - pub addresses: Vec, - pub read_lb_address: String, - pub key: String, - pub meta_namespace: String, -} - -impl MeilisearchConfig { - pub fn new(meta_namespace: Option) -> Self { - Self { - addresses: ENV.MEILISEARCH_WRITE_ADDRS.0.clone(), - key: ENV.MEILISEARCH_KEY.clone(), - meta_namespace: meta_namespace.unwrap_or_default(), - read_lb_address: ENV.MEILISEARCH_READ_ADDR.clone(), - } - } - - pub fn make_loadbalanced_read_client( - &self, - ) -> Result { - Ok(MeilisearchReadClient { - client: Client::new(&self.read_lb_address, Some(&self.key))?, - }) - } - - pub fn make_batch_client( - &self, - ) -> Result { - Ok(BatchClient::new( - self.addresses - .iter() - .map(|address| { - Client::new(address.as_str(), Some(self.key.as_str())) - }) - .collect::, _>>()?, - )) - } - - pub fn get_index_name(&self, index: &str, next: bool) -> String { - let alt = if next { "_alt" } else { "" }; - format!("{}_{}_{}", self.meta_namespace, index, alt) - } -} - -pub struct Meilisearch { - pub config: MeilisearchConfig, -} - -impl Meilisearch { - pub fn new(config: MeilisearchConfig) -> Self { - Self { config } - } - - fn get_sort_index( - &self, - index: &str, - new_filters: Option<&str>, - ) -> Result<(String, &'static [&'static str]), ApiError> { - let sort = parse_search_index(index, new_filters)?; - let index_name = match sort.index_name { - SearchIndexName::Projects => { - self.config.get_index_name("projects", false) - } - SearchIndexName::ProjectsFiltered => { - self.config.get_index_name("projects_filtered", false) - } - }; - - Ok(match sort.index { - SearchIndex::Relevance => ( - index_name, - &["downloads:desc", "version_published_timestamp:desc"], - ), - SearchIndex::Downloads => ( - index_name, - &["downloads:desc", "version_published_timestamp:desc"], - ), - SearchIndex::Follows => ( - index_name, - &["follows:desc", "version_published_timestamp:desc"], - ), - SearchIndex::Updated => ( - index_name, - &["date_modified:desc", "version_published_timestamp:desc"], - ), - SearchIndex::Newest => ( - index_name, - &["date_created:desc", "version_published_timestamp:desc"], - ), - SearchIndex::MinecraftJavaServerVerifiedPlays2w => ( - index_name, - &[ - "minecraft_java_server.verified_plays_2w:desc", - "minecraft_java_server.ping.data.players_online:desc", - "version_published_timestamp:desc", - ], - ), - SearchIndex::MinecraftJavaServerPlayersOnline => ( - index_name, - &[ - "minecraft_java_server.ping.data.players_online:desc", - "version_published_timestamp:desc", - ], - ), - }) - } -} - -#[async_trait] -impl SearchBackend for Meilisearch { - async fn search_for_project_raw( - &self, - info: &SearchRequest, - ) -> Result { - let parsed = parse_search_request(info)?; - - let (index_name, sort_name) = - self.get_sort_index(parsed.index, info.new_filters.as_deref())?; - let client = self - .config - .make_loadbalanced_read_client() - .wrap_internal_err("failed to make load-balanced read client")?; - let meilisearch_index = client - .get_index(index_name) - .await - .wrap_internal_err("failed to get index")?; - - let mut filter_string = String::new(); - - let results = { - let mut query = meilisearch_index.search(); - query - .with_page(parsed.page) - .with_hits_per_page(parsed.hits_per_page) - .with_query(parsed.query) - .with_sort(sort_name); - - if let Some(new_filters) = info.new_filters.as_deref() { - query.with_filter(new_filters); - } else { - let facets = if let Some(facets) = &info.facets { - let facets = - serde_json::from_str::>>(facets) - .wrap_request_err("failed to parse facets")?; - Some(facets) - } else { - None - }; - - let filters = - combined_search_filters(info).unwrap_or_else(|| "".into()); - - if let Some(facets) = facets { - let facets: Vec>> = - facets - .into_iter() - .map(|facets| { - facets - .into_iter() - .map(|facet| { - if facet.is_array() { - serde_json::from_value::>(facet) - .unwrap_or_default() - } else { - vec![ - serde_json::from_value::(facet) - .unwrap_or_default(), - ] - } - }) - .collect_vec() - }) - .collect_vec(); - - filter_string.push('('); - for (index, facet_outer_list) in facets.iter().enumerate() { - filter_string.push('('); - - for (facet_outer_index, facet_inner_list) in - facet_outer_list.iter().enumerate() - { - filter_string.push('('); - for (facet_inner_index, facet) in - facet_inner_list.iter().enumerate() - { - filter_string - .push_str(&facet.replace(':', " = ")); - if facet_inner_index - != (facet_inner_list.len() - 1) - { - filter_string.push_str(" AND ") - } - } - filter_string.push(')'); - - if facet_outer_index != (facet_outer_list.len() - 1) - { - filter_string.push_str(" OR ") - } - } - - filter_string.push(')'); - - if index != (facets.len() - 1) { - filter_string.push_str(" AND ") - } - } - filter_string.push(')'); - - if !filters.is_empty() { - write!(filter_string, " AND ({filters})") - .expect("write should not fail"); - } - } else { - filter_string.push_str(&filters); - } - - if !filter_string.is_empty() { - query.with_filter(&filter_string); - } - } - - if info.show_metadata { - query.with_show_ranking_score(true); - query.with_show_ranking_score_details(true); - query.execute().await? - } else { - query.execute::().await? - } - }; - - if info.show_metadata { - let hits = results - .hits - .into_iter() - .map(|hit| { - let metadata = serde_json::to_value(&hit) - .ok() - .and_then(|value| value.as_object().cloned()) - .map(|mut value| { - value.remove("_formatted"); - value.remove("_matchesPosition"); - value.remove("_federation"); - let result = value.remove("result"); - let metadata = Value::Object(value); - (result, metadata) - }); - - let (result, metadata) = - metadata.unwrap_or((None, Value::Null)); - let mut result = result - .and_then(|value| { - serde_json::from_value::(value) - .ok() - }) - .unwrap_or(hit.result); - - if !metadata.is_null() { - result.search_metadata = Some(metadata); - } - - result - }) - .collect(); - - Ok(SearchResults { - hits, - page: results.page.unwrap_or_default(), - hits_per_page: results.hits_per_page.unwrap_or_default(), - total_hits: results.total_hits.unwrap_or_default(), - }) - } else { - Ok(SearchResults { - hits: results.hits.into_iter().map(|r| r.result).collect(), - page: results.page.unwrap_or_default(), - hits_per_page: results.hits_per_page.unwrap_or_default(), - total_hits: results.total_hits.unwrap_or_default(), - }) - } - } - - async fn index_projects( - &self, - ro_pool: PgPool, - redis: RedisPool, - ) -> eyre::Result<()> { - indexing::index_projects(ro_pool, redis, &self.config).await?; - Ok(()) - } - - async fn remove_documents(&self, ids: &[VersionId]) -> eyre::Result<()> { - indexing::remove_documents(ids, &self.config).await?; - Ok(()) - } - - async fn tasks(&self) -> eyre::Result { - let client = self - .config - .make_batch_client() - .wrap_internal_err("failed to make batch client")?; - let tasks = client - .with_all_clients("get_tasks", async |client| { - let tasks = client.get_tasks().await?; - Ok(tasks.results) - }) - .await - .wrap_internal_err("failed to get tasks")?; - - #[derive(Serialize)] - struct MeiliTask