diff --git a/audit-server/audit-dispatcher/dispatcher-app/pom.xml b/audit-server/audit-dispatcher/dispatcher-app/pom.xml index d18d487e388..161256d3461 100644 --- a/audit-server/audit-dispatcher/dispatcher-app/pom.xml +++ b/audit-server/audit-dispatcher/dispatcher-app/pom.xml @@ -85,6 +85,12 @@ ${project.version} provided + + org.apache.ranger + audit-dispatcher-opensearch + ${project.version} + provided + org.apache.ranger audit-dispatcher-solr diff --git a/audit-server/audit-dispatcher/dispatcher-common/pom.xml b/audit-server/audit-dispatcher/dispatcher-common/pom.xml index 3b717af732e..2bba5201021 100644 --- a/audit-server/audit-dispatcher/dispatcher-common/pom.xml +++ b/audit-server/audit-dispatcher/dispatcher-common/pom.xml @@ -147,6 +147,20 @@ spring-context ${springframework.version} + + + + org.junit.jupiter + junit-jupiter + ${junit.jupiter.version} + test + + + org.mockito + mockito-core + ${mockito.version} + test + diff --git a/audit-server/audit-dispatcher/dispatcher-common/src/main/java/org/apache/ranger/audit/dispatcher/AuditEventDocMapper.java b/audit-server/audit-dispatcher/dispatcher-common/src/main/java/org/apache/ranger/audit/dispatcher/AuditEventDocMapper.java new file mode 100644 index 00000000000..9e1eb9385ce --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-common/src/main/java/org/apache/ranger/audit/dispatcher/AuditEventDocMapper.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.dispatcher; + +import org.apache.ranger.audit.model.AuthzAuditEvent; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; + +/** + * Maps {@link AuthzAuditEvent} to an OpenSearch document. + */ +public final class AuditEventDocMapper { + /** Thread-safe ISO-8601 UTC date formatter. */ + private static final ThreadLocal DATE_FORMAT = + ThreadLocal.withInitial(() -> { + SimpleDateFormat format = new SimpleDateFormat( + "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + format.setTimeZone(TimeZone.getTimeZone("UTC")); + return format; + }); + + private AuditEventDocMapper() { + } + + /** + * Converts an audit event to a document map. + * + * @param auditEvent the audit event to convert + * @return map of field names to values + */ + public static Map toDoc( + final AuthzAuditEvent auditEvent) { + Map doc = new HashMap<>(); + + doc.put("id", auditEvent.getEventId()); + doc.put("access", auditEvent.getAccessType()); + doc.put("enforcer", auditEvent.getAclEnforcer()); + doc.put("agent", auditEvent.getAgentId()); + doc.put("repo", auditEvent.getRepositoryName()); + doc.put("sess", auditEvent.getSessionId()); + doc.put("reqUser", auditEvent.getUser()); + doc.put("reqData", auditEvent.getRequestData()); + doc.put("resource", auditEvent.getResourcePath()); + doc.put("cliIP", auditEvent.getClientIP()); + doc.put("cliType", auditEvent.getClientType()); + doc.put("logType", auditEvent.getLogType()); + doc.put("result", auditEvent.getAccessResult()); + doc.put("policy", auditEvent.getPolicyId()); + doc.put("repoType", auditEvent.getRepositoryType()); + doc.put("resType", auditEvent.getResourceType()); + doc.put("reason", auditEvent.getResultReason()); + doc.put("action", auditEvent.getAction()); + + Date eventTime = auditEvent.getEventTime(); + doc.put("evtTime", eventTime != null + ? DATE_FORMAT.get().format(eventTime) : null); + + doc.put("seq_num", auditEvent.getSeqNum()); + doc.put("event_count", auditEvent.getEventCount()); + doc.put("event_dur_ms", + auditEvent.getEventDurationMS()); + doc.put("tags", auditEvent.getTags()); + doc.put("datasets", auditEvent.getDatasets()); + doc.put("projects", auditEvent.getProjects()); + doc.put("datasetIds", auditEvent.getDatasetIds()); + doc.put("cluster", auditEvent.getClusterName()); + doc.put("zoneName", auditEvent.getZoneName()); + doc.put("agentHost", auditEvent.getAgentHostname()); + doc.put("policyVersion", + auditEvent.getPolicyVersion()); + doc.put("additionalInfo", + auditEvent.getAdditionalInfo()); + + return doc; + } +} diff --git a/audit-server/audit-dispatcher/dispatcher-common/src/test/java/org/apache/ranger/audit/dispatcher/TestAuditEventDocMapper.java b/audit-server/audit-dispatcher/dispatcher-common/src/test/java/org/apache/ranger/audit/dispatcher/TestAuditEventDocMapper.java new file mode 100644 index 00000000000..6073c86cd26 --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-common/src/test/java/org/apache/ranger/audit/dispatcher/TestAuditEventDocMapper.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.dispatcher; + +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.junit.jupiter.api.Test; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class TestAuditEventDocMapper { + + @Test + void toDoc_mapsAllFields() { + Date eventTime = new Date(1700000000000L); + + AuthzAuditEvent event = new AuthzAuditEvent( + 1, "dev_hdfs", "testuser", eventTime, "read", + "/tmp/test", "path", "read", (short) 1, "hdfs-agent", + 42L, "allowed by policy", "ranger-acl", "sess-123", + "hive", "192.168.1.1", "/tmp/test", "cl1", "zone1", 5L + ); + event.setEventId("evt-001"); + event.setSeqNum(10L); + event.setEventCount(1L); + event.setEventDurationMS(50L); + event.setAgentHostname("host1.example.com"); + + Set tags = new HashSet<>(); + tags.add("PII"); + event.setTags(tags); + + Map doc = AuditEventDocMapper.toDoc(event); + + assertNotNull(doc); + assertEquals("evt-001", doc.get("id")); + assertEquals("read", doc.get("access")); + assertEquals("ranger-acl", doc.get("enforcer")); + assertEquals("hdfs-agent", doc.get("agent")); + assertEquals("dev_hdfs", doc.get("repo")); + assertEquals("sess-123", doc.get("sess")); + assertEquals("testuser", doc.get("reqUser")); + assertEquals("/tmp/test", doc.get("reqData")); + assertEquals("/tmp/test", doc.get("resource")); + assertEquals("192.168.1.1", doc.get("cliIP")); + assertEquals("hive", doc.get("cliType")); + assertEquals((short) 1, doc.get("result")); + assertEquals(42L, doc.get("policy")); + assertEquals(1, doc.get("repoType")); + assertEquals("path", doc.get("resType")); + assertEquals("allowed by policy", doc.get("reason")); + assertEquals("read", doc.get("action")); + assertEquals(10L, doc.get("seq_num")); + assertEquals(1L, doc.get("event_count")); + assertEquals(50L, doc.get("event_dur_ms")); + assertEquals("cl1", doc.get("cluster")); + assertEquals("zone1", doc.get("zoneName")); + assertEquals("host1.example.com", doc.get("agentHost")); + assertEquals(5L, doc.get("policyVersion")); + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + sdf.setTimeZone(TimeZone.getTimeZone("UTC")); + assertEquals(sdf.format(eventTime), doc.get("evtTime")); + } + + @Test + void toDoc_handlesNullEventTime() { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setEventId("evt-002"); + event.setEventTime(null); + + Map doc = AuditEventDocMapper.toDoc(event); + + assertNotNull(doc); + assertEquals("evt-002", doc.get("id")); + assertNull(doc.get("evtTime")); + } + + @Test + void toDoc_handlesNullFields() { + AuthzAuditEvent event = new AuthzAuditEvent(); + + Map doc = AuditEventDocMapper.toDoc(event); + + assertNotNull(doc); + assertNull(doc.get("id")); + assertNull(doc.get("access")); + assertNull(doc.get("enforcer")); + assertNull(doc.get("agent")); + assertNull(doc.get("repo")); + assertNull(doc.get("reqUser")); + assertNull(doc.get("resource")); + assertNull(doc.get("cliIP")); + assertEquals(0, doc.get("repoType")); + assertEquals(0L, doc.get("policy")); + } + + @Test + void toDoc_mapsDatasets() { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setEventId("evt-003"); + + Set datasets = new HashSet<>(); + datasets.add("dataset1"); + datasets.add("dataset2"); + event.setDatasets(datasets); + + Set projects = new HashSet<>(); + projects.add("project1"); + event.setProjects(projects); + + Set datasetIds = new HashSet<>(); + datasetIds.add(100L); + datasetIds.add(200L); + event.setDatasetIds(datasetIds); + + Map doc = AuditEventDocMapper.toDoc(event); + + assertEquals(datasets, doc.get("datasets")); + assertEquals(projects, doc.get("projects")); + assertEquals(datasetIds, doc.get("datasetIds")); + } +} diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/pom.xml b/audit-server/audit-dispatcher/dispatcher-opensearch/pom.xml new file mode 100644 index 00000000000..715c4dafb1c --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/pom.xml @@ -0,0 +1,210 @@ + + + + 4.0.0 + + + org.apache.ranger + ranger + 3.0.0-SNAPSHOT + ../../.. + + + audit-dispatcher-opensearch + jar + Ranger Audit Dispatcher OpenSearch + Kafka dispatcher service for indexing audits into OpenSearch/ElasticSearch + + + UTF-8 + + + + + + com.fasterxml.jackson.core + jackson-databind + ${fasterxml.jackson.version} + + + + org.apache.commons + commons-lang3 + ${commons.lang3.version} + + + + + org.apache.httpcomponents + httpasyncclient + ${httpcomponents.httpasyncclient.version} + + + commons-logging + * + + + + + org.apache.httpcomponents + httpclient + ${httpcomponents.httpclient.version} + + + commons-logging + * + + + + + org.apache.httpcomponents + httpcore + ${httpcomponents.httpcore.version} + + + org.apache.httpcomponents + httpcore-nio + ${httpcomponents.httpcore.version} + + + + + org.apache.ranger + ranger-audit-dest-es + ${project.version} + + + org.apache.hadoop + hadoop-client-api + + + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + provided + + + log4j + * + + + org.slf4j + * + + + + + org.apache.ranger + ranger-audit-core + ${project.version} + provided + + + org.apache.hadoop + hadoop-client-api + + + + + org.apache.ranger + ranger-audit-dispatcher-common + ${project.version} + provided + + + org.apache.hadoop + hadoop-client-api + + + + + org.apache.ranger + ranger-audit-server-common + ${project.version} + provided + + + org.slf4j + slf4j-api + ${slf4j.version} + provided + + + + + org.junit.jupiter + junit-jupiter + ${junit.jupiter.version} + test + + + org.mockito + mockito-core + ${mockito.version} + test + + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test + + + + + audit-dispatcher-opensearch-${project.version} + + + true + src/main/resources + + + + + org.apache.maven.plugins + maven-pmd-plugin + + + ${project.parent.basedir}/dev-support/ranger-pmd-ruleset.xml + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + + copy-dependencies + + package + + ${project.build.directory}/lib + runtime + + + + + + + diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/OpenSearchDispatcherManager.java b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/OpenSearchDispatcherManager.java new file mode 100644 index 00000000000..e292e489b03 --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/OpenSearchDispatcherManager.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.dispatcher; + +import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcher; +import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcherTracker; +import org.apache.ranger.audit.provider.MiscUtil; +import org.apache.ranger.audit.server.AuditServerConstants; +import org.apache.ranger.audit.utils.AuditServerLogFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * Manages the lifecycle of the OpenSearch dispatcher. + */ +public final class OpenSearchDispatcherManager { + /** Class logger. */ + private static final Logger LOG = + LoggerFactory.getLogger( + OpenSearchDispatcherManager.class); + /** System property for dispatcher type selection. */ + private static final String CONFIG_DISPATCHER_TYPE = + AuditServerConstants.PROP_DISPATCHER_TYPE; + /** Dispatcher type identifier. */ + private static final String TYPE_OPENSEARCH = + "opensearch"; + /** Property controlling OpenSearch destination. */ + private static final String ES_DEST_PROP = + "xasecure.audit.destination.elasticsearch"; + /** Maximum initialization retry attempts. */ + private static final int MAX_INIT_ATTEMPTS = 5; + /** Base delay between initialization retries. */ + private static final long INIT_RETRY_MS = 5000L; + /** Maximum wait for dispatcher thread shutdown. */ + private static final long SHUTDOWN_WAIT_MS = 10000L; + + /** Tracks active dispatchers for health reporting. */ + private final AuditDispatcherTracker tracker = + AuditDispatcherTracker.getInstance(); + /** The active OpenSearch dispatcher instance. */ + private AuditDispatcher dispatcher; + /** Thread running the dispatcher's consume loop. */ + private Thread dispatcherThread; + + /** + * Initializes the OpenSearch dispatcher from properties. + * + * @param props configuration properties + */ + public void init(final Properties props) { + LOG.info("==> OpenSearchDispatcherManager.init()"); + + String dispatcherType = + System.getProperty(CONFIG_DISPATCHER_TYPE); + if (dispatcherType != null + && !dispatcherType.equalsIgnoreCase( + TYPE_OPENSEARCH)) { + LOG.info("Skipping OpenSearchDispatcherManager" + + " initialization since dispatcher" + + " type is {}", dispatcherType); + return; + } + + try { + if (props == null) { + LOG.error("Configuration properties are null"); + throw new RuntimeException( + "Failed to load configuration"); + } + + boolean isEnabled = MiscUtil.getBooleanProperty( + props, ES_DEST_PROP, false); + if (!isEnabled) { + String clsName = MiscUtil.getStringProperty( + props, + AuditServerConstants.PROP_DISPATCHER_CLASS); + if (clsName != null && clsName.contains( + "AuditOpenSearchDispatcher")) { + isEnabled = true; + } + } + + if (!isEnabled) { + LOG.warn("OpenSearch destination is disabled" + + " ({}=false). No dispatchers" + + " will be created.", ES_DEST_PROP); + return; + } + + initializeDispatcher(props, + AuditServerConstants.PROP_DISPATCHER_PREFIX); + + if (dispatcher == null) { + throw new RuntimeException( + "No OpenSearch dispatcher was created." + + " Verify that " + ES_DEST_PROP + "=true" + + " and classes are configured" + + " correctly."); + } else { + LOG.info("Created OpenSearch dispatcher"); + + Runtime.getRuntime().addShutdownHook( + new Thread(() -> { + LOG.info("JVM shutdown detected," + + " stopping" + + " OpenSearchDispatcherManager"); + shutdown(); + }, "OpenSearchDispatcher-ShutdownHook")); + + startDispatcher(); + } + } catch (Exception e) { + LOG.error("Failed to initialize" + + " OpenSearchDispatcherManager", e); + throw new RuntimeException( + "Failed to initialize" + + " OpenSearchDispatcherManager", e); + } + + LOG.info( + "<== OpenSearchDispatcherManager.init()"); + } + + private void initializeDispatcher( + final Properties props, + final String propPrefix) { + LOG.info("==> OpenSearchDispatcherManager" + + ".initializeDispatcher()"); + + String clsStr = MiscUtil.getStringProperty( + props, + AuditServerConstants.PROP_DISPATCHER_CLASS, + "org.apache.ranger.audit.dispatcher" + + ".kafka.AuditOpenSearchDispatcher"); + + String className = clsStr.split(",")[0].trim(); + if (className.isEmpty()) { + LOG.error("Dispatcher class name is empty"); + return; + } + + long retryDelay = INIT_RETRY_MS; + + for (int attempt = 1; + attempt <= MAX_INIT_ATTEMPTS; attempt++) { + try { + Class cls = + Class.forName(className); + dispatcher = (AuditDispatcher) + cls.getConstructor( + Properties.class, String.class) + .newInstance(props, propPrefix); + tracker.addActiveDispatcher( + TYPE_OPENSEARCH, dispatcher); + LOG.info("Successfully initialized" + + " dispatcher class: {}", + cls.getName()); + break; + } catch (ClassNotFoundException e) { + LOG.error("Dispatcher class not found: {}." + + " Ensure the class is on the" + + " classpath.", className, e); + break; + } catch (Exception e) { + if (attempt < MAX_INIT_ATTEMPTS) { + LOG.warn("Dispatcher init attempt" + + " {}/{} failed, retrying" + + " in {}ms...", + attempt, MAX_INIT_ATTEMPTS, + retryDelay, e); + try { + Thread.sleep(retryDelay); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + retryDelay *= 2; + } else { + LOG.error("Error initializing" + + " dispatcher class after {}" + + " attempts: {}", + MAX_INIT_ATTEMPTS, className, e); + } + } + } + + LOG.info("<== OpenSearchDispatcherManager" + + ".initializeDispatcher()"); + } + + private void startDispatcher() { + LOG.info("==> OpenSearchDispatcherManager" + + ".startDispatcher()"); + + logStartupBanner(); + + if (dispatcher != null) { + try { + String name = + dispatcher.getClass().getSimpleName(); + dispatcherThread = + new Thread(dispatcher, name); + dispatcherThread.setDaemon(true); + dispatcherThread.start(); + + LOG.info("Started {} thread" + + " [Thread-ID: {}," + + " Thread-Name: '{}']", + name, + dispatcherThread.getId(), + dispatcherThread.getName()); + } catch (Exception e) { + LOG.error("Error starting dispatcher: {}", + dispatcher.getClass().getSimpleName(), + e); + } + } + + LOG.info("<== OpenSearchDispatcherManager" + + ".startDispatcher()"); + } + + private void logStartupBanner() { + LOG.info("########## OPENSEARCH DISPATCHER" + + " SERVICE STARTUP ##########"); + + if (dispatcher == null) { + LOG.warn("WARNING: No OpenSearch dispatchers" + + " are enabled!"); + LOG.warn("Verify: {}=true in configuration", + ES_DEST_PROP); + } else { + AuditServerLogFormatter.LogBuilder builder = + AuditServerLogFormatter.builder( + "OpenSearch Dispatcher Status"); + String type = + dispatcher.getClass().getSimpleName(); + builder.add(type, "ENABLED"); + builder.add("Topic", dispatcher.getTopicName()); + builder.logInfo(LOG); + + LOG.info("Starting OpenSearch dispatcher" + + " thread..."); + } + LOG.info("####################################" + + "############################"); + } + + /** Shuts down the dispatcher and waits for thread exit. */ + public void shutdown() { + LOG.info("==> OpenSearchDispatcherManager" + + ".shutdown()"); + + if (dispatcher != null) { + try { + LOG.info("Shutting down dispatcher: {}", + dispatcher.getClass().getSimpleName()); + dispatcher.shutdown(); + LOG.info("Dispatcher shutdown completed: {}", + dispatcher.getClass().getSimpleName()); + } catch (Exception e) { + LOG.error( + "Error shutting down dispatcher: {}", + dispatcher.getClass().getSimpleName(), + e); + } + } + + if (dispatcherThread != null + && dispatcherThread.isAlive()) { + try { + LOG.info("Waiting for thread" + + " to terminate: {}", + dispatcherThread.getName()); + dispatcherThread.join(SHUTDOWN_WAIT_MS); + if (dispatcherThread.isAlive()) { + LOG.warn("Thread did not terminate" + + " within {}ms: {}", + SHUTDOWN_WAIT_MS, + dispatcherThread.getName()); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for" + + " thread to terminate: {}", + dispatcherThread.getName(), e); + Thread.currentThread().interrupt(); + } + } + + dispatcher = null; + dispatcherThread = null; + tracker.clearActiveDispatcher(TYPE_OPENSEARCH); + + LOG.info("<== OpenSearchDispatcherManager" + + ".shutdown() - OpenSearch dispatcher" + + " stopped"); + } +} diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/kafka/AuditOpenSearchDispatcher.java b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/kafka/AuditOpenSearchDispatcher.java new file mode 100644 index 00000000000..118530a8869 --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/kafka/AuditOpenSearchDispatcher.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.dispatcher.kafka; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; +import org.apache.http.HttpStatus; +import org.apache.http.auth.AuthSchemeProvider; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.config.AuthSchemes; +import org.apache.http.config.Lookup; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.impl.auth.SPNegoSchemeFactory; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.http.util.EntityUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.ranger.audit.dispatcher.AuditEventDocMapper; +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.apache.ranger.audit.provider.MiscUtil; +import org.apache.ranger.audit.server.AuditServerConstants; +import org.apache.ranger.audit.utils.AuditServerLogFormatter; +import org.apache.ranger.authorization.credutils.CredentialsProviderUtil; +import org.apache.ranger.authorization.credutils.kerberos.KerberosCredentialsProvider; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +public class AuditOpenSearchDispatcher + extends AuditDispatcherBase { + /** Class logger. */ + private static final Logger LOG = + LoggerFactory.getLogger( + AuditOpenSearchDispatcher.class); + + /** Kafka consumer group for this dispatcher. */ + private static final String DEFAULT_GROUP = + "ranger_audit_opensearch_dispatcher_group"; + /** Property prefix for OpenSearch destination config. */ + private static final String ES_DEST_PREFIX = + "xasecure.audit.destination.elasticsearch"; + /** Default OpenSearch index name. */ + private static final String DEFAULT_INDEX = + "ranger_audits"; + /** Sleep duration between retries on batch failure. */ + private static final long RETRY_SLEEP_MS = 5000L; + /** Default OpenSearch HTTP port. */ + private static final int DEFAULT_PORT = 9200; + /** Shared JSON serializer. */ + private static final ObjectMapper OBJECT_MAPPER = + new ObjectMapper(); + /** Type reference for bulk response parsing. */ + private static final TypeReference> + MAP_TYPE = new TypeReference>() { + }; + + /** OpenSearch REST client. */ + private RestClient openSearchClient; + /** Target index for audit documents. */ + private String openSearchIndex; + + /** + * Creates and initializes the OpenSearch dispatcher. + * + * @param props configuration properties + * @param propPrefix property key prefix + * @throws Exception if initialization fails + */ + public AuditOpenSearchDispatcher( + final Properties props, + final String propPrefix) throws Exception { + super(props, propPrefix, DEFAULT_GROUP); + + init(props, propPrefix); + } + + @Override + protected final String getDispatcherName() { + return "OPENSEARCH"; + } + + @Override + protected final DispatcherWorker createDispatcherWorker( + final String workerId, + final List assignedPartitions) { + return new OpenSearchDispatcherWorker( + workerId, assignedPartitions); + } + + @Override + protected final void shutdownDestination() { + if (openSearchClient != null) { + try { + openSearchClient.close(); + } catch (Exception e) { + LOG.error( + "Error shutting down OpenSearch REST client", + e); + } + } + } + + private void init( + final Properties props, + final String propPrefix) throws Exception { + LOG.info("==> AuditOpenSearchDispatcher.init()"); + + String pfx = propPrefix + "."; + + this.openSearchIndex = MiscUtil.getStringProperty( + props, ES_DEST_PREFIX + ".index", DEFAULT_INDEX); + this.openSearchClient = createOpenSearchClient(props); + + this.dispatcherThreadCount = MiscUtil.getIntProperty( + props, + pfx + AuditServerConstants.PROP_DISPATCHER_THREAD_COUNT, + 1); + this.offsetCommitStrategy = MiscUtil.getStringProperty( + props, + pfx + AuditServerConstants + .PROP_DISPATCHER_OFFSET_COMMIT_STRATEGY, + AuditServerConstants.DEFAULT_OFFSET_COMMIT_STRATEGY); + this.offsetCommitInterval = MiscUtil.getLongProperty( + props, + pfx + AuditServerConstants + .PROP_DISPATCHER_OFFSET_COMMIT_INTERVAL, + AuditServerConstants + .DEFAULT_OFFSET_COMMIT_INTERVAL_MS); + + AuditServerLogFormatter + .builder("AuditOpenSearchDispatcher Configuration") + .add("Index", openSearchIndex) + .add("Thread Count", dispatcherThreadCount) + .add("Commit Strategy", offsetCommitStrategy) + .add("Commit Interval (ms)", + offsetCommitInterval + " (manual mode only)") + .logInfo(LOG); + + LOG.info("<== AuditOpenSearchDispatcher.init()"); + } + + /** + * Sends a batch of audit JSON strings to OpenSearch. + * + * @param audits collection of JSON audit event strings + * @throws Exception if the bulk request fails + */ + public final void processMessageBatch( + final Collection audits) throws Exception { + if (audits == null || audits.isEmpty()) { + throw new Exception( + "Failure in sending audits into OpenSearch"); + } + + StringBuilder bulkBody = new StringBuilder(); + + for (String audit : audits) { + AuthzAuditEvent auditEvent = + MiscUtil.fromJson(audit, AuthzAuditEvent.class); + String id = auditEvent.getEventId(); + Map doc = + AuditEventDocMapper.toDoc(auditEvent); + + if (id == null || id.trim().isEmpty()) { + id = UUID.randomUUID().toString(); + doc.put("id", id); + } + + Map indexProperties = + new HashMap<>(); + indexProperties.put("_index", openSearchIndex); + indexProperties.put("_id", id); + + Map indexMeta = + Collections.singletonMap("index", + indexProperties); + bulkBody.append( + OBJECT_MAPPER.writeValueAsString(indexMeta)) + .append('\n') + .append(OBJECT_MAPPER.writeValueAsString(doc)) + .append('\n'); + } + + Request request = new Request("POST", "/_bulk"); + request.setEntity(new NStringEntity( + bulkBody.toString(), + ContentType.create( + "application/x-ndjson", + StandardCharsets.UTF_8))); + + Response response = + openSearchClient.performRequest(request); + int status = + response.getStatusLine().getStatusCode(); + + if (status >= HttpStatus.SC_BAD_REQUEST) { + throw new Exception( + "OpenSearch bulk request failed with HTTP " + + status); + } + + String responseBody = response.getEntity() != null + ? EntityUtils.toString(response.getEntity()) + : "{}"; + Map responseMap = + OBJECT_MAPPER.readValue(responseBody, MAP_TYPE); + Object hasErrors = responseMap.get("errors"); + + if (Boolean.TRUE.equals(hasErrors)) { + throw new Exception( + "OpenSearch bulk request returned item errors: " + + responseBody); + } + } + + private RestClient createOpenSearchClient( + final Properties props) { + String protocol = MiscUtil.getStringProperty( + props, ES_DEST_PREFIX + ".protocol", "http"); + String urls = MiscUtil.getStringProperty( + props, ES_DEST_PREFIX + ".urls", "localhost"); + int port = MiscUtil.getIntProperty( + props, ES_DEST_PREFIX + ".port", DEFAULT_PORT); + String user = MiscUtil.getStringProperty( + props, ES_DEST_PREFIX + ".user", ""); + String password = MiscUtil.getStringProperty( + props, ES_DEST_PREFIX + ".password", ""); + + HttpHost[] hosts = + MiscUtil.toArray(urls, ",").stream() + .map(h -> new HttpHost(h, port, protocol)) + .toArray(HttpHost[]::new); + + LOG.info("Connecting to OpenSearch: {}://{}:{}/{}", + protocol, urls, port, openSearchIndex); + + RestClientBuilder builder = + RestClient.builder(hosts); + + if (isCredentialConfigured(user) + && isCredentialConfigured(password)) { + if (password.contains("keytab") + && new File(password).exists()) { + KerberosCredentialsProvider creds = + CredentialsProviderUtil + .getKerberosCredentials(user, password); + Lookup authRegistry = + RegistryBuilder.create() + .register(AuthSchemes.SPNEGO, + new SPNegoSchemeFactory()) + .build(); + builder.setHttpClientConfigCallback( + httpClientBuilder -> httpClientBuilder + .setDefaultCredentialsProvider(creds) + .setDefaultAuthSchemeRegistry(authRegistry)); + LOG.info("OpenSearch client configured with" + + " Kerberos credentials for user: {}", user); + } else { + CredentialsProvider creds = + new BasicCredentialsProvider(); + creds.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials( + user, password)); + builder.setHttpClientConfigCallback( + httpClientBuilder -> httpClientBuilder + .setDefaultCredentialsProvider(creds)); + LOG.info("OpenSearch client configured with" + + " basic auth for user: {}", user); + } + } + + return builder.build(); + } + + private boolean isCredentialConfigured(final String value) { + return StringUtils.isNotBlank(value) + && !"NONE".equalsIgnoreCase(value.trim()); + } + + private class OpenSearchDispatcherWorker + extends DispatcherWorker { + OpenSearchDispatcherWorker( + final String workerId, + final List assignedPartitions) { + super(workerId, assignedPartitions); + } + + @Override + protected void processRecordBatch( + final ConsumerRecords + records) { + List auditBatch = new ArrayList<>(); + List> + recordList = new ArrayList<>(); + + for (ConsumerRecord rec + : records) { + LOG.debug( + "Worker '{}' consumed:" + + " partition={}, key={}, offset={}", + workerId, rec.partition(), + rec.key(), rec.offset()); + + auditBatch.add(rec.value()); + recordList.add(rec); + } + + try { + if (!auditBatch.isEmpty()) { + processMessageBatch(auditBatch); + + for (ConsumerRecord rec + : recordList) { + TopicPartition tp = + new TopicPartition( + rec.topic(), + rec.partition()); + pendingOffsets.put(tp, + new OffsetAndMetadata( + rec.offset() + 1)); + messagesProcessedSinceLastCommit + .incrementAndGet(); + } + } + } catch (Exception e) { + LOG.error( + "Error processing batch in" + + " worker '{}', batch size: {}", + workerId, auditBatch.size(), e); + + if (!recordList.isEmpty()) { + seekToFirstOffsets(recordList); + } + } + } + + private void seekToFirstOffsets( + final List> + recordList) { + Map firstOffsets = + new HashMap<>(); + + for (ConsumerRecord rec + : recordList) { + TopicPartition tp = new TopicPartition( + rec.topic(), rec.partition()); + firstOffsets.putIfAbsent( + tp, rec.offset()); + } + + for (Map.Entry entry + : firstOffsets.entrySet()) { + TopicPartition tp = entry.getKey(); + Long offset = entry.getValue(); + + pendingOffsets.put(tp, + new OffsetAndMetadata(offset)); + + try { + workerDispatcher.seek(tp, offset); + } catch (Exception seekEx) { + LOG.error( + "Failed to seek to offset {}" + + " for partition {} after" + + " OpenSearch batch error", + offset, tp, seekEx); + } + } + + try { + Thread.sleep(RETRY_SLEEP_MS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf/logback.xml b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf/logback.xml new file mode 100644 index 00000000000..29b33e86ad9 --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf/logback.xml @@ -0,0 +1,55 @@ + + + + + + + + + ${LOG_DIR}/${LOG_FILE} + + + ${LOG_DIR}/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + + 100MB + 30 + 2GB + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf/ranger-audit-dispatcher-opensearch-site.xml b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf/ranger-audit-dispatcher-opensearch-site.xml new file mode 100644 index 00000000000..42b6e1843f5 --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf/ranger-audit-dispatcher-opensearch-site.xml @@ -0,0 +1,186 @@ + + + + log.dir + ${audit.dispatcher.opensearch.log.dir} + Log directory for OpenSearch dispatcher service + + + + + ranger.audit.dispatcher.host + ranger-audit-dispatcher-opensearch.rangernw + + - In Docker: Use full service name with domain (e.g., ranger-audit-server.rangernw) + - In VM: Use the actual FQDN (e.g., ranger.example.com) + + + + + ranger.audit.dispatcher.service.kerberos.principal + rangerauditserver/_HOST@EXAMPLE.COM + + rangerauditserver user kerberos principal for authentication into kafka + + + + + ranger.audit.dispatcher.service.kerberos.keytab + /etc/keytabs/rangerauditserver.keytab + + keytab of the rangerauditserver principal + + + + + + ranger.audit.dispatcher.kafka.bootstrap.servers + ranger-kafka.rangernw:9092 + + + + ranger.audit.dispatcher.kafka.topic.name + ranger_audits + + + + ranger.audit.dispatcher.kafka.security.protocol + SASL_PLAINTEXT + + + + ranger.audit.dispatcher.kafka.sasl.mechanism + GSSAPI + + + + + ranger.audit.dispatcher.thread.count + 5 + Number of OpenSearch dispatcher worker threads (higher for indexing throughput) + + + + + ranger.audit.dispatcher.offset.commit.strategy + batch + batch or manual + + + + ranger.audit.dispatcher.offset.commit.interval.ms + 30000 + Used only if strategy is 'manual' + + + + ranger.audit.dispatcher.max.poll.records + 500 + Maximum records per poll for batch processing + + + + + ranger.audit.dispatcher.class + org.apache.ranger.audit.dispatcher.kafka.AuditOpenSearchDispatcher + + + + + xasecure.audit.destination.elasticsearch + true + Enable the Elasticsearch-compatible audit destination used for OpenSearch writes + + + + xasecure.audit.destination.elasticsearch.urls + ranger-opensearch + + OpenSearch host + + + + + xasecure.audit.destination.elasticsearch.port + 9200 + + + + xasecure.audit.destination.elasticsearch.protocol + http + + + + xasecure.audit.destination.elasticsearch.index + ranger_audits + + + + xasecure.audit.destination.elasticsearch.user + + + Username for OpenSearch Basic Auth, or Kerberos principal when password is a keytab path. Use NONE or empty for unauthenticated OpenSearch. + + + + + xasecure.audit.destination.elasticsearch.password + + + Password for OpenSearch Basic Auth, or path to keytab for Kerberos. Use NONE or empty for unauthenticated OpenSearch. + + + + + + ranger.audit.dispatcher.type + opensearch + + + + ranger.audit.dispatcher.war.file + ranger-audit-dispatcher.war + + + + ranger.audit.dispatcher.launcher.class + org.apache.ranger.audit.dispatcher.AuditDispatcherLauncher + + + + ranger.audit.dispatcher.main.class + org.apache.ranger.audit.dispatcher.AuditDispatcherApplication + + + + ranger.audit.dispatcher.http.port + 7090 + + + + ranger.audit.dispatcher.contextName + / + + + + ranger.audit.dispatcher.kafka.group.id + ranger_audit_opensearch_dispatcher_group + + + + ranger.audit.dispatcher.opensearch.class + org.apache.ranger.audit.dispatcher.OpenSearchDispatcherManager + + \ No newline at end of file diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/test/java/org/apache/ranger/audit/dispatcher/TestOpenSearchDispatcherManager.java b/audit-server/audit-dispatcher/dispatcher-opensearch/src/test/java/org/apache/ranger/audit/dispatcher/TestOpenSearchDispatcherManager.java new file mode 100644 index 00000000000..9257e65e1d3 --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/test/java/org/apache/ranger/audit/dispatcher/TestOpenSearchDispatcherManager.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.dispatcher; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class TestOpenSearchDispatcherManager { + + @AfterEach + void clearSystemProperty() { + System.clearProperty("ranger.audit.dispatcher.type"); + } + + @Test + void init_skipsWhenDispatcherTypeIsNotOpenSearch() { + System.setProperty("ranger.audit.dispatcher.type", "solr"); + + OpenSearchDispatcherManager manager = new OpenSearchDispatcherManager(); + Properties props = new Properties(); + + assertDoesNotThrow(() -> manager.init(props)); + } + + @Test + void init_throwsWhenPropsAreNull() { + OpenSearchDispatcherManager manager = new OpenSearchDispatcherManager(); + + assertThrows(RuntimeException.class, () -> manager.init(null)); + } + + @Test + void init_skipsWhenOpenSearchDestinationDisabled() { + OpenSearchDispatcherManager manager = new OpenSearchDispatcherManager(); + Properties props = new Properties(); + props.setProperty("xasecure.audit.destination.elasticsearch", "false"); + + assertDoesNotThrow(() -> manager.init(props)); + } + + @Test + void shutdown_handlesNullDispatcherGracefully() { + OpenSearchDispatcherManager manager = new OpenSearchDispatcherManager(); + + assertDoesNotThrow(manager::shutdown); + } + + @Test + void init_throwsWhenEnabledDispatcherClassCannotBeCreated() { + OpenSearchDispatcherManager manager = new OpenSearchDispatcherManager(); + Properties props = new Properties(); + props.setProperty("xasecure.audit.destination.elasticsearch", "true"); + props.setProperty("ranger.audit.dispatcher.class", "com.nonexistent.FakeDispatcher"); + props.setProperty("ranger.audit.dispatcher.kafka.bootstrap.servers", "localhost:9092"); + props.setProperty("ranger.audit.dispatcher.kafka.topic", "ranger_audits"); + + assertThrows(RuntimeException.class, () -> manager.init(props)); + } +} diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/test/java/org/apache/ranger/audit/dispatcher/kafka/TestAuditOpenSearchDispatcher.java b/audit-server/audit-dispatcher/dispatcher-opensearch/src/test/java/org/apache/ranger/audit/dispatcher/kafka/TestAuditOpenSearchDispatcher.java new file mode 100644 index 00000000000..846d59db033 --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/test/java/org/apache/ranger/audit/dispatcher/kafka/TestAuditOpenSearchDispatcher.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.dispatcher.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpEntity; +import org.apache.http.StatusLine; +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Date; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TestAuditOpenSearchDispatcher { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Mock + private RestClient mockRestClient; + + @Mock + private Response mockResponse; + + @Mock + private StatusLine mockStatusLine; + + private AuditOpenSearchDispatcher dispatcher; + + @BeforeEach + void setUp() throws Exception { + Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + sun.misc.Unsafe unsafe = (sun.misc.Unsafe) unsafeField.get(null); + dispatcher = (AuditOpenSearchDispatcher) unsafe.allocateInstance(AuditOpenSearchDispatcher.class); + + setField(dispatcher, "openSearchClient", mockRestClient); + setField(dispatcher, "openSearchIndex", "ranger_audits"); + } + + @Test + void processMessageBatch_sendsBulkRequestWithDocumentId() throws Exception { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setEventId("test-id-001"); + event.setAccessType("read"); + event.setUser("testuser"); + event.setRepositoryName("dev_hdfs"); + event.setEventTime(new Date(1700000000000L)); + + String auditJson = MAPPER.writeValueAsString(event); + setupSuccessResponse(); + + dispatcher.processMessageBatch(Collections.singletonList(auditJson)); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class); + verify(mockRestClient).performRequest(requestCaptor.capture()); + + Request request = requestCaptor.getValue(); + assertEquals("POST", request.getMethod()); + assertEquals("/_bulk", request.getEndpoint()); + + String[] lines = requestBody(request).split("\n"); + Map indexMeta = indexMetadata(lines[0]); + Map doc = MAPPER.readValue(lines[1], Map.class); + + assertEquals("ranger_audits", indexMeta.get("_index")); + assertEquals("test-id-001", indexMeta.get("_id")); + assertEquals("test-id-001", doc.get("id")); + assertEquals("read", doc.get("access")); + assertEquals("testuser", doc.get("reqUser")); + assertEquals("dev_hdfs", doc.get("repo")); + } + + @Test + void processMessageBatch_generatesDocumentIdWhenAuditIdMissing() throws Exception { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setAccessType("read"); + event.setUser("testuser"); + + String auditJson = MAPPER.writeValueAsString(event); + setupSuccessResponse(); + + dispatcher.processMessageBatch(Collections.singletonList(auditJson)); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class); + verify(mockRestClient).performRequest(requestCaptor.capture()); + + String[] lines = requestBody(requestCaptor.getValue()).split("\n"); + Map indexMeta = indexMetadata(lines[0]); + Map doc = MAPPER.readValue(lines[1], Map.class); + Object generatedId = indexMeta.get("_id"); + + assertNotNull(generatedId); + assertFalse(generatedId.toString().trim().isEmpty()); + assertEquals(generatedId, doc.get("id")); + } + + @Test + void processMessageBatch_throwsOnNullInput() { + assertThrows(Exception.class, () -> dispatcher.processMessageBatch(null)); + } + + @Test + void processMessageBatch_throwsOnEmptyInput() { + assertThrows(Exception.class, () -> dispatcher.processMessageBatch(Collections.emptyList())); + } + + @Test + void processMessageBatch_throwsOnHttpError() throws Exception { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setEventId("test-id-002"); + event.setUser("testuser"); + + String auditJson = MAPPER.writeValueAsString(event); + + when(mockRestClient.performRequest(any(Request.class))).thenReturn(mockResponse); + when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); + when(mockStatusLine.getStatusCode()).thenReturn(500); + + assertThrows(Exception.class, () -> dispatcher.processMessageBatch(Collections.singletonList(auditJson))); + } + + @Test + void processMessageBatch_throwsOnBulkItemErrors() throws Exception { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setEventId("test-id-003"); + event.setUser("testuser"); + + String auditJson = MAPPER.writeValueAsString(event); + String errorResponse = "{\"errors\":true,\"items\":[{\"index\":{\"status\":400,\"error\":\"mapping error\"}}]}"; + HttpEntity entity = mock(HttpEntity.class); + when(entity.getContent()).thenReturn(new ByteArrayInputStream(errorResponse.getBytes(StandardCharsets.UTF_8))); + when(entity.getContentLength()).thenReturn((long) errorResponse.length()); + + when(mockRestClient.performRequest(any(Request.class))).thenReturn(mockResponse); + when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); + when(mockStatusLine.getStatusCode()).thenReturn(200); + when(mockResponse.getEntity()).thenReturn(entity); + + assertThrows(Exception.class, () -> dispatcher.processMessageBatch(Collections.singletonList(auditJson))); + } + + private void setupSuccessResponse() throws Exception { + String successBody = "{\"errors\":false,\"items\":[]}"; + HttpEntity entity = mock(HttpEntity.class); + when(entity.getContent()).thenReturn(new ByteArrayInputStream(successBody.getBytes(StandardCharsets.UTF_8))); + when(entity.getContentLength()).thenReturn((long) successBody.length()); + + when(mockRestClient.performRequest(any(Request.class))).thenReturn(mockResponse); + when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); + when(mockStatusLine.getStatusCode()).thenReturn(200); + when(mockResponse.getEntity()).thenReturn(entity); + } + + private String requestBody(Request request) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + request.getEntity().writeTo(out); + + return out.toString(StandardCharsets.UTF_8.name()); + } + + @SuppressWarnings("unchecked") + private Map indexMetadata(String line) throws Exception { + Map meta = MAPPER.readValue(line, Map.class); + + return (Map) meta.get("index"); + } + + private void setField(Object target, String fieldName, Object value) throws Exception { + Class clazz = target.getClass(); + while (clazz != null) { + try { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + return; + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } + } + throw new NoSuchFieldException(fieldName + " not found in class hierarchy"); + } +} diff --git a/audit-server/audit-dispatcher/pom.xml b/audit-server/audit-dispatcher/pom.xml index a306cc1674e..83a2f3d3b6c 100644 --- a/audit-server/audit-dispatcher/pom.xml +++ b/audit-server/audit-dispatcher/pom.xml @@ -34,6 +34,7 @@ dispatcher-app dispatcher-common dispatcher-hdfs + dispatcher-opensearch dispatcher-solr diff --git a/audit-server/audit-dispatcher/scripts/start-audit-dispatcher.sh b/audit-server/audit-dispatcher/scripts/start-audit-dispatcher.sh index ad72e71b73a..ae7100d40ed 100755 --- a/audit-server/audit-dispatcher/scripts/start-audit-dispatcher.sh +++ b/audit-server/audit-dispatcher/scripts/start-audit-dispatcher.sh @@ -36,6 +36,10 @@ if [ -z "${DISPATCHER_TYPE}" ]; then fi CONF_FILE="${AUDIT_DISPATCHER_CONF_DIR}/ranger-audit-dispatcher-${DISPATCHER_TYPE}-site.xml" +LOGBACK_CONFIG_FILE="${AUDIT_DISPATCHER_CONF_DIR}/logback.xml" +if [ -n "${DISPATCHER_TYPE}" ] && [ -f "${AUDIT_DISPATCHER_CONF_DIR}/logback-${DISPATCHER_TYPE}.xml" ]; then + LOGBACK_CONFIG_FILE="${AUDIT_DISPATCHER_CONF_DIR}/logback-${DISPATCHER_TYPE}.xml" +fi # Set default heap size if not set if [ -z "${AUDIT_DISPATCHER_HEAP}" ]; then @@ -44,7 +48,7 @@ fi # Set default Java options if not set if [ -z "${AUDIT_DISPATCHER_OPTS}" ]; then - AUDIT_DISPATCHER_OPTS="-Dlogback.configurationFile=${AUDIT_DISPATCHER_CONF_DIR}/logback.xml \ + AUDIT_DISPATCHER_OPTS="-Dlogback.configurationFile=${LOGBACK_CONFIG_FILE} \ -Daudit.dispatcher.log.dir=${AUDIT_DISPATCHER_LOG_DIR} \ -Daudit.dispatcher.log.file=ranger-audit-dispatcher.log \ -Djava.net.preferIPv4Stack=true \ diff --git a/dev-support/ranger-docker/Dockerfile.ranger-kdc b/dev-support/ranger-docker/Dockerfile.ranger-kdc index f3ce89d182f..a81119bfffd 100644 --- a/dev-support/ranger-docker/Dockerfile.ranger-kdc +++ b/dev-support/ranger-docker/Dockerfile.ranger-kdc @@ -36,10 +36,13 @@ COPY ./scripts/kdc/kdc.conf /etc/krb5kdc/kdc.conf COPY ./scripts/kdc/kadm5.acl /etc/krb5kdc/kadm5.acl COPY ./scripts/kdc/entrypoint.sh /entrypoint.sh -RUN chmod +x /entrypoint.sh +RUN chmod +x /entrypoint.sh && apt-get update && apt-get install -y netcat-openbsd && rm -rf /var/lib/apt/lists/* VOLUME /etc/keytabs EXPOSE 88/tcp 88/udp 749/tcp +HEALTHCHECK --interval=5s --timeout=3s --start-period=120s --retries=30 \ + CMD test -f /etc/keytabs/.provisioned && echo | nc -w 1 localhost 88 || exit 1 + ENTRYPOINT ["/entrypoint.sh"] diff --git a/dev-support/ranger-docker/README.md b/dev-support/ranger-docker/README.md index 3f8090d6eb6..b8b7fff684b 100644 --- a/dev-support/ranger-docker/README.md +++ b/dev-support/ranger-docker/README.md @@ -106,6 +106,79 @@ docker compose -f docker-compose.ranger.yml -f docker-compose.ranger-trino.yml u ~~~ docker compose -f docker-compose.ranger.yml -f docker-compose.ranger-opensearch.yml up -d ~~~ + +#### OpenSearch audit flow (replace Solr for access audits) + +OpenSearch can replace Solr for **audit storage and UI queries**. Ranger Admin reads audits via +`audit_store=elasticsearch` (compatible with OpenSearch 1.x using the ES 7 REST client). + +**Write path:** access audits flow through audit-server ingestor, Kafka, and the Java +`ranger-audit-dispatcher-opensearch` service into the OpenSearch `ranger_audits` index. +Ranger Admin policy/admin transaction audits remain DB-backed; this is the same boundary +used by the Solr audit path. + +##### Quick start + +~~~ +# Prerequisites: build the audit-dispatcher tarball and download archives +mvn clean package -DskipTests -pl distro -am +cp target/ranger-*-audit-dispatcher.tar.gz dev-support/ranger-docker/dist/ +cd dev-support/ranger-docker +./download-archives.sh kafka opensearch hadoop + +# Run the E2E test (starts stack, tests, tears down automatically) +./scripts/audit/e2e-audit-opensearch.sh + +# Or keep the stack running after the test for debugging +./scripts/audit/e2e-audit-opensearch.sh --no-teardown + +# Re-run just the test against an already-running stack +./scripts/audit/e2e-audit-opensearch.sh --skip-start +~~~ + +##### Manual setup (advanced) + +For finer control, the individual steps can be run manually: + +~~~ +export RANGER_DB_TYPE=postgres + +# 1. Start core stack (Ranger Admin, Kafka, Hadoop) +docker compose -f docker-compose.ranger.yml -f docker-compose.ranger-kafka.yml \ + -f docker-compose.ranger-hadoop.yml up -d ranger ranger-kafka ranger-hadoop + +# 2. Pre-create Kafka topic (avoids auto-create with wrong partition count) +./scripts/kafka/create-ranger-audit-topic.sh + +# 3. Start audit ingestor +docker compose -f docker-compose.ranger.yml -f docker-compose.ranger-kafka.yml \ + -f docker-compose.ranger-audit-server.yml -f docker-compose.ranger-hadoop.yml \ + up -d ranger-audit-ingestor + +# 4. Start OpenSearch +docker compose -f docker-compose.ranger.yml -f docker-compose.ranger-opensearch.yml \ + up -d ranger-opensearch + +# 5. Create ranger_audits index (wait for OpenSearch to be ready first) +./scripts/opensearch/create-ranger-audit-index.sh + +# 6. Build and start the OpenSearch dispatcher +docker compose -f docker-compose.ranger.yml -f docker-compose.ranger-opensearch.yml \ + -f docker-compose.ranger-kafka.yml \ + -f docker-compose.ranger-audit-dispatcher-opensearch.yml up -d --build + +# 7. (Optional) Stop Solr dispatcher to avoid duplicate indexing +docker stop ranger-audit-dispatcher-solr 2>/dev/null || true + +# 8. Run end-to-end audit test (--skip-start since stack is already up) +./scripts/audit/e2e-audit-opensearch.sh --skip-start +~~~ + +For **fresh Ranger installs** using OpenSearch for audits, mount +`scripts/admin/ranger-admin-install-postgres-opensearch.properties` as install.properties. + +For **existing Solr-based installs**, switching stores requires updating `ranger.audit.source.type` +and elasticsearch settings in `ranger-admin-site.xml`, then restarting Ranger Admin. Similarly, check the `depends` section of the `docker-compose.ranger-service.yaml` file and add docker-compose files for these services when trying to bring up the `service` container. #### Bring up all containers diff --git a/dev-support/ranger-docker/docker-compose.ranger-audit-dispatcher-opensearch.yml b/dev-support/ranger-docker/docker-compose.ranger-audit-dispatcher-opensearch.yml new file mode 100644 index 00000000000..b99a03c6d93 --- /dev/null +++ b/dev-support/ranger-docker/docker-compose.ranger-audit-dispatcher-opensearch.yml @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file to You +# under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +services: + ranger-audit-dispatcher-opensearch: + build: + context: . + dockerfile: Dockerfile.ranger-audit-dispatcher + args: + - RANGER_BASE_IMAGE=${RANGER_BASE_IMAGE} + - RANGER_BASE_VERSION=${RANGER_BASE_VERSION} + - RANGER_VERSION=${RANGER_VERSION} + - KERBEROS_ENABLED=${KERBEROS_ENABLED} + image: ranger-audit-dispatcher-opensearch:latest + container_name: ranger-audit-dispatcher-opensearch + hostname: ranger-audit-dispatcher-opensearch.rangernw + command: ["opensearch"] + stdin_open: true + tty: true + depends_on: + ranger-kdc: + condition: service_healthy + ranger-kafka: + condition: service_started + ranger-opensearch: + condition: service_started + ports: + - "7093:7090" + environment: + JAVA_HOME: /opt/java/openjdk + AUDIT_DISPATCHER_HOME_DIR: /opt/ranger/audit-dispatcher + AUDIT_DISPATCHER_CONF_DIR: /opt/ranger/audit-dispatcher/conf + AUDIT_DISPATCHER_LOG_DIR: /var/log/ranger/audit-dispatcher/opensearch + AUDIT_DISPATCHER_HEAP: "-Xms512m -Xmx2g" + RANGER_VERSION: ${RANGER_VERSION} + KERBEROS_ENABLED: ${KERBEROS_ENABLED} + KAFKA_BOOTSTRAP_SERVERS: ranger-kafka.rangernw:9092 + OPENSEARCH_URL: http://ranger-opensearch.rangernw:9200 + networks: + - ranger + volumes: + - ./dist/keytabs/ranger-audit-dispatcher-opensearch:/etc/keytabs + - ranger-audit-dispatcher-opensearch-logs:/var/log/ranger/audit-dispatcher/opensearch + - ./scripts/audit-dispatcher/ranger-audit-dispatcher-opensearch-site.xml:/opt/ranger/audit-dispatcher/conf/ranger-audit-dispatcher-opensearch-site.xml + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:7090/api/health/ping"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + +volumes: + ranger-audit-dispatcher-opensearch-logs: + +networks: + ranger: + name: rangernw diff --git a/dev-support/ranger-docker/docker-compose.ranger.yml b/dev-support/ranger-docker/docker-compose.ranger.yml index e37469c1098..9605e0bcf41 100644 --- a/dev-support/ranger-docker/docker-compose.ranger.yml +++ b/dev-support/ranger-docker/docker-compose.ranger.yml @@ -81,6 +81,9 @@ services: image: ranger-zk container_name: ranger-zk hostname: ranger-zk.rangernw + depends_on: + ranger-kdc: + condition: service_healthy volumes: - ./dist/keytabs/ranger-zk:/etc/keytabs - ./scripts/kdc/krb5.conf:/etc/krb5.conf:ro diff --git a/dev-support/ranger-docker/download-archives.sh b/dev-support/ranger-docker/download-archives.sh index 52220741d1b..a93476fdd55 100755 --- a/dev-support/ranger-docker/download-archives.sh +++ b/dev-support/ranger-docker/download-archives.sh @@ -22,9 +22,15 @@ # # -# source .env file to get versions to download +# Load .env file to get versions to download. Do not source it directly: +# values like JAVA_OPTS contain spaces and are valid for docker compose .env +# files, but not valid shell assignments unless quoted. # -source .env +while IFS='=' read -r key value; do + if [[ "${key}" =~ ^[A-Za-z_][A-Za-z0-9_]*$ ]]; then + export "${key}=${value}" + fi +done < .env downloadIfNotPresent() { diff --git a/dev-support/ranger-docker/scripts/admin/ranger-admin-install-postgres-opensearch.properties b/dev-support/ranger-docker/scripts/admin/ranger-admin-install-postgres-opensearch.properties new file mode 100644 index 00000000000..b284d4a60bf --- /dev/null +++ b/dev-support/ranger-docker/scripts/admin/ranger-admin-install-postgres-opensearch.properties @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# Ranger Admin install.properties — OpenSearch audit store +# +# Use this file instead of ranger-admin-install-postgres.properties when +# bootstrapping a fresh Ranger container for OpenSearch-based audits: +# +# -f docker-compose.ranger.yml with volume mount override: +# ./scripts/admin/ranger-admin-install-postgres-opensearch.properties +# + +PYTHON_COMMAND_INVOKER=python3 +RANGER_ADMIN_LOG_DIR=/var/log/ranger +RANGER_PID_DIR_PATH=/var/run/ranger +DB_FLAVOR=POSTGRES +SQL_CONNECTOR_JAR=/usr/share/java/postgresql.jar +RANGER_ADMIN_LOGBACK_CONF_FILE=/opt/ranger/admin/ews/webapp/WEB-INF/classes/conf/logback.xml + +db_root_user=postgres +db_root_password=rangerR0cks! +db_host=ranger-db + +db_name=ranger +db_user=rangeradmin +db_password=rangerR0cks! + +postgres_core_file=db/postgres/optimized/current/ranger_core_db_postgres.sql +postgres_audit_file=db/postgres/xa_audit_db_postgres.sql +mysql_core_file=db/mysql/optimized/current/ranger_core_db_mysql.sql +mysql_audit_file=db/mysql/xa_audit_db.sql + +rangerAdmin_password=rangerR0cks! +rangerTagsync_password=rangerR0cks! +rangerUsersync_password=rangerR0cks! +keyadmin_password=rangerR0cks! + +# OpenSearch replaces Solr for audit storage (Ranger uses audit_store=elasticsearch) +audit_store=elasticsearch +audit_elasticsearch_urls=ranger-opensearch +audit_elasticsearch_port=9200 +audit_elasticsearch_protocol=http +audit_elasticsearch_user=NONE +audit_elasticsearch_password=NONE +audit_elasticsearch_index=ranger_audits +audit_elasticsearch_bootstrap_enabled=true + +# Solr disabled for OpenSearch profile +# audit_store=solr +# audit_solr_urls=http://ranger-solr:8983/solr/ranger_audits + +policymgr_external_url=http://ranger-admin:6080 +policymgr_http_enabled=true + +unix_user=ranger +unix_user_pwd=ranger +unix_group=ranger + +oracle_core_file= +sqlserver_core_file= +sqlanywhere_core_file= +cred_keystore_filename= + +XAPOLICYMGR_DIR=$PWD +app_home=$PWD/ews/webapp +TMPFILE=$PWD/.fi_tmp +LOGFILE=$PWD/logfile +LOGFILES="$LOGFILE" + +JAVA_BIN='java' +JAVA_VERSION_REQUIRED='1.8' + +ranger_admin_max_heap_size=1g +PATCH_RETRY_INTERVAL=120 +STALE_PATCH_ENTRY_HOLD_TIME=10 + +hadoop_conf= +authentication_method=UNIX + +spnego_principal=HTTP/ranger.rangernw@EXAMPLE.COM +spnego_keytab=/etc/keytabs/HTTP.keytab +token_valid=30 +admin_principal=rangeradmin/ranger.rangernw@EXAMPLE.COM +admin_keytab=/etc/keytabs/rangeradmin.keytab +lookup_principal=rangerlookup/ranger.rangernw@EXAMPLE.COM +lookup_keytab=/etc/keytabs/rangerlookup.keytab diff --git a/dev-support/ranger-docker/scripts/audit-dispatcher/ranger-audit-dispatcher-opensearch-site.xml b/dev-support/ranger-docker/scripts/audit-dispatcher/ranger-audit-dispatcher-opensearch-site.xml new file mode 100644 index 00000000000..718d9b3cd67 --- /dev/null +++ b/dev-support/ranger-docker/scripts/audit-dispatcher/ranger-audit-dispatcher-opensearch-site.xml @@ -0,0 +1,173 @@ + + + + ranger.audit.dispatcher.opensearch.class + org.apache.ranger.audit.dispatcher.OpenSearchDispatcherManager + + + + log.dir + ${audit.dispatcher.opensearch.log.dir} + Log directory for OpenSearch dispatcher service + + + + ranger.audit.dispatcher.host + ranger-audit-dispatcher-opensearch.rangernw + + + + ranger.audit.dispatcher.service.kerberos.principal + rangerauditserver/_HOST@EXAMPLE.COM + + + + ranger.audit.dispatcher.service.kerberos.keytab + /etc/keytabs/rangerauditserver.keytab + + + + + ranger.audit.dispatcher.kafka.bootstrap.servers + ranger-kafka.rangernw:9092 + + + + ranger.audit.dispatcher.kafka.topic.name + ranger_audits + + + + ranger.audit.dispatcher.kafka.security.protocol + SASL_PLAINTEXT + + + + ranger.audit.dispatcher.kafka.sasl.mechanism + GSSAPI + + + + ranger.audit.dispatcher.thread.count + 1 + + + + ranger.audit.dispatcher.offset.commit.strategy + batch + + + + ranger.audit.dispatcher.offset.commit.interval.ms + 30000 + + + + ranger.audit.dispatcher.max.poll.records + 500 + + + + ranger.audit.dispatcher.class + org.apache.ranger.audit.dispatcher.kafka.AuditOpenSearchDispatcher + + + + + xasecure.audit.destination.elasticsearch + true + + + + xasecure.audit.destination.elasticsearch.urls + ranger-opensearch.rangernw + + + + xasecure.audit.destination.elasticsearch.port + 9200 + + + + xasecure.audit.destination.elasticsearch.protocol + http + + + + xasecure.audit.destination.elasticsearch.index + ranger_audits + + + + xasecure.audit.destination.elasticsearch.user + + + Username for OpenSearch Basic Auth, or Kerberos principal when password is a keytab path. + Use NONE or empty for unauthenticated OpenSearch (dev only). + Production: configure user/password or Kerberos keytab. + + + + + xasecure.audit.destination.elasticsearch.password + + + Password for OpenSearch Basic Auth, or path to keytab for Kerberos. + Use NONE or empty for unauthenticated OpenSearch (dev only). + Production: configure user/password or Kerberos keytab. + + + + + + ranger.audit.dispatcher.type + opensearch + + + + ranger.audit.dispatcher.war.file + ranger-audit-dispatcher.war + + + + ranger.audit.dispatcher.launcher.class + org.apache.ranger.audit.dispatcher.AuditDispatcherLauncher + + + + ranger.audit.dispatcher.main.class + org.apache.ranger.audit.dispatcher.AuditDispatcherApplication + + + + ranger.audit.dispatcher.http.port + 7090 + + + + ranger.audit.dispatcher.contextName + / + + + + ranger.audit.dispatcher.kafka.group.id + ranger_audit_opensearch_dispatcher_group + + diff --git a/dev-support/ranger-docker/scripts/audit/e2e-audit-opensearch.sh b/dev-support/ranger-docker/scripts/audit/e2e-audit-opensearch.sh new file mode 100755 index 00000000000..9deaa0dc6c5 --- /dev/null +++ b/dev-support/ranger-docker/scripts/audit/e2e-audit-opensearch.sh @@ -0,0 +1,389 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# End-to-end OpenSearch audit dispatcher test. +# +# Starts the full Docker stack, validates the pipeline end-to-end, and tears +# down on exit. Pipeline: Plugin -> Ingestor -> Kafka -> Dispatcher -> OpenSearch +# +# Usage (from dev-support/ranger-docker/): +# ./scripts/audit/e2e-audit-opensearch.sh # full lifecycle +# ./scripts/audit/e2e-audit-opensearch.sh --build # force rebuild tarball, then test +# ./scripts/audit/e2e-audit-opensearch.sh --no-teardown # keep stack for debugging +# ./scripts/audit/e2e-audit-opensearch.sh --skip-start # test against running stack +# +# Prerequisites: +# - Docker running +# - dist/ranger-*-audit-dispatcher.tar.gz present (or use --build) +# + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +RD_HOME="$(cd "${SCRIPT_DIR}/../.." && pwd)" + +# ─── Flags ─────────────────────────────────────────────────────────────────── + +SKIP_START=false +NO_TEARDOWN=false +DO_BUILD=false + +for arg in "$@"; do + case "${arg}" in + --build) DO_BUILD=true ;; + --skip-start) SKIP_START=true ;; + --no-teardown) NO_TEARDOWN=true ;; + --help|-h) + sed -n '/^# Usage/,/^#$/p' "$0" | sed 's/^# \?//' + exit 0 ;; + *) echo "Unknown flag: ${arg}" >&2; exit 1 ;; + esac +done + +# ─── Configuration ─────────────────────────────────────────────────────────── + +export RANGER_DB_TYPE="${RANGER_DB_TYPE:-postgres}" +WAIT_TIMEOUT="${WAIT_TIMEOUT:-120}" + +OPENSEARCH_URL="${OPENSEARCH_URL:-http://localhost:9200}" +OPENSEARCH_INDEX="${OPENSEARCH_INDEX:-ranger_audits}" +INGESTOR_URL="${INGESTOR_URL:-http://localhost:7081}" +DISPATCHER_URL="${DISPATCHER_URL:-http://localhost:7093}" +RANGER_URL="${RANGER_URL:-http://localhost:6080}" +RANGER_USER="${RANGER_USER:-admin}" +RANGER_PASSWORD="${RANGER_PASSWORD:-rangerR0cks!}" +SERVICE_NAME="${SERVICE_NAME:-dev_hdfs}" +WAIT_SECONDS="${WAIT_SECONDS:-90}" + +COMPOSE_RANGER="docker-compose.ranger.yml" +COMPOSE_KAFKA="docker-compose.ranger-kafka.yml" +COMPOSE_HADOOP="docker-compose.ranger-hadoop.yml" +COMPOSE_AUDIT_SERVER="docker-compose.ranger-audit-server.yml" +COMPOSE_OPENSEARCH="docker-compose.ranger-opensearch.yml" +COMPOSE_DISPATCHER="docker-compose.ranger-audit-dispatcher-opensearch.yml" + +COMPOSE_BASE="-f ${COMPOSE_RANGER} -f ${COMPOSE_KAFKA} -f ${COMPOSE_HADOOP}" +COMPOSE_ALL="${COMPOSE_BASE} -f ${COMPOSE_AUDIT_SERVER} -f ${COMPOSE_OPENSEARCH} -f ${COMPOSE_DISPATCHER}" + +# ─── Helpers ───────────────────────────────────────────────────────────────── + +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' + +pass() { echo -e "${GREEN}[PASS]${NC} $*"; } +fail() { echo -e "${RED}[FAIL]${NC} $*"; exit 1; } +info() { echo -e "${YELLOW}[INFO]${NC} $*"; } + +wait_for_url() { + local name="$1" url="$2" max_wait="${3:-${WAIT_TIMEOUT}}" + local elapsed=0 + info "Waiting for ${name} at ${url} (up to ${max_wait}s)..." + while ! curl -sf -m 2 "${url}" >/dev/null 2>&1; do + elapsed=$((elapsed + 3)) + if [ "${elapsed}" -ge "${max_wait}" ]; then + fail "${name} not ready after ${max_wait}s" + fi + sleep 3 + done + pass "${name} is ready" +} + +wait_for_container_health() { + local name="$1" container="$2" max_wait="${3:-${WAIT_TIMEOUT}}" + local elapsed=0 + info "Waiting for ${name} container health (up to ${max_wait}s)..." + while true; do + local status + status="$(docker inspect --format='{{.State.Health.Status}}' "${container}" 2>/dev/null || echo "not_found")" + if [ "${status}" = "healthy" ]; then + pass "${name} is healthy" + return 0 + fi + elapsed=$((elapsed + 3)) + if [ "${elapsed}" -ge "${max_wait}" ]; then + fail "${name} not healthy after ${max_wait}s (status: ${status})" + fi + sleep 3 + done +} + +get_os_count() { + curl -sf "${OPENSEARCH_URL}/${OPENSEARCH_INDEX}/_count" \ + | python3 -c "import sys,json; print(json.load(sys.stdin).get('count',0))" +} + +teardown() { + if [ "${NO_TEARDOWN}" = true ]; then + info "Skipping teardown (--no-teardown). Stack is still running." + return + fi + echo "" + info "Tearing down OpenSearch audit stack..." + cd "${RD_HOME}" + docker compose ${COMPOSE_ALL} down -v 2>/dev/null || true + pass "All containers stopped and volumes removed." +} + +# ─── Teardown trap ─────────────────────────────────────────────────────────── + +if [ "${SKIP_START}" = false ]; then + trap teardown EXIT +fi + +# ══════════════════════════════════════════════════════════════════════════════ +# Phase 1: Start Stack +# ══════════════════════════════════════════════════════════════════════════════ + +if [ "${SKIP_START}" = false ]; then + echo "========================================" + echo " OpenSearch Audit Stack — Startup" + echo "========================================" + echo "" + + cd "${RD_HOME}" + + # Prerequisites + info "Checking prerequisites..." + if ! docker info >/dev/null 2>&1; then + fail "Docker is not running" + fi + + # Populate dependency archives exactly as dev-support expects; this is + # idempotent and skips files already present in downloads/. + info "Ensuring required Docker build archives are present..." + mkdir -p "${RD_HOME}/downloads" + chmod +x "${RD_HOME}/download-archives.sh" + "${RD_HOME}/download-archives.sh" kafka opensearch hadoop + pass "Required archives are available" + + TARBALL="$(find dist/ -name 'ranger-*-audit-dispatcher.tar.gz' 2>/dev/null | head -1)" + + # Build tarball if requested or absent (skips kylin plugin which has a corrupt dependency) + if [ "${DO_BUILD}" = true ] || [ -z "${TARBALL}" ]; then + info "Building audit-dispatcher tarball (excluding kylin plugin)..." + RANGER_ROOT="$(cd "${RD_HOME}/../.." && pwd)" + mkdir -p "${RD_HOME}/dist" + (cd "${RANGER_ROOT}" && mvn clean package -DskipTests \ + -pl 'distro,!plugin-kylin,!ranger-kylin-plugin-shim' -am \ + -Dcheckstyle.skip=true -q) + cp "${RANGER_ROOT}/target/ranger-"*"-audit-dispatcher.tar.gz" "${RD_HOME}/dist/" + pass "Tarball built and copied to dist/" + fi + + TARBALL="$(find dist/ -name 'ranger-*-audit-dispatcher.tar.gz' 2>/dev/null | head -1)" + if [ -z "${TARBALL}" ]; then + fail "No audit-dispatcher tarball found in dist/. Manual build command: mvn clean package -DskipTests -pl 'distro,!plugin-kylin,!ranger-kylin-plugin-shim' -am" + fi + pass "Found tarball: ${TARBALL}" + + # Ensure the Docker network exists (some compose files declare it external) + docker network create rangernw 2>/dev/null || true + + # Build all images first (avoids container recreation during phased startup) + info "Building images..." + docker compose ${COMPOSE_ALL} build + + # Core stack + info "Starting core stack (Ranger Admin, KDC, ZK, Kafka, Hadoop)..." + docker compose ${COMPOSE_ALL} up -d ranger ranger-kafka ranger-hadoop + + info "Waiting for Kafka to accept connections (up to 120s)..." + kafka_elapsed=0 + while ! docker exec ranger-kafka bash -c "echo > /dev/tcp/localhost/9092" 2>/dev/null; do + kafka_elapsed=$((kafka_elapsed + 3)) + if [ "${kafka_elapsed}" -ge 120 ]; then + fail "Kafka not ready after 120s" + fi + sleep 3 + done + pass "Kafka is ready" + + wait_for_url "Ranger Admin" "http://localhost:6080/login.jsp" 180 + + # Kafka topic + info "Pre-creating Kafka topic..." + chmod +x "${RD_HOME}/scripts/kafka/create-ranger-audit-topic.sh" + "${RD_HOME}/scripts/kafka/create-ranger-audit-topic.sh" + pass "Kafka topic ready" + + # Audit ingestor + info "Starting audit ingestor..." + docker compose ${COMPOSE_ALL} up -d ranger-audit-ingestor + + wait_for_url "Audit Ingestor" "http://localhost:7081/api/audit/health" 90 + + # OpenSearch + info "Starting OpenSearch..." + docker compose ${COMPOSE_ALL} up -d ranger-opensearch + + wait_for_url "OpenSearch" "http://localhost:9200/_cluster/health" 90 + + # OpenSearch index + info "Creating ranger_audits index..." + chmod +x "${RD_HOME}/scripts/opensearch/create-ranger-audit-index.sh" + "${RD_HOME}/scripts/opensearch/create-ranger-audit-index.sh" + pass "OpenSearch index ready" + + # Dispatcher + info "Starting OpenSearch dispatcher..." + docker compose ${COMPOSE_ALL} up -d ranger-audit-dispatcher-opensearch + + wait_for_url "OpenSearch Dispatcher" "http://localhost:7093/api/health/ping" 120 + + echo "" + pass "OpenSearch audit stack is ready!" + echo "" +fi + +# ══════════════════════════════════════════════════════════════════════════════ +# Phase 2: E2E Test +# ══════════════════════════════════════════════════════════════════════════════ + +echo "========================================" +echo " OpenSearch Audit Dispatcher — E2E Test" +echo "========================================" +echo "" + +cd "${RD_HOME}" + +# Health checks +info "Checking OpenSearch..." +curl -sf "${OPENSEARCH_URL}/_cluster/health" | grep -q '"status"' || fail "OpenSearch unreachable" +pass "OpenSearch is up" + +info "Ensuring audit index exists..." +chmod +x "${RD_HOME}/scripts/opensearch/create-ranger-audit-index.sh" +"${RD_HOME}/scripts/opensearch/create-ranger-audit-index.sh" +pass "Index ${OPENSEARCH_INDEX} ready" + +info "Checking audit ingestor..." +curl -sf "${INGESTOR_URL}/api/audit/health" | grep -q '"status":"UP"' || fail "Audit ingestor down" +pass "Audit ingestor healthy" + +dispatcher_running="$(docker ps --format '{{.Names}}' | grep -c '^ranger-audit-dispatcher-opensearch$' || true)" +if [ "${dispatcher_running}" -eq 0 ]; then + fail "ranger-audit-dispatcher-opensearch is not running" +else + pass "OpenSearch dispatcher container is running" +fi + +info "Checking OpenSearch dispatcher health..." +curl -sf "${DISPATCHER_URL}/api/health/ping" >/dev/null || fail "OpenSearch dispatcher health check failed at ${DISPATCHER_URL}" +pass "OpenSearch dispatcher is healthy" + +# Post test audit via ingestor +before_count="$(get_os_count)" +marker="e2e-opensearch-$(date +%s)" +info "OpenSearch doc count before: ${before_count}" + +info "Posting test audit to ingestor..." +http_code="$(docker exec ranger-hadoop bash -c " + kinit -kt /etc/keytabs/hdfs.keytab hdfs/ranger-hadoop.rangernw@EXAMPLE.COM 2>/dev/null + curl -s -o /tmp/os_audit_resp.json -w '%{http_code}' \ + --negotiate -u : \ + -H 'Content-Type: application/json' \ + -X POST 'http://ranger-audit-ingestor.rangernw:7081/api/audit/access?serviceName=${SERVICE_NAME}' \ + -d '[{ + \"id\": \"${marker}\", + \"access\": \"read\", + \"enforcer\": \"ranger-acl\", + \"agent\": \"hdfs\", + \"repo\": \"${SERVICE_NAME}\", + \"repoType\": 1, + \"reqUser\": \"testuser1\", + \"reqData\": \"${marker}\", + \"resource\": \"/tmp/opensearch_audit_test\", + \"resType\": \"path\", + \"result\": 0, + \"policy\": -1, + \"reason\": \"OpenSearch audit dispatcher test\", + \"action\": \"read\", + \"cliIP\": \"127.0.0.1\", + \"agentHost\": \"ranger-hadoop.rangernw\" + }]' +")" + +if [[ "${http_code}" != "200" && "${http_code}" != "202" ]]; then + docker exec ranger-hadoop cat /tmp/os_audit_resp.json 2>/dev/null || true + fail "Ingestor returned HTTP ${http_code}" +fi +pass "Audit accepted by ingestor (HTTP ${http_code})" + +# Wait for OpenSearch indexing via dispatcher +info "Waiting up to ${WAIT_SECONDS}s for document in OpenSearch..." +elapsed=0 +found=0 +while [ "${elapsed}" -lt "${WAIT_SECONDS}" ]; do + result="$(curl -sf "${OPENSEARCH_URL}/${OPENSEARCH_INDEX}/_search" \ + -H 'Content-Type: application/json' \ + -d "{\"query\":{\"term\":{\"id\":\"${marker}\"}},\"size\":1}" 2>/dev/null \ + | python3 -c "import sys,json; print(json.load(sys.stdin)['hits']['total']['value'])" 2>/dev/null || echo 0)" + if [ "${result}" -gt 0 ]; then + found=1 + break + fi + sleep 3 + elapsed=$((elapsed + 3)) +done + +[ "${found}" -eq 1 ] || fail "Timed out waiting for marker '${marker}' in OpenSearch" +after_count="$(get_os_count)" +pass "Document indexed in OpenSearch (${before_count} -> ${after_count} docs)" + +# Query sample document +curl -sf "${OPENSEARCH_URL}/${OPENSEARCH_INDEX}/_search?q=id:${marker}&pretty" | head -30 +echo "" + +# Ranger UI (only if configured for elasticsearch store) +audit_source="$(curl -sf -u "${RANGER_USER}:${RANGER_PASSWORD}" \ + "${RANGER_URL}/service/assets/accessAudit?pageSize=1" 2>/dev/null \ + | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('totalCount','?'))" 2>/dev/null || echo "skip")" + +if [ "${audit_source}" != "skip" ]; then + info "Ranger UI audit API returned totalCount=${audit_source}" + ui_match="$(curl -sf -u "${RANGER_USER}:${RANGER_PASSWORD}" \ + "${RANGER_URL}/service/assets/accessAudit?pageSize=10&sortBy=eventTime&sortType=desc" 2>/dev/null \ + | python3 -c " +import sys, json +d = json.load(sys.stdin) +marker = '${marker}' +hits = [a for a in d.get('vXAccessAudits', []) if marker in (a.get('requestData') or '')] +print(len(hits)) +" 2>/dev/null || echo 0)" + if [ "${ui_match}" -gt 0 ]; then + pass "Audit visible in Ranger UI (elasticsearch store active)" + else + info "Audit not in Ranger UI yet — switch audit_store to elasticsearch and restart Ranger:" + info " Use scripts/admin/ranger-admin-install-postgres-opensearch.properties for fresh installs" + fi +fi + +# ══════════════════════════════════════════════════════════════════════════════ +# Done +# ══════════════════════════════════════════════════════════════════════════════ + +echo "" +echo "========================================" +pass "OpenSearch audit dispatcher E2E test completed successfully" +echo " OpenSearch: ${OPENSEARCH_URL}/${OPENSEARCH_INDEX}/_search" +echo " Dispatcher: ${DISPATCHER_URL}/api/health/ping" +echo " Ranger UI: ${RANGER_URL} -> Audit -> Access (requires elasticsearch audit_store)" +echo "========================================" diff --git a/dev-support/ranger-docker/scripts/kafka/create-ranger-audit-topic.sh b/dev-support/ranger-docker/scripts/kafka/create-ranger-audit-topic.sh new file mode 100755 index 00000000000..5a5125e8a8d --- /dev/null +++ b/dev-support/ranger-docker/scripts/kafka/create-ranger-audit-topic.sh @@ -0,0 +1,71 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# Creates the ranger_audits Kafka topic with the correct partition count. +# Idempotent: skips creation if the topic already exists. +# +# Usage: ./scripts/kafka/create-ranger-audit-topic.sh +# +# Environment variables: +# TOPIC_NAME - Topic name (default: ranger_audits) +# PARTITIONS - Number of partitions (default: 10) +# REPLICATION - Replication factor (default: 1) +# KAFKA_CONTAINER - Kafka container name (default: ranger-kafka) +# + +set -euo pipefail + +TOPIC_NAME="${TOPIC_NAME:-ranger_audits}" +PARTITIONS="${PARTITIONS:-10}" +REPLICATION="${REPLICATION:-1}" +KAFKA_CONTAINER="${KAFKA_CONTAINER:-ranger-kafka}" + +BOOTSTRAP_SERVER="ranger-kafka.rangernw:9092" +KEYTAB="/etc/keytabs/kafka.keytab" +PRINCIPAL="kafka/ranger-kafka.rangernw@EXAMPLE.COM" + +echo "Kafka topic : ${TOPIC_NAME}" +echo "Partitions : ${PARTITIONS}" +echo "Replication : ${REPLICATION}" +echo "Container : ${KAFKA_CONTAINER}" + +if ! docker ps --format '{{.Names}}' | grep -q "^${KAFKA_CONTAINER}$"; then + echo "ERROR: Container '${KAFKA_CONTAINER}' is not running" >&2 + exit 1 +fi + +SASL_CONFIG="security.protocol=SASL_PLAINTEXT +sasl.mechanism=GSSAPI +sasl.kerberos.service.name=kafka +sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"${KEYTAB}\" principal=\"${PRINCIPAL}\";" + +docker exec "${KAFKA_CONTAINER}" bash -c " + kinit -kt ${KEYTAB} ${PRINCIPAL} 2>/dev/null + + /opt/kafka/bin/kafka-topics.sh --create \ + --if-not-exists \ + --topic '${TOPIC_NAME}' \ + --partitions ${PARTITIONS} \ + --replication-factor ${REPLICATION} \ + --bootstrap-server ${BOOTSTRAP_SERVER} \ + --command-config <(echo '${SASL_CONFIG}') +" 2>&1 | grep -v "WARN.*TGT renewal" + +echo "" +echo "Topic '${TOPIC_NAME}' is ready." diff --git a/dev-support/ranger-docker/scripts/kdc/entrypoint.sh b/dev-support/ranger-docker/scripts/kdc/entrypoint.sh index 4e35f545724..d74a6135c5f 100644 --- a/dev-support/ranger-docker/scripts/kdc/entrypoint.sh +++ b/dev-support/ranger-docker/scripts/kdc/entrypoint.sh @@ -84,6 +84,9 @@ function create_keytabs() { create_principal_and_keytab HTTP ranger-audit-dispatcher-hdfs create_principal_and_keytab rangerauditserver ranger-audit-dispatcher-hdfs + create_principal_and_keytab HTTP ranger-audit-dispatcher-opensearch + create_principal_and_keytab rangerauditserver ranger-audit-dispatcher-opensearch + create_principal_and_keytab rangertagsync ranger-tagsync create_principal_and_keytab rangerusersync ranger-usersync @@ -145,7 +148,10 @@ if [ ! -f $DB_DIR/principal ]; then echo "Database initialized" create_keytabs - create_testusers ranger ranger-usersync ranger-tagsync ranger-pdp ranger-audit-ingestor ranger-audit-dispatcher-solr ranger-audit-dispatcher-hdfs ranger-hadoop ranger-hive ranger-hbase ranger-kafka ranger-solr ranger-knox ranger-kms ranger-ozone ranger-trino ranger-opensearch + create_testusers ranger ranger-usersync ranger-tagsync ranger-pdp ranger-audit-ingestor ranger-audit-dispatcher-solr ranger-audit-dispatcher-hdfs ranger-audit-dispatcher-opensearch ranger-hadoop ranger-hive ranger-hbase ranger-kafka ranger-solr ranger-knox ranger-kms ranger-ozone ranger-trino ranger-opensearch + + touch /etc/keytabs/.provisioned + echo "All keytabs provisioned" else echo "KDC DB already exists; skipping create" fi diff --git a/dev-support/ranger-docker/scripts/opensearch/create-ranger-audit-index.sh b/dev-support/ranger-docker/scripts/opensearch/create-ranger-audit-index.sh new file mode 100755 index 00000000000..724a879e45e --- /dev/null +++ b/dev-support/ranger-docker/scripts/opensearch/create-ranger-audit-index.sh @@ -0,0 +1,70 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# Creates the ranger_audits index in OpenSearch with the same field mapping +# used by Ranger's elasticsearch audit store (see security-admin/contrib/ +# elasticsearch_for_audit_setup/conf/ranger_es_schema.json). +# + +set -euo pipefail + +OPENSEARCH_URL="${OPENSEARCH_URL:-http://localhost:9200}" +INDEX_NAME="${INDEX_NAME:-ranger_audits}" +SCHEMA_FILE="${SCHEMA_FILE:-$(dirname "$0")/ranger_es_schema.json}" + +if [ ! -f "${SCHEMA_FILE}" ]; then + SCHEMA_FILE="$(cd "$(dirname "$0")/../../../.." && pwd)/security-admin/contrib/elasticsearch_for_audit_setup/conf/ranger_es_schema.json" +fi + +echo "OpenSearch URL : ${OPENSEARCH_URL}" +echo "Index name : ${INDEX_NAME}" +echo "Schema file : ${SCHEMA_FILE}" + +if ! curl -sf "${OPENSEARCH_URL}/_cluster/health" >/dev/null; then + echo "ERROR: OpenSearch is not reachable at ${OPENSEARCH_URL}" >&2 + exit 1 +fi + +if curl -sf -o /dev/null -w "%{http_code}" "${OPENSEARCH_URL}/${INDEX_NAME}" | grep -qE '200|404'; then + status="$(curl -sf -o /dev/null -w "%{http_code}" "${OPENSEARCH_URL}/${INDEX_NAME}")" + if [ "${status}" = "200" ]; then + echo "Index '${INDEX_NAME}' already exists — skipping create." + exit 0 + fi +fi + +echo "Creating index '${INDEX_NAME}'..." +python3 - </dev/null || true diff --git a/dev-support/ranger-docker/scripts/opensearch/opensearch.yml b/dev-support/ranger-docker/scripts/opensearch/opensearch.yml index d63405043b7..42b0e8898fa 100644 --- a/dev-support/ranger-docker/scripts/opensearch/opensearch.yml +++ b/dev-support/ranger-docker/scripts/opensearch/opensearch.yml @@ -48,3 +48,6 @@ http.cors.allow-headers: "X-Requested-With, Content-Type, Content-Length, Author # See opensearch-jaas.conf for Kerberos principal and keytab settings # JVM is configured with: -Djava.security.auth.login.config and -Djava.security.krb5.conf +# Wire-compatible with ES 7 REST client used by Ranger Admin +compatibility.override_main_response_version: true + diff --git a/dev-support/ranger-docker/scripts/opensearch/ranger_es_schema.json b/dev-support/ranger-docker/scripts/opensearch/ranger_es_schema.json new file mode 100644 index 00000000000..f8f3bbf2d70 --- /dev/null +++ b/dev-support/ranger-docker/scripts/opensearch/ranger_es_schema.json @@ -0,0 +1,136 @@ +{ + "properties": { + "_expire_at_": { + "type": "date", + "store": true, + "doc_values": true + }, + "_ttl_": { + "type": "text", + "store": true + }, + "_version_": { + "type": "long", + "store": true, + "index": false + }, + "access": { + "type": "keyword" + }, + "action": { + "type": "keyword" + }, + "agent": { + "type": "keyword" + }, + "agentHost": { + "type": "keyword" + }, + "cliIP": { + "type": "keyword" + }, + "cliType": { + "type": "keyword" + }, + "cluster": { + "type": "keyword" + }, + "reqContext": { + "type": "keyword" + }, + "enforcer": { + "type": "keyword" + }, + "event_count": { + "type": "long", + "doc_values": true + }, + "event_dur_ms": { + "type": "long", + "doc_values": true + }, + "evtTime": { + "type": "date", + "doc_values": true + }, + "id": { + "type": "keyword", + "store": true + }, + "logType": { + "type": "keyword" + }, + "policy": { + "type": "long", + "doc_values": true + }, + "proxyUsers": { + "type": "keyword" + }, + "reason": { + "type": "text" + }, + "repo": { + "type": "keyword" + }, + "repoType": { + "type": "integer", + "doc_values": true + }, + "req_caller_id": { + "type": "keyword" + }, + "req_self_id": { + "type": "keyword" + }, + "reqData": { + "type": "text" + }, + "reqUser": { + "type": "keyword" + }, + "resType": { + "type": "keyword" + }, + "resource": { + "type": "keyword" + }, + "result": { + "type": "integer" + }, + "seq_num": { + "type": "long", + "doc_values": true + }, + "sess": { + "type": "keyword" + }, + "tags": { + "type": "keyword" + }, + "tags_str": { + "type": "text" + }, + "datasets": { + "type": "keyword" + }, + "projects": { + "type": "keyword" + }, + "datasetIds": { + "type": "long" + }, + "text": { + "type": "text" + }, + "zoneName": { + "type": "keyword" + }, + "policyVersion": { + "type": "long" + }, + "additionalInfo": { + "type": "text" + } + } +} diff --git a/distro/pom.xml b/distro/pom.xml index d9d5c20d177..a4c30451164 100644 --- a/distro/pom.xml +++ b/distro/pom.xml @@ -34,6 +34,11 @@ audit-dispatcher-hdfs ${project.version} + + org.apache.ranger + audit-dispatcher-opensearch + ${project.version} + org.apache.ranger audit-dispatcher-solr diff --git a/distro/src/main/assembly/audit-dispatcher.xml b/distro/src/main/assembly/audit-dispatcher.xml index 34af9d79c53..c8fe1bbbdee 100644 --- a/distro/src/main/assembly/audit-dispatcher.xml +++ b/distro/src/main/assembly/audit-dispatcher.xml @@ -56,6 +56,15 @@ 644 + + conf + ${project.parent.basedir}/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf + + ranger-audit-dispatcher-opensearch-site.xml + + 644 + + @@ -98,6 +107,22 @@ + + + lib/dispatchers/opensearch + ${project.parent.basedir}/audit-server/audit-dispatcher/dispatcher-opensearch/target/lib + + *.jar + + + + lib/dispatchers/opensearch + ${project.parent.basedir}/audit-server/audit-dispatcher/dispatcher-opensearch/target + + audit-dispatcher-opensearch-${project.version}.jar + + + lib/dispatchers/solr @@ -116,6 +141,13 @@ + + conf + ${project.parent.basedir}/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf/logback.xml + logback-opensearch.xml + 644 + + webapp diff --git a/security-admin/src/main/java/org/apache/ranger/elasticsearch/ElasticSearchMgr.java b/security-admin/src/main/java/org/apache/ranger/elasticsearch/ElasticSearchMgr.java index 4b264daf639..2a2db5a48f0 100644 --- a/security-admin/src/main/java/org/apache/ranger/elasticsearch/ElasticSearchMgr.java +++ b/security-admin/src/main/java/org/apache/ranger/elasticsearch/ElasticSearchMgr.java @@ -168,6 +168,7 @@ synchronized RestHighLevelClient connect() { RestClientBuilder restClientBuilder = getRestClientBuilder(urls, protocol, user, password, port); client = new RestHighLevelClient(restClientBuilder); + me = client; } catch (Throwable t) { logger.error("Can't connect to ElasticSearch: {}", parameterString, t); }