From 1740273da20568067a1a524dab000c79f8068d3d Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 11 Jun 2026 12:23:26 +0300 Subject: [PATCH 01/17] impl --- .../internal/managers/communication/UnknownMessageException.java | 0 .../apache/ignite/plugin/extensions/communication/Message.java | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename modules/{core => commons}/src/main/java/org/apache/ignite/internal/managers/communication/UnknownMessageException.java (100%) rename modules/{core => commons}/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java (100%) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/UnknownMessageException.java b/modules/commons/src/main/java/org/apache/ignite/internal/managers/communication/UnknownMessageException.java similarity index 100% rename from modules/core/src/main/java/org/apache/ignite/internal/managers/communication/UnknownMessageException.java rename to modules/commons/src/main/java/org/apache/ignite/internal/managers/communication/UnknownMessageException.java diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java b/modules/commons/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java similarity index 100% rename from modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java rename to modules/commons/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java From baa147367a3876e08e30661b87fa6251cdd61290 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 12 Jun 2026 12:43:24 +0300 Subject: [PATCH 02/17] in-progress --- ...utedOperationContextAttributeRegistry.java | 83 +++++++++++++++++++ .../thread/context/OperationContext.java | 2 +- .../context/OperationContextAttribute.java | 1 + .../ignite/spi/discovery/tcp/ClientImpl.java | 14 +++- .../ignite/spi/discovery/tcp/ServerImpl.java | 26 ++++-- .../messages/InetSocketAddressMessage.java | 1 - .../messages/TcpDiscoveryAbstractMessage.java | 6 ++ .../OperationContextAttributesTest.java | 76 ++++++++++++++++- 8 files changed, 199 insertions(+), 10 deletions(-) create mode 100644 modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java new file mode 100644 index 0000000000000..93c4ddb621c34 --- /dev/null +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.thread.context; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** */ +public class DistributedOperationContextAttributeRegistry { + /** */ + private static final DistributedOperationContextAttributeRegistry INSTANCE = new DistributedOperationContextAttributeRegistry(); + + /** Attributes by their id. */ + private final Map> attributes = new ConcurrentHashMap<>(); + + /** */ + public static DistributedOperationContextAttributeRegistry instance() { + return INSTANCE; + } + + /** */ + public void register(byte id, OperationContextAttribute attr) { + assert id >= 0; + + if(attributes.size() == OperationContextAttribute.MAX_ATTR_CNT) + throw new IgniteException("Maximum number of attributes is exceeded [" + OperationContextAttribute.MAX_ATTR_CNT + "]."); + + if (attributes.putIfAbsent(id, attr) != null) + throw new IgniteException("Duplicated attribute id: " + id); + } + + /** @return Values for all registered operation context attributes. */ + public @Nullable Map collectContext() { + Map res = null; + + for (Map.Entry> e : attributes.entrySet()) { + OperationContextAttribute attr = e.getValue(); + + Message curVal = OperationContext.get(attr); + + if (!Objects.equals(attr.initialValue(), curVal)) { + if (res == null) + res = new HashMap<>(attributes.size(), 1.0f); + + res.put(e.getKey(), curVal); + } + } + + return res; + } + + /** */ + public Scope restoreContext(Map res) { + if (F.isEmpty(res)) + return Scope.NOOP_SCOPE; + + OperationContext.ContextUpdater updater = OperationContext.ContextUpdater.create(); + + res.forEach((id, attr) -> updater.set((OperationContextAttribute)attributes.get(id), attr)); + + return updater.apply(); + } +} diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java index 6953d8b853891..4a8f556781cf7 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java @@ -322,7 +322,7 @@ private static class AttributeValueHolder { } /** Allows to change multiple attribute values in a single update operation and skip updates that changes nothing. */ - private static class ContextUpdater { + static class ContextUpdater { /** */ private static final int INIT_UPDATES_CAPACITY = 3; diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java index 499d241d9ccba..f5f20066a3d2f 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.thread.context; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index a0e1a20048786..3476ba8785843 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -70,6 +70,8 @@ import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable; +import org.apache.ignite.internal.thread.context.DistributedOperationContextAttributeRegistry; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; @@ -1310,6 +1312,8 @@ private class SocketWriter extends IgniteSpiThread { * @param msg Message. */ private void sendMessage(TcpDiscoveryAbstractMessage msg) { + msg.opCtxAttrs = DistributedOperationContextAttributeRegistry.instance().collectContext(); + synchronized (mux) { queue.add(msg); @@ -2001,7 +2005,15 @@ else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage) } } - processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg); + TcpDiscoveryAbstractMessage msg0 = (TcpDiscoveryAbstractMessage)msg; + + if (F.isEmpty(msg0.opCtxAttrs)) + processDiscoveryMessage(msg0); + else { + try (Scope ignored = DistributedOperationContextAttributeRegistry.instance().restoreContext(msg0.opCtxAttrs)) { + processDiscoveryMessage(msg0); + } + } } } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 82c012c2a1a94..a32bf95fd1bc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -95,6 +95,8 @@ import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable; +import org.apache.ignite.internal.thread.context.DistributedOperationContextAttributeRegistry; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor; import org.apache.ignite.internal.util.GridBoundedLinkedHashSet; import org.apache.ignite.internal.util.GridConcurrentHashSet; @@ -3046,6 +3048,9 @@ void addMessage(TcpDiscoveryAbstractMessage msg, boolean ignoreHighPriority, boo return; } + if (!fromSocket) + msg.opCtxAttrs = DistributedOperationContextAttributeRegistry.instance().collectContext(); + if (msg instanceof TraceableMessage) { TraceableMessage tMsg = (TraceableMessage)msg; @@ -3173,11 +3178,8 @@ protected void runTasks() { task.run(); } - /** {@inheritDoc} */ - @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { - if (msg == WAKEUP) - return; - + /** */ + private void processMessage0(TcpDiscoveryAbstractMessage msg) { notifiedDiscovery.set(false); if (msg instanceof TraceableMessage) { @@ -3315,6 +3317,20 @@ else if (msg instanceof TcpDiscoveryAuthFailedMessage) } } + /** {@inheritDoc} */ + @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + if (msg == WAKEUP) + return; + + if (F.isEmpty(msg.opCtxAttrs)) + processMessage0(msg); + else { + try (Scope ignored = DistributedOperationContextAttributeRegistry.instance().restoreContext(msg.opCtxAttrs)) { + processMessage0(msg); + } + } + } + /** * Processes authentication failed message. * diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java index f23e36f200d27..d76279fb28082 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java @@ -52,7 +52,6 @@ public int port() { return port; } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(InetSocketAddressMessage.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 7a97763c36b25..e04f1d856c0ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -19,6 +19,7 @@ import java.io.Externalizable; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.Order; @@ -76,6 +77,11 @@ public abstract class TcpDiscoveryAbstractMessage implements Message { @Order(4) Set failedNodes; + /** Operation context attributes: id -> attribute value. */ + @GridToStringInclude + @Order(5) + public @Nullable Map opCtxAttrs; + /** * Default no-arg constructor for {@link Externalizable} interface. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java index 9de906b27290c..e6454be489cc7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.thread.context; +import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; @@ -35,9 +36,17 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.managers.discovery.CustomEventListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture; @@ -48,20 +57,24 @@ import org.apache.ignite.internal.thread.pool.IgniteStripedThreadPoolExecutor; import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.queue.IgniteAsyncObjectHandler; import org.apache.ignite.internal.util.worker.queue.IgniteDelayedObjectHandler; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.thread.IgniteThread; import org.junit.Test; import org.springframework.lang.NonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; @@ -85,6 +98,9 @@ public class OperationContextAttributesTest extends GridCommonAbstractTest { /** */ private int beforeTestReservedAttrIds; + /** */ + private IgnitePredicate evtLsnr; + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -98,6 +114,8 @@ public class OperationContextAttributesTest extends GridCommonAbstractTest { @Override protected void afterTest() throws Exception { super.afterTest(); + stopAllGrids(); + if (poolToShutdownAfterTest != null) poolToShutdownAfterTest.shutdownNow(); @@ -105,6 +123,16 @@ public class OperationContextAttributesTest extends GridCommonAbstractTest { OperationContextAttribute.ID_GEN.set(beforeTestReservedAttrIds); } + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (evtLsnr != null) + cfg.setLocalEventListeners(Collections.singletonMap(evtLsnr, new int[] {EVT_DISCOVERY_CUSTOM_EVT})); + + return cfg; + } + /** */ @Test public void testNotAttachedAttribute() { @@ -808,6 +836,51 @@ public void testContextAwareDelayQueue() throws Exception { } } + /** */ + @Test + public void testSendAttributesByDiscovery() throws Exception { + byte attrId = (byte)(OperationContextAttribute.MAX_ATTR_CNT + 1); + + InetSocketAddressMessage dfltAttrVal = new InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80); + + OperationContextAttribute attr = OperationContextAttribute.newInstance(); + + DistributedOperationContextAttributeRegistry.instance().register(attrId, attr); + + startGrids(2); + Ignite cli = startClientGrid(); + + CountDownLatch clientLatch = new CountDownLatch(1); + CountDownLatch srvrLatch = new CountDownLatch(1); + + for (int i = 1; i < G.allGrids().size(); ++i) { + int i0 = i; + + grid(i).context().discovery().setCustomEventListener( + DynamicCacheChangeBatch.class, new CustomEventListener<>() { + @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, + DynamicCacheChangeBatch msg) { + + if (grid(i0).localNode().isClient()) + clientLatch.countDown(); + else + srvrLatch.countDown(); + } + }); + } + + InetSocketAddressMessage newAttrVal = new InetSocketAddressMessage(dfltAttrVal.address(), 443); + + assertFalse(newAttrVal.equals(dfltAttrVal)); + + try (Scope ignored = OperationContext.set(attr, newAttrVal)) { + grid(0).createCache(defaultCacheConfiguration()); + } + + assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + assertTrue(srvrLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + } + /** */ private void doContextAwareExecutorServiceTest(ExecutorService pool) throws Exception { CountDownLatch poolUnblockedLatch = blockPool(pool); @@ -923,9 +996,8 @@ public AttributeValueChecker(String expStrAttrVal, Integer expIntAttrVal) { /** */ static void assertAllCreatedChecksPassed() throws Exception { - for (AttributeValueChecker check : CHECKS) { + for (AttributeValueChecker check : CHECKS) check.get(5_000, MILLISECONDS); - } } /** */ From 9f8e09d8a01167e5b8bb7a6b576353850cc792a4 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Sun, 14 Jun 2026 20:39:02 +0300 Subject: [PATCH 03/17] raw --- ...utedOperationContextAttributeRegistry.java | 2 +- .../context/OperationContextAttribute.java | 1 - .../OperationContextAttributesTest.java | 59 +++++++++++++++---- 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java index 93c4ddb621c34..4d9e9c0fd6698 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java @@ -42,7 +42,7 @@ public static DistributedOperationContextAttributeRegistry instance() { public void register(byte id, OperationContextAttribute attr) { assert id >= 0; - if(attributes.size() == OperationContextAttribute.MAX_ATTR_CNT) + if (attributes.size() == OperationContextAttribute.MAX_ATTR_CNT) throw new IgniteException("Maximum number of attributes is exceeded [" + OperationContextAttribute.MAX_ATTR_CNT + "]."); if (attributes.putIfAbsent(id, attr) != null) diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java index f5f20066a3d2f..499d241d9ccba 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.thread.context; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java index e6454be489cc7..e82721c10b114 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -36,12 +36,10 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import org.apache.ignite.Ignite; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.Event; -import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.discovery.CustomEventListener; @@ -77,6 +75,7 @@ import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** */ public class OperationContextAttributesTest extends GridCommonAbstractTest { @@ -848,12 +847,15 @@ public void testSendAttributesByDiscovery() throws Exception { DistributedOperationContextAttributeRegistry.instance().register(attrId, attr); startGrids(2); - Ignite cli = startClientGrid(); + startClientGrid(2); - CountDownLatch clientLatch = new CountDownLatch(1); - CountDownLatch srvrLatch = new CountDownLatch(1); + CountDownLatch coordLatch = new CountDownLatch(3); + CountDownLatch srvrLatch = new CountDownLatch(3); + CountDownLatch clientLatch = new CountDownLatch(3); - for (int i = 1; i < G.allGrids().size(); ++i) { + InetSocketAddressMessage valToSend = new InetSocketAddressMessage(dfltAttrVal.address(), 443); + + for (int i = 0; i < G.allGrids().size(); ++i) { int i0 = i; grid(i).context().discovery().setCustomEventListener( @@ -861,24 +863,61 @@ public void testSendAttributesByDiscovery() throws Exception { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, DynamicCacheChangeBatch msg) { + InetSocketAddressMessage receivedVal = OperationContext.get(attr); + + assertNotNull(receivedVal); + + assertFalse(dfltAttrVal.port() == receivedVal.port()); + + assertEquals(receivedVal.port(), valToSend.port()); + assertEquals(receivedVal.address(), valToSend.address()); + if (grid(i0).localNode().isClient()) clientLatch.countDown(); + else if (grid(i0).localNode().order() == 1) + coordLatch.countDown(); else srvrLatch.countDown(); } }); } - InetSocketAddressMessage newAttrVal = new InetSocketAddressMessage(dfltAttrVal.address(), 443); + assertFalse(valToSend.equals(dfltAttrVal)); - assertFalse(newAttrVal.equals(dfltAttrVal)); + assertNull(OperationContext.get(attr)); - try (Scope ignored = OperationContext.set(attr, newAttrVal)) { + // Send from a coordinator. + try (Scope ignored = OperationContext.set(attr, valToSend)) { grid(0).createCache(defaultCacheConfiguration()); } - assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + assertTrue(waitForCondition(() -> coordLatch.getCount() == 2, getTestTimeout())); + assertTrue(waitForCondition(() -> srvrLatch.getCount() == 2, getTestTimeout())); + assertTrue(waitForCondition(() -> clientLatch.getCount() == 2, getTestTimeout())); + + assertNull(OperationContext.get(attr)); + + // Send from a server. + try (Scope ignored = OperationContext.set(attr, valToSend)) { + grid(1).destroyCache(DEFAULT_CACHE_NAME); + } + + assertTrue(waitForCondition(() -> coordLatch.getCount() == 1, getTestTimeout())); + assertTrue(waitForCondition(() -> srvrLatch.getCount() == 1, getTestTimeout())); + assertTrue(waitForCondition(() -> clientLatch.getCount() == 1, getTestTimeout())); + + assertNull(OperationContext.get(attr)); + + // Send from a client. + try (Scope ignored = OperationContext.set(attr, valToSend)) { + grid(2).createCache(defaultCacheConfiguration()); + } + + assertNull(OperationContext.get(attr)); + + assertTrue(coordLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); assertTrue(srvrLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); } /** */ From 9f4ccecbea9382da2a1da57cd5d595d6c9966045 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 15 Jun 2026 20:42:00 +0300 Subject: [PATCH 04/17] impl --- ...utedOperationContextAttributeRegistry.java | 21 +++++++++---------- .../UnknownMessageException.java | 0 .../extensions/communication/Message.java | 0 3 files changed, 10 insertions(+), 11 deletions(-) rename modules/{commons => core}/src/main/java/org/apache/ignite/internal/managers/communication/UnknownMessageException.java (100%) rename modules/{commons => core}/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java (100%) diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java index 4d9e9c0fd6698..d81b0d1d321cb 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java @@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** */ @@ -31,7 +30,7 @@ public class DistributedOperationContextAttributeRegistry { private static final DistributedOperationContextAttributeRegistry INSTANCE = new DistributedOperationContextAttributeRegistry(); /** Attributes by their id. */ - private final Map> attributes = new ConcurrentHashMap<>(); + private final Map> attributes = new ConcurrentHashMap<>(); /** */ public static DistributedOperationContextAttributeRegistry instance() { @@ -39,7 +38,7 @@ public static DistributedOperationContextAttributeRegistry instance() { } /** */ - public void register(byte id, OperationContextAttribute attr) { + public void register(byte id, OperationContextAttribute attr) { assert id >= 0; if (attributes.size() == OperationContextAttribute.MAX_ATTR_CNT) @@ -50,19 +49,19 @@ public void register(byte id, OperationContextAttribute a } /** @return Values for all registered operation context attributes. */ - public @Nullable Map collectContext() { - Map res = null; + public @Nullable Map collectContext() { + Map res = null; - for (Map.Entry> e : attributes.entrySet()) { - OperationContextAttribute attr = e.getValue(); + for (Map.Entry> e : attributes.entrySet()) { + OperationContextAttribute attr = e.getValue(); - Message curVal = OperationContext.get(attr); + Object curVal = OperationContext.get(attr); if (!Objects.equals(attr.initialValue(), curVal)) { if (res == null) res = new HashMap<>(attributes.size(), 1.0f); - res.put(e.getKey(), curVal); + res.put(e.getKey(), (T)curVal); } } @@ -70,13 +69,13 @@ public void register(byte id, OperationContextAttribute a } /** */ - public Scope restoreContext(Map res) { + public Scope restoreContext(Map res) { if (F.isEmpty(res)) return Scope.NOOP_SCOPE; OperationContext.ContextUpdater updater = OperationContext.ContextUpdater.create(); - res.forEach((id, attr) -> updater.set((OperationContextAttribute)attributes.get(id), attr)); + res.forEach((id, attr) -> updater.set((OperationContextAttribute)attributes.get(id), attr)); return updater.apply(); } diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/managers/communication/UnknownMessageException.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/UnknownMessageException.java similarity index 100% rename from modules/commons/src/main/java/org/apache/ignite/internal/managers/communication/UnknownMessageException.java rename to modules/core/src/main/java/org/apache/ignite/internal/managers/communication/UnknownMessageException.java diff --git a/modules/commons/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java similarity index 100% rename from modules/commons/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java rename to modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java From 1987030a0b85fa0ce730b60afd2639c4c661be84 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 17 Jun 2026 13:50:35 +0300 Subject: [PATCH 05/17] review fixes --- ...utedOperationContextAttributeRegistry.java | 32 ++++++++++++---- .../context/OperationContextAttribute.java | 2 +- .../ignite/internal/CoreMessagesProvider.java | 4 ++ .../internal/OperationContexMessage.java | 37 +++++++++++++++++++ .../ignite/spi/discovery/tcp/ClientImpl.java | 7 ++-- .../ignite/spi/discovery/tcp/ServerImpl.java | 9 +++-- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 24 ++++++++++++ .../messages/TcpDiscoveryAbstractMessage.java | 6 +-- .../OperationContextAttributesTest.java | 2 +- 9 files changed, 105 insertions(+), 18 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/OperationContexMessage.java diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java index d81b0d1d321cb..9208fb5add00d 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java @@ -16,6 +16,7 @@ */ package org.apache.ignite.internal.thread.context; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -48,17 +49,25 @@ public void register(byte id, OperationContextAttribute attr) { throw new IgniteException("Duplicated attribute id: " + id); } - /** @return Values for all registered operation context attributes. */ - public @Nullable Map collectContext() { - Map res = null; + /** + * TODO : Declare distributed attributes as 'extends Message' after https://issues.apache.org/jira/browse/IGNITE-28766 + * @return Values for all registered operation context attributes. + * */ + public Map collectContext(@Nullable Class checkValuesType) { + Map res = Collections.emptyMap(); for (Map.Entry> e : attributes.entrySet()) { OperationContextAttribute attr = e.getValue(); Object curVal = OperationContext.get(attr); + if (curVal != null && checkValuesType != null && !checkValuesType.isAssignableFrom(curVal.getClass())) { + throw new IgniteException("To distribute operation context attributes they have to be a " + + checkValuesType.getSimpleName()); + } + if (!Objects.equals(attr.initialValue(), curVal)) { - if (res == null) + if (res == Collections.EMPTY_MAP) res = new HashMap<>(attributes.size(), 1.0f); res.put(e.getKey(), (T)curVal); @@ -69,13 +78,22 @@ public void register(byte id, OperationContextAttribute attr) { } /** */ - public Scope restoreContext(Map res) { - if (F.isEmpty(res)) + public Scope restoreContext(int idBitmask, Object[] values) { + if (F.isEmpty(values) || idBitmask == 0) return Scope.NOOP_SCOPE; OperationContext.ContextUpdater updater = OperationContext.ContextUpdater.create(); - res.forEach((id, attr) -> updater.set((OperationContextAttribute)attributes.get(id), attr)); + for (byte attrId = 0; attrId < OperationContextAttribute.MAX_ATTR_CNT; attrId++) { + assert attrId < Integer.SIZE; + + int mask = 1 << attrId; + + if ((mask & idBitmask) == 0) + continue; + + updater.set((OperationContextAttribute)attributes.get(attrId), values[attrId]); + } return updater.apply(); } diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java index 499d241d9ccba..373a72ee3b278 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java @@ -32,7 +32,7 @@ public class OperationContextAttribute { static final AtomicInteger ID_GEN = new AtomicInteger(); /** */ - static final int MAX_ATTR_CNT = Integer.SIZE; + public static final int MAX_ATTR_CNT = Integer.SIZE; /** */ private final int bitmask; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 9da592635d229..4d4caff06d126 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -666,6 +666,10 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(PartitionHashRecord.class); withNoSchema(TransactionsHashRecord.class); + // [13400 - 13600]: Operation context messages. + msgIdx = 13400; + withNoSchema(OperationContexMessage.class); + assert msgIdx <= MAX_MESSAGE_ID; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/OperationContexMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/OperationContexMessage.java new file mode 100644 index 0000000000000..d45bf3a578602 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/OperationContexMessage.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** ransport for {@link OperationContext} attributes. */ +public class OperationContexMessage implements Message { + /** Values of operation context attributes. */ + @Order(0) + public Message[] vals; + + /** Bitmask of effective attributes ids. */ + @Order(1) + public int idBitmask; + + /** Empty constructor for serialization purposes. */ + public OperationContexMessage() { + // No-op. + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 3476ba8785843..ce2911a83392e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1312,7 +1312,7 @@ private class SocketWriter extends IgniteSpiThread { * @param msg Message. */ private void sendMessage(TcpDiscoveryAbstractMessage msg) { - msg.opCtxAttrs = DistributedOperationContextAttributeRegistry.instance().collectContext(); + fillOperationContextAttributes(msg); synchronized (mux) { queue.add(msg); @@ -2007,10 +2007,11 @@ else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage) TcpDiscoveryAbstractMessage msg0 = (TcpDiscoveryAbstractMessage)msg; - if (F.isEmpty(msg0.opCtxAttrs)) + if (msg0.opCtxMsg == null) processDiscoveryMessage(msg0); else { - try (Scope ignored = DistributedOperationContextAttributeRegistry.instance().restoreContext(msg0.opCtxAttrs)) { + try (Scope ignored = DistributedOperationContextAttributeRegistry.instance() + .restoreContext(msg0.opCtxMsg.idBitmask, msg0.opCtxMsg.vals)) { processDiscoveryMessage(msg0); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index a32bf95fd1bc5..e27de173ec6c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -81,6 +81,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.OperationContexMessage; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.communication.UnknownMessageException; import org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage; @@ -3049,7 +3050,7 @@ void addMessage(TcpDiscoveryAbstractMessage msg, boolean ignoreHighPriority, boo } if (!fromSocket) - msg.opCtxAttrs = DistributedOperationContextAttributeRegistry.instance().collectContext(); + fillOperationContextAttributes(msg); if (msg instanceof TraceableMessage) { TraceableMessage tMsg = (TraceableMessage)msg; @@ -3322,10 +3323,12 @@ else if (msg instanceof TcpDiscoveryAuthFailedMessage) if (msg == WAKEUP) return; - if (F.isEmpty(msg.opCtxAttrs)) + if (msg.opCtxMsg == null) processMessage0(msg); else { - try (Scope ignored = DistributedOperationContextAttributeRegistry.instance().restoreContext(msg.opCtxAttrs)) { + OperationContexMessage cm = msg.opCtxMsg; + + try (Scope ignored = DistributedOperationContextAttributeRegistry.instance().restoreContext(cm.idBitmask, cm.vals)) { processMessage0(msg); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index 789f3d0adb107..097a9a496c7eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -37,15 +37,19 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.OperationContexMessage; import org.apache.ignite.internal.processors.cache.CacheMetricsSnapshot; import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage; import org.apache.ignite.internal.processors.cluster.NodeFullMetricsMessage; import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; import org.apache.ignite.internal.processors.tracing.NoopTracing; import org.apache.ignite.internal.processors.tracing.Tracing; +import org.apache.ignite.internal.thread.context.DistributedOperationContextAttributeRegistry; +import org.apache.ignite.internal.thread.context.OperationContextAttribute; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiContext; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiThread; @@ -450,6 +454,26 @@ public void processCacheMetricsMessage(TcpDiscoveryMetricsUpdateMessage msg, lon } } + /** */ + protected static void fillOperationContextAttributes(TcpDiscoveryAbstractMessage msg) { + DistributedOperationContextAttributeRegistry.instance().collectContext(Message.class).forEach((attrId, msgVal) -> { + assert attrId >= 0 && attrId < OperationContextAttribute.MAX_ATTR_CNT; + + if (msg.opCtxMsg == null) { + msg.opCtxMsg = new OperationContexMessage(); + + msg.opCtxMsg.vals = new Message[OperationContextAttribute.MAX_ATTR_CNT]; + } + + int mask = 1 << attrId; + + assert (msg.opCtxMsg.idBitmask & mask) == 0; + + msg.opCtxMsg.idBitmask |= mask; + msg.opCtxMsg.vals[attrId] = msgVal; + }); + } + /** * @param addrs Addresses. */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index e04f1d856c0ec..1f624f7775af8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -19,9 +19,9 @@ import java.io.Externalizable; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.OperationContexMessage; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -77,10 +77,10 @@ public abstract class TcpDiscoveryAbstractMessage implements Message { @Order(4) Set failedNodes; - /** Operation context attributes: id -> attribute value. */ + /** Operation context attributes message. */ @GridToStringInclude @Order(5) - public @Nullable Map opCtxAttrs; + public @Nullable OperationContexMessage opCtxMsg; /** * Default no-arg constructor for {@link Externalizable} interface. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java index e82721c10b114..e42fe31c83f07 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -838,7 +838,7 @@ public void testContextAwareDelayQueue() throws Exception { /** */ @Test public void testSendAttributesByDiscovery() throws Exception { - byte attrId = (byte)(OperationContextAttribute.MAX_ATTR_CNT + 1); + byte attrId = (byte)(OperationContextAttribute.MAX_ATTR_CNT - 1); InetSocketAddressMessage dfltAttrVal = new InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80); From d293eb032a9b0c0f0969bbf927c9cb98f61fc4d2 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 17 Jun 2026 16:03:40 +0300 Subject: [PATCH 06/17] fix --- ...ibutedOperationContextAttributeRegistry.java | 3 ++- .../context/OperationContextAttributesTest.java | 17 ----------------- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java index 9208fb5add00d..a6fb4d41de982 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java @@ -51,8 +51,9 @@ public void register(byte id, OperationContextAttribute attr) { /** * TODO : Declare distributed attributes as 'extends Message' after https://issues.apache.org/jira/browse/IGNITE-28766 + * * @return Values for all registered operation context attributes. - * */ + */ public Map collectContext(@Nullable Class checkValuesType) { Map res = Collections.emptyMap(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java index e42fe31c83f07..19b3a5d39a636 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -38,8 +38,6 @@ import java.util.function.Supplier; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.discovery.CustomEventListener; @@ -62,7 +60,6 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; -import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage; @@ -72,7 +69,6 @@ import org.springframework.lang.NonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; @@ -97,9 +93,6 @@ public class OperationContextAttributesTest extends GridCommonAbstractTest { /** */ private int beforeTestReservedAttrIds; - /** */ - private IgnitePredicate evtLsnr; - /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -122,16 +115,6 @@ public class OperationContextAttributesTest extends GridCommonAbstractTest { OperationContextAttribute.ID_GEN.set(beforeTestReservedAttrIds); } - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - if (evtLsnr != null) - cfg.setLocalEventListeners(Collections.singletonMap(evtLsnr, new int[] {EVT_DISCOVERY_CUSTOM_EVT})); - - return cfg; - } - /** */ @Test public void testNotAttachedAttribute() { From 1395cafed69003dc49d235837e0d010c68d729a7 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 17 Jun 2026 16:40:08 +0300 Subject: [PATCH 07/17] review fixes --- ...utedOperationContextAttributeRegistry.java | 15 +++++++----- .../internal/OperationContexMessage.java | 2 +- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 24 ++++++++++++------- .../OperationContextAttributesTest.java | 8 ++----- 4 files changed, 28 insertions(+), 21 deletions(-) diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java index a6fb4d41de982..c347d486f6603 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java @@ -27,6 +27,9 @@ /** */ public class DistributedOperationContextAttributeRegistry { + /** */ + public static final byte MAX_DISTRIBUTED_ATTR_ID = 7; + /** */ private static final DistributedOperationContextAttributeRegistry INSTANCE = new DistributedOperationContextAttributeRegistry(); @@ -79,21 +82,21 @@ public Map collectContext(@Nullable Class checkValuesType) { } /** */ - public Scope restoreContext(int idBitmask, Object[] values) { + public Scope restoreContext(byte idBitmask, Object[] values) { if (F.isEmpty(values) || idBitmask == 0) return Scope.NOOP_SCOPE; - OperationContext.ContextUpdater updater = OperationContext.ContextUpdater.create(); + assert values.length <= MAX_DISTRIBUTED_ATTR_ID; - for (byte attrId = 0; attrId < OperationContextAttribute.MAX_ATTR_CNT; attrId++) { - assert attrId < Integer.SIZE; + OperationContext.ContextUpdater updater = OperationContext.ContextUpdater.create(); - int mask = 1 << attrId; + for (byte attrId = 0, idx = 0; attrId < Byte.SIZE; ++attrId) { + byte mask = (byte)(1 << attrId); if ((mask & idBitmask) == 0) continue; - updater.set((OperationContextAttribute)attributes.get(attrId), values[attrId]); + updater.set((OperationContextAttribute)attributes.get(attrId), values[idx++]); } return updater.apply(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/OperationContexMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/OperationContexMessage.java index d45bf3a578602..4435ed7ec38d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/OperationContexMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/OperationContexMessage.java @@ -28,7 +28,7 @@ public class OperationContexMessage implements Message { /** Bitmask of effective attributes ids. */ @Order(1) - public int idBitmask; + public byte idBitmask; /** Empty constructor for serialization purposes. */ public OperationContexMessage() { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index 097a9a496c7eb..849bc71c3b1ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -45,7 +45,6 @@ import org.apache.ignite.internal.processors.tracing.NoopTracing; import org.apache.ignite.internal.processors.tracing.Tracing; import org.apache.ignite.internal.thread.context.DistributedOperationContextAttributeRegistry; -import org.apache.ignite.internal.thread.context.OperationContextAttribute; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; @@ -456,22 +455,31 @@ public void processCacheMetricsMessage(TcpDiscoveryMetricsUpdateMessage msg, lon /** */ protected static void fillOperationContextAttributes(TcpDiscoveryAbstractMessage msg) { - DistributedOperationContextAttributeRegistry.instance().collectContext(Message.class).forEach((attrId, msgVal) -> { - assert attrId >= 0 && attrId < OperationContextAttribute.MAX_ATTR_CNT; + Map attrs = DistributedOperationContextAttributeRegistry.instance().collectContext(Message.class); + + if(F.isEmpty(attrs)) + return; + + int idx = 0; + + for (Map.Entry e : attrs.entrySet()) { + byte attrId = e.getKey(); + Message msgVal = e.getValue(); + + assert attrId >= 0 && attrId <= DistributedOperationContextAttributeRegistry.MAX_DISTRIBUTED_ATTR_ID; if (msg.opCtxMsg == null) { msg.opCtxMsg = new OperationContexMessage(); - - msg.opCtxMsg.vals = new Message[OperationContextAttribute.MAX_ATTR_CNT]; + msg.opCtxMsg.vals = new Message[attrs.size()]; } - int mask = 1 << attrId; + byte mask = (byte)(1 << attrId); assert (msg.opCtxMsg.idBitmask & mask) == 0; msg.opCtxMsg.idBitmask |= mask; - msg.opCtxMsg.vals[attrId] = msgVal; - }); + msg.opCtxMsg.vals[idx++] = msgVal; + } } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java index 19b3a5d39a636..20125f1773c23 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -821,7 +821,7 @@ public void testContextAwareDelayQueue() throws Exception { /** */ @Test public void testSendAttributesByDiscovery() throws Exception { - byte attrId = (byte)(OperationContextAttribute.MAX_ATTR_CNT - 1); + byte attrId = DistributedOperationContextAttributeRegistry.MAX_DISTRIBUTED_ATTR_ID; InetSocketAddressMessage dfltAttrVal = new InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80); @@ -866,10 +866,9 @@ else if (grid(i0).localNode().order() == 1) } assertFalse(valToSend.equals(dfltAttrVal)); - assertNull(OperationContext.get(attr)); - // Send from a coordinator. + // Send from the coordinator. try (Scope ignored = OperationContext.set(attr, valToSend)) { grid(0).createCache(defaultCacheConfiguration()); } @@ -877,7 +876,6 @@ else if (grid(i0).localNode().order() == 1) assertTrue(waitForCondition(() -> coordLatch.getCount() == 2, getTestTimeout())); assertTrue(waitForCondition(() -> srvrLatch.getCount() == 2, getTestTimeout())); assertTrue(waitForCondition(() -> clientLatch.getCount() == 2, getTestTimeout())); - assertNull(OperationContext.get(attr)); // Send from a server. @@ -888,7 +886,6 @@ else if (grid(i0).localNode().order() == 1) assertTrue(waitForCondition(() -> coordLatch.getCount() == 1, getTestTimeout())); assertTrue(waitForCondition(() -> srvrLatch.getCount() == 1, getTestTimeout())); assertTrue(waitForCondition(() -> clientLatch.getCount() == 1, getTestTimeout())); - assertNull(OperationContext.get(attr)); // Send from a client. @@ -897,7 +894,6 @@ else if (grid(i0).localNode().order() == 1) } assertNull(OperationContext.get(attr)); - assertTrue(coordLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); assertTrue(srvrLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); From 79dab0b2bafc06a4881157fcfcd15cf9f3597632 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 18 Jun 2026 17:21:26 +0300 Subject: [PATCH 08/17] reimpol --- ...utedOperationContextAttributeRegistry.java | 104 ---------------- .../ignite/internal/CoreMessagesProvider.java | 2 +- ...tedOperationContextAttributesMessage.java} | 9 +- .../DistributedOperationAttributeManager.java | 117 ++++++++++++++++++ .../ignite/spi/discovery/tcp/ClientImpl.java | 11 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 16 +-- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 32 ----- .../messages/TcpDiscoveryAbstractMessage.java | 4 +- .../OperationContextAttributesTest.java | 13 +- 9 files changed, 135 insertions(+), 173 deletions(-) delete mode 100644 modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java rename modules/core/src/main/java/org/apache/ignite/internal/{OperationContexMessage.java => DistributedOperationContextAttributesMessage.java} (82%) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationAttributeManager.java diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java deleted file mode 100644 index c347d486f6603..0000000000000 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeRegistry.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ignite.internal.thread.context; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.util.typedef.F; -import org.jetbrains.annotations.Nullable; - -/** */ -public class DistributedOperationContextAttributeRegistry { - /** */ - public static final byte MAX_DISTRIBUTED_ATTR_ID = 7; - - /** */ - private static final DistributedOperationContextAttributeRegistry INSTANCE = new DistributedOperationContextAttributeRegistry(); - - /** Attributes by their id. */ - private final Map> attributes = new ConcurrentHashMap<>(); - - /** */ - public static DistributedOperationContextAttributeRegistry instance() { - return INSTANCE; - } - - /** */ - public void register(byte id, OperationContextAttribute attr) { - assert id >= 0; - - if (attributes.size() == OperationContextAttribute.MAX_ATTR_CNT) - throw new IgniteException("Maximum number of attributes is exceeded [" + OperationContextAttribute.MAX_ATTR_CNT + "]."); - - if (attributes.putIfAbsent(id, attr) != null) - throw new IgniteException("Duplicated attribute id: " + id); - } - - /** - * TODO : Declare distributed attributes as 'extends Message' after https://issues.apache.org/jira/browse/IGNITE-28766 - * - * @return Values for all registered operation context attributes. - */ - public Map collectContext(@Nullable Class checkValuesType) { - Map res = Collections.emptyMap(); - - for (Map.Entry> e : attributes.entrySet()) { - OperationContextAttribute attr = e.getValue(); - - Object curVal = OperationContext.get(attr); - - if (curVal != null && checkValuesType != null && !checkValuesType.isAssignableFrom(curVal.getClass())) { - throw new IgniteException("To distribute operation context attributes they have to be a " - + checkValuesType.getSimpleName()); - } - - if (!Objects.equals(attr.initialValue(), curVal)) { - if (res == Collections.EMPTY_MAP) - res = new HashMap<>(attributes.size(), 1.0f); - - res.put(e.getKey(), (T)curVal); - } - } - - return res; - } - - /** */ - public Scope restoreContext(byte idBitmask, Object[] values) { - if (F.isEmpty(values) || idBitmask == 0) - return Scope.NOOP_SCOPE; - - assert values.length <= MAX_DISTRIBUTED_ATTR_ID; - - OperationContext.ContextUpdater updater = OperationContext.ContextUpdater.create(); - - for (byte attrId = 0, idx = 0; attrId < Byte.SIZE; ++attrId) { - byte mask = (byte)(1 << attrId); - - if ((mask & idBitmask) == 0) - continue; - - updater.set((OperationContextAttribute)attributes.get(attrId), values[idx++]); - } - - return updater.apply(); - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 4d4caff06d126..0b655b98f48c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -668,7 +668,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C // [13400 - 13600]: Operation context messages. msgIdx = 13400; - withNoSchema(OperationContexMessage.class); + withNoSchema(DistributedOperationContextAttributesMessage.class); assert msgIdx <= MAX_MESSAGE_ID; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/OperationContexMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextAttributesMessage.java similarity index 82% rename from modules/core/src/main/java/org/apache/ignite/internal/OperationContexMessage.java rename to modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextAttributesMessage.java index 4435ed7ec38d5..a319ab9bb759b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/OperationContexMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextAttributesMessage.java @@ -17,21 +17,22 @@ package org.apache.ignite.internal; +import java.util.List; import org.apache.ignite.internal.thread.context.OperationContext; import org.apache.ignite.plugin.extensions.communication.Message; -/** ransport for {@link OperationContext} attributes. */ -public class OperationContexMessage implements Message { +/** Transport for {@link OperationContext} distibuted attributes. */ +public class DistributedOperationContextAttributesMessage implements Message { /** Values of operation context attributes. */ @Order(0) - public Message[] vals; + public List vals; /** Bitmask of effective attributes ids. */ @Order(1) public byte idBitmask; /** Empty constructor for serialization purposes. */ - public OperationContexMessage() { + public DistributedOperationContextAttributesMessage() { // No-op. } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationAttributeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationAttributeManager.java new file mode 100644 index 0000000000000..cae1fa957be74 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationAttributeManager.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.thread.context; + +import java.util.ArrayList; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.DistributedOperationContextAttributesMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** */ +public class DistributedOperationAttributeManager { + /** */ + public static final byte MAX_DISTRIBUTED_ATTR_ID = 7; + + /** */ + private static final DistributedOperationAttributeManager INSTANCE = new DistributedOperationAttributeManager(); + + /** Attributes by their id. */ + private final Map> attrs = new ConcurrentHashMap<>(); + + /** */ + public static DistributedOperationAttributeManager instance() { + return INSTANCE; + } + + /** */ + public OperationContextAttribute createDistributedAttriubte(byte id, @Nullable T initVal) { + assert id >= 0; + + if (attrs.size() == OperationContextAttribute.MAX_ATTR_CNT) + throw new IgniteException("Maximum number of ributed attributes is exceeded [" + OperationContextAttribute.MAX_ATTR_CNT + "]."); + + attrs.compute(id, (id0, attr0) -> { + if (attr0 != null) + throw new IgniteException("Duplicated distributed attribute id: " + id); + + return OperationContextAttribute.newInstance(initVal); + }); + + return (OperationContextAttribute)attrs.get(id); + } + + /** */ + public @Nullable DistributedOperationContextAttributesMessage collectDistributedAttributes() { + DistributedOperationContextAttributesMessage res = null; + + for (Map.Entry> e : attrs.entrySet()) { + OperationContextAttribute attr = e.getValue(); + + Message curVal = OperationContext.get(attr); + + assert attr.initialValue() == null || curVal == null || curVal.getClass().isAssignableFrom(attr.initialValue().getClass()); + + if (!Objects.equals(curVal, attr.initialValue())) { + if (res == null) { + res = new DistributedOperationContextAttributesMessage(); + + res.vals = new ArrayList<>(MAX_DISTRIBUTED_ATTR_ID / 2); + } + + byte mask = (byte)(1 << e.getKey()); + + assert (res.idBitmask & mask) == 0; + + res.vals.add(curVal); + res.idBitmask |= mask; + } + } + + return res; + } + + /** */ + public Scope restoreDistributedAttributes(@Nullable DistributedOperationContextAttributesMessage msg) { + if (msg == null) + return Scope.NOOP_SCOPE; + + assert msg.idBitmask != 0; + assert !F.isEmpty(msg.vals); + assert msg.vals.size() <= MAX_DISTRIBUTED_ATTR_ID; + + OperationContext.ContextUpdater updater = OperationContext.ContextUpdater.create(); + + for (byte valIdx = 0, maskIdx = -1; valIdx < msg.vals.size(); ++valIdx) { + Message curVal = msg.vals.get(valIdx); + + while (maskIdx < 0 || (msg.idBitmask & (1 << maskIdx)) == 0) { + assert maskIdx <= MAX_DISTRIBUTED_ATTR_ID; + + ++maskIdx; + } + + updater.set((OperationContextAttribute)attrs.get(maskIdx++), curVal); + } + + return updater.apply(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index ce2911a83392e..da38209518d63 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -70,7 +70,7 @@ import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable; -import org.apache.ignite.internal.thread.context.DistributedOperationContextAttributeRegistry; +import org.apache.ignite.internal.thread.context.DistributedOperationAttributeManager; import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -1312,7 +1312,7 @@ private class SocketWriter extends IgniteSpiThread { * @param msg Message. */ private void sendMessage(TcpDiscoveryAbstractMessage msg) { - fillOperationContextAttributes(msg); + msg.opCtxMsg = DistributedOperationAttributeManager.instance().collectDistributedAttributes(); synchronized (mux) { queue.add(msg); @@ -2007,13 +2007,8 @@ else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage) TcpDiscoveryAbstractMessage msg0 = (TcpDiscoveryAbstractMessage)msg; - if (msg0.opCtxMsg == null) + try (Scope ignored = DistributedOperationAttributeManager.instance().restoreDistributedAttributes(msg0.opCtxMsg)) { processDiscoveryMessage(msg0); - else { - try (Scope ignored = DistributedOperationContextAttributeRegistry.instance() - .restoreContext(msg0.opCtxMsg.idBitmask, msg0.opCtxMsg.vals)) { - processDiscoveryMessage(msg0); - } } } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index e27de173ec6c2..c8f45ecc8ccbf 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -81,7 +81,6 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.IgnitionEx; -import org.apache.ignite.internal.OperationContexMessage; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.communication.UnknownMessageException; import org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage; @@ -96,7 +95,7 @@ import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable; -import org.apache.ignite.internal.thread.context.DistributedOperationContextAttributeRegistry; +import org.apache.ignite.internal.thread.context.DistributedOperationAttributeManager; import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor; import org.apache.ignite.internal.util.GridBoundedLinkedHashSet; @@ -3050,10 +3049,9 @@ void addMessage(TcpDiscoveryAbstractMessage msg, boolean ignoreHighPriority, boo } if (!fromSocket) - fillOperationContextAttributes(msg); + msg.opCtxMsg = DistributedOperationAttributeManager.instance().collectDistributedAttributes(); - if (msg instanceof TraceableMessage) { - TraceableMessage tMsg = (TraceableMessage)msg; + if (msg instanceof TraceableMessage tMsg) { // If we read this message from socket. if (fromSocket) @@ -3323,14 +3321,8 @@ else if (msg instanceof TcpDiscoveryAuthFailedMessage) if (msg == WAKEUP) return; - if (msg.opCtxMsg == null) + try (Scope ignored = DistributedOperationAttributeManager.instance().restoreDistributedAttributes(msg.opCtxMsg)) { processMessage0(msg); - else { - OperationContexMessage cm = msg.opCtxMsg; - - try (Scope ignored = DistributedOperationContextAttributeRegistry.instance().restoreContext(cm.idBitmask, cm.vals)) { - processMessage0(msg); - } } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index 849bc71c3b1ee..789f3d0adb107 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -37,18 +37,15 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.OperationContexMessage; import org.apache.ignite.internal.processors.cache.CacheMetricsSnapshot; import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage; import org.apache.ignite.internal.processors.cluster.NodeFullMetricsMessage; import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; import org.apache.ignite.internal.processors.tracing.NoopTracing; import org.apache.ignite.internal.processors.tracing.Tracing; -import org.apache.ignite.internal.thread.context.DistributedOperationContextAttributeRegistry; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiContext; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiThread; @@ -453,35 +450,6 @@ public void processCacheMetricsMessage(TcpDiscoveryMetricsUpdateMessage msg, lon } } - /** */ - protected static void fillOperationContextAttributes(TcpDiscoveryAbstractMessage msg) { - Map attrs = DistributedOperationContextAttributeRegistry.instance().collectContext(Message.class); - - if(F.isEmpty(attrs)) - return; - - int idx = 0; - - for (Map.Entry e : attrs.entrySet()) { - byte attrId = e.getKey(); - Message msgVal = e.getValue(); - - assert attrId >= 0 && attrId <= DistributedOperationContextAttributeRegistry.MAX_DISTRIBUTED_ATTR_ID; - - if (msg.opCtxMsg == null) { - msg.opCtxMsg = new OperationContexMessage(); - msg.opCtxMsg.vals = new Message[attrs.size()]; - } - - byte mask = (byte)(1 << attrId); - - assert (msg.opCtxMsg.idBitmask & mask) == 0; - - msg.opCtxMsg.idBitmask |= mask; - msg.opCtxMsg.vals[idx++] = msgVal; - } - } - /** * @param addrs Addresses. */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 1f624f7775af8..bbee4c33e8cd9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -21,7 +21,7 @@ import java.util.HashSet; import java.util.Set; import java.util.UUID; -import org.apache.ignite.internal.OperationContexMessage; +import org.apache.ignite.internal.DistributedOperationContextAttributesMessage; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -80,7 +80,7 @@ public abstract class TcpDiscoveryAbstractMessage implements Message { /** Operation context attributes message. */ @GridToStringInclude @Order(5) - public @Nullable OperationContexMessage opCtxMsg; + public @Nullable DistributedOperationContextAttributesMessage opCtxMsg; /** * Default no-arg constructor for {@link Externalizable} interface. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java index 20125f1773c23..6f571a984204e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -821,13 +821,12 @@ public void testContextAwareDelayQueue() throws Exception { /** */ @Test public void testSendAttributesByDiscovery() throws Exception { - byte attrId = DistributedOperationContextAttributeRegistry.MAX_DISTRIBUTED_ATTR_ID; + byte attrId = DistributedOperationAttributeManager.MAX_DISTRIBUTED_ATTR_ID; InetSocketAddressMessage dfltAttrVal = new InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80); - OperationContextAttribute attr = OperationContextAttribute.newInstance(); - - DistributedOperationContextAttributeRegistry.instance().register(attrId, attr); + OperationContextAttribute attr = DistributedOperationAttributeManager.instance() + .createDistributedAttriubte(attrId, dfltAttrVal); startGrids(2); startClientGrid(2); @@ -865,9 +864,6 @@ else if (grid(i0).localNode().order() == 1) }); } - assertFalse(valToSend.equals(dfltAttrVal)); - assertNull(OperationContext.get(attr)); - // Send from the coordinator. try (Scope ignored = OperationContext.set(attr, valToSend)) { grid(0).createCache(defaultCacheConfiguration()); @@ -876,7 +872,6 @@ else if (grid(i0).localNode().order() == 1) assertTrue(waitForCondition(() -> coordLatch.getCount() == 2, getTestTimeout())); assertTrue(waitForCondition(() -> srvrLatch.getCount() == 2, getTestTimeout())); assertTrue(waitForCondition(() -> clientLatch.getCount() == 2, getTestTimeout())); - assertNull(OperationContext.get(attr)); // Send from a server. try (Scope ignored = OperationContext.set(attr, valToSend)) { @@ -886,14 +881,12 @@ else if (grid(i0).localNode().order() == 1) assertTrue(waitForCondition(() -> coordLatch.getCount() == 1, getTestTimeout())); assertTrue(waitForCondition(() -> srvrLatch.getCount() == 1, getTestTimeout())); assertTrue(waitForCondition(() -> clientLatch.getCount() == 1, getTestTimeout())); - assertNull(OperationContext.get(attr)); // Send from a client. try (Scope ignored = OperationContext.set(attr, valToSend)) { grid(2).createCache(defaultCacheConfiguration()); } - assertNull(OperationContext.get(attr)); assertTrue(coordLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); assertTrue(srvrLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); From 695f18d30cde1ea0cf3b5fb85879c0f0811c45bb Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 19 Jun 2026 12:54:56 +0300 Subject: [PATCH 09/17] review fixes --- .../context/OperationContextAttribute.java | 2 +- ...utedOperationContextAttributesMessage.java | 4 +- ...utedOperationContextAttributeManager.java} | 53 ++++++++++--------- .../ignite/spi/discovery/tcp/ClientImpl.java | 7 +-- .../ignite/spi/discovery/tcp/ServerImpl.java | 6 +-- .../OperationContextAttributesTest.java | 4 +- 6 files changed, 41 insertions(+), 35 deletions(-) rename modules/core/src/main/java/org/apache/ignite/internal/thread/context/{DistributedOperationAttributeManager.java => DistributedOperationContextAttributeManager.java} (70%) diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java index 373a72ee3b278..499d241d9ccba 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java @@ -32,7 +32,7 @@ public class OperationContextAttribute { static final AtomicInteger ID_GEN = new AtomicInteger(); /** */ - public static final int MAX_ATTR_CNT = Integer.SIZE; + static final int MAX_ATTR_CNT = Integer.SIZE; /** */ private final int bitmask; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextAttributesMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextAttributesMessage.java index a319ab9bb759b..937536d1b24ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextAttributesMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextAttributesMessage.java @@ -21,7 +21,7 @@ import org.apache.ignite.internal.thread.context.OperationContext; import org.apache.ignite.plugin.extensions.communication.Message; -/** Transport for {@link OperationContext} distibuted attributes. */ +/** Transport for {@link OperationContext} distributed attributes. */ public class DistributedOperationContextAttributesMessage implements Message { /** Values of operation context attributes. */ @Order(0) @@ -29,7 +29,7 @@ public class DistributedOperationContextAttributesMessage implements Message { /** Bitmask of effective attributes ids. */ @Order(1) - public byte idBitmask; + public byte idBitmap; /** Empty constructor for serialization purposes. */ public DistributedOperationContextAttributesMessage() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationAttributeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeManager.java similarity index 70% rename from modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationAttributeManager.java rename to modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeManager.java index cae1fa957be74..ff76eb99ea4e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationAttributeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeManager.java @@ -18,8 +18,7 @@ import java.util.ArrayList; import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.DistributedOperationContextAttributesMessage; import org.apache.ignite.internal.util.typedef.F; @@ -27,18 +26,18 @@ import org.jetbrains.annotations.Nullable; /** */ -public class DistributedOperationAttributeManager { +public class DistributedOperationContextAttributeManager { /** */ - public static final byte MAX_DISTRIBUTED_ATTR_ID = 7; + static final byte MAX_DISTRIBUTED_ATTR_CNT = 7; /** */ - private static final DistributedOperationAttributeManager INSTANCE = new DistributedOperationAttributeManager(); + private static final DistributedOperationContextAttributeManager INSTANCE = new DistributedOperationContextAttributeManager(); /** Attributes by their id. */ - private final Map> attrs = new ConcurrentHashMap<>(); + private final Map> attrs = new ConcurrentSkipListMap<>(); /** */ - public static DistributedOperationAttributeManager instance() { + public static DistributedOperationContextAttributeManager instance() { return INSTANCE; } @@ -46,17 +45,23 @@ public static DistributedOperationAttributeManager instance() { public OperationContextAttribute createDistributedAttriubte(byte id, @Nullable T initVal) { assert id >= 0; - if (attrs.size() == OperationContextAttribute.MAX_ATTR_CNT) - throw new IgniteException("Maximum number of ributed attributes is exceeded [" + OperationContextAttribute.MAX_ATTR_CNT + "]."); + if (attrs.size() == OperationContextAttribute.MAX_ATTR_CNT) { + throw new IgniteException("Maximum number of distributed attributes is exceeded [max=" + + OperationContextAttribute.MAX_ATTR_CNT + "]."); + } + + OperationContextAttribute res; - attrs.compute(id, (id0, attr0) -> { - if (attr0 != null) - throw new IgniteException("Duplicated distributed attribute id: " + id); + synchronized (attrs) { + if (attrs.containsKey(id)) + throw new IgniteException("Duplicated distributed attribute id [id=" + id + "]."); - return OperationContextAttribute.newInstance(initVal); - }); + res = OperationContextAttribute.newInstance(initVal); - return (OperationContextAttribute)attrs.get(id); + attrs.put(id, res); + } + + return res; } /** */ @@ -70,19 +75,19 @@ public OperationContextAttribute createDistributedAttriub assert attr.initialValue() == null || curVal == null || curVal.getClass().isAssignableFrom(attr.initialValue().getClass()); - if (!Objects.equals(curVal, attr.initialValue())) { + if (curVal != attr.initialValue()) { if (res == null) { res = new DistributedOperationContextAttributesMessage(); - res.vals = new ArrayList<>(MAX_DISTRIBUTED_ATTR_ID / 2); + res.vals = new ArrayList<>(MAX_DISTRIBUTED_ATTR_CNT / 2); } byte mask = (byte)(1 << e.getKey()); - assert (res.idBitmask & mask) == 0; + assert (res.idBitmap & mask) == 0; res.vals.add(curVal); - res.idBitmask |= mask; + res.idBitmap |= mask; } } @@ -94,17 +99,17 @@ public Scope restoreDistributedAttributes(@Nullable DistributedOperationContextA if (msg == null) return Scope.NOOP_SCOPE; - assert msg.idBitmask != 0; + assert msg.idBitmap != 0; assert !F.isEmpty(msg.vals); - assert msg.vals.size() <= MAX_DISTRIBUTED_ATTR_ID; + assert msg.vals.size() <= MAX_DISTRIBUTED_ATTR_CNT; OperationContext.ContextUpdater updater = OperationContext.ContextUpdater.create(); - for (byte valIdx = 0, maskIdx = -1; valIdx < msg.vals.size(); ++valIdx) { + for (byte valIdx = 0, maskIdx = 0; valIdx < msg.vals.size(); ++valIdx) { Message curVal = msg.vals.get(valIdx); - while (maskIdx < 0 || (msg.idBitmask & (1 << maskIdx)) == 0) { - assert maskIdx <= MAX_DISTRIBUTED_ATTR_ID; + while ((msg.idBitmap & (1 << maskIdx)) == 0) { + assert maskIdx <= MAX_DISTRIBUTED_ATTR_CNT; ++maskIdx; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index da38209518d63..2ebeeedf762f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -70,7 +70,7 @@ import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable; -import org.apache.ignite.internal.thread.context.DistributedOperationAttributeManager; +import org.apache.ignite.internal.thread.context.DistributedOperationContextAttributeManager; import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -1312,7 +1312,7 @@ private class SocketWriter extends IgniteSpiThread { * @param msg Message. */ private void sendMessage(TcpDiscoveryAbstractMessage msg) { - msg.opCtxMsg = DistributedOperationAttributeManager.instance().collectDistributedAttributes(); + msg.opCtxMsg = DistributedOperationContextAttributeManager.instance().collectDistributedAttributes(); synchronized (mux) { queue.add(msg); @@ -2007,7 +2007,8 @@ else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage) TcpDiscoveryAbstractMessage msg0 = (TcpDiscoveryAbstractMessage)msg; - try (Scope ignored = DistributedOperationAttributeManager.instance().restoreDistributedAttributes(msg0.opCtxMsg)) { + try (Scope ignored = DistributedOperationContextAttributeManager.instance() + .restoreDistributedAttributes(msg0.opCtxMsg)) { processDiscoveryMessage(msg0); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index c8f45ecc8ccbf..684ef8d7b2048 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -95,7 +95,7 @@ import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable; -import org.apache.ignite.internal.thread.context.DistributedOperationAttributeManager; +import org.apache.ignite.internal.thread.context.DistributedOperationContextAttributeManager; import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor; import org.apache.ignite.internal.util.GridBoundedLinkedHashSet; @@ -3049,7 +3049,7 @@ void addMessage(TcpDiscoveryAbstractMessage msg, boolean ignoreHighPriority, boo } if (!fromSocket) - msg.opCtxMsg = DistributedOperationAttributeManager.instance().collectDistributedAttributes(); + msg.opCtxMsg = DistributedOperationContextAttributeManager.instance().collectDistributedAttributes(); if (msg instanceof TraceableMessage tMsg) { @@ -3321,7 +3321,7 @@ else if (msg instanceof TcpDiscoveryAuthFailedMessage) if (msg == WAKEUP) return; - try (Scope ignored = DistributedOperationAttributeManager.instance().restoreDistributedAttributes(msg.opCtxMsg)) { + try (Scope ignored = DistributedOperationContextAttributeManager.instance().restoreDistributedAttributes(msg.opCtxMsg)) { processMessage0(msg); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java index 6f571a984204e..446f47d2a5058 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -821,11 +821,11 @@ public void testContextAwareDelayQueue() throws Exception { /** */ @Test public void testSendAttributesByDiscovery() throws Exception { - byte attrId = DistributedOperationAttributeManager.MAX_DISTRIBUTED_ATTR_ID; + byte attrId = DistributedOperationContextAttributeManager.MAX_DISTRIBUTED_ATTR_CNT; InetSocketAddressMessage dfltAttrVal = new InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80); - OperationContextAttribute attr = DistributedOperationAttributeManager.instance() + OperationContextAttribute attr = DistributedOperationContextAttributeManager.instance() .createDistributedAttriubte(attrId, dfltAttrVal); startGrids(2); From f0d579fc2d24de981922b36e2dc4fd8e37fc758f Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 19 Jun 2026 15:43:31 +0300 Subject: [PATCH 10/17] review fixes --- ...> DistributedOperationContextManager.java} | 20 +++------ .../ignite/spi/discovery/tcp/ClientImpl.java | 7 ++- .../ignite/spi/discovery/tcp/ServerImpl.java | 6 +-- .../OperationContextAttributesTest.java | 45 +++++++++++++------ 4 files changed, 45 insertions(+), 33 deletions(-) rename modules/core/src/main/java/org/apache/ignite/internal/thread/context/{DistributedOperationContextAttributeManager.java => DistributedOperationContextManager.java} (88%) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java similarity index 88% rename from modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeManager.java rename to modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java index ff76eb99ea4e6..0738398560df2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextAttributeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java @@ -26,18 +26,18 @@ import org.jetbrains.annotations.Nullable; /** */ -public class DistributedOperationContextAttributeManager { +public class DistributedOperationContextManager { /** */ static final byte MAX_DISTRIBUTED_ATTR_CNT = 7; /** */ - private static final DistributedOperationContextAttributeManager INSTANCE = new DistributedOperationContextAttributeManager(); + private static final DistributedOperationContextManager INSTANCE = new DistributedOperationContextManager(); /** Attributes by their id. */ private final Map> attrs = new ConcurrentSkipListMap<>(); /** */ - public static DistributedOperationContextAttributeManager instance() { + public static DistributedOperationContextManager instance() { return INSTANCE; } @@ -50,18 +50,12 @@ public OperationContextAttribute createDistributedAttriub + OperationContextAttribute.MAX_ATTR_CNT + "]."); } - OperationContextAttribute res; - - synchronized (attrs) { - if (attrs.containsKey(id)) + return (OperationContextAttribute)attrs.compute(id, (id0, attr0) -> { + if (attr0 != null) throw new IgniteException("Duplicated distributed attribute id [id=" + id + "]."); - res = OperationContextAttribute.newInstance(initVal); - - attrs.put(id, res); - } - - return res; + return OperationContextAttribute.newInstance(initVal); + }); } /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 2ebeeedf762f6..ec153176d64b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -70,7 +70,7 @@ import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable; -import org.apache.ignite.internal.thread.context.DistributedOperationContextAttributeManager; +import org.apache.ignite.internal.thread.context.DistributedOperationContextManager; import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -1312,7 +1312,7 @@ private class SocketWriter extends IgniteSpiThread { * @param msg Message. */ private void sendMessage(TcpDiscoveryAbstractMessage msg) { - msg.opCtxMsg = DistributedOperationContextAttributeManager.instance().collectDistributedAttributes(); + msg.opCtxMsg = DistributedOperationContextManager.instance().collectDistributedAttributes(); synchronized (mux) { queue.add(msg); @@ -2007,8 +2007,7 @@ else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage) TcpDiscoveryAbstractMessage msg0 = (TcpDiscoveryAbstractMessage)msg; - try (Scope ignored = DistributedOperationContextAttributeManager.instance() - .restoreDistributedAttributes(msg0.opCtxMsg)) { + try (Scope ignored = DistributedOperationContextManager.instance().restoreDistributedAttributes(msg0.opCtxMsg)) { processDiscoveryMessage(msg0); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 684ef8d7b2048..a8a4f2f0a473e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -95,7 +95,7 @@ import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable; -import org.apache.ignite.internal.thread.context.DistributedOperationContextAttributeManager; +import org.apache.ignite.internal.thread.context.DistributedOperationContextManager; import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor; import org.apache.ignite.internal.util.GridBoundedLinkedHashSet; @@ -3049,7 +3049,7 @@ void addMessage(TcpDiscoveryAbstractMessage msg, boolean ignoreHighPriority, boo } if (!fromSocket) - msg.opCtxMsg = DistributedOperationContextAttributeManager.instance().collectDistributedAttributes(); + msg.opCtxMsg = DistributedOperationContextManager.instance().collectDistributedAttributes(); if (msg instanceof TraceableMessage tMsg) { @@ -3321,7 +3321,7 @@ else if (msg instanceof TcpDiscoveryAuthFailedMessage) if (msg == WAKEUP) return; - try (Scope ignored = DistributedOperationContextAttributeManager.instance().restoreDistributedAttributes(msg.opCtxMsg)) { + try (Scope ignored = DistributedOperationContextManager.instance().restoreDistributedAttributes(msg.opCtxMsg)) { processMessage0(msg); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java index 446f47d2a5058..d60071c6dd3e8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.managers.discovery.CustomEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture; @@ -821,12 +822,25 @@ public void testContextAwareDelayQueue() throws Exception { /** */ @Test public void testSendAttributesByDiscovery() throws Exception { - byte attrId = DistributedOperationContextAttributeManager.MAX_DISTRIBUTED_ATTR_CNT; + byte attrId1 = 0; + byte attrId2 = DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT; - InetSocketAddressMessage dfltAttrVal = new InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80); + InetSocketAddressMessage dfltDistAttr1Val = new InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80); + GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1); - OperationContextAttribute attr = DistributedOperationContextAttributeManager.instance() - .createDistributedAttriubte(attrId, dfltAttrVal); + // Local attribute 1. + OperationContextAttribute.newInstance(1000); + + // Distributed attribute 1. + OperationContextAttribute dAttr1 = DistributedOperationContextManager.instance() + .createDistributedAttriubte(attrId1, dfltDistAttr1Val); + + // Local attribute 2. + OperationContextAttribute.newInstance("locaAttr2"); + + // Distributed attribute 2. + OperationContextAttribute dAttr2 = DistributedOperationContextManager.instance() + .createDistributedAttriubte(attrId2, dfltDistrAttr2Val); startGrids(2); startClientGrid(2); @@ -835,7 +849,8 @@ public void testSendAttributesByDiscovery() throws Exception { CountDownLatch srvrLatch = new CountDownLatch(3); CountDownLatch clientLatch = new CountDownLatch(3); - InetSocketAddressMessage valToSend = new InetSocketAddressMessage(dfltAttrVal.address(), 443); + InetSocketAddressMessage valToSend1 = new InetSocketAddressMessage(dfltDistAttr1Val.address(), 443); + GridCacheVersion valToSend2 = new GridCacheVersion(2, 2, 2); for (int i = 0; i < G.allGrids().size(); ++i) { int i0 = i; @@ -845,14 +860,18 @@ public void testSendAttributesByDiscovery() throws Exception { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, DynamicCacheChangeBatch msg) { - InetSocketAddressMessage receivedVal = OperationContext.get(attr); + InetSocketAddressMessage receivedVal1 = OperationContext.get(dAttr1); + GridCacheVersion receivedVal2 = OperationContext.get(dAttr2); - assertNotNull(receivedVal); + assertNotNull(receivedVal1); + assertNotNull(receivedVal2); - assertFalse(dfltAttrVal.port() == receivedVal.port()); + assertFalse(dfltDistAttr1Val.port() == receivedVal1.port()); + assertEquals(receivedVal1.port(), valToSend1.port()); + assertEquals(receivedVal1.address(), valToSend1.address()); - assertEquals(receivedVal.port(), valToSend.port()); - assertEquals(receivedVal.address(), valToSend.address()); + assertFalse(dfltDistrAttr2Val.equals(receivedVal2)); + assertTrue(valToSend2.equals(receivedVal2)); if (grid(i0).localNode().isClient()) clientLatch.countDown(); @@ -865,7 +884,7 @@ else if (grid(i0).localNode().order() == 1) } // Send from the coordinator. - try (Scope ignored = OperationContext.set(attr, valToSend)) { + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { grid(0).createCache(defaultCacheConfiguration()); } @@ -874,7 +893,7 @@ else if (grid(i0).localNode().order() == 1) assertTrue(waitForCondition(() -> clientLatch.getCount() == 2, getTestTimeout())); // Send from a server. - try (Scope ignored = OperationContext.set(attr, valToSend)) { + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { grid(1).destroyCache(DEFAULT_CACHE_NAME); } @@ -883,7 +902,7 @@ else if (grid(i0).localNode().order() == 1) assertTrue(waitForCondition(() -> clientLatch.getCount() == 1, getTestTimeout())); // Send from a client. - try (Scope ignored = OperationContext.set(attr, valToSend)) { + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { grid(2).createCache(defaultCacheConfiguration()); } From 13d5f1146e19ef054b449526dfea730618a4ab7d Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 19 Jun 2026 16:38:10 +0300 Subject: [PATCH 11/17] renaming --- .../apache/ignite/internal/CoreMessagesProvider.java | 2 +- ...ge.java => DistributedOperationContextMessage.java} | 4 ++-- .../context/DistributedOperationContextManager.java | 10 +++++----- .../tcp/messages/TcpDiscoveryAbstractMessage.java | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) rename modules/core/src/main/java/org/apache/ignite/internal/{DistributedOperationContextAttributesMessage.java => DistributedOperationContextMessage.java} (90%) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 0b655b98f48c5..cc0ab3e112b7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -668,7 +668,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C // [13400 - 13600]: Operation context messages. msgIdx = 13400; - withNoSchema(DistributedOperationContextAttributesMessage.class); + withNoSchema(DistributedOperationContextMessage.class); assert msgIdx <= MAX_MESSAGE_ID; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextAttributesMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java similarity index 90% rename from modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextAttributesMessage.java rename to modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java index 937536d1b24ad..0277784044272 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextAttributesMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java @@ -22,7 +22,7 @@ import org.apache.ignite.plugin.extensions.communication.Message; /** Transport for {@link OperationContext} distributed attributes. */ -public class DistributedOperationContextAttributesMessage implements Message { +public class DistributedOperationContextMessage implements Message { /** Values of operation context attributes. */ @Order(0) public List vals; @@ -32,7 +32,7 @@ public class DistributedOperationContextAttributesMessage implements Message { public byte idBitmap; /** Empty constructor for serialization purposes. */ - public DistributedOperationContextAttributesMessage() { + public DistributedOperationContextMessage() { // No-op. } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java index 0738398560df2..a95e48880353f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java @@ -20,7 +20,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.DistributedOperationContextAttributesMessage; +import org.apache.ignite.internal.DistributedOperationContextMessage; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; @@ -59,8 +59,8 @@ public OperationContextAttribute createDistributedAttriub } /** */ - public @Nullable DistributedOperationContextAttributesMessage collectDistributedAttributes() { - DistributedOperationContextAttributesMessage res = null; + public @Nullable DistributedOperationContextMessage collectDistributedAttributes() { + DistributedOperationContextMessage res = null; for (Map.Entry> e : attrs.entrySet()) { OperationContextAttribute attr = e.getValue(); @@ -71,7 +71,7 @@ public OperationContextAttribute createDistributedAttriub if (curVal != attr.initialValue()) { if (res == null) { - res = new DistributedOperationContextAttributesMessage(); + res = new DistributedOperationContextMessage(); res.vals = new ArrayList<>(MAX_DISTRIBUTED_ATTR_CNT / 2); } @@ -89,7 +89,7 @@ public OperationContextAttribute createDistributedAttriub } /** */ - public Scope restoreDistributedAttributes(@Nullable DistributedOperationContextAttributesMessage msg) { + public Scope restoreDistributedAttributes(@Nullable DistributedOperationContextMessage msg) { if (msg == null) return Scope.NOOP_SCOPE; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index bbee4c33e8cd9..9dda990c7d020 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -21,7 +21,7 @@ import java.util.HashSet; import java.util.Set; import java.util.UUID; -import org.apache.ignite.internal.DistributedOperationContextAttributesMessage; +import org.apache.ignite.internal.DistributedOperationContextMessage; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -80,7 +80,7 @@ public abstract class TcpDiscoveryAbstractMessage implements Message { /** Operation context attributes message. */ @GridToStringInclude @Order(5) - public @Nullable DistributedOperationContextAttributesMessage opCtxMsg; + public @Nullable DistributedOperationContextMessage opCtxMsg; /** * Default no-arg constructor for {@link Externalizable} interface. From 1bc5fa895521b1f0128eddd9465a2139c73501f7 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 22 Jun 2026 16:36:40 +0300 Subject: [PATCH 12/17] review fixes --- .../DistributedOperationContextMessage.java | 5 +- .../DistributedOperationContextManager.java | 32 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 428 +++++++++--------- .../OperationContextAttributesTest.java | 4 +- 4 files changed, 238 insertions(+), 231 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java index 0277784044272..2cfed19ad250a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal; -import java.util.List; import org.apache.ignite.internal.thread.context.OperationContext; import org.apache.ignite.plugin.extensions.communication.Message; @@ -25,9 +24,9 @@ public class DistributedOperationContextMessage implements Message { /** Values of operation context attributes. */ @Order(0) - public List vals; + public Message[] vals; - /** Bitmask of effective attributes ids. */ + /** Bitmap of effective attributes ids. */ @Order(1) public byte idBitmap; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java index a95e48880353f..d02b708b975db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.thread.context; import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.ignite.IgniteException; @@ -28,7 +29,7 @@ /** */ public class DistributedOperationContextManager { /** */ - static final byte MAX_DISTRIBUTED_ATTR_CNT = 7; + static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE; /** */ private static final DistributedOperationContextManager INSTANCE = new DistributedOperationContextManager(); @@ -42,13 +43,11 @@ public static DistributedOperationContextManager instance() { } /** */ - public OperationContextAttribute createDistributedAttriubte(byte id, @Nullable T initVal) { + public OperationContextAttribute createDistributedAttribute(byte id, @Nullable T initVal) { assert id >= 0; - if (attrs.size() == OperationContextAttribute.MAX_ATTR_CNT) { - throw new IgniteException("Maximum number of distributed attributes is exceeded [max=" - + OperationContextAttribute.MAX_ATTR_CNT + "]."); - } + if (attrs.size() == MAX_DISTRIBUTED_ATTR_CNT) + throw new IgniteException("Maximum number of distributed attributes is exceeded [max=" + MAX_DISTRIBUTED_ATTR_CNT + "]."); return (OperationContextAttribute)attrs.compute(id, (id0, attr0) -> { if (attr0 != null) @@ -61,30 +60,32 @@ public OperationContextAttribute createDistributedAttriub /** */ public @Nullable DistributedOperationContextMessage collectDistributedAttributes() { DistributedOperationContextMessage res = null; + List vals = null; for (Map.Entry> e : attrs.entrySet()) { OperationContextAttribute attr = e.getValue(); Message curVal = OperationContext.get(attr); - assert attr.initialValue() == null || curVal == null || curVal.getClass().isAssignableFrom(attr.initialValue().getClass()); - if (curVal != attr.initialValue()) { if (res == null) { res = new DistributedOperationContextMessage(); - res.vals = new ArrayList<>(MAX_DISTRIBUTED_ATTR_CNT / 2); + vals = new ArrayList<>(MAX_DISTRIBUTED_ATTR_CNT / 2); } byte mask = (byte)(1 << e.getKey()); assert (res.idBitmap & mask) == 0; - res.vals.add(curVal); + vals.add(curVal); res.idBitmap |= mask; } } + if (res != null) + res.vals = vals.toArray(vals.toArray(new Message[vals.size()])); + return res; } @@ -95,18 +96,15 @@ public Scope restoreDistributedAttributes(@Nullable DistributedOperationContextM assert msg.idBitmap != 0; assert !F.isEmpty(msg.vals); - assert msg.vals.size() <= MAX_DISTRIBUTED_ATTR_CNT; + assert msg.vals.length <= MAX_DISTRIBUTED_ATTR_CNT; OperationContext.ContextUpdater updater = OperationContext.ContextUpdater.create(); - for (byte valIdx = 0, maskIdx = 0; valIdx < msg.vals.size(); ++valIdx) { - Message curVal = msg.vals.get(valIdx); - - while ((msg.idBitmap & (1 << maskIdx)) == 0) { - assert maskIdx <= MAX_DISTRIBUTED_ATTR_CNT; + for (byte valIdx = 0, maskIdx = 0; valIdx < msg.vals.length; ++valIdx) { + Message curVal = msg.vals[valIdx]; + while ((msg.idBitmap & (1 << maskIdx)) == 0) ++maskIdx; - } updater.set((OperationContextAttribute)attrs.get(maskIdx++), curVal); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index ec153176d64b6..4b4eb3fccacd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1761,280 +1761,290 @@ private MessageWorker(IgniteLogger log) { blockingSectionEnd(); } - if (msg instanceof JoinTimeout) { - int joinCnt0 = ((JoinTimeout)msg).joinCnt; - - if (joinCnt == joinCnt0) { - if (state == STARTING) { - joinError(new IgniteSpiException("Join process timed out, did not receive response for " + - "join request (consider increasing 'joinTimeout' configuration property) " + - "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']')); - + if (msg instanceof TcpDiscoveryClientReconnectMessage msg0 && msg0.opCtxMsg != null) { + try (Scope ignored = DistributedOperationContextManager.instance().restoreDistributedAttributes(msg0.opCtxMsg)) { + if (processRawMessage(msg)) break; - } - else if (state == DISCONNECTED) { - if (log.isDebugEnabled()) - log.debug("Failed to reconnect, local node segmented " + - "[joinTimeout=" + spi.joinTimeout + ']'); - - state = SEGMENTED; - - notifyDiscovery( - EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); - } } } - else if (msg == SPI_STOP) { - boolean connected = state == CONNECTED; - - state = STOPPED; + else if (processRawMessage(msg)) + break; + } + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + catch (Throwable t) { + if (spi.ignite() instanceof IgniteEx) + ((IgniteEx)spi.ignite()).context().failure().process(new FailureContext(CRITICAL_ERROR, t)); + } + finally { + SocketStream currSock = this.currSock; - assert spi.getSpiContext().isStopping(); + if (currSock != null) + U.closeQuiet(currSock.socket()); - if (connected && currSock != null) { - TcpDiscoveryNodeLeftMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); + if (joinLatch.getCount() > 0) + joinError(new IgniteSpiException("Some error in join process.")); // This should not occur. - leftMsg.client(true); + if (reconnector != null) { + reconnector.cancel(); - Span rootSpan = tracing.create(TraceableMessagesTable.traceName(leftMsg.getClass())) - .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> locNode.id().toString()) - .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID), - () -> locNode.consistentId().toString()) - .addLog(() -> "Created"); + reconnector.join(); + } + } + } - leftMsg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); + /** @return {@code True} if the cycle stop is required. */ + private boolean processRawMessage(Object msg) throws InterruptedException { + if (msg instanceof JoinTimeout) { + int joinCnt0 = ((JoinTimeout)msg).joinCnt; - sockWriter.sendMessage(leftMsg); + if (joinCnt == joinCnt0) { + if (state == STARTING) { + joinError(new IgniteSpiException("Join process timed out, did not receive response for " + + "join request (consider increasing 'joinTimeout' configuration property) " + + "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']')); - rootSpan.addLog(() -> "Sent").end(); - } - else - leaveLatch.countDown(); + return true; } - else if (msg == SPI_RECONNECT) { - if (state == CONNECTED) { - if (reconnector != null) { - reconnector.cancel(); - reconnector.join(); + else if (state == DISCONNECTED) { + if (log.isDebugEnabled()) + log.debug("Failed to reconnect, local node segmented " + + "[joinTimeout=" + spi.joinTimeout + ']'); - reconnector = null; - } + state = SEGMENTED; - sockWriter.forceLeave(); - sockReader.forceStopRead(); + notifyDiscovery( + EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); + } + } + } + else if (msg == SPI_STOP) { + boolean connected = state == CONNECTED; - currSock = null; + state = STOPPED; - queue.clear(); + assert spi.getSpiContext().isStopping(); - onDisconnected(); + if (connected && currSock != null) { + TcpDiscoveryNodeLeftMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); - UUID newId = UUID.randomUUID(); + leftMsg.client(true); - U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " + - "to network problems [newId=" + newId + - ", prevId=" + locNode.id() + - ", locNode=" + locNode + ']'); + Span rootSpan = tracing.create(TraceableMessagesTable.traceName(leftMsg.getClass())) + .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> locNode.id().toString()) + .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID), + () -> locNode.consistentId().toString()) + .addLog(() -> "Created"); - locNode.onClientDisconnected(newId); + leftMsg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); - throttleClientReconnect(); + sockWriter.sendMessage(leftMsg); - tryJoin(); - } + rootSpan.addLog(() -> "Sent").end(); + } + else + leaveLatch.countDown(); + } + else if (msg == SPI_RECONNECT) { + if (state == CONNECTED) { + if (reconnector != null) { + reconnector.cancel(); + reconnector.join(); + + reconnector = null; } - else if (msg instanceof TcpDiscoveryNodeFailedMessage && - ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) { - TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg; - assert msg0.force() : msg0; + sockWriter.forceLeave(); + sockReader.forceStopRead(); - forceFailMsg = msg0; - } - else if (msg instanceof SocketClosedMessage) { - if (((SocketClosedMessage)msg).sock == currSock) { - Socket sock = currSock.sock; + currSock = null; - InetSocketAddress prevAddr = new InetSocketAddress(sock.getInetAddress(), sock.getPort()); + queue.clear(); - currSock = null; + onDisconnected(); - boolean join = joinLatch.getCount() > 0; + UUID newId = UUID.randomUUID(); - if (spi.getSpiContext().isStopping() || state == SEGMENTED) { - leaveLatch.countDown(); + U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " + + "to network problems [newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode + ']'); - if (join) { - joinError(new IgniteSpiException("Failed to connect to cluster: socket closed.")); + locNode.onClientDisconnected(newId); - break; - } - } - else { - if (forceFailMsg != null) { - if (log.isDebugEnabled()) { - log.debug("Connection closed, local node received force fail message, " + - "will not try to restore connection"); - } + throttleClientReconnect(); - queue.addFirst(SPI_RECONNECT_FAILED); - } - else { - if (log.isDebugEnabled()) - log.debug("Connection closed, will try to restore connection."); + tryJoin(); + } + } + else if (msg instanceof TcpDiscoveryNodeFailedMessage && + ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) { + TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg; - assert reconnector == null; + assert msg0.force() : msg0; - reconnector = new Reconnector(join, prevAddr); - reconnector.start(); - } - } + forceFailMsg = msg0; + } + else if (msg instanceof SocketClosedMessage) { + if (((SocketClosedMessage)msg).sock == currSock) { + Socket sock = currSock.sock; + + InetSocketAddress prevAddr = new InetSocketAddress(sock.getInetAddress(), sock.getPort()); + + currSock = null; + + boolean join = joinLatch.getCount() > 0; + + if (spi.getSpiContext().isStopping() || state == SEGMENTED) { + leaveLatch.countDown(); + + if (join) { + joinError(new IgniteSpiException("Failed to connect to cluster: socket closed.")); + + return true; } } - else if (msg == SPI_RECONNECT_FAILED) { - if (reconnector != null) { - reconnector.cancel(); - reconnector.join(); + else { + if (forceFailMsg != null) { + if (log.isDebugEnabled()) { + log.debug("Connection closed, local node received force fail message, " + + "will not try to restore connection"); + } - reconnector = null; + queue.addFirst(SPI_RECONNECT_FAILED); } - else - assert forceFailMsg != null; - - if (spi.isClientReconnectDisabled()) { - if (state != SEGMENTED && state != STOPPED) { - if (forceFailMsg != null) { - U.quietAndWarn(log, "Local node was dropped from cluster due to network problems " + - "[nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + - ", msg=" + forceFailMsg.warning() + ']'); - } + else { + if (log.isDebugEnabled()) + log.debug("Connection closed, will try to restore connection."); - if (log.isDebugEnabled()) { - log.debug("Failed to restore closed connection, reconnect disabled, " + - "local node segmented [networkTimeout=" + spi.netTimeout + ']'); - } + assert reconnector == null; - state = SEGMENTED; + reconnector = new Reconnector(join, prevAddr); + reconnector.start(); + } + } + } + } + else if (msg == SPI_RECONNECT_FAILED) { + if (reconnector != null) { + reconnector.cancel(); + reconnector.join(); - notifyDiscovery( - EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); - } + reconnector = null; + } + else + assert forceFailMsg != null; + + if (spi.isClientReconnectDisabled()) { + if (state != SEGMENTED && state != STOPPED) { + if (forceFailMsg != null) { + U.quietAndWarn(log, "Local node was dropped from cluster due to network problems " + + "[nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + + ", msg=" + forceFailMsg.warning() + ']'); } - else { - if (state == STARTING || state == CONNECTED) { - if (log.isDebugEnabled()) { - log.debug("Failed to restore closed connection, will try to reconnect " + - "[networkTimeout=" + spi.netTimeout + - ", joinTimeout=" + spi.joinTimeout + - ", failMsg=" + forceFailMsg + ']'); - } - onDisconnected(); - } + if (log.isDebugEnabled()) { + log.debug("Failed to restore closed connection, reconnect disabled, " + + "local node segmented [networkTimeout=" + spi.netTimeout + ']'); + } - UUID newId = UUID.randomUUID(); + state = SEGMENTED; - if (forceFailMsg != null) { - long delay = IgniteSystemProperties.getLong(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY, - DFLT_DISCO_FAILED_CLIENT_RECONNECT_DELAY); + notifyDiscovery( + EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); + } + } + else { + if (state == STARTING || state == CONNECTED) { + if (log.isDebugEnabled()) { + log.debug("Failed to restore closed connection, will try to reconnect " + + "[networkTimeout=" + spi.netTimeout + + ", joinTimeout=" + spi.joinTimeout + + ", failMsg=" + forceFailMsg + ']'); + } - if (delay > 0) { - U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + - "will try to reconnect with new id after " + delay + "ms (reconnect delay " + - "can be changed using IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY system " + - "property) [" + - "newId=" + newId + - ", prevId=" + locNode.id() + - ", locNode=" + locNode + - ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + - ", msg=" + forceFailMsg.warning() + ']'); + onDisconnected(); + } - Thread.sleep(delay); - } - else { - U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + - "will try to reconnect with new id [" + - "newId=" + newId + - ", prevId=" + locNode.id() + - ", locNode=" + locNode + - ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + - ", msg=" + forceFailMsg.warning() + ']'); - } + UUID newId = UUID.randomUUID(); - forceFailMsg = null; - } - else if (log.isInfoEnabled()) { - log.info("Client node disconnected from cluster, will try to reconnect with new id " + - "[newId=" + newId + ", prevId=" + locNode.id() + ", locNode=" + locNode + ']'); - } + if (forceFailMsg != null) { + long delay = IgniteSystemProperties.getLong(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY, + DFLT_DISCO_FAILED_CLIENT_RECONNECT_DELAY); - locNode.onClientDisconnected(newId); + if (delay > 0) { + U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + + "will try to reconnect with new id after " + delay + "ms (reconnect delay " + + "can be changed using IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY system " + + "property) [" + + "newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode + + ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + + ", msg=" + forceFailMsg.warning() + ']'); - tryJoin(); + Thread.sleep(delay); + } + else { + U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + + "will try to reconnect with new id [" + + "newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode + + ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + + ", msg=" + forceFailMsg.warning() + ']'); } + + forceFailMsg = null; + } + else if (log.isInfoEnabled()) { + log.info("Client node disconnected from cluster, will try to reconnect with new id " + + "[newId=" + newId + ", prevId=" + locNode.id() + ", locNode=" + locNode + ']'); } - else { - TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg; - if (joining()) { - IgniteSpiException err = null; + locNode.onClientDisconnected(newId); - if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage) - err = spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg); - else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage) - err = spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg); - //TODO: https://issues.apache.org/jira/browse/IGNITE-9829 - else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage) - err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg); + tryJoin(); + } + } + else { + TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg; - if (err != null) { - if (state == DISCONNECTED) { - U.error(log, "Failed to reconnect, segment local node.", err); + if (joining()) { + IgniteSpiException err = null; - state = SEGMENTED; + if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage) + err = spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg); + else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage) + err = spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg); + //TODO: https://issues.apache.org/jira/browse/IGNITE-9829 + else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage) + err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg); - notifyDiscovery( - EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); - } - else - joinError(err); + if (err != null) { + if (state == DISCONNECTED) { + U.error(log, "Failed to reconnect, segment local node.", err); - cancel(); + state = SEGMENTED; - break; - } + notifyDiscovery( + EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); } + else + joinError(err); - TcpDiscoveryAbstractMessage msg0 = (TcpDiscoveryAbstractMessage)msg; + cancel(); - try (Scope ignored = DistributedOperationContextManager.instance().restoreDistributedAttributes(msg0.opCtxMsg)) { - processDiscoveryMessage(msg0); - } + return true; } } - } - catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } - catch (Throwable t) { - if (spi.ignite() instanceof IgniteEx) - ((IgniteEx)spi.ignite()).context().failure().process(new FailureContext(CRITICAL_ERROR, t)); - } - finally { - SocketStream currSock = this.currSock; - if (currSock != null) - U.closeQuiet(currSock.socket()); - - if (joinLatch.getCount() > 0) - joinError(new IgniteSpiException("Some error in join process.")); // This should not occur. - - if (reconnector != null) { - reconnector.cancel(); - - reconnector.join(); - } + processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg); } + + return false; } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java index d60071c6dd3e8..20ca6845072bf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -833,14 +833,14 @@ public void testSendAttributesByDiscovery() throws Exception { // Distributed attribute 1. OperationContextAttribute dAttr1 = DistributedOperationContextManager.instance() - .createDistributedAttriubte(attrId1, dfltDistAttr1Val); + .createDistributedAttribute(attrId1, dfltDistAttr1Val); // Local attribute 2. OperationContextAttribute.newInstance("locaAttr2"); // Distributed attribute 2. OperationContextAttribute dAttr2 = DistributedOperationContextManager.instance() - .createDistributedAttriubte(attrId2, dfltDistrAttr2Val); + .createDistributedAttribute(attrId2, dfltDistrAttr2Val); startGrids(2); startClientGrid(2); From cb024d1897db0564f7c16b8958176ad56da93420 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 22 Jun 2026 16:45:49 +0300 Subject: [PATCH 13/17] fix --- .../thread/context/DistributedOperationContextManager.java | 2 +- .../internal/thread/context/OperationContextAttributesTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java index d02b708b975db..a0eba66e7052f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java @@ -44,7 +44,7 @@ public static DistributedOperationContextManager instance() { /** */ public OperationContextAttribute createDistributedAttribute(byte id, @Nullable T initVal) { - assert id >= 0; + assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed attributed id [id=" + id + "]."; if (attrs.size() == MAX_DISTRIBUTED_ATTR_CNT) throw new IgniteException("Maximum number of distributed attributes is exceeded [max=" + MAX_DISTRIBUTED_ATTR_CNT + "]."); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java index 20ca6845072bf..bdf84b743ad2d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -823,7 +823,7 @@ public void testContextAwareDelayQueue() throws Exception { @Test public void testSendAttributesByDiscovery() throws Exception { byte attrId1 = 0; - byte attrId2 = DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT; + byte attrId2 = DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1; InetSocketAddressMessage dfltDistAttr1Val = new InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80); GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1); From 4f5d18034ab7e20ec543fe85f9e42cdd6b1523ee Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 22 Jun 2026 17:00:17 +0300 Subject: [PATCH 14/17] fix --- .../java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 4b4eb3fccacd2..81c0bf9c26f5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1761,7 +1761,7 @@ private MessageWorker(IgniteLogger log) { blockingSectionEnd(); } - if (msg instanceof TcpDiscoveryClientReconnectMessage msg0 && msg0.opCtxMsg != null) { + if (msg instanceof TcpDiscoveryAbstractMessage msg0 && msg0.opCtxMsg != null) { try (Scope ignored = DistributedOperationContextManager.instance().restoreDistributedAttributes(msg0.opCtxMsg)) { if (processRawMessage(msg)) break; From a25c2f2e36a305f73823302df0a9aab86440ce11 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 22 Jun 2026 17:19:36 +0300 Subject: [PATCH 15/17] review fixes --- .../DistributedOperationContextMessage.java | 7 +++- .../DistributedOperationContextManager.java | 40 +++++++++++++------ 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java index 2cfed19ad250a..42d5c7eda859a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java @@ -17,10 +17,15 @@ package org.apache.ignite.internal; +import org.apache.ignite.internal.thread.context.DistributedOperationContextManager; import org.apache.ignite.internal.thread.context.OperationContext; import org.apache.ignite.plugin.extensions.communication.Message; -/** Transport for {@link OperationContext} distributed attributes. */ +/** + * Transport for {@link OperationContext} distributed attributes. + * + * @see DistributedOperationContextManager + */ public class DistributedOperationContextMessage implements Message { /** Values of operation context attributes. */ @Order(0) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java index a0eba66e7052f..25824f64eaaf9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java @@ -26,15 +26,21 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; -/** */ +/** + * A manager of {@link OperationContextAttribute} which are required to be propagated through a cluster. + * Has own attributes ids compared to a local node's {@link OperationContext}. + * + * @see OperationContext + * @see DistributedOperationContextMessage + */ public class DistributedOperationContextManager { - /** */ - static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE; - /** */ private static final DistributedOperationContextManager INSTANCE = new DistributedOperationContextManager(); - /** Attributes by their id. */ + /** Maximal number of supported distributed attributes. */ + static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE; + + /** Registered distributed attributes by their cluster-wide id. */ private final Map> attrs = new ConcurrentSkipListMap<>(); /** */ @@ -42,22 +48,30 @@ public static DistributedOperationContextManager instance() { return INSTANCE; } - /** */ + /** + * Creates and registers a distributable {@link OperationContextAttribute} identified by {@code id}. + * + * @param id Cluster-wide id of an operation context attribute. + * @param initVal The attrbute's unitial value. + */ public OperationContextAttribute createDistributedAttribute(byte id, @Nullable T initVal) { - assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed attributed id [id=" + id + "]."; - - if (attrs.size() == MAX_DISTRIBUTED_ATTR_CNT) - throw new IgniteException("Maximum number of distributed attributes is exceeded [max=" + MAX_DISTRIBUTED_ATTR_CNT + "]."); + assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed attributed id [id=" + id + ']'; return (OperationContextAttribute)attrs.compute(id, (id0, attr0) -> { if (attr0 != null) - throw new IgniteException("Duplicated distributed attribute id [id=" + id + "]."); + throw new IgniteException("Duplicated distributed attribute id [id=" + id + ']'); return OperationContextAttribute.newInstance(initVal); }); } - /** */ + /** + * Requests current {@link OperationContext} for its effective attributes and collects ones which are also registered + * as the distbibued attributes. + * + * @return The dedicated message to send current effective distributed attributes. {@code null}, if there are no + * effective attributes in {@link OperationContext} or none of them is registered as the distribute attribute. + */ public @Nullable DistributedOperationContextMessage collectDistributedAttributes() { DistributedOperationContextMessage res = null; List vals = null; @@ -89,7 +103,7 @@ public OperationContextAttribute createDistributedAttribu return res; } - /** */ + /** Sets the received distributed operation context attributes (if any) into current {@link OperationContext}. */ public Scope restoreDistributedAttributes(@Nullable DistributedOperationContextMessage msg) { if (msg == null) return Scope.NOOP_SCOPE; From 4af4ccdd084645694d808ebed71fdee22c679ba2 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 22 Jun 2026 17:25:30 +0300 Subject: [PATCH 16/17] typo --- .../thread/context/DistributedOperationContextManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java index 25824f64eaaf9..4a47c84a206c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java @@ -51,8 +51,8 @@ public static DistributedOperationContextManager instance() { /** * Creates and registers a distributable {@link OperationContextAttribute} identified by {@code id}. * - * @param id Cluster-wide id of an operation context attribute. - * @param initVal The attrbute's unitial value. + * @param id Cluster-wide id of a distributed operation context attribute. + * @param initVal The attribute's unitial value. */ public OperationContextAttribute createDistributedAttribute(byte id, @Nullable T initVal) { assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed attributed id [id=" + id + ']'; From f863cc482d3f5b002180ae825582851bcf8a4532 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 22 Jun 2026 17:28:17 +0300 Subject: [PATCH 17/17] typo --- .../context/DistributedOperationContextManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java index 4a47c84a206c8..cc39aff8e05d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java @@ -49,7 +49,7 @@ public static DistributedOperationContextManager instance() { } /** - * Creates and registers a distributable {@link OperationContextAttribute} identified by {@code id}. + * Creates and registers a distributable {@link OperationContextAttribute}. * * @param id Cluster-wide id of a distributed operation context attribute. * @param initVal The attribute's unitial value. @@ -67,10 +67,10 @@ public OperationContextAttribute createDistributedAttribu /** * Requests current {@link OperationContext} for its effective attributes and collects ones which are also registered - * as the distbibued attributes. + * as distbibued attributes. * - * @return The dedicated message to send current effective distributed attributes. {@code null}, if there are no - * effective attributes in {@link OperationContext} or none of them is registered as the distribute attribute. + * @return A message to send current effective distributed attributes. {@code null}, if there are no + * effective attributes in {@link OperationContext} or none of them is a distributed attribute. */ public @Nullable DistributedOperationContextMessage collectDistributedAttributes() { DistributedOperationContextMessage res = null;