diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java index 6953d8b853891..4a8f556781cf7 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java @@ -322,7 +322,7 @@ private static class AttributeValueHolder { } /** Allows to change multiple attribute values in a single update operation and skip updates that changes nothing. */ - private static class ContextUpdater { + static class ContextUpdater { /** */ private static final int INIT_UPDATES_CAPACITY = 3; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 9da592635d229..f6b51fabbcde3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -666,6 +666,10 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(PartitionHashRecord.class); withNoSchema(TransactionsHashRecord.class); + // [13400 - 13500]: Operation context messages. + msgIdx = 13400; + withNoSchema(DistributedOperationContextMessage.class); + assert msgIdx <= MAX_MESSAGE_ID; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java new file mode 100644 index 0000000000000..0277784044272 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.List; +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** Transport for {@link OperationContext} distributed attributes. */ +public class DistributedOperationContextMessage implements Message { + /** Values of operation context attributes. */ + @Order(0) + public List vals; + + /** Bitmask of effective attributes ids. */ + @Order(1) + public byte idBitmap; + + /** Empty constructor for serialization purposes. */ + public DistributedOperationContextMessage() { + // No-op. + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 3acf503561ad8..bdbc375fc9ff3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -102,6 +102,7 @@ import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; import org.apache.ignite.internal.processors.tracing.Span; import org.apache.ignite.internal.processors.tracing.SpanTags; +import org.apache.ignite.internal.thread.context.DistributedOperationContextManager; import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; import org.apache.ignite.internal.util.IgniteUtils; @@ -455,10 +456,14 @@ public void resetMetrics() { ioMetric.register(RCVD_BYTES_CNT, spi::getReceivedBytesCount, "Received bytes count."); - getSpi().setListener(commLsnr = new CommunicationListenerEx() { + getSpi().setListener(commLsnr = new CommunicationListenerEx<>() { @Override public void onMessage(UUID nodeId, Object msg, IgniteRunnable msgC) { try { - onMessage0(nodeId, (GridIoMessage)msg, msgC); + GridIoMessage msg0 = (GridIoMessage)msg; + + try (Scope ignored = DistributedOperationContextManager.instance().restoreDistributedAttributes(msg0.opCtxMsg)) { + onMessage0(nodeId, msg0, msgC); + } } catch (ClassCastException ignored) { U.error(log, "Communication manager received message of unknown type (will ignore): " + @@ -2037,16 +2042,22 @@ private long getInverseConnectionWaitTimeout() { long timeout, boolean skipOnTimeout ) { + GridIoMessage res; + if (ctx.security().enabled()) { UUID secSubjId = null; if (!ctx.security().isDefaultContext()) secSubjId = ctx.security().securityContext().subject().id(); - return new GridIoSecurityAwareMessage(secSubjId, plc, topic, msg, ordered, timeout, skipOnTimeout); + res = new GridIoSecurityAwareMessage(secSubjId, plc, topic, msg, ordered, timeout, skipOnTimeout); } + else + res = new GridIoMessage(plc, topic, msg, ordered, timeout, skipOnTimeout); + + res.opCtxMsg = DistributedOperationContextManager.instance().collectDistributedAttributes(); - return new GridIoMessage(plc, topic, msg, ordered, timeout, skipOnTimeout); + return res; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java index 8cc6c106cf22e..cb09122415def 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.managers.communication; +import org.apache.ignite.internal.DistributedOperationContextMessage; import org.apache.ignite.internal.ExecutorAwareMessage; import org.apache.ignite.internal.GridTopicMessage; import org.apache.ignite.internal.Order; @@ -64,6 +65,11 @@ public class GridIoMessage implements Message, SpanTransport { @Order(6) byte[] span; + /** Effective operation context attributes. */ + @Order(7) + @GridToStringInclude + public @Nullable DistributedOperationContextMessage opCtxMsg; + /** * Default constructor. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java new file mode 100644 index 0000000000000..a95e48880353f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.thread.context; + +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.DistributedOperationContextMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** */ +public class DistributedOperationContextManager { + /** */ + static final byte MAX_DISTRIBUTED_ATTR_CNT = 7; + + /** */ + private static final DistributedOperationContextManager INSTANCE = new DistributedOperationContextManager(); + + /** Attributes by their id. */ + private final Map> attrs = new ConcurrentSkipListMap<>(); + + /** */ + public static DistributedOperationContextManager instance() { + return INSTANCE; + } + + /** */ + public OperationContextAttribute createDistributedAttriubte(byte id, @Nullable T initVal) { + assert id >= 0; + + if (attrs.size() == OperationContextAttribute.MAX_ATTR_CNT) { + throw new IgniteException("Maximum number of distributed attributes is exceeded [max=" + + OperationContextAttribute.MAX_ATTR_CNT + "]."); + } + + return (OperationContextAttribute)attrs.compute(id, (id0, attr0) -> { + if (attr0 != null) + throw new IgniteException("Duplicated distributed attribute id [id=" + id + "]."); + + return OperationContextAttribute.newInstance(initVal); + }); + } + + /** */ + public @Nullable DistributedOperationContextMessage collectDistributedAttributes() { + DistributedOperationContextMessage res = null; + + for (Map.Entry> e : attrs.entrySet()) { + OperationContextAttribute attr = e.getValue(); + + Message curVal = OperationContext.get(attr); + + assert attr.initialValue() == null || curVal == null || curVal.getClass().isAssignableFrom(attr.initialValue().getClass()); + + if (curVal != attr.initialValue()) { + if (res == null) { + res = new DistributedOperationContextMessage(); + + res.vals = new ArrayList<>(MAX_DISTRIBUTED_ATTR_CNT / 2); + } + + byte mask = (byte)(1 << e.getKey()); + + assert (res.idBitmap & mask) == 0; + + res.vals.add(curVal); + res.idBitmap |= mask; + } + } + + return res; + } + + /** */ + public Scope restoreDistributedAttributes(@Nullable DistributedOperationContextMessage msg) { + if (msg == null) + return Scope.NOOP_SCOPE; + + assert msg.idBitmap != 0; + assert !F.isEmpty(msg.vals); + assert msg.vals.size() <= MAX_DISTRIBUTED_ATTR_CNT; + + OperationContext.ContextUpdater updater = OperationContext.ContextUpdater.create(); + + for (byte valIdx = 0, maskIdx = 0; valIdx < msg.vals.size(); ++valIdx) { + Message curVal = msg.vals.get(valIdx); + + while ((msg.idBitmap & (1 << maskIdx)) == 0) { + assert maskIdx <= MAX_DISTRIBUTED_ATTR_CNT; + + ++maskIdx; + } + + updater.set((OperationContextAttribute)attrs.get(maskIdx++), curVal); + } + + return updater.apply(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 3078e20a0e9fc..f9ab18727eada 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -616,7 +616,7 @@ private void dumpInfo(StringBuilder sb, UUID dstNodeId) { ctxInitLatch, client, igniteExSupplier, - new CommunicationListener() { + new CommunicationListener<>() { @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) { notifyListener(nodeId, msg, msgC); } @@ -651,7 +651,7 @@ private void dumpInfo(StringBuilder sb, UUID dstNodeId) { getWorkersRegistry(ignite), ignite instanceof IgniteEx ? ((IgniteEx)ignite).context().metric() : null, this::createTcpClient, - new CommunicationListenerEx() { + new CommunicationListenerEx<>() { @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) { notifyListener(nodeId, msg, msgC); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index a0e1a20048786..ec153176d64b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -70,6 +70,8 @@ import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable; +import org.apache.ignite.internal.thread.context.DistributedOperationContextManager; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; @@ -1310,6 +1312,8 @@ private class SocketWriter extends IgniteSpiThread { * @param msg Message. */ private void sendMessage(TcpDiscoveryAbstractMessage msg) { + msg.opCtxMsg = DistributedOperationContextManager.instance().collectDistributedAttributes(); + synchronized (mux) { queue.add(msg); @@ -2001,7 +2005,11 @@ else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage) } } - processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg); + TcpDiscoveryAbstractMessage msg0 = (TcpDiscoveryAbstractMessage)msg; + + try (Scope ignored = DistributedOperationContextManager.instance().restoreDistributedAttributes(msg0.opCtxMsg)) { + processDiscoveryMessage(msg0); + } } } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 82c012c2a1a94..a8a4f2f0a473e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -95,6 +95,8 @@ import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable; +import org.apache.ignite.internal.thread.context.DistributedOperationContextManager; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor; import org.apache.ignite.internal.util.GridBoundedLinkedHashSet; import org.apache.ignite.internal.util.GridConcurrentHashSet; @@ -3046,8 +3048,10 @@ void addMessage(TcpDiscoveryAbstractMessage msg, boolean ignoreHighPriority, boo return; } - if (msg instanceof TraceableMessage) { - TraceableMessage tMsg = (TraceableMessage)msg; + if (!fromSocket) + msg.opCtxMsg = DistributedOperationContextManager.instance().collectDistributedAttributes(); + + if (msg instanceof TraceableMessage tMsg) { // If we read this message from socket. if (fromSocket) @@ -3173,11 +3177,8 @@ protected void runTasks() { task.run(); } - /** {@inheritDoc} */ - @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { - if (msg == WAKEUP) - return; - + /** */ + private void processMessage0(TcpDiscoveryAbstractMessage msg) { notifiedDiscovery.set(false); if (msg instanceof TraceableMessage) { @@ -3315,6 +3316,16 @@ else if (msg instanceof TcpDiscoveryAuthFailedMessage) } } + /** {@inheritDoc} */ + @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + if (msg == WAKEUP) + return; + + try (Scope ignored = DistributedOperationContextManager.instance().restoreDistributedAttributes(msg.opCtxMsg)) { + processMessage0(msg); + } + } + /** * Processes authentication failed message. * diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java index f23e36f200d27..d76279fb28082 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java @@ -52,7 +52,6 @@ public int port() { return port; } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(InetSocketAddressMessage.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 7a97763c36b25..9dda990c7d020 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.DistributedOperationContextMessage; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -76,6 +77,11 @@ public abstract class TcpDiscoveryAbstractMessage implements Message { @Order(4) Set failedNodes; + /** Operation context attributes message. */ + @GridToStringInclude + @Order(5) + public @Nullable DistributedOperationContextMessage opCtxMsg; + /** * Default no-arg constructor for {@link Externalizable} interface. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java index 9de906b27290c..7bef37a872d43 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -17,10 +17,12 @@ package org.apache.ignite.internal.thread.context; +import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -36,8 +38,17 @@ import java.util.function.Function; import java.util.function.Supplier; import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.communication.IgniteIoTestMessage; +import org.apache.ignite.internal.managers.discovery.CustomEventListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture; @@ -48,6 +59,7 @@ import org.apache.ignite.internal.thread.pool.IgniteStripedThreadPoolExecutor; import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.queue.IgniteAsyncObjectHandler; import org.apache.ignite.internal.util.worker.queue.IgniteDelayedObjectHandler; @@ -56,6 +68,7 @@ import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.thread.IgniteThread; import org.junit.Test; @@ -64,6 +77,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** */ public class OperationContextAttributesTest extends GridCommonAbstractTest { @@ -98,6 +112,8 @@ public class OperationContextAttributesTest extends GridCommonAbstractTest { @Override protected void afterTest() throws Exception { super.afterTest(); + stopAllGrids(); + if (poolToShutdownAfterTest != null) poolToShutdownAfterTest.shutdownNow(); @@ -808,6 +824,206 @@ public void testContextAwareDelayQueue() throws Exception { } } + /** */ + @Test + public void testSendAttributesByDiscovery() throws Exception { + byte attrId1 = 0; + byte attrId2 = DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT; + + InetSocketAddressMessage dfltDistAttr1Val = new InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80); + GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1); + + // Local attribute 1. + OperationContextAttribute.newInstance(1000); + + // Distributed attribute 1. + OperationContextAttribute dAttr1 = DistributedOperationContextManager.instance() + .createDistributedAttriubte(attrId1, dfltDistAttr1Val); + + // Local attribute 2. + OperationContextAttribute.newInstance("locaAttr2"); + + // Distributed attribute 2. + OperationContextAttribute dAttr2 = DistributedOperationContextManager.instance() + .createDistributedAttriubte(attrId2, dfltDistrAttr2Val); + + startGrids(2); + startClientGrid(2); + + CountDownLatch coordLatch = new CountDownLatch(3); + CountDownLatch srvrLatch = new CountDownLatch(3); + CountDownLatch clientLatch = new CountDownLatch(3); + + InetSocketAddressMessage valToSend1 = new InetSocketAddressMessage(dfltDistAttr1Val.address(), 443); + GridCacheVersion valToSend2 = new GridCacheVersion(2, 2, 2); + + for (int i = 0; i < G.allGrids().size(); ++i) { + int i0 = i; + + grid(i).context().discovery().setCustomEventListener( + DynamicCacheChangeBatch.class, new CustomEventListener<>() { + @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, + DynamicCacheChangeBatch msg) { + + InetSocketAddressMessage receivedVal1 = OperationContext.get(dAttr1); + GridCacheVersion receivedVal2 = OperationContext.get(dAttr2); + + assertNotNull(receivedVal1); + assertNotNull(receivedVal2); + + assertFalse(dfltDistAttr1Val.port() == receivedVal1.port()); + assertEquals(receivedVal1.port(), valToSend1.port()); + assertEquals(receivedVal1.address(), valToSend1.address()); + + assertFalse(dfltDistrAttr2Val.equals(receivedVal2)); + assertTrue(valToSend2.equals(receivedVal2)); + + if (grid(i0).localNode().isClient()) + clientLatch.countDown(); + else if (grid(i0).localNode().order() == 1) + coordLatch.countDown(); + else + srvrLatch.countDown(); + } + }); + } + + // Send from the coordinator. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + grid(0).createCache(defaultCacheConfiguration()); + } + + assertTrue(waitForCondition(() -> coordLatch.getCount() == 2, getTestTimeout())); + assertTrue(waitForCondition(() -> srvrLatch.getCount() == 2, getTestTimeout())); + assertTrue(waitForCondition(() -> clientLatch.getCount() == 2, getTestTimeout())); + + // Send from a server. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + grid(1).destroyCache(DEFAULT_CACHE_NAME); + } + + assertTrue(waitForCondition(() -> coordLatch.getCount() == 1, getTestTimeout())); + assertTrue(waitForCondition(() -> srvrLatch.getCount() == 1, getTestTimeout())); + assertTrue(waitForCondition(() -> clientLatch.getCount() == 1, getTestTimeout())); + + // Send from a client. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + grid(2).createCache(defaultCacheConfiguration()); + } + + assertTrue(coordLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + assertTrue(srvrLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + } + + /** */ + @Test + public void testSendAttributesByCommunication() throws Exception { + byte attrId1 = 0; + byte attrId2 = DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT; + + InetSocketAddressMessage dfltDistrAttr1Val = new InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80); + GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1); + + // Local attribute 1. + OperationContextAttribute.newInstance(1000); + + // Distributed attribute 1. + OperationContextAttribute dAttr1 = DistributedOperationContextManager.instance() + .createDistributedAttriubte(attrId1, dfltDistrAttr1Val); + + // Local attribute 2. + OperationContextAttribute.newInstance("locaAttr2"); + + // Distributed attribute 2. + OperationContextAttribute dAttr2 = DistributedOperationContextManager.instance() + .createDistributedAttriubte(attrId2, dfltDistrAttr2Val); + + startGrids(2); + startClientGrid(2); + + CountDownLatch coordLatch = new CountDownLatch(2); + CountDownLatch srvrLatch = new CountDownLatch(4); + CountDownLatch clientLatch = new CountDownLatch(2); + + InetSocketAddressMessage valToSend1 = new InetSocketAddressMessage(dfltDistrAttr1Val.address(), 443); + GridCacheVersion valToSend2 = new GridCacheVersion(2, 2, 2); + + for (int i = 0; i < G.allGrids().size(); ++i) { + int i0 = i; + + grid(i).context().io().addMessageListener(GridTopic.TOPIC_IO_TEST, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { + if (msg instanceof IgniteIoTestMessage) { + InetSocketAddressMessage receivedVal1 = OperationContext.get(dAttr1); + GridCacheVersion receivedVal2 = OperationContext.get(dAttr2); + + assertNotNull(receivedVal1); + assertNotNull(receivedVal2); + + assertFalse(dfltDistrAttr1Val.port() == receivedVal1.port()); + assertEquals(receivedVal1.port(), valToSend1.port()); + assertEquals(receivedVal1.address(), valToSend1.address()); + + assertEquals(receivedVal2, valToSend2); + + if (grid(i0).localNode().isClient()) + clientLatch.countDown(); + else if (grid(i0).localNode().order() == 1) + coordLatch.countDown(); + else + srvrLatch.countDown(); + } + } + }); + } + + assertFalse(valToSend1.equals(dfltDistrAttr1Val)); + assertFalse(valToSend2.equals(dfltDistrAttr2Val)); + + // From the coordinator to a server. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + io(0).sendIoTest(node(1), null, false); + io(0).sendIoTest(node(1), null, true); + } + + assertTrue(waitForCondition(() -> coordLatch.getCount() == 2, getTestTimeout())); + + // From a server to the coordinator. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + io(1).sendIoTest(node(0), null, false); + io(1).sendIoTest(node(0), null, true); + } + + assertTrue(coordLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + + // From a client to a server. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + io(2).sendIoTest(node(1), null, false); + io(2).sendIoTest(node(1), null, true); + } + + assertTrue(srvrLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + + // From a server to a client. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + io(1).sendIoTest(node(2), null, false); + io(1).sendIoTest(node(2), null, true); + } + + assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + } + + /** @return a {@link ClusterNode} with {@link ClusterNode#isLocal()} == {@code false} to avoid some asserts/checks. */ + private ClusterNode node(int nodeIdx) { + return grid(0).cluster().node(grid(nodeIdx).localNode().id()); + } + + /** */ + private GridIoManager io(int nodeIdx) { + return grid(nodeIdx).context().io(); + } + /** */ private void doContextAwareExecutorServiceTest(ExecutorService pool) throws Exception { CountDownLatch poolUnblockedLatch = blockPool(pool); @@ -923,9 +1139,8 @@ public AttributeValueChecker(String expStrAttrVal, Integer expIntAttrVal) { /** */ static void assertAllCreatedChecksPassed() throws Exception { - for (AttributeValueChecker check : CHECKS) { + for (AttributeValueChecker check : CHECKS) check.get(5_000, MILLISECONDS); - } } /** */