diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index c1421ae064f06..b702226f8c60d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -17,8 +17,10 @@ package org.apache.ignite.internal.managers.discovery; +import org.apache.ignite.internal.codegen.DiscoveryDataPacketSerializer; import org.apache.ignite.internal.codegen.InetAddressMessageSerializer; import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer; +import org.apache.ignite.internal.codegen.NodeSpecificDataSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryCacheMetricsMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer; @@ -34,6 +36,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryMetricsUpdateMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeAddFinishedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFullMetricsMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer; @@ -42,8 +45,10 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; +import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage; +import org.apache.ignite.spi.discovery.tcp.messages.NodeSpecificData; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCacheMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage; @@ -59,6 +64,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage; @@ -70,6 +76,8 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { /** {@inheritDoc} */ @Override public void registerAll(MessageFactory factory) { + factory.register((short)-107, NodeSpecificData::new, new NodeSpecificDataSerializer()); + factory.register((short)-106, DiscoveryDataPacket::new, new DiscoveryDataPacketSerializer()); factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new, new TcpDiscoveryNodeFullMetricsMessageSerializer()); factory.register((short)-104, TcpDiscoveryClientNodesMetricsMessage::new, new TcpDiscoveryClientNodesMetricsMessageSerializer()); @@ -95,5 +103,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)14, TcpDiscoveryMetricsUpdateMessage::new, new TcpDiscoveryMetricsUpdateMessageSerializer()); factory.register((short)15, TcpDiscoveryClientAckResponse::new, new TcpDiscoveryClientAckResponseSerializer()); factory.register((short)16, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer()); + factory.register((short)17, TcpDiscoveryNodeAddFinishedMessage::new, new TcpDiscoveryNodeAddFinishedMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java index 5dff4b089b8fe..eb60eb222e813 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java @@ -28,10 +28,13 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridComponent; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.apache.ignite.spi.discovery.tcp.messages.NodeSpecificData; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC; @@ -39,31 +42,40 @@ * Carries discovery data in marshalled form * and allows convenient way of converting it to and from {@link DiscoveryDataBag} objects. */ -public class DiscoveryDataPacket implements Serializable { - /** Local file header signature(read as a little-endian number). */ - private static int ZIP_HEADER_SIGNATURE = 0x04034b50; +public class DiscoveryDataPacket implements Serializable, Message { + /** Local file header signature (read as a little-endian number). */ + private static final int ZIP_HEADER_SIGNATURE = 0x04034b50; /** */ private static final long serialVersionUID = 0L; /** */ - private final UUID joiningNodeId; + @Order(0) + private UUID joiningNodeId; /** */ + @Order(1) private Map joiningNodeData = new HashMap<>(); /** */ private transient Map unmarshalledJoiningNodeData; /** */ + @Order(2) private Map commonData = new HashMap<>(); /** */ - private Map> nodeSpecificData = new LinkedHashMap<>(); + @Order(3) + private Map nodeSpecificData = new LinkedHashMap<>(); /** */ private transient boolean joiningNodeClient; + /** Constructor. */ + public DiscoveryDataPacket() { + // No-op. + } + /** * @param joiningNodeId Joining node id. */ @@ -78,6 +90,55 @@ public UUID joiningNodeId() { return joiningNodeId; } + /** + * @param joiningNodeId Joining node ID. + */ + public void joiningNodeId(UUID joiningNodeId) { + this.joiningNodeId = joiningNodeId; + } + + /** + * @return Joining node data. + */ + public Map joiningNodeData() { + return joiningNodeData; + } + + /** + * @param joiningNodeData Joining node data. + */ + public void joiningNodeData(Map joiningNodeData) { + this.joiningNodeData = joiningNodeData; + } + + /** + * @return Common data. + */ + public Map commonData() { + return commonData; + } + + /** + * @param commonData Common data. + */ + public void commonData(Map commonData) { + this.commonData = commonData; + } + + /** + * @return Node specific data. + */ + public Map nodeSpecificData() { + return nodeSpecificData; + } + + /** + * @param nodeSpecificData New node specific data. + */ + public void nodeSpecificData(Map nodeSpecificData) { + this.nodeSpecificData = nodeSpecificData; + } + /** * @param bag Bag. * @param nodeId Node id. @@ -98,7 +159,7 @@ public void marshalGridNodeData(DiscoveryDataBag bag, UUID nodeId, Marshaller ma filterDuplicatedData(marshLocNodeSpecificData); if (!marshLocNodeSpecificData.isEmpty()) - nodeSpecificData.put(nodeId, marshLocNodeSpecificData); + nodeSpecificData.put(nodeId, new NodeSpecificData(marshLocNodeSpecificData)); } } @@ -132,8 +193,11 @@ public DiscoveryDataBag unmarshalGridData( if (nodeSpecificData != null && !nodeSpecificData.isEmpty()) { Map> unmarshNodeSpecData = U.newLinkedHashMap(nodeSpecificData.size()); - for (Map.Entry> nodeBinEntry : nodeSpecificData.entrySet()) { - Map nodeBinData = nodeBinEntry.getValue(); + for (Map.Entry nodeBinEntry : nodeSpecificData.entrySet()) { + if (nodeBinEntry.getValue() == null) + continue; + + Map nodeBinData = nodeBinEntry.getValue().nodeSpecificData(); if (nodeBinData == null || nodeBinData.isEmpty()) continue; @@ -260,12 +324,17 @@ public boolean mergeDataFrom( } if (nodeSpecificData.size() != mrgdSpecifDataKeys.size()) { - for (Map.Entry> e : nodeSpecificData.entrySet()) { + for (Map.Entry e : nodeSpecificData.entrySet()) { if (!mrgdSpecifDataKeys.contains(e.getKey())) { - Map data = existingDataPacket.nodeSpecificData.get(e.getKey()); + NodeSpecificData dataMsg = existingDataPacket.nodeSpecificData.get(e.getKey()); - if (data != null && mapsEqual(e.getValue(), data)) { - e.setValue(data); + if (dataMsg == null) + continue; + + Map data = dataMsg.nodeSpecificData(); + + if (data != null && mapsEqual(e.getValue().nodeSpecificData(), data)) { + e.setValue(new NodeSpecificData(data)); boolean add = mrgdSpecifDataKeys.add(e.getKey()); @@ -310,7 +379,7 @@ private boolean mapsEqual(Map m1, Map m2) { * @param clientNode Client node. * @param log Logger. * @param panic Throw unmarshalling if {@code true}. - * @throws IgniteCheckedException If {@code panic} is {@true} and unmarshalling failed. + * @throws IgniteCheckedException If {@code panic} is {@code True} and unmarshalling failed. */ private Map unmarshalData( Map src, @@ -358,11 +427,11 @@ else if (binEntry.getKey() < GridComponent.DiscoveryDataExchangeType.VALUES.leng } /** - * @param value Value to check. + * @param val Value to check. * @return {@code true} if value is zipped. */ - private boolean isZipped(byte[] value) { - return value != null && value.length > 3 && makeInt(value) == ZIP_HEADER_SIGNATURE; + private boolean isZipped(byte[] val) { + return val != null && val.length > 3 && makeInt(val) == ZIP_HEADER_SIGNATURE; } /** @@ -391,7 +460,7 @@ private void marshalData( int compressionLevel, IgniteLogger log ) { - //may happen if nothing was collected from components, + // may happen if nothing was collected from components, // corresponding map (for common data or for node specific data) left null if (src == null) return; @@ -411,13 +480,15 @@ private void marshalData( * TODO https://issues.apache.org/jira/browse/IGNITE-4435 */ private void filterDuplicatedData(Map discoData) { - for (Map existingData : nodeSpecificData.values()) { + for (NodeSpecificData existingData : nodeSpecificData.values()) { Iterator> it = discoData.entrySet().iterator(); while (it.hasNext()) { Map.Entry discoDataEntry = it.next(); - byte[] curData = existingData.get(discoDataEntry.getKey()); + byte[] curData = (existingData == null || existingData.nodeSpecificData() == null) + ? null + : existingData.nodeSpecificData().get(discoDataEntry.getKey()); if (Arrays.equals(curData, discoDataEntry.getValue())) it.remove(); @@ -454,4 +525,9 @@ public void joiningNodeClient(boolean joiningNodeClient) { public void clearUnmarshalledJoiningNodeData() { unmarshalledJoiningNodeData = null; } + + /** {@inheritDoc} */ + @Override public short directType() { + return -106; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/NodeSpecificData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/NodeSpecificData.java new file mode 100644 index 0000000000000..d02d2ba3630db --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/NodeSpecificData.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.io.Serializable; +import java.util.Map; +import java.util.Objects; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** */ +public class NodeSpecificData implements Message, Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @Order(0) + private Map nodeSpecificData; + + /** */ + public NodeSpecificData() { + // No-op. + } + + /** + * @param nodeSpecificData Node specific data. + */ + public NodeSpecificData(Map nodeSpecificData) { + this.nodeSpecificData = nodeSpecificData; + } + + /** + * @return Node specific data. + */ + public Map nodeSpecificData() { + return nodeSpecificData; + } + + /** + * @param nodeSpecificData New node specific data. + */ + public void nodeSpecificData(Map nodeSpecificData) { + this.nodeSpecificData = nodeSpecificData; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -107; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + NodeSpecificData that = (NodeSpecificData)o; + + return Objects.equals(nodeSpecificData, that.nodeSpecificData); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Objects.hashCode(nodeSpecificData); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java index fbdf3660947ff..a0760ed22862a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -19,26 +19,37 @@ import java.util.Map; import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.marshaller.Marshallers.jdk; /** * Sent by coordinator across the ring to finish node add process. */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractTraceableMessage { +public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { /** */ private static final long serialVersionUID = 0L; /** Added node ID. */ - private final UUID nodeId; + @Order(6) + private UUID nodeId; /** * Client node can not get discovery data from TcpDiscoveryNodeAddedMessage, we have to pass discovery data in - * TcpDiscoveryNodeAddFinishedMessage + * TcpDiscoveryNodeAddFinishedMessage. */ + @Order(7) @GridToStringExclude private DiscoveryDataPacket clientDiscoData; @@ -46,6 +57,16 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractTrac @GridToStringExclude private Map clientNodeAttrs; + /** Serialized client node attributes. */ + @Order(value = 8, method = "clientNodeAttributesBytes") + @SuppressWarnings("unused") + private @Nullable byte[] clientNodeAttrsBytesHolder; + + /** Constructor. */ + public TcpDiscoveryNodeAddFinishedMessage() { + // No-op. + } + /** * Constructor. * @@ -78,6 +99,13 @@ public UUID nodeId() { return nodeId; } + /** + * @param nodeId ID of the node added. + */ + public void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } + /** * @return Discovery data for joined client. */ @@ -108,8 +136,44 @@ public void clientNodeAttributes(Map clientNodeAttrs) { this.clientNodeAttrs = clientNodeAttrs; } + /** + * @return Serialized client node attributes. + */ + public byte[] clientNodeAttributesBytes() { + if (clientNodeAttrs == null) + return null; + + try { + return U.marshal(jdk(), clientNodeAttrs); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * @param clientNodeAttrsBytes Serialized client node attributes. + */ + public void clientNodeAttributesBytes(byte[] clientNodeAttrsBytes) { + if (F.isEmpty(clientNodeAttrsBytes)) + clientNodeAttrs = null; + else { + try { + clientNodeAttrs = U.unmarshal(jdk(), clientNodeAttrsBytes, U.gridClassLoader()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryNodeAddFinishedMessage.class, this, "super", super.toString()); } + + /** {@inheritDoc} */ + @Override public short directType() { + return 17; + } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 3b729cb19ea77..999c7c747c35e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -79,6 +79,7 @@ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryStatistics; import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.NodeSpecificData; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; @@ -2518,7 +2519,7 @@ private static class TestDiscoveryDataDuplicateSpi extends TcpDiscoverySpi { DiscoveryDataPacket dataPacket = ((TcpDiscoveryNodeAddedMessage)msg).gridDiscoveryData(); if (dataPacket != null) { - Map> discoData = U.field(dataPacket, "nodeSpecificData"); + Map discoData = U.field(dataPacket, "nodeSpecificData"); checkDiscoData(discoData, msg); } @@ -2527,7 +2528,7 @@ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { DiscoveryDataPacket dataPacket = ((TcpDiscoveryNodeAddFinishedMessage)msg).clientDiscoData(); if (dataPacket != null) { - Map> discoData = U.field(dataPacket, "nodeSpecificData"); + Map discoData = U.field(dataPacket, "nodeSpecificData"); checkDiscoData(discoData, msg); } @@ -2540,12 +2541,12 @@ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { * @param discoData Discovery data. * @param msg Message. */ - private void checkDiscoData(Map> discoData, TcpDiscoveryAbstractMessage msg) { + private void checkDiscoData(Map discoData, TcpDiscoveryAbstractMessage msg) { if (discoData != null && discoData.size() > 1) { int cnt = 0; - for (Map map : discoData.values()) { - if (map.containsKey(GridComponent.DiscoveryDataExchangeType.CACHE_PROC.ordinal())) + for (NodeSpecificData data : discoData.values()) { + if (data.nodeSpecificData().containsKey(GridComponent.DiscoveryDataExchangeType.CACHE_PROC.ordinal())) cnt++; }