diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java index f1befc3b5dcb..729d662cbd1d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java @@ -57,6 +57,11 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger { public static final Pattern REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/remote-process-groups/[a-f0-9\\-]{36}/status/history"); public static final Pattern CONNECTION_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/connections/[a-f0-9\\-]{36}/status/history"); public static final Pattern NODE_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/controller/status/history"); + public static final Pattern CONNECTOR_PROCESSOR_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/processors/[a-f0-9\\-]{36}/status/history"); + public static final Pattern CONNECTOR_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/process-groups/[a-f0-9\\-]{36}/status/history"); + public static final Pattern CONNECTOR_REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = + Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/remote-process-groups/[a-f0-9\\-]{36}/status/history"); + public static final Pattern CONNECTOR_CONNECTION_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/connections/[a-f0-9\\-]{36}/status/history"); private final long componentStatusSnapshotMillis; @@ -69,19 +74,19 @@ private Map> getStandardMetricDescriptors(final URI final Map> metricDescriptors = new HashMap<>(); - if (PROCESSOR_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) { + if (PROCESSOR_STATUS_HISTORY_URI_PATTERN.matcher(path).matches() || CONNECTOR_PROCESSOR_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) { for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) { metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); } - } else if (PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) { + } else if (PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches() || CONNECTOR_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) { for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) { metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); } - } else if (REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) { + } else if (REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches() || CONNECTOR_REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) { for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) { metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); } - } else if (CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) { + } else if (CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(path).matches() || CONNECTOR_CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) { for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) { metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestStatusHistoryEndpointMerger.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestStatusHistoryEndpointMerger.java index 3423b12194a5..f1d85555f517 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestStatusHistoryEndpointMerger.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestStatusHistoryEndpointMerger.java @@ -19,11 +19,34 @@ import org.junit.jupiter.api.Test; +import java.net.URI; import java.util.Date; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestStatusHistoryEndpointMerger { + + private static final String UUID = "12345678-1234-1234-1234-123456789012"; + + @Test + public void testCanHandleConnectorStatusHistory() { + final StatusHistoryEndpointMerger merger = new StatusHistoryEndpointMerger(300000); + + assertTrue(merger.canHandle(URI.create("/nifi-api/connectors/" + UUID + "/processors/" + UUID + "/status/history"), "GET")); + assertTrue(merger.canHandle(URI.create("/nifi-api/connectors/" + UUID + "/connections/" + UUID + "/status/history"), "GET")); + assertTrue(merger.canHandle(URI.create("/nifi-api/connectors/" + UUID + "/process-groups/" + UUID + "/status/history"), "GET")); + assertTrue(merger.canHandle(URI.create("/nifi-api/connectors/" + UUID + "/remote-process-groups/" + UUID + "/status/history"), "GET")); + + // The top-level flow endpoints continue to be handled alongside the connector endpoints. + assertTrue(merger.canHandle(URI.create("/nifi-api/flow/processors/" + UUID + "/status/history"), "GET")); + + // Non-GET methods and unrelated connector paths are not handled. + assertFalse(merger.canHandle(URI.create("/nifi-api/connectors/" + UUID + "/processors/" + UUID + "/status/history"), "POST")); + assertFalse(merger.canHandle(URI.create("/nifi-api/connectors/" + UUID + "/status"), "GET")); + } + @Test public void testNormalizedStatusSnapshotDate() { final Date date1 = new Date(1388538000000L); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 99d5992083d1..df1be3ce68cf 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -242,6 +242,46 @@ public interface NiFiServiceFacade { ProcessGroupStatusEntity getConnectorProcessGroupStatus(String id, Boolean recursive); + /** + * Gets the status history for a Processor inside the Connector's managed flow. The status history is available + * regardless of whether the Connector is in Troubleshooting mode. + * + * @param connectorId the connector id + * @param processorId the processor id within the connector's managed flow + * @return the status history + */ + StatusHistoryEntity getConnectorProcessorStatusHistory(String connectorId, String processorId); + + /** + * Gets the status history for a Connection inside the Connector's managed flow. The status history is available + * regardless of whether the Connector is in Troubleshooting mode. + * + * @param connectorId the connector id + * @param connectionId the connection id within the connector's managed flow + * @return the status history + */ + StatusHistoryEntity getConnectorConnectionStatusHistory(String connectorId, String connectionId); + + /** + * Gets the status history for a Process Group inside the Connector's managed flow. The status history is available + * regardless of whether the Connector is in Troubleshooting mode. + * + * @param connectorId the connector id + * @param processGroupId the process group id within the connector's managed flow + * @return the status history + */ + StatusHistoryEntity getConnectorProcessGroupStatusHistory(String connectorId, String processGroupId); + + /** + * Gets the status history for a Remote Process Group inside the Connector's managed flow. The status history is + * available regardless of whether the Connector is in Troubleshooting mode. + * + * @param connectorId the connector id + * @param remoteProcessGroupId the remote process group id within the connector's managed flow + * @return the status history + */ + StatusHistoryEntity getConnectorRemoteProcessGroupStatusHistory(String connectorId, String remoteProcessGroupId); + Set getConnectorControllerServices(String connectorId, String processGroupId, boolean includeAncestorGroups, boolean includeDescendantGroups, boolean includeReferencingComponents); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 77b18e862d09..f92d6c6aad36 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -4043,6 +4043,59 @@ public ProcessGroupStatusEntity getConnectorProcessGroupStatus(final String id, return entityFactory.createProcessGroupStatusEntity(dto, permissions); } + @Override + public StatusHistoryEntity getConnectorProcessorStatusHistory(final String connectorId, final String processorId) { + final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); + final ProcessGroup managedGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup(); + if (managedGroup.findProcessor(processorId) == null) { + throw new ResourceNotFoundException("Unable to find processor with id '%s' within connector '%s'.".formatted(processorId, connectorId)); + } + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectorNode); + final StatusHistoryDTO dto = controllerFacade.getConnectorProcessorStatusHistory(processorId); + return entityFactory.createStatusHistoryEntity(dto, permissions); + } + + @Override + public StatusHistoryEntity getConnectorConnectionStatusHistory(final String connectorId, final String connectionId) { + final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); + final ProcessGroup managedGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup(); + if (managedGroup.findConnection(connectionId) == null) { + throw new ResourceNotFoundException("Unable to find connection with id '%s' within connector '%s'.".formatted(connectionId, connectorId)); + } + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectorNode); + final StatusHistoryDTO dto = controllerFacade.getConnectorConnectionStatusHistory(connectionId); + return entityFactory.createStatusHistoryEntity(dto, permissions); + } + + @Override + public StatusHistoryEntity getConnectorProcessGroupStatusHistory(final String connectorId, final String processGroupId) { + final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); + final ProcessGroup managedGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup(); + final ProcessGroup processGroup = managedGroup.getIdentifier().equals(processGroupId) ? managedGroup : managedGroup.findProcessGroup(processGroupId); + if (processGroup == null) { + throw new ResourceNotFoundException("Unable to find process group with id '%s' within connector '%s'.".formatted(processGroupId, connectorId)); + } + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectorNode); + final StatusHistoryDTO dto = controllerFacade.getConnectorProcessGroupStatusHistory(processGroupId); + return entityFactory.createStatusHistoryEntity(dto, permissions); + } + + @Override + public StatusHistoryEntity getConnectorRemoteProcessGroupStatusHistory(final String connectorId, final String remoteProcessGroupId) { + final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); + final ProcessGroup managedGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup(); + if (managedGroup.findRemoteProcessGroup(remoteProcessGroupId) == null) { + throw new ResourceNotFoundException("Unable to find remote process group with id '%s' within connector '%s'.".formatted(remoteProcessGroupId, connectorId)); + } + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectorNode); + final StatusHistoryDTO dto = controllerFacade.getConnectorRemoteProcessGroupStatusHistory(remoteProcessGroupId); + return entityFactory.createStatusHistoryEntity(dto, permissions); + } + @Override public Set getConnectorControllerServices(final String connectorId, final String processGroupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups, final boolean includeReferencingComponents) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java index 1b3685a72f92..dd8cd557597d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java @@ -98,6 +98,7 @@ import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.SearchResultsEntity; import org.apache.nifi.web.api.entity.SecretsEntity; +import org.apache.nifi.web.api.entity.StatusHistoryEntity; import org.apache.nifi.web.api.entity.VerifyConnectorConfigStepRequestEntity; import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.LongParameter; @@ -2164,6 +2165,194 @@ public Response getConnectorStatus( return generateOkResponse(entity).build(); } + // ----------------- + // Status History + // ----------------- + + /** + * Gets the status history for a Processor inside the Connector's managed flow. The status history is available + * regardless of whether the Connector is in Troubleshooting mode. + * + * @param connectorId the connector id + * @param processorId the processor id within the connector's managed flow + * @return A statusHistoryEntity. + * @throws InterruptedException if interrupted + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/processors/{processorId}/status/history") + @Operation( + summary = "Gets the status history for a processor within a connector", + responses = { + @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = StatusHistoryEntity.class))), + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + }, + security = { + @SecurityRequirement(name = "Read - /connectors/{uuid}") + } + ) + public Response getConnectorProcessorStatusHistory( + @Parameter(description = "The connector id.", required = true) + @PathParam("id") final String connectorId, + @Parameter(description = "The processor id.", required = true) + @PathParam("processorId") final String processorId) throws InterruptedException { + + if (isReplicateRequest()) { + return replicate(HttpMethod.GET); + } + + serviceFacade.authorizeAccess(lookup -> { + final Authorizable connector = lookup.getConnector(connectorId); + connector.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + + final StatusHistoryEntity entity = serviceFacade.getConnectorProcessorStatusHistory(connectorId, processorId); + return generateOkResponse(entity).build(); + } + + /** + * Gets the status history for a Connection inside the Connector's managed flow. The status history is available + * regardless of whether the Connector is in Troubleshooting mode. + * + * @param connectorId the connector id + * @param connectionId the connection id within the connector's managed flow + * @return A statusHistoryEntity. + * @throws InterruptedException if interrupted + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/connections/{connectionId}/status/history") + @Operation( + summary = "Gets the status history for a connection within a connector", + responses = { + @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = StatusHistoryEntity.class))), + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + }, + security = { + @SecurityRequirement(name = "Read - /connectors/{uuid}") + } + ) + public Response getConnectorConnectionStatusHistory( + @Parameter(description = "The connector id.", required = true) + @PathParam("id") final String connectorId, + @Parameter(description = "The connection id.", required = true) + @PathParam("connectionId") final String connectionId) throws InterruptedException { + + if (isReplicateRequest()) { + return replicate(HttpMethod.GET); + } + + serviceFacade.authorizeAccess(lookup -> { + final Authorizable connector = lookup.getConnector(connectorId); + connector.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + + final StatusHistoryEntity entity = serviceFacade.getConnectorConnectionStatusHistory(connectorId, connectionId); + return generateOkResponse(entity).build(); + } + + /** + * Gets the status history for a Process Group inside the Connector's managed flow. The status history is available + * regardless of whether the Connector is in Troubleshooting mode. + * + * @param connectorId the connector id + * @param processGroupId the process group id within the connector's managed flow + * @return A statusHistoryEntity. + * @throws InterruptedException if interrupted + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/process-groups/{processGroupId}/status/history") + @Operation( + summary = "Gets the status history for a process group within a connector", + responses = { + @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = StatusHistoryEntity.class))), + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + }, + security = { + @SecurityRequirement(name = "Read - /connectors/{uuid}") + } + ) + public Response getConnectorProcessGroupStatusHistory( + @Parameter(description = "The connector id.", required = true) + @PathParam("id") final String connectorId, + @Parameter(description = "The process group id.", required = true) + @PathParam("processGroupId") final String processGroupId) throws InterruptedException { + + if (isReplicateRequest()) { + return replicate(HttpMethod.GET); + } + + serviceFacade.authorizeAccess(lookup -> { + final Authorizable connector = lookup.getConnector(connectorId); + connector.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + + final StatusHistoryEntity entity = serviceFacade.getConnectorProcessGroupStatusHistory(connectorId, processGroupId); + return generateOkResponse(entity).build(); + } + + /** + * Gets the status history for a Remote Process Group inside the Connector's managed flow. The status history is + * available regardless of whether the Connector is in Troubleshooting mode. + * + * @param connectorId the connector id + * @param remoteProcessGroupId the remote process group id within the connector's managed flow + * @return A statusHistoryEntity. + * @throws InterruptedException if interrupted + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/remote-process-groups/{remoteProcessGroupId}/status/history") + @Operation( + summary = "Gets the status history for a remote process group within a connector", + responses = { + @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = StatusHistoryEntity.class))), + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + }, + security = { + @SecurityRequirement(name = "Read - /connectors/{uuid}") + } + ) + public Response getConnectorRemoteProcessGroupStatusHistory( + @Parameter(description = "The connector id.", required = true) + @PathParam("id") final String connectorId, + @Parameter(description = "The remote process group id.", required = true) + @PathParam("remoteProcessGroupId") final String remoteProcessGroupId) throws InterruptedException { + + if (isReplicateRequest()) { + return replicate(HttpMethod.GET); + } + + serviceFacade.authorizeAccess(lookup -> { + final Authorizable connector = lookup.getConnector(connectorId); + connector.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + + final StatusHistoryEntity entity = serviceFacade.getConnectorRemoteProcessGroupStatusHistory(connectorId, remoteProcessGroupId); + return generateOkResponse(entity).build(); + } + @POST @Consumes(MediaType.APPLICATION_OCTET_STREAM) @Produces(MediaType.APPLICATION_JSON) diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 080b9fb3ae25..3cd15e3219d6 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -433,6 +433,54 @@ public StatusHistoryDTO getRemoteProcessGroupStatusHistory(final String remotePr return statusHistory; } + /** + * Returns the status history for a Processor inside a Connector's managed flow. The component is resolved and + * authorized at the Connector level before this method is invoked, so the status history is read directly from the + * repository by id without consulting the root flow hierarchy or applying per-component authorization masking. + * + * @param processorId processor id + * @return status history + */ + public StatusHistoryDTO getConnectorProcessorStatusHistory(final String processorId) { + return flowController.getProcessorStatusHistory(processorId, true); + } + + /** + * Returns the status history for a Connection inside a Connector's managed flow. The component is resolved and + * authorized at the Connector level before this method is invoked, so the status history is read directly from the + * repository by id without consulting the root flow hierarchy or applying per-component authorization masking. + * + * @param connectionId connection id + * @return status history + */ + public StatusHistoryDTO getConnectorConnectionStatusHistory(final String connectionId) { + return flowController.getConnectionStatusHistory(connectionId); + } + + /** + * Returns the status history for a Process Group inside a Connector's managed flow. The component is resolved and + * authorized at the Connector level before this method is invoked, so the status history is read directly from the + * repository by id without consulting the root flow hierarchy or applying per-component authorization masking. + * + * @param processGroupId process group id + * @return status history + */ + public StatusHistoryDTO getConnectorProcessGroupStatusHistory(final String processGroupId) { + return flowController.getProcessGroupStatusHistory(processGroupId); + } + + /** + * Returns the status history for a Remote Process Group inside a Connector's managed flow. The component is resolved + * and authorized at the Connector level before this method is invoked, so the status history is read directly from + * the repository by id without consulting the root flow hierarchy or applying per-component authorization masking. + * + * @param remoteProcessGroupId remote process group id + * @return status history + */ + public StatusHistoryDTO getConnectorRemoteProcessGroupStatusHistory(final String remoteProcessGroupId) { + return flowController.getRemoteProcessGroupStatusHistory(remoteProcessGroupId); + } + /** * Get the node id of this controller. * diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorStatusHistoryIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorStatusHistoryIT.java new file mode 100644 index 000000000000..6e4ab99899ec --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorStatusHistoryIT.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.connectors; + +import jakarta.ws.rs.WebApplicationException; +import org.apache.nifi.components.connector.ConnectorState; +import org.apache.nifi.controller.status.history.StatusHistoryRepository; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.client.ConnectorClient; +import org.apache.nifi.toolkit.client.NiFiClientException; +import org.apache.nifi.web.api.dto.flow.FlowDTO; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ConnectorEntity; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.StatusHistoryEntity; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * System tests that verify the Status History REST endpoints for components inside a Connector's managed flow. + */ +public class ConnectorStatusHistoryIT extends NiFiSystemIT { + + @Override + protected Map getNifiPropertiesOverrides() { + // Capture component status snapshots once per second so the test does not have to wait the default one minute + // for the first snapshot to be recorded. + return Map.of("nifi.components.status.snapshot.frequency", "1 sec"); + } + + /** + * Status history for components inside a Connector's managed flow must be retrievable whether the Connector is + * RUNNING or in Troubleshooting mode. The standard component endpoints reject requests with a 409 when the + * Connector is not in Troubleshooting; the Connector status history endpoints must not be subject to that gate. + */ + @Test + public void testStatusHistoryAvailableRegardlessOfTroubleshooting() throws NiFiClientException, IOException, InterruptedException { + final ConnectorEntity connector = getClientUtil().createConnector("ComponentLifecycleConnector"); + final String connectorId = connector.getId(); + + getClientUtil().applyConnectorUpdate(connector); + getClientUtil().waitForValidConnector(connectorId); + + final String managedGroupId = getNifiClient().getConnectorClient().getConnector(connectorId).getComponent().getManagedProcessGroupId(); + assertNotNull(managedGroupId); + + final String processorId = findFirstProcessorId(connectorId); + final String connectionId = findFirstConnectionId(connectorId); + assertNotNull(processorId); + assertNotNull(connectionId); + + getClientUtil().startConnector(connectorId); + assertConnectorState(connectorId, ConnectorState.RUNNING); + + // While the Connector is RUNNING (not in Troubleshooting), the standard component endpoint is gated with a 409, + // but the Connector status history endpoints must still return data. + assertConflict("GET processor outside Troubleshooting", () -> getNifiClient().getProcessorClient().getProcessor(processorId)); + + assertStatusHistoryAvailable(connectorId, processorId, connectionId, managedGroupId); + + getClientUtil().enterTroubleshooting(connectorId); + assertConnectorState(connectorId, ConnectorState.TROUBLESHOOTING); + + // The same component status history must remain available once the Connector enters Troubleshooting. + assertStatusHistoryAvailable(connectorId, processorId, connectionId, managedGroupId); + } + + /** + * Unknown component ids, and component ids that exist elsewhere in NiFi but not inside the target Connector's + * managed flow, must produce a 404 Not Found for each component type. The latter verifies that the endpoints are + * scoped to the Connector rather than resolving any component id from the repository. + */ + @Test + public void testStatusHistoryNotFoundForUnknownComponent() throws NiFiClientException, IOException, InterruptedException { + final ConnectorEntity connector = getClientUtil().createConnector("ComponentLifecycleConnector"); + final String connectorId = connector.getId(); + + getClientUtil().applyConnectorUpdate(connector); + getClientUtil().waitForValidConnector(connectorId); + + final ConnectorClient connectorClient = getNifiClient().getConnectorClient(); + final String unknownId = UUID.randomUUID().toString(); + + assertNotFound("processor status history", () -> connectorClient.getProcessorStatusHistory(connectorId, unknownId)); + assertNotFound("connection status history", () -> connectorClient.getConnectionStatusHistory(connectorId, unknownId)); + assertNotFound("process group status history", () -> connectorClient.getProcessGroupStatusHistory(connectorId, unknownId)); + assertNotFound("remote process group status history", () -> connectorClient.getRemoteProcessGroupStatusHistory(connectorId, unknownId)); + + // A Processor that exists in the root flow, outside any Connector, must not be resolvable through the Connector + // status history endpoint. + final String rootProcessorId = getClientUtil().createProcessor("CountEvents").getId(); + assertNotFound("processor status history for component outside the connector", + () -> connectorClient.getProcessorStatusHistory(connectorId, rootProcessorId)); + } + + private void assertStatusHistoryAvailable(final String connectorId, final String processorId, final String connectionId, final String processGroupId) + throws InterruptedException { + + final ConnectorClient connectorClient = getNifiClient().getConnectorClient(); + + final StatusHistoryEntity processorHistory = waitForStatusHistory(() -> connectorClient.getProcessorStatusHistory(connectorId, processorId)); + assertComponentHistory(processorHistory, processorId); + + final StatusHistoryEntity connectionHistory = waitForStatusHistory(() -> connectorClient.getConnectionStatusHistory(connectorId, connectionId)); + assertComponentHistory(connectionHistory, connectionId); + + final StatusHistoryEntity processGroupHistory = waitForStatusHistory(() -> connectorClient.getProcessGroupStatusHistory(connectorId, processGroupId)); + assertComponentHistory(processGroupHistory, processGroupId); + } + + private void assertComponentHistory(final StatusHistoryEntity entity, final String componentId) { + assertNotNull(entity); + assertTrue(entity.getCanRead()); + assertNotNull(entity.getStatusHistory()); + assertFalse(entity.getStatusHistory().getAggregateSnapshots().isEmpty()); + assertEquals(componentId, entity.getStatusHistory().getComponentDetails().get(StatusHistoryRepository.COMPONENT_DETAIL_ID)); + } + + private StatusHistoryEntity waitForStatusHistory(final Callable supplier) throws InterruptedException { + final AtomicReference holder = new AtomicReference<>(); + waitFor(() -> { + final StatusHistoryEntity entity = supplier.call(); + if (entity != null && entity.getStatusHistory() != null && !entity.getStatusHistory().getAggregateSnapshots().isEmpty()) { + holder.set(entity); + return true; + } + + return false; + }); + return holder.get(); + } + + private String findFirstProcessorId(final String connectorId) throws NiFiClientException, IOException { + final List processors = new ArrayList<>(); + collectProcessors(connectorId, null, processors); + return processors.isEmpty() ? null : processors.getFirst().getId(); + } + + private void collectProcessors(final String connectorId, final String groupId, final List collected) throws NiFiClientException, IOException { + final ProcessGroupFlowEntity entity = (groupId == null) + ? getNifiClient().getConnectorClient().getFlow(connectorId) + : getNifiClient().getConnectorClient().getFlow(connectorId, groupId); + final FlowDTO flow = entity.getProcessGroupFlow().getFlow(); + collected.addAll(flow.getProcessors()); + + for (final ProcessGroupEntity child : flow.getProcessGroups()) { + collectProcessors(connectorId, child.getId(), collected); + } + } + + private String findFirstConnectionId(final String connectorId) throws NiFiClientException, IOException { + return collectFirstConnectionId(connectorId, null); + } + + private String collectFirstConnectionId(final String connectorId, final String groupId) throws NiFiClientException, IOException { + final ProcessGroupFlowEntity entity = (groupId == null) + ? getNifiClient().getConnectorClient().getFlow(connectorId) + : getNifiClient().getConnectorClient().getFlow(connectorId, groupId); + final FlowDTO flow = entity.getProcessGroupFlow().getFlow(); + for (final ConnectionEntity connection : flow.getConnections()) { + return connection.getId(); + } + + for (final ProcessGroupEntity child : flow.getProcessGroups()) { + final String childConnectionId = collectFirstConnectionId(connectorId, child.getId()); + if (childConnectionId != null) { + return childConnectionId; + } + } + + return null; + } + + private void assertConnectorState(final String connectorId, final ConnectorState expected) throws NiFiClientException, IOException { + final ConnectorEntity entity = getNifiClient().getConnectorClient().getConnector(connectorId); + assertEquals(expected.name(), entity.getComponent().getState()); + } + + private void assertNotFound(final String description, final Callable call) { + assertExpectedStatus(description, call, 404); + } + + private void assertConflict(final String description, final Callable call) { + assertExpectedStatus(description, call, 409); + } + + private void assertExpectedStatus(final String description, final Callable call, final int expectedStatus) { + try { + call.call(); + fail("Expected HTTP " + expectedStatus + " for " + description + " but request succeeded"); + } catch (final NiFiClientException e) { + final Throwable cause = e.getCause(); + if (cause instanceof WebApplicationException wae) { + assertEquals(expectedStatus, wae.getResponse().getStatus(), "Unexpected status for " + description); + } else { + fail("Expected WebApplicationException " + expectedStatus + " for " + description + ", got: " + cause); + } + } catch (final Exception e) { + fail("Unexpected exception while invoking " + description + ": " + e.getMessage()); + } + } +} diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java index 14a909fc3bc2..76f49df4b07c 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java @@ -26,6 +26,7 @@ import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; +import org.apache.nifi.web.api.entity.StatusHistoryEntity; import org.apache.nifi.web.api.entity.VerifyConnectorConfigStepRequestEntity; import java.io.File; @@ -323,6 +324,50 @@ VerifyConnectorConfigStepRequestEntity deleteConfigStepVerificationRequest(Strin */ ProcessGroupStatusEntity getStatus(String connectorId, boolean recursive) throws NiFiClientException, IOException; + /** + * Gets the status history for a processor within a connector's managed flow. + * + * @param connectorId the connector ID + * @param processorId the processor ID within the connector's managed flow + * @return the status history entity + * @throws NiFiClientException if an error occurs during the request + * @throws IOException if an I/O error occurs + */ + StatusHistoryEntity getProcessorStatusHistory(String connectorId, String processorId) throws NiFiClientException, IOException; + + /** + * Gets the status history for a connection within a connector's managed flow. + * + * @param connectorId the connector ID + * @param connectionId the connection ID within the connector's managed flow + * @return the status history entity + * @throws NiFiClientException if an error occurs during the request + * @throws IOException if an I/O error occurs + */ + StatusHistoryEntity getConnectionStatusHistory(String connectorId, String connectionId) throws NiFiClientException, IOException; + + /** + * Gets the status history for a process group within a connector's managed flow. + * + * @param connectorId the connector ID + * @param processGroupId the process group ID within the connector's managed flow + * @return the status history entity + * @throws NiFiClientException if an error occurs during the request + * @throws IOException if an I/O error occurs + */ + StatusHistoryEntity getProcessGroupStatusHistory(String connectorId, String processGroupId) throws NiFiClientException, IOException; + + /** + * Gets the status history for a remote process group within a connector's managed flow. + * + * @param connectorId the connector ID + * @param remoteProcessGroupId the remote process group ID within the connector's managed flow + * @return the status history entity + * @throws NiFiClientException if an error occurs during the request + * @throws IOException if an I/O error occurs + */ + StatusHistoryEntity getRemoteProcessGroupStatusHistory(String connectorId, String remoteProcessGroupId) throws NiFiClientException, IOException; + /** * Creates an asset in the given connector. * diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java index d972f4891591..915091d08e58 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java @@ -36,6 +36,7 @@ import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; +import org.apache.nifi.web.api.entity.StatusHistoryEntity; import org.apache.nifi.web.api.entity.VerifyConnectorConfigStepRequestEntity; import java.io.File; @@ -527,6 +528,82 @@ public ProcessGroupStatusEntity getStatus(final String connectorId, final boolea }); } + @Override + public StatusHistoryEntity getProcessorStatusHistory(final String connectorId, final String processorId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(connectorId)) { + throw new IllegalArgumentException("Connector id cannot be null or blank"); + } + if (StringUtils.isBlank(processorId)) { + throw new IllegalArgumentException("Processor id cannot be null or blank"); + } + + return executeAction("Error retrieving processor status history for Connector " + connectorId, () -> { + final WebTarget target = connectorTarget + .path("/processors/{processorId}/status/history") + .resolveTemplate("id", connectorId) + .resolveTemplate("processorId", processorId); + + return getRequestBuilder(target).get(StatusHistoryEntity.class); + }); + } + + @Override + public StatusHistoryEntity getConnectionStatusHistory(final String connectorId, final String connectionId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(connectorId)) { + throw new IllegalArgumentException("Connector id cannot be null or blank"); + } + if (StringUtils.isBlank(connectionId)) { + throw new IllegalArgumentException("Connection id cannot be null or blank"); + } + + return executeAction("Error retrieving connection status history for Connector " + connectorId, () -> { + final WebTarget target = connectorTarget + .path("/connections/{connectionId}/status/history") + .resolveTemplate("id", connectorId) + .resolveTemplate("connectionId", connectionId); + + return getRequestBuilder(target).get(StatusHistoryEntity.class); + }); + } + + @Override + public StatusHistoryEntity getProcessGroupStatusHistory(final String connectorId, final String processGroupId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(connectorId)) { + throw new IllegalArgumentException("Connector id cannot be null or blank"); + } + if (StringUtils.isBlank(processGroupId)) { + throw new IllegalArgumentException("Process group id cannot be null or blank"); + } + + return executeAction("Error retrieving process group status history for Connector " + connectorId, () -> { + final WebTarget target = connectorTarget + .path("/process-groups/{processGroupId}/status/history") + .resolveTemplate("id", connectorId) + .resolveTemplate("processGroupId", processGroupId); + + return getRequestBuilder(target).get(StatusHistoryEntity.class); + }); + } + + @Override + public StatusHistoryEntity getRemoteProcessGroupStatusHistory(final String connectorId, final String remoteProcessGroupId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(connectorId)) { + throw new IllegalArgumentException("Connector id cannot be null or blank"); + } + if (StringUtils.isBlank(remoteProcessGroupId)) { + throw new IllegalArgumentException("Remote process group id cannot be null or blank"); + } + + return executeAction("Error retrieving remote process group status history for Connector " + connectorId, () -> { + final WebTarget target = connectorTarget + .path("/remote-process-groups/{remoteProcessGroupId}/status/history") + .resolveTemplate("id", connectorId) + .resolveTemplate("remoteProcessGroupId", remoteProcessGroupId); + + return getRequestBuilder(target).get(StatusHistoryEntity.class); + }); + } + @Override public AssetEntity createAsset(final String connectorId, final String assetName, final File file) throws NiFiClientException, IOException { if (StringUtils.isBlank(connectorId)) {