diff --git a/framework/src/main/java/org/tron/core/db/Manager.java b/framework/src/main/java/org/tron/core/db/Manager.java index d2aa42dfce..01da300261 100644 --- a/framework/src/main/java/org/tron/core/db/Manager.java +++ b/framework/src/main/java/org/tron/core/db/Manager.java @@ -1201,6 +1201,8 @@ private void switchFork(BlockCapsule newHead) } } } + // only reached when the whole new branch applied cleanly; a failed switch rethrows above + reApplyLogsFilter(first); } } @@ -2284,6 +2286,20 @@ private void reOrgLogsFilter() { } } + // Post the FULL-stream block and logs filters for each block of the new canonical branch + // (oldest-first). Must be kept in sync with the FULL-filter section of blockTrigger. + // Solidity filters are intentionally not posted here: solidification events for these + // blocks arrive later, when postSolidityFilter runs against the then-canonical chain. + private void reApplyLogsFilter(List newBranch) { + if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) { + for (KhaosBlock khaosBlock : newBranch) { + BlockCapsule blockCapsule = khaosBlock.getBlk(); + postBlockFilter(blockCapsule, false); + postLogsFilter(blockCapsule, false, false); + } + } + } + private void postBlockFilter(final BlockCapsule blockCapsule, boolean solidified) { BlockFilterCapsule blockFilterCapsule = new BlockFilterCapsule(blockCapsule, solidified); diff --git a/framework/src/test/java/org/tron/core/db/ManagerTest.java b/framework/src/test/java/org/tron/core/db/ManagerTest.java index 717d6c4cf6..e342fa4322 100755 --- a/framework/src/test/java/org/tron/core/db/ManagerTest.java +++ b/framework/src/test/java/org/tron/core/db/ManagerTest.java @@ -40,7 +40,11 @@ import org.tron.common.TestConstants; import org.tron.common.crypto.ECKey; import org.tron.common.logsfilter.EventPluginLoader; +import org.tron.common.logsfilter.capsule.BlockFilterCapsule; +import org.tron.common.logsfilter.capsule.FilterTriggerCapsule; +import org.tron.common.logsfilter.capsule.LogsFilterCapsule; import org.tron.common.logsfilter.trigger.ContractLogTrigger; +import org.tron.common.parameter.CommonParameter; import org.tron.common.runtime.RuntimeImpl; import org.tron.common.utils.ByteArray; import org.tron.common.utils.Commons; @@ -1540,4 +1544,132 @@ public void adjustBalance(AccountStore accountStore, byte[] accountAddress, long Commons.adjustBalance(accountStore, accountAddress, amount, chainManager.getDynamicPropertiesStore().disableJavaLangMath()); } + + /** + * Drives a real reorg and asserts what Manager posts to the jsonrpc filterCapsuleQueue. + */ + @Test + public void switchForkShouldPostFullNodeFilterForNewBranch() throws Exception { + CommonParameter.getInstance().jsonRpcHttpFullNodeEnable = true; + // filterProcessLoop only starts when isJsonRpcFilterEnabled() held at Manager.init() time; it + // was false then, so filterCapsuleQueue is produce-only here and fully observable. + + // bootstrap a head with a known witness + String key = PublicMethod.getRandomPrivateKey(); + byte[] privateKey = ByteArray.fromHexString(key); + final ECKey ecKey = ECKey.fromPrivate(privateKey); + byte[] address = ecKey.getAddress(); + ByteString addressByte = ByteString.copyFrom(address); + chainManager.getAccountStore().put(addressByte.toByteArray(), + new AccountCapsule(Protocol.Account.newBuilder().setAddress(addressByte).build())); + WitnessCapsule witnessCapsule = new WitnessCapsule(addressByte); + chainManager.getWitnessScheduleStore().saveActiveWitnesses(new ArrayList<>()); + chainManager.addWitness(addressByte); + chainManager.getWitnessStore().put(address, witnessCapsule); + Block block = blockGenerate.getSignedBlock( + witnessCapsule.getAddress(), 1533529947843L, privateKey); + dbManager.pushBlock(new BlockCapsule(block)); + + Map keys = addTestWitnessAndAccount(); + keys.put(addressByte, key); + + // fund an owner; transfers go owner -> witness 'address' (an existing account) + ECKey ownerKey = new ECKey(Utils.getRandom()); + byte[] owner = ownerKey.getAddress(); + AccountCapsule ownerAccount = new AccountCapsule( + Protocol.Account.newBuilder().setAddress(ByteString.copyFrom(owner)).build()); + ownerAccount.setBalance(1_000_000_000L); + chainManager.getAccountStore().put(owner, ownerAccount); + + long t = 1533529947843L; + long base = chainManager.getDynamicPropertiesStore().getLatestBlockHeaderNumber(); + + // common ancestor P (empty) — fork point and tapos reference + BlockCapsule p = createTestBlockCapsule(t + 3000, base + 1, + chainManager.getDynamicPropertiesStore().getLatestBlockHeaderHash().getByteString(), keys); + dbManager.pushBlock(p); + + long expiration = t + 1_000_000L; + BlockingQueue queue = + ReflectUtils.getFieldValue(dbManager, "filterCapsuleQueue"); + queue.clear(); + + // old branch: A carries a transfer; applied via the normal extend path + BlockCapsule a = blockWithTransfer(t + 6000, base + 2, p.getBlockId().getByteString(), keys, + transfer(owner, address, 1L, p, expiration)); + dbManager.pushBlock(a); + Assert.assertEquals("control: head should be A after normal extend", + a.getBlockId(), chainManager.getDynamicPropertiesStore().getLatestBlockHeaderHash()); + Assert.assertTrue("control: normal-path block A's logs must reach FULL stream (added)", + hasLogsFilterCapsule(queue, a, false)); + Assert.assertTrue("control: normal-path block A must reach the FULL block-filter stream", + hasBlockFilterCapsule(queue, a)); + + // heavier competing branch P -> B1 -> B2, each carrying a transfer, to force switchFork + BlockCapsule b1 = blockWithTransfer(t + 6001, base + 2, p.getBlockId().getByteString(), keys, + transfer(owner, address, 2L, p, expiration)); + dbManager.pushBlock(b1); // num <= head -> kept in khaosDb, no switch yet + BlockCapsule b2 = blockWithTransfer(t + 9000, base + 3, b1.getBlockId().getByteString(), keys, + transfer(owner, address, 3L, p, expiration)); + dbManager.pushBlock(b2); // num > head & parent != head -> triggers switchFork + + Assert.assertEquals("reorg must switch the canonical head to the competing branch (B2)", + b2.getBlockId(), chainManager.getDynamicPropertiesStore().getLatestBlockHeaderHash()); + + // reorg withdraws the orphaned old-branch logs (removed=true) + Assert.assertTrue("reorg: orphaned block A's logs must be withdrawn (removed=true)", + hasLogsFilterCapsule(queue, a, true)); + // the fix: new canonical blocks' logs and block filters are delivered + Assert.assertTrue("reorg: new canonical block B1's logs must reach FULL stream (added)", + hasLogsFilterCapsule(queue, b1, false)); + Assert.assertTrue("reorg: new canonical block B2's logs must reach FULL stream (added)", + hasLogsFilterCapsule(queue, b2, false)); + Assert.assertTrue("reorg: new canonical block B1 must reach the FULL block-filter stream", + hasBlockFilterCapsule(queue, b1)); + Assert.assertTrue("reorg: new canonical block B2 must reach the FULL block-filter stream", + hasBlockFilterCapsule(queue, b2)); + } + + private TransactionCapsule transfer(byte[] owner, byte[] to, long amount, + BlockCapsule refBlock, long expiration) { + TransferContract contract = TransferContract.newBuilder() + .setOwnerAddress(ByteString.copyFrom(owner)) + .setToAddress(ByteString.copyFrom(to)) + .setAmount(amount).build(); + TransactionCapsule tx = new TransactionCapsule(contract, ContractType.TransferContract); + tx.setReference(refBlock.getNum(), refBlock.getBlockId().getBytes()); + tx.setExpiration(expiration); + return tx; + } + + private BlockCapsule blockWithTransfer(long time, long number, ByteString parentHash, + Map keys, TransactionCapsule tx) { + ByteString witnessAddress = dposSlot.getScheduledWitness(dposSlot.getSlot(time)); + BlockCapsule blockCapsule = new BlockCapsule(number, Sha256Hash.wrap(parentHash), time, + witnessAddress); + blockCapsule.addTransaction(tx); + blockCapsule.generatedByMyself = true; + blockCapsule.setMerkleRoot(); + blockCapsule.sign(ByteArray.fromHexString(keys.get(witnessAddress))); + return blockCapsule; + } + + private boolean hasLogsFilterCapsule(BlockingQueue queue, BlockCapsule b, + boolean removed) { + String blockHash = b.getBlockId().toString(); + return queue.stream() + .filter(c -> c instanceof LogsFilterCapsule) + .map(c -> (LogsFilterCapsule) c) + .anyMatch(c -> !c.isSolidified() && c.isRemoved() == removed + && blockHash.equals(c.getBlockHash())); + } + + private boolean hasBlockFilterCapsule(BlockingQueue queue, + BlockCapsule b) { + String blockHash = b.getBlockId().toString(); + return queue.stream() + .filter(c -> c instanceof BlockFilterCapsule) + .map(c -> (BlockFilterCapsule) c) + .anyMatch(c -> !c.isSolidified() && blockHash.equals(c.getBlockHash())); + } }