Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions framework/src/main/java/org/tron/core/db/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,8 @@ private void switchFork(BlockCapsule newHead)
}
}
}
// only reached when the whole new branch applied cleanly; a failed switch rethrows above
reApplyLogsFilter(first);
}

}
Expand Down Expand Up @@ -2284,6 +2286,20 @@ private void reOrgLogsFilter() {
}
}

// Post the FULL-stream block and logs filters for each block of the new canonical branch
// (oldest-first). Must be kept in sync with the FULL-filter section of blockTrigger.
// Solidity filters are intentionally not posted here: solidification events for these
// blocks arrive later, when postSolidityFilter runs against the then-canonical chain.
private void reApplyLogsFilter(List<KhaosBlock> newBranch) {
Comment thread
0xbigapple marked this conversation as resolved.
if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) {
for (KhaosBlock khaosBlock : newBranch) {
BlockCapsule blockCapsule = khaosBlock.getBlk();
postBlockFilter(blockCapsule, false);
postLogsFilter(blockCapsule, false, false);
}
}
}

private void postBlockFilter(final BlockCapsule blockCapsule, boolean solidified) {
BlockFilterCapsule blockFilterCapsule =
new BlockFilterCapsule(blockCapsule, solidified);
Expand Down
132 changes: 132 additions & 0 deletions framework/src/test/java/org/tron/core/db/ManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@
import org.tron.common.TestConstants;
import org.tron.common.crypto.ECKey;
import org.tron.common.logsfilter.EventPluginLoader;
import org.tron.common.logsfilter.capsule.BlockFilterCapsule;
import org.tron.common.logsfilter.capsule.FilterTriggerCapsule;
import org.tron.common.logsfilter.capsule.LogsFilterCapsule;
import org.tron.common.logsfilter.trigger.ContractLogTrigger;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.runtime.RuntimeImpl;
import org.tron.common.utils.ByteArray;
import org.tron.common.utils.Commons;
Expand Down Expand Up @@ -1540,4 +1544,132 @@ public void adjustBalance(AccountStore accountStore, byte[] accountAddress, long
Commons.adjustBalance(accountStore, accountAddress, amount,
chainManager.getDynamicPropertiesStore().disableJavaLangMath());
}

/**
* Drives a real reorg and asserts what Manager posts to the jsonrpc filterCapsuleQueue.
*/
@Test
public void switchForkShouldPostFullNodeFilterForNewBranch() throws Exception {
CommonParameter.getInstance().jsonRpcHttpFullNodeEnable = true;
// filterProcessLoop only starts when isJsonRpcFilterEnabled() held at Manager.init() time; it
// was false then, so filterCapsuleQueue is produce-only here and fully observable.

// bootstrap a head with a known witness
String key = PublicMethod.getRandomPrivateKey();
byte[] privateKey = ByteArray.fromHexString(key);
final ECKey ecKey = ECKey.fromPrivate(privateKey);
byte[] address = ecKey.getAddress();
ByteString addressByte = ByteString.copyFrom(address);
chainManager.getAccountStore().put(addressByte.toByteArray(),
new AccountCapsule(Protocol.Account.newBuilder().setAddress(addressByte).build()));
WitnessCapsule witnessCapsule = new WitnessCapsule(addressByte);
chainManager.getWitnessScheduleStore().saveActiveWitnesses(new ArrayList<>());
chainManager.addWitness(addressByte);
chainManager.getWitnessStore().put(address, witnessCapsule);
Block block = blockGenerate.getSignedBlock(
witnessCapsule.getAddress(), 1533529947843L, privateKey);
dbManager.pushBlock(new BlockCapsule(block));

Map<ByteString, String> keys = addTestWitnessAndAccount();
keys.put(addressByte, key);

// fund an owner; transfers go owner -> witness 'address' (an existing account)
ECKey ownerKey = new ECKey(Utils.getRandom());
byte[] owner = ownerKey.getAddress();
AccountCapsule ownerAccount = new AccountCapsule(
Protocol.Account.newBuilder().setAddress(ByteString.copyFrom(owner)).build());
ownerAccount.setBalance(1_000_000_000L);
chainManager.getAccountStore().put(owner, ownerAccount);

long t = 1533529947843L;
long base = chainManager.getDynamicPropertiesStore().getLatestBlockHeaderNumber();

// common ancestor P (empty) — fork point and tapos reference
BlockCapsule p = createTestBlockCapsule(t + 3000, base + 1,
chainManager.getDynamicPropertiesStore().getLatestBlockHeaderHash().getByteString(), keys);
dbManager.pushBlock(p);

long expiration = t + 1_000_000L;
BlockingQueue<FilterTriggerCapsule> queue =
ReflectUtils.getFieldValue(dbManager, "filterCapsuleQueue");
queue.clear();

// old branch: A carries a transfer; applied via the normal extend path
BlockCapsule a = blockWithTransfer(t + 6000, base + 2, p.getBlockId().getByteString(), keys,
transfer(owner, address, 1L, p, expiration));
dbManager.pushBlock(a);
Assert.assertEquals("control: head should be A after normal extend",
a.getBlockId(), chainManager.getDynamicPropertiesStore().getLatestBlockHeaderHash());
Assert.assertTrue("control: normal-path block A's logs must reach FULL stream (added)",
hasLogsFilterCapsule(queue, a, false));
Assert.assertTrue("control: normal-path block A must reach the FULL block-filter stream",
hasBlockFilterCapsule(queue, a));

// heavier competing branch P -> B1 -> B2, each carrying a transfer, to force switchFork
BlockCapsule b1 = blockWithTransfer(t + 6001, base + 2, p.getBlockId().getByteString(), keys,
transfer(owner, address, 2L, p, expiration));
dbManager.pushBlock(b1); // num <= head -> kept in khaosDb, no switch yet
BlockCapsule b2 = blockWithTransfer(t + 9000, base + 3, b1.getBlockId().getByteString(), keys,
transfer(owner, address, 3L, p, expiration));
dbManager.pushBlock(b2); // num > head & parent != head -> triggers switchFork

Assert.assertEquals("reorg must switch the canonical head to the competing branch (B2)",
b2.getBlockId(), chainManager.getDynamicPropertiesStore().getLatestBlockHeaderHash());

// reorg withdraws the orphaned old-branch logs (removed=true)
Assert.assertTrue("reorg: orphaned block A's logs must be withdrawn (removed=true)",
hasLogsFilterCapsule(queue, a, true));
// the fix: new canonical blocks' logs and block filters are delivered
Assert.assertTrue("reorg: new canonical block B1's logs must reach FULL stream (added)",
hasLogsFilterCapsule(queue, b1, false));
Assert.assertTrue("reorg: new canonical block B2's logs must reach FULL stream (added)",
hasLogsFilterCapsule(queue, b2, false));
Assert.assertTrue("reorg: new canonical block B1 must reach the FULL block-filter stream",
hasBlockFilterCapsule(queue, b1));
Assert.assertTrue("reorg: new canonical block B2 must reach the FULL block-filter stream",
hasBlockFilterCapsule(queue, b2));
}

private TransactionCapsule transfer(byte[] owner, byte[] to, long amount,
BlockCapsule refBlock, long expiration) {
TransferContract contract = TransferContract.newBuilder()
.setOwnerAddress(ByteString.copyFrom(owner))
.setToAddress(ByteString.copyFrom(to))
.setAmount(amount).build();
TransactionCapsule tx = new TransactionCapsule(contract, ContractType.TransferContract);
tx.setReference(refBlock.getNum(), refBlock.getBlockId().getBytes());
tx.setExpiration(expiration);
return tx;
}

private BlockCapsule blockWithTransfer(long time, long number, ByteString parentHash,
Map<ByteString, String> keys, TransactionCapsule tx) {
ByteString witnessAddress = dposSlot.getScheduledWitness(dposSlot.getSlot(time));
BlockCapsule blockCapsule = new BlockCapsule(number, Sha256Hash.wrap(parentHash), time,
witnessAddress);
blockCapsule.addTransaction(tx);
blockCapsule.generatedByMyself = true;
blockCapsule.setMerkleRoot();
blockCapsule.sign(ByteArray.fromHexString(keys.get(witnessAddress)));
return blockCapsule;
}

private boolean hasLogsFilterCapsule(BlockingQueue<FilterTriggerCapsule> queue, BlockCapsule b,
boolean removed) {
Comment thread
0xbigapple marked this conversation as resolved.
String blockHash = b.getBlockId().toString();
return queue.stream()
.filter(c -> c instanceof LogsFilterCapsule)
.map(c -> (LogsFilterCapsule) c)
.anyMatch(c -> !c.isSolidified() && c.isRemoved() == removed
&& blockHash.equals(c.getBlockHash()));
}

private boolean hasBlockFilterCapsule(BlockingQueue<FilterTriggerCapsule> queue,
BlockCapsule b) {
String blockHash = b.getBlockId().toString();
return queue.stream()
.filter(c -> c instanceof BlockFilterCapsule)
.map(c -> (BlockFilterCapsule) c)
.anyMatch(c -> !c.isSolidified() && blockHash.equals(c.getBlockHash()));
}
}
Loading