Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
public static final Pattern REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/remote-process-groups/[a-f0-9\\-]{36}/status/history");
public static final Pattern CONNECTION_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/connections/[a-f0-9\\-]{36}/status/history");
public static final Pattern NODE_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/controller/status/history");
public static final Pattern CONNECTOR_PROCESSOR_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/processors/[a-f0-9\\-]{36}/status/history");
public static final Pattern CONNECTOR_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/process-groups/[a-f0-9\\-]{36}/status/history");
public static final Pattern CONNECTOR_REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN =
Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/remote-process-groups/[a-f0-9\\-]{36}/status/history");
public static final Pattern CONNECTOR_CONNECTION_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/connections/[a-f0-9\\-]{36}/status/history");

private final long componentStatusSnapshotMillis;

Expand All @@ -69,19 +74,19 @@ private Map<String, MetricDescriptor<?>> getStandardMetricDescriptors(final URI

final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>();

if (PROCESSOR_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) {
if (PROCESSOR_STATUS_HISTORY_URI_PATTERN.matcher(path).matches() || CONNECTOR_PROCESSOR_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) {
for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) {
metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor());
}
} else if (PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) {
} else if (PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches() || CONNECTOR_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) {
for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) {
metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor());
}
} else if (REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) {
} else if (REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches() || CONNECTOR_REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) {
for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) {
metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor());
}
} else if (CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) {
} else if (CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(path).matches() || CONNECTOR_CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) {
for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) {
metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,34 @@

import org.junit.jupiter.api.Test;

import java.net.URI;
import java.util.Date;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestStatusHistoryEndpointMerger {

private static final String UUID = "12345678-1234-1234-1234-123456789012";

@Test
public void testCanHandleConnectorStatusHistory() {
final StatusHistoryEndpointMerger merger = new StatusHistoryEndpointMerger(300000);

assertTrue(merger.canHandle(URI.create("/nifi-api/connectors/" + UUID + "/processors/" + UUID + "/status/history"), "GET"));
assertTrue(merger.canHandle(URI.create("/nifi-api/connectors/" + UUID + "/connections/" + UUID + "/status/history"), "GET"));
assertTrue(merger.canHandle(URI.create("/nifi-api/connectors/" + UUID + "/process-groups/" + UUID + "/status/history"), "GET"));
assertTrue(merger.canHandle(URI.create("/nifi-api/connectors/" + UUID + "/remote-process-groups/" + UUID + "/status/history"), "GET"));

// The top-level flow endpoints continue to be handled alongside the connector endpoints.
assertTrue(merger.canHandle(URI.create("/nifi-api/flow/processors/" + UUID + "/status/history"), "GET"));

// Non-GET methods and unrelated connector paths are not handled.
assertFalse(merger.canHandle(URI.create("/nifi-api/connectors/" + UUID + "/processors/" + UUID + "/status/history"), "POST"));
assertFalse(merger.canHandle(URI.create("/nifi-api/connectors/" + UUID + "/status"), "GET"));
}

@Test
public void testNormalizedStatusSnapshotDate() {
final Date date1 = new Date(1388538000000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,46 @@ public interface NiFiServiceFacade {

ProcessGroupStatusEntity getConnectorProcessGroupStatus(String id, Boolean recursive);

/**
* Gets the status history for a Processor inside the Connector's managed flow. The status history is available
* regardless of whether the Connector is in Troubleshooting mode.
*
* @param connectorId the connector id
* @param processorId the processor id within the connector's managed flow
* @return the status history
*/
StatusHistoryEntity getConnectorProcessorStatusHistory(String connectorId, String processorId);

/**
* Gets the status history for a Connection inside the Connector's managed flow. The status history is available
* regardless of whether the Connector is in Troubleshooting mode.
*
* @param connectorId the connector id
* @param connectionId the connection id within the connector's managed flow
* @return the status history
*/
StatusHistoryEntity getConnectorConnectionStatusHistory(String connectorId, String connectionId);

/**
* Gets the status history for a Process Group inside the Connector's managed flow. The status history is available
* regardless of whether the Connector is in Troubleshooting mode.
*
* @param connectorId the connector id
* @param processGroupId the process group id within the connector's managed flow
* @return the status history
*/
StatusHistoryEntity getConnectorProcessGroupStatusHistory(String connectorId, String processGroupId);

/**
* Gets the status history for a Remote Process Group inside the Connector's managed flow. The status history is
* available regardless of whether the Connector is in Troubleshooting mode.
*
* @param connectorId the connector id
* @param remoteProcessGroupId the remote process group id within the connector's managed flow
* @return the status history
*/
StatusHistoryEntity getConnectorRemoteProcessGroupStatusHistory(String connectorId, String remoteProcessGroupId);

Set<ControllerServiceEntity> getConnectorControllerServices(String connectorId, String processGroupId, boolean includeAncestorGroups,
boolean includeDescendantGroups, boolean includeReferencingComponents);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4043,6 +4043,59 @@ public ProcessGroupStatusEntity getConnectorProcessGroupStatus(final String id,
return entityFactory.createProcessGroupStatusEntity(dto, permissions);
}

@Override
public StatusHistoryEntity getConnectorProcessorStatusHistory(final String connectorId, final String processorId) {
final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY);
final ProcessGroup managedGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup();
if (managedGroup.findProcessor(processorId) == null) {
throw new ResourceNotFoundException("Unable to find processor with id '%s' within connector '%s'.".formatted(processorId, connectorId));
}

final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectorNode);
final StatusHistoryDTO dto = controllerFacade.getConnectorProcessorStatusHistory(processorId);
return entityFactory.createStatusHistoryEntity(dto, permissions);
}

@Override
public StatusHistoryEntity getConnectorConnectionStatusHistory(final String connectorId, final String connectionId) {
final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY);
final ProcessGroup managedGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup();
if (managedGroup.findConnection(connectionId) == null) {
throw new ResourceNotFoundException("Unable to find connection with id '%s' within connector '%s'.".formatted(connectionId, connectorId));
}

final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectorNode);
final StatusHistoryDTO dto = controllerFacade.getConnectorConnectionStatusHistory(connectionId);
return entityFactory.createStatusHistoryEntity(dto, permissions);
}

@Override
public StatusHistoryEntity getConnectorProcessGroupStatusHistory(final String connectorId, final String processGroupId) {
final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY);
final ProcessGroup managedGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup();
final ProcessGroup processGroup = managedGroup.getIdentifier().equals(processGroupId) ? managedGroup : managedGroup.findProcessGroup(processGroupId);
if (processGroup == null) {
throw new ResourceNotFoundException("Unable to find process group with id '%s' within connector '%s'.".formatted(processGroupId, connectorId));
}

final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectorNode);
final StatusHistoryDTO dto = controllerFacade.getConnectorProcessGroupStatusHistory(processGroupId);
return entityFactory.createStatusHistoryEntity(dto, permissions);
}

@Override
public StatusHistoryEntity getConnectorRemoteProcessGroupStatusHistory(final String connectorId, final String remoteProcessGroupId) {
final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY);
final ProcessGroup managedGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup();
if (managedGroup.findRemoteProcessGroup(remoteProcessGroupId) == null) {
throw new ResourceNotFoundException("Unable to find remote process group with id '%s' within connector '%s'.".formatted(remoteProcessGroupId, connectorId));
}

final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectorNode);
final StatusHistoryDTO dto = controllerFacade.getConnectorRemoteProcessGroupStatusHistory(remoteProcessGroupId);
return entityFactory.createStatusHistoryEntity(dto, permissions);
}

@Override
public Set<ControllerServiceEntity> getConnectorControllerServices(final String connectorId, final String processGroupId,
final boolean includeAncestorGroups, final boolean includeDescendantGroups, final boolean includeReferencingComponents) {
Expand Down
Loading
Loading