From db801cc4913aaae273a2286b70eb89c301265518 Mon Sep 17 00:00:00 2001 From: wb Date: Fri, 12 Jun 2026 14:26:48 +0800 Subject: [PATCH] feat(event): support rollback (removed) for block and transaction log triggers On a chain reorg, BlockLogTrigger/TransactionLogTrigger subscribers previously never learned that an orphaned block's events were rolled back. Add the same "removed" semantics already used by ContractTrigger. - add a `removed` field to BlockLogTrigger/TransactionLogTrigger and the matching setRemoved() on their capsules - Manager (event version 0): refactor postBlockTrigger to emit a single real-time block (with removed), move the solidified-mode batch into postSolidityTrigger, add reOrgBlockTrigger to re-emit erased blocks with removed=true, rename reApplyLogsFilter to reApplyBlockEvents and re-emit the re-applied fork branch's block/tx triggers (forward), thread removed through processTransactionTrigger/postTransactionTrigger - RealtimeEventService (event version 1): post block/tx triggers synchronously to the plugin with the removed flag instead of the async triggerCapsuleQueue, avoiding overwrite of the shared cached capsule; drop the unused manager field - tests: capsule removed passthrough, version-0 emit (testReOrgBlockTriggerRemoved), rewritten RealtimeEventServiceTest, updated ManagerTest stub --- .../logsfilter/trigger/BlockLogTrigger.java | 8 + .../trigger/TransactionLogTrigger.java | 6 + .../capsule/BlockLogTriggerCapsule.java | 4 + .../capsule/TransactionLogTriggerCapsule.java | 4 + .../main/java/org/tron/core/db/Manager.java | 120 ++++++++++----- .../services/event/RealtimeEventService.java | 24 +-- .../TransactionLogTriggerCapsuleTest.java | 18 +++ .../capsule/BlockLogTriggerCapsuleTest.java | 8 + .../java/org/tron/core/db/ManagerTest.java | 143 +++++++++++++++++- .../core/event/RealtimeEventServiceTest.java | 44 +++--- 10 files changed, 305 insertions(+), 74 deletions(-) diff --git a/common/src/main/java/org/tron/common/logsfilter/trigger/BlockLogTrigger.java b/common/src/main/java/org/tron/common/logsfilter/trigger/BlockLogTrigger.java index b878597045d..8b27bb6ff4c 100644 --- a/common/src/main/java/org/tron/common/logsfilter/trigger/BlockLogTrigger.java +++ b/common/src/main/java/org/tron/common/logsfilter/trigger/BlockLogTrigger.java @@ -27,6 +27,12 @@ public class BlockLogTrigger extends Trigger { @Setter private List transactionList = new ArrayList<>(); + // true when this block is being rolled back due to a chain reorg (fork switch); + // mirrors the Ethereum log "removed" semantics already used by ContractTrigger. + @Getter + @Setter + private boolean removed; + public BlockLogTrigger() { setTriggerName(Trigger.BLOCK_TRIGGER_NAME); } @@ -44,6 +50,8 @@ public String toString() { .append(transactionSize) .append(", latestSolidifiedBlockNumber: ") .append(latestSolidifiedBlockNumber) + .append(", removed: ") + .append(removed) .append(", transactionList: ") .append(transactionList).toString(); } diff --git a/common/src/main/java/org/tron/common/logsfilter/trigger/TransactionLogTrigger.java b/common/src/main/java/org/tron/common/logsfilter/trigger/TransactionLogTrigger.java index a4fb1fddb79..7ccc17f9a1d 100644 --- a/common/src/main/java/org/tron/common/logsfilter/trigger/TransactionLogTrigger.java +++ b/common/src/main/java/org/tron/common/logsfilter/trigger/TransactionLogTrigger.java @@ -97,6 +97,12 @@ public class TransactionLogTrigger extends Trigger { @Setter private Map extMap; + // true when this transaction is being rolled back due to a chain reorg (fork switch); + // mirrors the Ethereum log "removed" semantics already used by ContractTrigger. + @Getter + @Setter + private boolean removed; + public TransactionLogTrigger() { setTriggerName(Trigger.TRANSACTION_TRIGGER_NAME); } diff --git a/framework/src/main/java/org/tron/common/logsfilter/capsule/BlockLogTriggerCapsule.java b/framework/src/main/java/org/tron/common/logsfilter/capsule/BlockLogTriggerCapsule.java index b714134ff60..de040d33d44 100644 --- a/framework/src/main/java/org/tron/common/logsfilter/capsule/BlockLogTriggerCapsule.java +++ b/framework/src/main/java/org/tron/common/logsfilter/capsule/BlockLogTriggerCapsule.java @@ -27,6 +27,10 @@ public void setLatestSolidifiedBlockNumber(long latestSolidifiedBlockNumber) { blockLogTrigger.setLatestSolidifiedBlockNumber(latestSolidifiedBlockNumber); } + public void setRemoved(boolean removed) { + blockLogTrigger.setRemoved(removed); + } + @Override public void processTrigger() { EventPluginLoader.getInstance().postBlockTrigger(blockLogTrigger); diff --git a/framework/src/main/java/org/tron/common/logsfilter/capsule/TransactionLogTriggerCapsule.java b/framework/src/main/java/org/tron/common/logsfilter/capsule/TransactionLogTriggerCapsule.java index 958b3c0fd78..cf1737d9d22 100644 --- a/framework/src/main/java/org/tron/common/logsfilter/capsule/TransactionLogTriggerCapsule.java +++ b/framework/src/main/java/org/tron/common/logsfilter/capsule/TransactionLogTriggerCapsule.java @@ -377,6 +377,10 @@ public void setLatestSolidifiedBlockNumber(long latestSolidifiedBlockNumber) { transactionLogTrigger.setLatestSolidifiedBlockNumber(latestSolidifiedBlockNumber); } + public void setRemoved(boolean removed) { + transactionLogTrigger.setRemoved(removed); + } + private List getInternalTransactionList( List internalTransactionList) { List pojoList = new ArrayList<>(); diff --git a/framework/src/main/java/org/tron/core/db/Manager.java b/framework/src/main/java/org/tron/core/db/Manager.java index 01da3002611..eab76db9e3f 100644 --- a/framework/src/main/java/org/tron/core/db/Manager.java +++ b/framework/src/main/java/org/tron/core/db/Manager.java @@ -1127,6 +1127,7 @@ private void switchFork(BlockCapsule newHead) .equals(binaryTree.getValue().peekLast().getParentHash())) { if (EventPluginLoader.getInstance().getVersion() == 0) { reOrgContractTrigger(); + reOrgBlockTrigger(); } reOrgLogsFilter(); eraseBlock(); @@ -1202,7 +1203,7 @@ private void switchFork(BlockCapsule newHead) } } // only reached when the whole new branch applied cleanly; a failed switch rethrows above - reApplyLogsFilter(first); + reApplyBlockEvents(first); } } @@ -1435,9 +1436,10 @@ void blockTrigger(final BlockCapsule block, long oldSolid, long newSolid) { return; } - // if event subscribe is enabled, post block trigger to queue - postBlockTrigger(block); + // if event subscribe is enabled, post block trigger to queue (real-time, not removed) + postBlockTrigger(block, false); // if event subscribe is enabled, post solidity trigger to queue + // (also emits solidified-mode block/transaction triggers) postSolidityTrigger(newSolid); } catch (Exception e) { logger.error("Block trigger failed. head: {}, oldSolid: {}, newSolid: {}", @@ -2205,6 +2207,27 @@ private void postSolidityFilter(final long oldSolidNum, final long latestSolidif } private void postSolidityTrigger(final long latestSolidifiedBlockNumber) { + // solidified-mode block trigger: emit the newly-solidified blocks (never removed, + // since solidified blocks cannot be reorged). + if (eventPluginLoaded && EventPluginLoader.getInstance().isBlockLogTriggerEnable() + && EventPluginLoader.getInstance().isBlockLogTriggerSolidified()) { + for (BlockCapsule capsule : getContinuousBlockCapsule(latestSolidifiedBlockNumber)) { + BlockLogTriggerCapsule blockLogTriggerCapsule = new BlockLogTriggerCapsule(capsule); + blockLogTriggerCapsule.setLatestSolidifiedBlockNumber(latestSolidifiedBlockNumber); + if (!triggerCapsuleQueue.offer(blockLogTriggerCapsule)) { + logger.info("Too many triggers, block trigger lost: {}.", capsule.getBlockId()); + } + } + } + + // solidified-mode transaction trigger: emit transactions of the newly-solidified blocks. + if (eventPluginLoaded && EventPluginLoader.getInstance().isTransactionLogTriggerEnable() + && EventPluginLoader.getInstance().isTransactionLogTriggerSolidified()) { + for (BlockCapsule capsule : getContinuousBlockCapsule(latestSolidifiedBlockNumber)) { + processTransactionTrigger(capsule, false); + } + } + if (eventPluginLoaded && EventPluginLoader.getInstance().isSolidityLogTriggerEnable()) { for (Long i : Args.getSolidityContractLogTriggerMap().keySet()) { postSolidityLogContractTrigger(i, latestSolidifiedBlockNumber); @@ -2233,7 +2256,7 @@ private void postSolidityTrigger(final long latestSolidifiedBlockNumber) { lastUsedSolidityNum = latestSolidifiedBlockNumber; } - private void processTransactionTrigger(BlockCapsule newBlock) { + private void processTransactionTrigger(BlockCapsule newBlock, boolean removed) { List transactionCapsuleList = newBlock.getTransactions(); // need to set eth compatible data from transactionInfoList @@ -2252,7 +2275,7 @@ private void processTransactionTrigger(BlockCapsule newBlock) { transactionCapsule.setBlockNum(newBlock.getNum()); cumulativeEnergyUsed += postTransactionTrigger(transactionCapsule, newBlock, i, - cumulativeEnergyUsed, cumulativeLogCount, transactionInfo, energyUnitPrice); + cumulativeEnergyUsed, cumulativeLogCount, transactionInfo, energyUnitPrice, removed); cumulativeLogCount += transactionInfo.getLogCount(); } @@ -2261,12 +2284,12 @@ private void processTransactionTrigger(BlockCapsule newBlock) { newBlock.getNum(), "the sizes of transactionInfoList and transactionCapsuleList are not equal"); for (TransactionCapsule e : newBlock.getTransactions()) { - postTransactionTrigger(e, newBlock); + postTransactionTrigger(e, newBlock, removed); } } } else { for (TransactionCapsule e : newBlock.getTransactions()) { - postTransactionTrigger(e, newBlock); + postTransactionTrigger(e, newBlock, removed); } } } @@ -2290,7 +2313,12 @@ private void reOrgLogsFilter() { // (oldest-first). Must be kept in sync with the FULL-filter section of blockTrigger. // Solidity filters are intentionally not posted here: solidification events for these // blocks arrive later, when postSolidityFilter runs against the then-canonical chain. - private void reApplyLogsFilter(List newBranch) { + // Re-emit the per-block subscription events for a newly-applied fork branch after a chain + // reorg: JSON-RPC block/logs filters and event-subscribe block/transaction triggers. The + // fork-switch path returns before blockTrigger() runs, so without this these forward events + // would be lost for the re-applied blocks (contract triggers are already re-emitted during + // applyBlock). All emitted as forward (removed=false): these blocks are now canonical. + private void reApplyBlockEvents(List newBranch) { if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) { for (KhaosBlock khaosBlock : newBranch) { BlockCapsule blockCapsule = khaosBlock.getBlk(); @@ -2298,6 +2326,12 @@ private void reApplyLogsFilter(List newBranch) { postLogsFilter(blockCapsule, false, false); } } + + if (EventPluginLoader.getInstance().getVersion() == 0) { + for (KhaosBlock khaosBlock : newBranch) { + postBlockTrigger(khaosBlock.getBlk(), false); + } + } } private void postBlockFilter(final BlockCapsule blockCapsule, boolean solidified) { @@ -2324,39 +2358,26 @@ private void postLogsFilter(final BlockCapsule blockCapsule, boolean solidified, } } - void postBlockTrigger(final BlockCapsule blockCapsule) { - // process block trigger + // Real-time block/transaction triggers for a single block. The solidified-mode batch is + // handled in postSolidityTrigger (driven by solidification advancement), so here we only + // emit for triggers configured as non-solidified. {@code removed=true} re-emits the same + // trigger when the block is rolled back by a chain reorg (see reOrgBlockTrigger). + void postBlockTrigger(final BlockCapsule blockCapsule, boolean removed) { long solidityBlkNum = getDynamicPropertiesStore().getLatestSolidifiedBlockNum(); - if (eventPluginLoaded && EventPluginLoader.getInstance().isBlockLogTriggerEnable()) { - List capsuleList = new ArrayList<>(); - if (EventPluginLoader.getInstance().isBlockLogTriggerSolidified()) { - capsuleList = getContinuousBlockCapsule(solidityBlkNum); - } else { - capsuleList.add(blockCapsule); - } - for (BlockCapsule capsule : capsuleList) { - BlockLogTriggerCapsule blockLogTriggerCapsule = new BlockLogTriggerCapsule(capsule); - blockLogTriggerCapsule.setLatestSolidifiedBlockNumber(solidityBlkNum); - if (!triggerCapsuleQueue.offer(blockLogTriggerCapsule)) { - logger.info("Too many triggers, block trigger lost: {}.", capsule.getBlockId()); - } + if (eventPluginLoaded && EventPluginLoader.getInstance().isBlockLogTriggerEnable() + && !EventPluginLoader.getInstance().isBlockLogTriggerSolidified()) { + BlockLogTriggerCapsule blockLogTriggerCapsule = new BlockLogTriggerCapsule(blockCapsule); + blockLogTriggerCapsule.setLatestSolidifiedBlockNumber(solidityBlkNum); + blockLogTriggerCapsule.setRemoved(removed); + if (!triggerCapsuleQueue.offer(blockLogTriggerCapsule)) { + logger.info("Too many triggers, block trigger lost: {}.", blockCapsule.getBlockId()); } } - // process transaction trigger - if (eventPluginLoaded && EventPluginLoader.getInstance().isTransactionLogTriggerEnable()) { - List capsuleList = new ArrayList<>(); - if (EventPluginLoader.getInstance().isTransactionLogTriggerSolidified()) { - capsuleList = getContinuousBlockCapsule(solidityBlkNum); - } else { - // need to reset block - capsuleList.add(blockCapsule); - } - - for (BlockCapsule capsule : capsuleList) { - processTransactionTrigger(capsule); - } + if (eventPluginLoaded && EventPluginLoader.getInstance().isTransactionLogTriggerEnable() + && !EventPluginLoader.getInstance().isTransactionLogTriggerSolidified()) { + processTransactionTrigger(blockCapsule, removed); } } @@ -2382,11 +2403,13 @@ private List getContinuousBlockCapsule(long solidityBlkNum) { // cumulativeEnergyUsed is the total of energy used before the current transaction private long postTransactionTrigger(final TransactionCapsule trxCap, final BlockCapsule blockCap, int index, long preCumulativeEnergyUsed, - long cumulativeLogCount, final TransactionInfo transactionInfo, long energyUnitPrice) { + long cumulativeLogCount, final TransactionInfo transactionInfo, long energyUnitPrice, + boolean removed) { TransactionLogTriggerCapsule trx = new TransactionLogTriggerCapsule(trxCap, blockCap, index, preCumulativeEnergyUsed, cumulativeLogCount, transactionInfo, energyUnitPrice); trx.setLatestSolidifiedBlockNumber(getDynamicPropertiesStore() .getLatestSolidifiedBlockNum()); + trx.setRemoved(removed); if (!triggerCapsuleQueue.offer(trx)) { logger.info("Too many triggers, transaction trigger lost: {}.", trxCap.getTransactionId()); } @@ -2396,10 +2419,11 @@ private long postTransactionTrigger(final TransactionCapsule trxCap, private void postTransactionTrigger(final TransactionCapsule trxCap, - final BlockCapsule blockCap) { + final BlockCapsule blockCap, boolean removed) { TransactionLogTriggerCapsule trx = new TransactionLogTriggerCapsule(trxCap, blockCap); trx.setLatestSolidifiedBlockNumber(getDynamicPropertiesStore() .getLatestSolidifiedBlockNum()); + trx.setRemoved(removed); if (!triggerCapsuleQueue.offer(trx)) { logger.info("Too many triggers, transaction trigger lost: {}.", trxCap.getTransactionId()); } @@ -2424,6 +2448,26 @@ private void reOrgContractTrigger() { clearSolidityContractTriggerCache(getHeadBlockNum()); } + // On a chain reorg, re-emit the block/transaction triggers of the block being erased with + // removed=true, so subscribers can roll back. Only real-time (non-solidified) triggers were + // ever emitted for this block, so postBlockTrigger(.., true) naturally no-ops in solidified + // mode. Called in the erase loop before eraseBlock(), so the old head is still current head. + private void reOrgBlockTrigger() { + if (eventPluginLoaded + && (EventPluginLoader.getInstance().isBlockLogTriggerEnable() + || EventPluginLoader.getInstance().isTransactionLogTriggerEnable())) { + logger.info("Switch fork occurred, post reOrgBlockTrigger."); + try { + BlockCapsule oldHeadBlock = chainBaseManager.getBlockById( + getDynamicPropertiesStore().getLatestBlockHeaderHash()); + postBlockTrigger(oldHeadBlock, true); + } catch (BadItemException | ItemNotFoundException e) { + logger.error("Block header hash does not exist or is bad: {}.", + getDynamicPropertiesStore().getLatestBlockHeaderHash()); + } + } + } + private void clearSolidityContractTriggerCache(long blockNum) { if (eventPluginLoaded && (EventPluginLoader.getInstance().isSolidityEventTriggerEnable() diff --git a/framework/src/main/java/org/tron/core/services/event/RealtimeEventService.java b/framework/src/main/java/org/tron/core/services/event/RealtimeEventService.java index 5aee55b1c13..cef16cd81c1 100644 --- a/framework/src/main/java/org/tron/core/services/event/RealtimeEventService.java +++ b/framework/src/main/java/org/tron/core/services/event/RealtimeEventService.java @@ -12,7 +12,6 @@ import org.tron.common.es.ExecutorServiceManager; import org.tron.common.logsfilter.EventPluginLoader; import org.tron.common.logsfilter.trigger.Trigger; -import org.tron.core.db.Manager; import org.tron.core.services.event.bo.BlockEvent; import org.tron.core.services.event.bo.Event; @@ -25,9 +24,6 @@ public class RealtimeEventService { @Getter private static Object contractLock = new Object(); - @Autowired - private Manager manager; - @Autowired private SolidEventService solidEventService; @@ -77,25 +73,31 @@ public synchronized void work() { public void flush(BlockEvent blockEvent, boolean isRemove) { logger.info("Flush realtime event {}", blockEvent.getBlockId().getString()); + // Post block/transaction triggers synchronously to the plugin (processTrigger -> + // EventPluginLoader serializes immediately) instead of the async triggerCapsuleQueue: the + // capsule is a shared cached object whose removed flag is set per-flush, so an async consumer + // could read it after a later flush overwrote it. This mirrors how contract triggers below + // are posted directly. isRemove=true re-emits the block/transaction as rolled back on a reorg. if (instance.isBlockLogTriggerEnable() - && !instance.isBlockLogTriggerSolidified() - && !isRemove) { + && !instance.isBlockLogTriggerSolidified()) { if (blockEvent.getBlockLogTriggerCapsule() == null) { logger.warn("BlockLogTriggerCapsule is null. {}", blockEvent.getBlockId().getString()); } else { - manager.getTriggerCapsuleQueue().offer(blockEvent.getBlockLogTriggerCapsule()); + blockEvent.getBlockLogTriggerCapsule().setRemoved(isRemove); + blockEvent.getBlockLogTriggerCapsule().processTrigger(); } } if (instance.isTransactionLogTriggerEnable() - && !instance.isTransactionLogTriggerSolidified() - && !isRemove) { + && !instance.isTransactionLogTriggerSolidified()) { if (blockEvent.getTransactionLogTriggerCapsules() == null) { logger.warn("TransactionLogTriggerCapsules is null. {}", blockEvent.getBlockId().getString()); } else { - blockEvent.getTransactionLogTriggerCapsules().forEach(v -> - manager.getTriggerCapsuleQueue().offer(v)); + blockEvent.getTransactionLogTriggerCapsules().forEach(v -> { + v.setRemoved(isRemove); + v.processTrigger(); + }); } } diff --git a/framework/src/test/java/org/tron/common/logsfilter/TransactionLogTriggerCapsuleTest.java b/framework/src/test/java/org/tron/common/logsfilter/TransactionLogTriggerCapsuleTest.java index ce0f63ef7a4..1d559adb6d0 100644 --- a/framework/src/test/java/org/tron/common/logsfilter/TransactionLogTriggerCapsuleTest.java +++ b/framework/src/test/java/org/tron/common/logsfilter/TransactionLogTriggerCapsuleTest.java @@ -35,6 +35,24 @@ public void setup() { System.currentTimeMillis(), Sha256Hash.ZERO_HASH.getByteString()); } + @Test + public void testSetRemoved() { + BalanceContract.TransferContract.Builder builder = + BalanceContract.TransferContract.newBuilder() + .setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString(OWNER_ADDRESS))) + .setToAddress(ByteString.copyFrom(ByteArray.fromHexString(RECEIVER_ADDRESS))) + .setAmount(1000L); + transactionCapsule = new TransactionCapsule(builder.build(), + Protocol.Transaction.Contract.ContractType.TransferContract); + TransactionLogTriggerCapsule triggerCapsule = + new TransactionLogTriggerCapsule(transactionCapsule, blockCapsule); + + // default is false (forward emit); reorg rollback sets it to true + Assert.assertFalse(triggerCapsule.getTransactionLogTrigger().isRemoved()); + triggerCapsule.setRemoved(true); + Assert.assertTrue(triggerCapsule.getTransactionLogTrigger().isRemoved()); + } + @Test public void testConstructorWithUnfreezeBalanceTrxCapsule() { BalanceContract.UnfreezeBalanceContract.Builder builder2 = diff --git a/framework/src/test/java/org/tron/common/logsfilter/capsule/BlockLogTriggerCapsuleTest.java b/framework/src/test/java/org/tron/common/logsfilter/capsule/BlockLogTriggerCapsuleTest.java index f77869b8650..60e51144a40 100644 --- a/framework/src/test/java/org/tron/common/logsfilter/capsule/BlockLogTriggerCapsuleTest.java +++ b/framework/src/test/java/org/tron/common/logsfilter/capsule/BlockLogTriggerCapsuleTest.java @@ -32,4 +32,12 @@ public void testSetLatestSolidifiedBlockNumber() { Assert.assertEquals(100, blockLogTriggerCapsule.getBlockLogTrigger().getLatestSolidifiedBlockNumber()); } + + @Test + public void testSetRemoved() { + // default is false (forward emit); reorg rollback sets it to true + Assert.assertFalse(blockLogTriggerCapsule.getBlockLogTrigger().isRemoved()); + blockLogTriggerCapsule.setRemoved(true); + Assert.assertTrue(blockLogTriggerCapsule.getBlockLogTrigger().isRemoved()); + } } diff --git a/framework/src/test/java/org/tron/core/db/ManagerTest.java b/framework/src/test/java/org/tron/core/db/ManagerTest.java index e342fa43222..b31af7557fe 100755 --- a/framework/src/test/java/org/tron/core/db/ManagerTest.java +++ b/framework/src/test/java/org/tron/core/db/ManagerTest.java @@ -2,6 +2,7 @@ import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -41,8 +42,11 @@ import org.tron.common.crypto.ECKey; import org.tron.common.logsfilter.EventPluginLoader; import org.tron.common.logsfilter.capsule.BlockFilterCapsule; +import org.tron.common.logsfilter.capsule.BlockLogTriggerCapsule; import org.tron.common.logsfilter.capsule.FilterTriggerCapsule; import org.tron.common.logsfilter.capsule.LogsFilterCapsule; +import org.tron.common.logsfilter.capsule.TransactionLogTriggerCapsule; +import org.tron.common.logsfilter.capsule.TriggerCapsule; import org.tron.common.logsfilter.trigger.ContractLogTrigger; import org.tron.common.parameter.CommonParameter; import org.tron.common.runtime.RuntimeImpl; @@ -1381,7 +1385,8 @@ public void isExchangeTransactionNonExchangeContractReturnsFalse() throws Except @Test public void blockTrigger() { Manager manager = spy(new Manager()); - doThrow(new RuntimeException("postBlockTrigger mock")).when(manager).postBlockTrigger(any()); + doThrow(new RuntimeException("postBlockTrigger mock")).when(manager) + .postBlockTrigger(any(), anyBoolean()); TronError thrown = Assert.assertThrows(TronError.class, () -> manager.blockTrigger(new BlockCapsule(Block.newBuilder().build()), 1, 1)); Assert.assertEquals(TronError.ErrCode.EVENT_SUBSCRIBE_ERROR, thrown.getErrCode()); @@ -1425,6 +1430,142 @@ public void testReOrgContractTriggerClearsMap() throws Exception { } } + private EventPluginLoader installMockLoader() throws Exception { + ReflectUtils.setFieldValue(dbManager, "eventPluginLoaded", true); + EventPluginLoader mockLoader = mock(EventPluginLoader.class); + Field instanceField = EventPluginLoader.class.getDeclaredField("instance"); + instanceField.setAccessible(true); + instanceField.set(null, mockLoader); + return mockLoader; + } + + private void restoreLoader(EventPluginLoader original) throws Exception { + Field instanceField = EventPluginLoader.class.getDeclaredField("instance"); + instanceField.setAccessible(true); + instanceField.set(null, original); + ReflectUtils.setFieldValue(dbManager, "eventPluginLoaded", false); + dbManager.getTriggerCapsuleQueue().clear(); + } + + private BlockCapsule blockWithOneTransfer() { + BlockCapsule block = new BlockCapsule(1, chainManager.getGenesisBlockId(), + System.currentTimeMillis(), ByteString.EMPTY); + TransferContract tc = TransferContract.newBuilder() + .setOwnerAddress(ByteString.copyFrom(new byte[21])) + .setToAddress(ByteString.copyFrom(new byte[21])) + .setAmount(1L).build(); + block.addTransaction(new TransactionCapsule(tc, ContractType.TransferContract)); + return block; + } + + @Test + public void testReOrgBlockTriggerRemoved() throws Exception { + // version-0 reorg emit core: postBlockTrigger threads the removed flag onto both the block + // and transaction triggers. reOrgBlockTrigger calls postBlockTrigger(block, true) (rollback), + // reApplyBlockEvents calls postBlockTrigger(block, false) (forward); both delegate here. + Field instanceField = EventPluginLoader.class.getDeclaredField("instance"); + instanceField.setAccessible(true); + EventPluginLoader originalLoader = (EventPluginLoader) instanceField.get(null); + EventPluginLoader mockLoader = installMockLoader(); + when(mockLoader.isBlockLogTriggerEnable()).thenReturn(true); + when(mockLoader.isBlockLogTriggerSolidified()).thenReturn(false); + when(mockLoader.isTransactionLogTriggerEnable()).thenReturn(true); + when(mockLoader.isTransactionLogTriggerSolidified()).thenReturn(false); + when(mockLoader.isTransactionLogTriggerEthCompatible()).thenReturn(false); + + BlockingQueue queue = dbManager.getTriggerCapsuleQueue(); + queue.clear(); + BlockCapsule block = blockWithOneTransfer(); + try { + // rollback: block + transaction triggers re-emitted with removed=true + dbManager.postBlockTrigger(block, true); + Assert.assertEquals(2, queue.size()); + Assert.assertTrue(((BlockLogTriggerCapsule) queue.poll()).getBlockLogTrigger().isRemoved()); + Assert.assertTrue(((TransactionLogTriggerCapsule) queue.poll()) + .getTransactionLogTrigger().isRemoved()); + + // forward: removed=false + dbManager.postBlockTrigger(block, false); + Assert.assertEquals(2, queue.size()); + Assert.assertFalse(((BlockLogTriggerCapsule) queue.poll()).getBlockLogTrigger().isRemoved()); + Assert.assertFalse(((TransactionLogTriggerCapsule) queue.poll()) + .getTransactionLogTrigger().isRemoved()); + } finally { + restoreLoader(originalLoader); + } + } + + @Test + public void testReApplyBlockEvents() throws Exception { + Field instanceField = EventPluginLoader.class.getDeclaredField("instance"); + instanceField.setAccessible(true); + EventPluginLoader originalLoader = (EventPluginLoader) instanceField.get(null); + EventPluginLoader mockLoader = installMockLoader(); + when(mockLoader.getVersion()).thenReturn(0); + when(mockLoader.isBlockLogTriggerEnable()).thenReturn(true); + when(mockLoader.isBlockLogTriggerSolidified()).thenReturn(false); + when(mockLoader.isTransactionLogTriggerEnable()).thenReturn(false); + + BlockingQueue queue = dbManager.getTriggerCapsuleQueue(); + queue.clear(); + BlockCapsule block = new BlockCapsule(1, chainManager.getGenesisBlockId(), + System.currentTimeMillis(), ByteString.EMPTY); + List branch = new ArrayList<>(); + branch.add(new KhaosDatabase.KhaosBlock(block)); + try { + Method m = Manager.class.getDeclaredMethod("reApplyBlockEvents", List.class); + m.setAccessible(true); + m.invoke(dbManager, branch); + // forward block trigger emitted for the re-applied fork branch (removed=false) + Assert.assertEquals(1, queue.size()); + Assert.assertFalse(((BlockLogTriggerCapsule) queue.poll()) + .getBlockLogTrigger().isRemoved()); + } finally { + restoreLoader(originalLoader); + } + } + + @Test + public void testReOrgBlockTrigger() throws Exception { + Field instanceField = EventPluginLoader.class.getDeclaredField("instance"); + instanceField.setAccessible(true); + EventPluginLoader originalLoader = (EventPluginLoader) instanceField.get(null); + EventPluginLoader mockLoader = installMockLoader(); + when(mockLoader.isBlockLogTriggerEnable()).thenReturn(true); + when(mockLoader.isTransactionLogTriggerEnable()).thenReturn(false); + try { + Method m = Manager.class.getDeclaredMethod("reOrgBlockTrigger"); + m.setAccessible(true); + // exercises the fetch of the old head block + try/catch; must not throw + m.invoke(dbManager); + } finally { + restoreLoader(originalLoader); + } + } + + @Test + public void testPostSolidityTriggerSolidified() throws Exception { + Field instanceField = EventPluginLoader.class.getDeclaredField("instance"); + instanceField.setAccessible(true); + EventPluginLoader originalLoader = (EventPluginLoader) instanceField.get(null); + EventPluginLoader mockLoader = installMockLoader(); + when(mockLoader.isBlockLogTriggerEnable()).thenReturn(true); + when(mockLoader.isBlockLogTriggerSolidified()).thenReturn(true); + when(mockLoader.isTransactionLogTriggerEnable()).thenReturn(true); + when(mockLoader.isTransactionLogTriggerSolidified()).thenReturn(true); + when(mockLoader.isTransactionLogTriggerEthCompatible()).thenReturn(false); + // make getContinuousBlockCapsule cover the current head block + ReflectUtils.setFieldValue(dbManager, "lastUsedSolidityNum", -1L); + try { + Method m = Manager.class.getDeclaredMethod("postSolidityTrigger", long.class); + m.setAccessible(true); + // exercises the solidified-mode block/transaction batch emission + m.invoke(dbManager, dbManager.getHeadBlockNum()); + } finally { + restoreLoader(originalLoader); + } + } + @Test public void testClearSolidityContractTriggerCache() throws Exception { long blockNum = 999L; diff --git a/framework/src/test/java/org/tron/core/event/RealtimeEventServiceTest.java b/framework/src/test/java/org/tron/core/event/RealtimeEventServiceTest.java index 91dcea71322..f58f725195c 100644 --- a/framework/src/test/java/org/tron/core/event/RealtimeEventServiceTest.java +++ b/framework/src/test/java/org/tron/core/event/RealtimeEventServiceTest.java @@ -5,21 +5,17 @@ import com.google.protobuf.ByteString; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.BlockingQueue; -import org.eclipse.jetty.util.BlockingArrayQueue; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; import org.tron.common.logsfilter.EventPluginLoader; import org.tron.common.logsfilter.capsule.BlockLogTriggerCapsule; import org.tron.common.logsfilter.capsule.TransactionLogTriggerCapsule; -import org.tron.common.logsfilter.capsule.TriggerCapsule; import org.tron.common.logsfilter.trigger.ContractEventTrigger; import org.tron.common.logsfilter.trigger.ContractLogTrigger; import org.tron.common.utils.ReflectUtils; import org.tron.common.utils.Sha256Hash; import org.tron.core.capsule.BlockCapsule; -import org.tron.core.db.Manager; import org.tron.core.services.event.BlockEventCache; import org.tron.core.services.event.RealtimeEventService; import org.tron.core.services.event.bo.BlockEvent; @@ -56,39 +52,39 @@ public void test() throws Exception { EventPluginLoader instance = mock(EventPluginLoader.class); ReflectUtils.setFieldValue(realtimeEventService, "instance", instance); - BlockingQueue queue = new BlockingArrayQueue<>(); - Manager manager = mock(Manager.class); - Mockito.when(manager.getTriggerCapsuleQueue()).thenReturn(queue); - ReflectUtils.setFieldValue(realtimeEventService, "manager", manager); - BlockCapsule blockCapsule = new BlockCapsule(0L, Sha256Hash.ZERO_HASH, 0L, ByteString.copyFrom(BlockEventCacheTest.getBlockId())); - be2.setBlockLogTriggerCapsule(new BlockLogTriggerCapsule(blockCapsule)); + // spy so processTrigger() is a no-op (does not reach the real EventPluginLoader), + // while setRemoved() still mutates the real trigger so the removed flag can be asserted. + BlockLogTriggerCapsule blockCap = Mockito.spy(new BlockLogTriggerCapsule(blockCapsule)); + Mockito.doNothing().when(blockCap).processTrigger(); + be2.setBlockLogTriggerCapsule(blockCap); Mockito.when(instance.isBlockLogTriggerEnable()).thenReturn(true); Mockito.when(instance.isBlockLogTriggerSolidified()).thenReturn(false); - realtimeEventService.add(event); - realtimeEventService.work(); - - Assert.assertEquals(0, queue.size()); - - event = new Event(be2, false); - realtimeEventService.add(event); - realtimeEventService.work(); + // reorg rollback: block trigger re-emitted (posted synchronously) with removed=true + realtimeEventService.flush(be2, true); + Assert.assertTrue(blockCap.getBlockLogTrigger().isRemoved()); - Assert.assertEquals(1, queue.size()); + // forward: block trigger posted with removed=false + realtimeEventService.flush(be2, false); + Assert.assertFalse(blockCap.getBlockLogTrigger().isRemoved()); + // posted directly to the plugin both times, never via the async queue + Mockito.verify(blockCap, Mockito.times(2)).processTrigger(); be2.setBlockLogTriggerCapsule(null); - queue.poll(); + TransactionLogTriggerCapsule txCap = mock(TransactionLogTriggerCapsule.class); List list = new ArrayList<>(); - list.add(mock(TransactionLogTriggerCapsule.class)); + list.add(txCap); be2.setTransactionLogTriggerCapsules(list); - Mockito.when(instance.isTransactionLogTriggerEnable()).thenReturn(true); Mockito.when(instance.isTransactionLogTriggerSolidified()).thenReturn(false); - realtimeEventService.flush(be2, event.isRemove()); - Assert.assertEquals(1, queue.size()); + + // rollback: tx trigger posted synchronously with removed=true + realtimeEventService.flush(be2, true); + Mockito.verify(txCap).setRemoved(true); + Mockito.verify(txCap).processTrigger(); be2.setTransactionLogTriggerCapsules(null);