diff --git a/audit-server/audit-dispatcher/dispatcher-app/pom.xml b/audit-server/audit-dispatcher/dispatcher-app/pom.xml
index d18d487e388..161256d3461 100644
--- a/audit-server/audit-dispatcher/dispatcher-app/pom.xml
+++ b/audit-server/audit-dispatcher/dispatcher-app/pom.xml
@@ -85,6 +85,12 @@
${project.version}
provided
+
+ org.apache.ranger
+ audit-dispatcher-opensearch
+ ${project.version}
+ provided
+
org.apache.ranger
audit-dispatcher-solr
diff --git a/audit-server/audit-dispatcher/dispatcher-common/pom.xml b/audit-server/audit-dispatcher/dispatcher-common/pom.xml
index 3b717af732e..2bba5201021 100644
--- a/audit-server/audit-dispatcher/dispatcher-common/pom.xml
+++ b/audit-server/audit-dispatcher/dispatcher-common/pom.xml
@@ -147,6 +147,20 @@
spring-context
${springframework.version}
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit.jupiter.version}
+ test
+
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
diff --git a/audit-server/audit-dispatcher/dispatcher-common/src/main/java/org/apache/ranger/audit/dispatcher/AuditEventDocMapper.java b/audit-server/audit-dispatcher/dispatcher-common/src/main/java/org/apache/ranger/audit/dispatcher/AuditEventDocMapper.java
new file mode 100644
index 00000000000..9e1eb9385ce
--- /dev/null
+++ b/audit-server/audit-dispatcher/dispatcher-common/src/main/java/org/apache/ranger/audit/dispatcher/AuditEventDocMapper.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.dispatcher;
+
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * Maps {@link AuthzAuditEvent} to an OpenSearch document.
+ */
+public final class AuditEventDocMapper {
+ /** Thread-safe ISO-8601 UTC date formatter. */
+ private static final ThreadLocal DATE_FORMAT =
+ ThreadLocal.withInitial(() -> {
+ SimpleDateFormat format = new SimpleDateFormat(
+ "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ format.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return format;
+ });
+
+ private AuditEventDocMapper() {
+ }
+
+ /**
+ * Converts an audit event to a document map.
+ *
+ * @param auditEvent the audit event to convert
+ * @return map of field names to values
+ */
+ public static Map toDoc(
+ final AuthzAuditEvent auditEvent) {
+ Map doc = new HashMap<>();
+
+ doc.put("id", auditEvent.getEventId());
+ doc.put("access", auditEvent.getAccessType());
+ doc.put("enforcer", auditEvent.getAclEnforcer());
+ doc.put("agent", auditEvent.getAgentId());
+ doc.put("repo", auditEvent.getRepositoryName());
+ doc.put("sess", auditEvent.getSessionId());
+ doc.put("reqUser", auditEvent.getUser());
+ doc.put("reqData", auditEvent.getRequestData());
+ doc.put("resource", auditEvent.getResourcePath());
+ doc.put("cliIP", auditEvent.getClientIP());
+ doc.put("cliType", auditEvent.getClientType());
+ doc.put("logType", auditEvent.getLogType());
+ doc.put("result", auditEvent.getAccessResult());
+ doc.put("policy", auditEvent.getPolicyId());
+ doc.put("repoType", auditEvent.getRepositoryType());
+ doc.put("resType", auditEvent.getResourceType());
+ doc.put("reason", auditEvent.getResultReason());
+ doc.put("action", auditEvent.getAction());
+
+ Date eventTime = auditEvent.getEventTime();
+ doc.put("evtTime", eventTime != null
+ ? DATE_FORMAT.get().format(eventTime) : null);
+
+ doc.put("seq_num", auditEvent.getSeqNum());
+ doc.put("event_count", auditEvent.getEventCount());
+ doc.put("event_dur_ms",
+ auditEvent.getEventDurationMS());
+ doc.put("tags", auditEvent.getTags());
+ doc.put("datasets", auditEvent.getDatasets());
+ doc.put("projects", auditEvent.getProjects());
+ doc.put("datasetIds", auditEvent.getDatasetIds());
+ doc.put("cluster", auditEvent.getClusterName());
+ doc.put("zoneName", auditEvent.getZoneName());
+ doc.put("agentHost", auditEvent.getAgentHostname());
+ doc.put("policyVersion",
+ auditEvent.getPolicyVersion());
+ doc.put("additionalInfo",
+ auditEvent.getAdditionalInfo());
+
+ return doc;
+ }
+}
diff --git a/audit-server/audit-dispatcher/dispatcher-common/src/test/java/org/apache/ranger/audit/dispatcher/TestAuditEventDocMapper.java b/audit-server/audit-dispatcher/dispatcher-common/src/test/java/org/apache/ranger/audit/dispatcher/TestAuditEventDocMapper.java
new file mode 100644
index 00000000000..6073c86cd26
--- /dev/null
+++ b/audit-server/audit-dispatcher/dispatcher-common/src/test/java/org/apache/ranger/audit/dispatcher/TestAuditEventDocMapper.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.dispatcher;
+
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.junit.jupiter.api.Test;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class TestAuditEventDocMapper {
+
+ @Test
+ void toDoc_mapsAllFields() {
+ Date eventTime = new Date(1700000000000L);
+
+ AuthzAuditEvent event = new AuthzAuditEvent(
+ 1, "dev_hdfs", "testuser", eventTime, "read",
+ "/tmp/test", "path", "read", (short) 1, "hdfs-agent",
+ 42L, "allowed by policy", "ranger-acl", "sess-123",
+ "hive", "192.168.1.1", "/tmp/test", "cl1", "zone1", 5L
+ );
+ event.setEventId("evt-001");
+ event.setSeqNum(10L);
+ event.setEventCount(1L);
+ event.setEventDurationMS(50L);
+ event.setAgentHostname("host1.example.com");
+
+ Set tags = new HashSet<>();
+ tags.add("PII");
+ event.setTags(tags);
+
+ Map doc = AuditEventDocMapper.toDoc(event);
+
+ assertNotNull(doc);
+ assertEquals("evt-001", doc.get("id"));
+ assertEquals("read", doc.get("access"));
+ assertEquals("ranger-acl", doc.get("enforcer"));
+ assertEquals("hdfs-agent", doc.get("agent"));
+ assertEquals("dev_hdfs", doc.get("repo"));
+ assertEquals("sess-123", doc.get("sess"));
+ assertEquals("testuser", doc.get("reqUser"));
+ assertEquals("/tmp/test", doc.get("reqData"));
+ assertEquals("/tmp/test", doc.get("resource"));
+ assertEquals("192.168.1.1", doc.get("cliIP"));
+ assertEquals("hive", doc.get("cliType"));
+ assertEquals((short) 1, doc.get("result"));
+ assertEquals(42L, doc.get("policy"));
+ assertEquals(1, doc.get("repoType"));
+ assertEquals("path", doc.get("resType"));
+ assertEquals("allowed by policy", doc.get("reason"));
+ assertEquals("read", doc.get("action"));
+ assertEquals(10L, doc.get("seq_num"));
+ assertEquals(1L, doc.get("event_count"));
+ assertEquals(50L, doc.get("event_dur_ms"));
+ assertEquals("cl1", doc.get("cluster"));
+ assertEquals("zone1", doc.get("zoneName"));
+ assertEquals("host1.example.com", doc.get("agentHost"));
+ assertEquals(5L, doc.get("policyVersion"));
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+ assertEquals(sdf.format(eventTime), doc.get("evtTime"));
+ }
+
+ @Test
+ void toDoc_handlesNullEventTime() {
+ AuthzAuditEvent event = new AuthzAuditEvent();
+ event.setEventId("evt-002");
+ event.setEventTime(null);
+
+ Map doc = AuditEventDocMapper.toDoc(event);
+
+ assertNotNull(doc);
+ assertEquals("evt-002", doc.get("id"));
+ assertNull(doc.get("evtTime"));
+ }
+
+ @Test
+ void toDoc_handlesNullFields() {
+ AuthzAuditEvent event = new AuthzAuditEvent();
+
+ Map doc = AuditEventDocMapper.toDoc(event);
+
+ assertNotNull(doc);
+ assertNull(doc.get("id"));
+ assertNull(doc.get("access"));
+ assertNull(doc.get("enforcer"));
+ assertNull(doc.get("agent"));
+ assertNull(doc.get("repo"));
+ assertNull(doc.get("reqUser"));
+ assertNull(doc.get("resource"));
+ assertNull(doc.get("cliIP"));
+ assertEquals(0, doc.get("repoType"));
+ assertEquals(0L, doc.get("policy"));
+ }
+
+ @Test
+ void toDoc_mapsDatasets() {
+ AuthzAuditEvent event = new AuthzAuditEvent();
+ event.setEventId("evt-003");
+
+ Set datasets = new HashSet<>();
+ datasets.add("dataset1");
+ datasets.add("dataset2");
+ event.setDatasets(datasets);
+
+ Set projects = new HashSet<>();
+ projects.add("project1");
+ event.setProjects(projects);
+
+ Set datasetIds = new HashSet<>();
+ datasetIds.add(100L);
+ datasetIds.add(200L);
+ event.setDatasetIds(datasetIds);
+
+ Map doc = AuditEventDocMapper.toDoc(event);
+
+ assertEquals(datasets, doc.get("datasets"));
+ assertEquals(projects, doc.get("projects"));
+ assertEquals(datasetIds, doc.get("datasetIds"));
+ }
+}
diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/pom.xml b/audit-server/audit-dispatcher/dispatcher-opensearch/pom.xml
new file mode 100644
index 00000000000..715c4dafb1c
--- /dev/null
+++ b/audit-server/audit-dispatcher/dispatcher-opensearch/pom.xml
@@ -0,0 +1,210 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.ranger
+ ranger
+ 3.0.0-SNAPSHOT
+ ../../..
+
+
+ audit-dispatcher-opensearch
+ jar
+ Ranger Audit Dispatcher OpenSearch
+ Kafka dispatcher service for indexing audits into OpenSearch/ElasticSearch
+
+
+ UTF-8
+
+
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${fasterxml.jackson.version}
+
+
+
+ org.apache.commons
+ commons-lang3
+ ${commons.lang3.version}
+
+
+
+
+ org.apache.httpcomponents
+ httpasyncclient
+ ${httpcomponents.httpasyncclient.version}
+
+
+ commons-logging
+ *
+
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ ${httpcomponents.httpclient.version}
+
+
+ commons-logging
+ *
+
+
+
+
+ org.apache.httpcomponents
+ httpcore
+ ${httpcomponents.httpcore.version}
+
+
+ org.apache.httpcomponents
+ httpcore-nio
+ ${httpcomponents.httpcore.version}
+
+
+
+
+ org.apache.ranger
+ ranger-audit-dest-es
+ ${project.version}
+
+
+ org.apache.hadoop
+ hadoop-client-api
+
+
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+ provided
+
+
+ log4j
+ *
+
+
+ org.slf4j
+ *
+
+
+
+
+ org.apache.ranger
+ ranger-audit-core
+ ${project.version}
+ provided
+
+
+ org.apache.hadoop
+ hadoop-client-api
+
+
+
+
+ org.apache.ranger
+ ranger-audit-dispatcher-common
+ ${project.version}
+ provided
+
+
+ org.apache.hadoop
+ hadoop-client-api
+
+
+
+
+ org.apache.ranger
+ ranger-audit-server-common
+ ${project.version}
+ provided
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+ provided
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit.jupiter.version}
+ test
+
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ ${mockito.version}
+ test
+
+
+
+
+ audit-dispatcher-opensearch-${project.version}
+
+
+ true
+ src/main/resources
+
+
+
+
+ org.apache.maven.plugins
+ maven-pmd-plugin
+
+
+ ${project.parent.basedir}/dev-support/ranger-pmd-ruleset.xml
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ copy-dependencies
+
+ copy-dependencies
+
+ package
+
+ ${project.build.directory}/lib
+ runtime
+
+
+
+
+
+
+
diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/OpenSearchDispatcherManager.java b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/OpenSearchDispatcherManager.java
new file mode 100644
index 00000000000..e292e489b03
--- /dev/null
+++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/OpenSearchDispatcherManager.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.dispatcher;
+
+import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcher;
+import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcherTracker;
+import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.audit.server.AuditServerConstants;
+import org.apache.ranger.audit.utils.AuditServerLogFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Manages the lifecycle of the OpenSearch dispatcher.
+ */
+public final class OpenSearchDispatcherManager {
+ /** Class logger. */
+ private static final Logger LOG =
+ LoggerFactory.getLogger(
+ OpenSearchDispatcherManager.class);
+ /** System property for dispatcher type selection. */
+ private static final String CONFIG_DISPATCHER_TYPE =
+ AuditServerConstants.PROP_DISPATCHER_TYPE;
+ /** Dispatcher type identifier. */
+ private static final String TYPE_OPENSEARCH =
+ "opensearch";
+ /** Property controlling OpenSearch destination. */
+ private static final String ES_DEST_PROP =
+ "xasecure.audit.destination.elasticsearch";
+ /** Maximum initialization retry attempts. */
+ private static final int MAX_INIT_ATTEMPTS = 5;
+ /** Base delay between initialization retries. */
+ private static final long INIT_RETRY_MS = 5000L;
+ /** Maximum wait for dispatcher thread shutdown. */
+ private static final long SHUTDOWN_WAIT_MS = 10000L;
+
+ /** Tracks active dispatchers for health reporting. */
+ private final AuditDispatcherTracker tracker =
+ AuditDispatcherTracker.getInstance();
+ /** The active OpenSearch dispatcher instance. */
+ private AuditDispatcher dispatcher;
+ /** Thread running the dispatcher's consume loop. */
+ private Thread dispatcherThread;
+
+ /**
+ * Initializes the OpenSearch dispatcher from properties.
+ *
+ * @param props configuration properties
+ */
+ public void init(final Properties props) {
+ LOG.info("==> OpenSearchDispatcherManager.init()");
+
+ String dispatcherType =
+ System.getProperty(CONFIG_DISPATCHER_TYPE);
+ if (dispatcherType != null
+ && !dispatcherType.equalsIgnoreCase(
+ TYPE_OPENSEARCH)) {
+ LOG.info("Skipping OpenSearchDispatcherManager"
+ + " initialization since dispatcher"
+ + " type is {}", dispatcherType);
+ return;
+ }
+
+ try {
+ if (props == null) {
+ LOG.error("Configuration properties are null");
+ throw new RuntimeException(
+ "Failed to load configuration");
+ }
+
+ boolean isEnabled = MiscUtil.getBooleanProperty(
+ props, ES_DEST_PROP, false);
+ if (!isEnabled) {
+ String clsName = MiscUtil.getStringProperty(
+ props,
+ AuditServerConstants.PROP_DISPATCHER_CLASS);
+ if (clsName != null && clsName.contains(
+ "AuditOpenSearchDispatcher")) {
+ isEnabled = true;
+ }
+ }
+
+ if (!isEnabled) {
+ LOG.warn("OpenSearch destination is disabled"
+ + " ({}=false). No dispatchers"
+ + " will be created.", ES_DEST_PROP);
+ return;
+ }
+
+ initializeDispatcher(props,
+ AuditServerConstants.PROP_DISPATCHER_PREFIX);
+
+ if (dispatcher == null) {
+ throw new RuntimeException(
+ "No OpenSearch dispatcher was created."
+ + " Verify that " + ES_DEST_PROP + "=true"
+ + " and classes are configured"
+ + " correctly.");
+ } else {
+ LOG.info("Created OpenSearch dispatcher");
+
+ Runtime.getRuntime().addShutdownHook(
+ new Thread(() -> {
+ LOG.info("JVM shutdown detected,"
+ + " stopping"
+ + " OpenSearchDispatcherManager");
+ shutdown();
+ }, "OpenSearchDispatcher-ShutdownHook"));
+
+ startDispatcher();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to initialize"
+ + " OpenSearchDispatcherManager", e);
+ throw new RuntimeException(
+ "Failed to initialize"
+ + " OpenSearchDispatcherManager", e);
+ }
+
+ LOG.info(
+ "<== OpenSearchDispatcherManager.init()");
+ }
+
+ private void initializeDispatcher(
+ final Properties props,
+ final String propPrefix) {
+ LOG.info("==> OpenSearchDispatcherManager"
+ + ".initializeDispatcher()");
+
+ String clsStr = MiscUtil.getStringProperty(
+ props,
+ AuditServerConstants.PROP_DISPATCHER_CLASS,
+ "org.apache.ranger.audit.dispatcher"
+ + ".kafka.AuditOpenSearchDispatcher");
+
+ String className = clsStr.split(",")[0].trim();
+ if (className.isEmpty()) {
+ LOG.error("Dispatcher class name is empty");
+ return;
+ }
+
+ long retryDelay = INIT_RETRY_MS;
+
+ for (int attempt = 1;
+ attempt <= MAX_INIT_ATTEMPTS; attempt++) {
+ try {
+ Class> cls =
+ Class.forName(className);
+ dispatcher = (AuditDispatcher)
+ cls.getConstructor(
+ Properties.class, String.class)
+ .newInstance(props, propPrefix);
+ tracker.addActiveDispatcher(
+ TYPE_OPENSEARCH, dispatcher);
+ LOG.info("Successfully initialized"
+ + " dispatcher class: {}",
+ cls.getName());
+ break;
+ } catch (ClassNotFoundException e) {
+ LOG.error("Dispatcher class not found: {}."
+ + " Ensure the class is on the"
+ + " classpath.", className, e);
+ break;
+ } catch (Exception e) {
+ if (attempt < MAX_INIT_ATTEMPTS) {
+ LOG.warn("Dispatcher init attempt"
+ + " {}/{} failed, retrying"
+ + " in {}ms...",
+ attempt, MAX_INIT_ATTEMPTS,
+ retryDelay, e);
+ try {
+ Thread.sleep(retryDelay);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ retryDelay *= 2;
+ } else {
+ LOG.error("Error initializing"
+ + " dispatcher class after {}"
+ + " attempts: {}",
+ MAX_INIT_ATTEMPTS, className, e);
+ }
+ }
+ }
+
+ LOG.info("<== OpenSearchDispatcherManager"
+ + ".initializeDispatcher()");
+ }
+
+ private void startDispatcher() {
+ LOG.info("==> OpenSearchDispatcherManager"
+ + ".startDispatcher()");
+
+ logStartupBanner();
+
+ if (dispatcher != null) {
+ try {
+ String name =
+ dispatcher.getClass().getSimpleName();
+ dispatcherThread =
+ new Thread(dispatcher, name);
+ dispatcherThread.setDaemon(true);
+ dispatcherThread.start();
+
+ LOG.info("Started {} thread"
+ + " [Thread-ID: {},"
+ + " Thread-Name: '{}']",
+ name,
+ dispatcherThread.getId(),
+ dispatcherThread.getName());
+ } catch (Exception e) {
+ LOG.error("Error starting dispatcher: {}",
+ dispatcher.getClass().getSimpleName(),
+ e);
+ }
+ }
+
+ LOG.info("<== OpenSearchDispatcherManager"
+ + ".startDispatcher()");
+ }
+
+ private void logStartupBanner() {
+ LOG.info("########## OPENSEARCH DISPATCHER"
+ + " SERVICE STARTUP ##########");
+
+ if (dispatcher == null) {
+ LOG.warn("WARNING: No OpenSearch dispatchers"
+ + " are enabled!");
+ LOG.warn("Verify: {}=true in configuration",
+ ES_DEST_PROP);
+ } else {
+ AuditServerLogFormatter.LogBuilder builder =
+ AuditServerLogFormatter.builder(
+ "OpenSearch Dispatcher Status");
+ String type =
+ dispatcher.getClass().getSimpleName();
+ builder.add(type, "ENABLED");
+ builder.add("Topic", dispatcher.getTopicName());
+ builder.logInfo(LOG);
+
+ LOG.info("Starting OpenSearch dispatcher"
+ + " thread...");
+ }
+ LOG.info("####################################"
+ + "############################");
+ }
+
+ /** Shuts down the dispatcher and waits for thread exit. */
+ public void shutdown() {
+ LOG.info("==> OpenSearchDispatcherManager"
+ + ".shutdown()");
+
+ if (dispatcher != null) {
+ try {
+ LOG.info("Shutting down dispatcher: {}",
+ dispatcher.getClass().getSimpleName());
+ dispatcher.shutdown();
+ LOG.info("Dispatcher shutdown completed: {}",
+ dispatcher.getClass().getSimpleName());
+ } catch (Exception e) {
+ LOG.error(
+ "Error shutting down dispatcher: {}",
+ dispatcher.getClass().getSimpleName(),
+ e);
+ }
+ }
+
+ if (dispatcherThread != null
+ && dispatcherThread.isAlive()) {
+ try {
+ LOG.info("Waiting for thread"
+ + " to terminate: {}",
+ dispatcherThread.getName());
+ dispatcherThread.join(SHUTDOWN_WAIT_MS);
+ if (dispatcherThread.isAlive()) {
+ LOG.warn("Thread did not terminate"
+ + " within {}ms: {}",
+ SHUTDOWN_WAIT_MS,
+ dispatcherThread.getName());
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for"
+ + " thread to terminate: {}",
+ dispatcherThread.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ dispatcher = null;
+ dispatcherThread = null;
+ tracker.clearActiveDispatcher(TYPE_OPENSEARCH);
+
+ LOG.info("<== OpenSearchDispatcherManager"
+ + ".shutdown() - OpenSearch dispatcher"
+ + " stopped");
+ }
+}
diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/kafka/AuditOpenSearchDispatcher.java b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/kafka/AuditOpenSearchDispatcher.java
new file mode 100644
index 00000000000..118530a8869
--- /dev/null
+++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/kafka/AuditOpenSearchDispatcher.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.dispatcher.kafka;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpStatus;
+import org.apache.http.auth.AuthSchemeProvider;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.AuthSchemes;
+import org.apache.http.config.Lookup;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.impl.auth.SPNegoSchemeFactory;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.util.EntityUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.ranger.audit.dispatcher.AuditEventDocMapper;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.audit.server.AuditServerConstants;
+import org.apache.ranger.audit.utils.AuditServerLogFormatter;
+import org.apache.ranger.authorization.credutils.CredentialsProviderUtil;
+import org.apache.ranger.authorization.credutils.kerberos.KerberosCredentialsProvider;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+public class AuditOpenSearchDispatcher
+ extends AuditDispatcherBase {
+ /** Class logger. */
+ private static final Logger LOG =
+ LoggerFactory.getLogger(
+ AuditOpenSearchDispatcher.class);
+
+ /** Kafka consumer group for this dispatcher. */
+ private static final String DEFAULT_GROUP =
+ "ranger_audit_opensearch_dispatcher_group";
+ /** Property prefix for OpenSearch destination config. */
+ private static final String ES_DEST_PREFIX =
+ "xasecure.audit.destination.elasticsearch";
+ /** Default OpenSearch index name. */
+ private static final String DEFAULT_INDEX =
+ "ranger_audits";
+ /** Sleep duration between retries on batch failure. */
+ private static final long RETRY_SLEEP_MS = 5000L;
+ /** Default OpenSearch HTTP port. */
+ private static final int DEFAULT_PORT = 9200;
+ /** Shared JSON serializer. */
+ private static final ObjectMapper OBJECT_MAPPER =
+ new ObjectMapper();
+ /** Type reference for bulk response parsing. */
+ private static final TypeReference