Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ public class BlockLogTrigger extends Trigger {
@Setter
private List<String> transactionList = new ArrayList<>();

// true when this block is being rolled back due to a chain reorg (fork switch);
// mirrors the Ethereum log "removed" semantics already used by ContractTrigger.
@Getter
@Setter
private boolean removed;

public BlockLogTrigger() {
setTriggerName(Trigger.BLOCK_TRIGGER_NAME);
}
Expand All @@ -44,6 +50,8 @@ public String toString() {
.append(transactionSize)
.append(", latestSolidifiedBlockNumber: ")
.append(latestSolidifiedBlockNumber)
.append(", removed: ")
.append(removed)
.append(", transactionList: ")
.append(transactionList).toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ public class TransactionLogTrigger extends Trigger {
@Setter
private Map<String, Long> extMap;

// true when this transaction is being rolled back due to a chain reorg (fork switch);
// mirrors the Ethereum log "removed" semantics already used by ContractTrigger.
@Getter
@Setter
private boolean removed;

public TransactionLogTrigger() {
setTriggerName(Trigger.TRANSACTION_TRIGGER_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public void setLatestSolidifiedBlockNumber(long latestSolidifiedBlockNumber) {
blockLogTrigger.setLatestSolidifiedBlockNumber(latestSolidifiedBlockNumber);
}

public void setRemoved(boolean removed) {
blockLogTrigger.setRemoved(removed);
}

@Override
public void processTrigger() {
EventPluginLoader.getInstance().postBlockTrigger(blockLogTrigger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ public void setLatestSolidifiedBlockNumber(long latestSolidifiedBlockNumber) {
transactionLogTrigger.setLatestSolidifiedBlockNumber(latestSolidifiedBlockNumber);
}

public void setRemoved(boolean removed) {
transactionLogTrigger.setRemoved(removed);
}

private List<InternalTransactionPojo> getInternalTransactionList(
List<InternalTransaction> internalTransactionList) {
List<InternalTransactionPojo> pojoList = new ArrayList<>();
Expand Down
120 changes: 82 additions & 38 deletions framework/src/main/java/org/tron/core/db/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,7 @@ private void switchFork(BlockCapsule newHead)
.equals(binaryTree.getValue().peekLast().getParentHash())) {
if (EventPluginLoader.getInstance().getVersion() == 0) {
reOrgContractTrigger();
reOrgBlockTrigger();
}
reOrgLogsFilter();
eraseBlock();
Expand Down Expand Up @@ -1202,7 +1203,7 @@ private void switchFork(BlockCapsule newHead)
}
}
// only reached when the whole new branch applied cleanly; a failed switch rethrows above
reApplyLogsFilter(first);
reApplyBlockEvents(first);
}

}
Expand Down Expand Up @@ -1435,9 +1436,10 @@ void blockTrigger(final BlockCapsule block, long oldSolid, long newSolid) {
return;
}

// if event subscribe is enabled, post block trigger to queue
postBlockTrigger(block);
// if event subscribe is enabled, post block trigger to queue (real-time, not removed)
postBlockTrigger(block, false);
// if event subscribe is enabled, post solidity trigger to queue
// (also emits solidified-mode block/transaction triggers)
postSolidityTrigger(newSolid);
} catch (Exception e) {
logger.error("Block trigger failed. head: {}, oldSolid: {}, newSolid: {}",
Expand Down Expand Up @@ -2205,6 +2207,27 @@ private void postSolidityFilter(final long oldSolidNum, final long latestSolidif
}

private void postSolidityTrigger(final long latestSolidifiedBlockNumber) {
// solidified-mode block trigger: emit the newly-solidified blocks (never removed,
// since solidified blocks cannot be reorged).
if (eventPluginLoaded && EventPluginLoader.getInstance().isBlockLogTriggerEnable()
&& EventPluginLoader.getInstance().isBlockLogTriggerSolidified()) {
for (BlockCapsule capsule : getContinuousBlockCapsule(latestSolidifiedBlockNumber)) {
BlockLogTriggerCapsule blockLogTriggerCapsule = new BlockLogTriggerCapsule(capsule);
blockLogTriggerCapsule.setLatestSolidifiedBlockNumber(latestSolidifiedBlockNumber);
if (!triggerCapsuleQueue.offer(blockLogTriggerCapsule)) {
logger.info("Too many triggers, block trigger lost: {}.", capsule.getBlockId());
}
}
}

// solidified-mode transaction trigger: emit transactions of the newly-solidified blocks.
if (eventPluginLoaded && EventPluginLoader.getInstance().isTransactionLogTriggerEnable()
&& EventPluginLoader.getInstance().isTransactionLogTriggerSolidified()) {
for (BlockCapsule capsule : getContinuousBlockCapsule(latestSolidifiedBlockNumber)) {
processTransactionTrigger(capsule, false);
}
}

if (eventPluginLoaded && EventPluginLoader.getInstance().isSolidityLogTriggerEnable()) {
for (Long i : Args.getSolidityContractLogTriggerMap().keySet()) {
postSolidityLogContractTrigger(i, latestSolidifiedBlockNumber);
Expand Down Expand Up @@ -2233,7 +2256,7 @@ private void postSolidityTrigger(final long latestSolidifiedBlockNumber) {
lastUsedSolidityNum = latestSolidifiedBlockNumber;
}

private void processTransactionTrigger(BlockCapsule newBlock) {
private void processTransactionTrigger(BlockCapsule newBlock, boolean removed) {
List<TransactionCapsule> transactionCapsuleList = newBlock.getTransactions();

// need to set eth compatible data from transactionInfoList
Expand All @@ -2252,7 +2275,7 @@ private void processTransactionTrigger(BlockCapsule newBlock) {
transactionCapsule.setBlockNum(newBlock.getNum());

cumulativeEnergyUsed += postTransactionTrigger(transactionCapsule, newBlock, i,
cumulativeEnergyUsed, cumulativeLogCount, transactionInfo, energyUnitPrice);
cumulativeEnergyUsed, cumulativeLogCount, transactionInfo, energyUnitPrice, removed);

cumulativeLogCount += transactionInfo.getLogCount();
}
Expand All @@ -2261,12 +2284,12 @@ private void processTransactionTrigger(BlockCapsule newBlock) {
newBlock.getNum(),
"the sizes of transactionInfoList and transactionCapsuleList are not equal");
for (TransactionCapsule e : newBlock.getTransactions()) {
postTransactionTrigger(e, newBlock);
postTransactionTrigger(e, newBlock, removed);
}
}
} else {
for (TransactionCapsule e : newBlock.getTransactions()) {
postTransactionTrigger(e, newBlock);
postTransactionTrigger(e, newBlock, removed);
}
}
}
Expand All @@ -2290,14 +2313,25 @@ private void reOrgLogsFilter() {
// (oldest-first). Must be kept in sync with the FULL-filter section of blockTrigger.
// Solidity filters are intentionally not posted here: solidification events for these
// blocks arrive later, when postSolidityFilter runs against the then-canonical chain.
private void reApplyLogsFilter(List<KhaosBlock> newBranch) {
// Re-emit the per-block subscription events for a newly-applied fork branch after a chain
// reorg: JSON-RPC block/logs filters and event-subscribe block/transaction triggers. The
// fork-switch path returns before blockTrigger() runs, so without this these forward events
// would be lost for the re-applied blocks (contract triggers are already re-emitted during
// applyBlock). All emitted as forward (removed=false): these blocks are now canonical.
private void reApplyBlockEvents(List<KhaosBlock> newBranch) {
if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) {
for (KhaosBlock khaosBlock : newBranch) {
BlockCapsule blockCapsule = khaosBlock.getBlk();
postBlockFilter(blockCapsule, false);
postLogsFilter(blockCapsule, false, false);
}
}

if (EventPluginLoader.getInstance().getVersion() == 0) {
for (KhaosBlock khaosBlock : newBranch) {
postBlockTrigger(khaosBlock.getBlk(), false);
}
}
}

private void postBlockFilter(final BlockCapsule blockCapsule, boolean solidified) {
Expand All @@ -2324,39 +2358,26 @@ private void postLogsFilter(final BlockCapsule blockCapsule, boolean solidified,
}
}

void postBlockTrigger(final BlockCapsule blockCapsule) {
// process block trigger
// Real-time block/transaction triggers for a single block. The solidified-mode batch is
// handled in postSolidityTrigger (driven by solidification advancement), so here we only
// emit for triggers configured as non-solidified. {@code removed=true} re-emits the same
// trigger when the block is rolled back by a chain reorg (see reOrgBlockTrigger).
void postBlockTrigger(final BlockCapsule blockCapsule, boolean removed) {
long solidityBlkNum = getDynamicPropertiesStore().getLatestSolidifiedBlockNum();
if (eventPluginLoaded && EventPluginLoader.getInstance().isBlockLogTriggerEnable()) {
List<BlockCapsule> capsuleList = new ArrayList<>();
if (EventPluginLoader.getInstance().isBlockLogTriggerSolidified()) {
capsuleList = getContinuousBlockCapsule(solidityBlkNum);
} else {
capsuleList.add(blockCapsule);
}

for (BlockCapsule capsule : capsuleList) {
BlockLogTriggerCapsule blockLogTriggerCapsule = new BlockLogTriggerCapsule(capsule);
blockLogTriggerCapsule.setLatestSolidifiedBlockNumber(solidityBlkNum);
if (!triggerCapsuleQueue.offer(blockLogTriggerCapsule)) {
logger.info("Too many triggers, block trigger lost: {}.", capsule.getBlockId());
}
if (eventPluginLoaded && EventPluginLoader.getInstance().isBlockLogTriggerEnable()
&& !EventPluginLoader.getInstance().isBlockLogTriggerSolidified()) {
BlockLogTriggerCapsule blockLogTriggerCapsule = new BlockLogTriggerCapsule(blockCapsule);
blockLogTriggerCapsule.setLatestSolidifiedBlockNumber(solidityBlkNum);
blockLogTriggerCapsule.setRemoved(removed);
if (!triggerCapsuleQueue.offer(blockLogTriggerCapsule)) {
logger.info("Too many triggers, block trigger lost: {}.", blockCapsule.getBlockId());
}
}

// process transaction trigger
if (eventPluginLoaded && EventPluginLoader.getInstance().isTransactionLogTriggerEnable()) {
List<BlockCapsule> capsuleList = new ArrayList<>();
if (EventPluginLoader.getInstance().isTransactionLogTriggerSolidified()) {
capsuleList = getContinuousBlockCapsule(solidityBlkNum);
} else {
// need to reset block
capsuleList.add(blockCapsule);
}

for (BlockCapsule capsule : capsuleList) {
processTransactionTrigger(capsule);
}
if (eventPluginLoaded && EventPluginLoader.getInstance().isTransactionLogTriggerEnable()
&& !EventPluginLoader.getInstance().isTransactionLogTriggerSolidified()) {
processTransactionTrigger(blockCapsule, removed);
}
}

Expand All @@ -2382,11 +2403,13 @@ private List<BlockCapsule> getContinuousBlockCapsule(long solidityBlkNum) {
// cumulativeEnergyUsed is the total of energy used before the current transaction
private long postTransactionTrigger(final TransactionCapsule trxCap,
final BlockCapsule blockCap, int index, long preCumulativeEnergyUsed,
long cumulativeLogCount, final TransactionInfo transactionInfo, long energyUnitPrice) {
long cumulativeLogCount, final TransactionInfo transactionInfo, long energyUnitPrice,
boolean removed) {
TransactionLogTriggerCapsule trx = new TransactionLogTriggerCapsule(trxCap, blockCap,
index, preCumulativeEnergyUsed, cumulativeLogCount, transactionInfo, energyUnitPrice);
trx.setLatestSolidifiedBlockNumber(getDynamicPropertiesStore()
.getLatestSolidifiedBlockNum());
trx.setRemoved(removed);
if (!triggerCapsuleQueue.offer(trx)) {
logger.info("Too many triggers, transaction trigger lost: {}.", trxCap.getTransactionId());
}
Expand All @@ -2396,10 +2419,11 @@ private long postTransactionTrigger(final TransactionCapsule trxCap,


private void postTransactionTrigger(final TransactionCapsule trxCap,
final BlockCapsule blockCap) {
final BlockCapsule blockCap, boolean removed) {
TransactionLogTriggerCapsule trx = new TransactionLogTriggerCapsule(trxCap, blockCap);
trx.setLatestSolidifiedBlockNumber(getDynamicPropertiesStore()
.getLatestSolidifiedBlockNum());
trx.setRemoved(removed);
if (!triggerCapsuleQueue.offer(trx)) {
logger.info("Too many triggers, transaction trigger lost: {}.", trxCap.getTransactionId());
}
Expand All @@ -2424,6 +2448,26 @@ private void reOrgContractTrigger() {
clearSolidityContractTriggerCache(getHeadBlockNum());
}

// On a chain reorg, re-emit the block/transaction triggers of the block being erased with
// removed=true, so subscribers can roll back. Only real-time (non-solidified) triggers were
// ever emitted for this block, so postBlockTrigger(.., true) naturally no-ops in solidified
// mode. Called in the erase loop before eraseBlock(), so the old head is still current head.
private void reOrgBlockTrigger() {
if (eventPluginLoaded
&& (EventPluginLoader.getInstance().isBlockLogTriggerEnable()
|| EventPluginLoader.getInstance().isTransactionLogTriggerEnable())) {
logger.info("Switch fork occurred, post reOrgBlockTrigger.");
try {
BlockCapsule oldHeadBlock = chainBaseManager.getBlockById(
getDynamicPropertiesStore().getLatestBlockHeaderHash());
postBlockTrigger(oldHeadBlock, true);
} catch (BadItemException | ItemNotFoundException e) {
logger.error("Block header hash does not exist or is bad: {}.",
getDynamicPropertiesStore().getLatestBlockHeaderHash());
}
}
}

private void clearSolidityContractTriggerCache(long blockNum) {
if (eventPluginLoaded
&& (EventPluginLoader.getInstance().isSolidityEventTriggerEnable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.logsfilter.EventPluginLoader;
import org.tron.common.logsfilter.trigger.Trigger;
import org.tron.core.db.Manager;
import org.tron.core.services.event.bo.BlockEvent;
import org.tron.core.services.event.bo.Event;

Expand All @@ -25,9 +24,6 @@ public class RealtimeEventService {
@Getter
private static Object contractLock = new Object();

@Autowired
private Manager manager;

@Autowired
private SolidEventService solidEventService;

Expand Down Expand Up @@ -77,25 +73,31 @@ public synchronized void work() {
public void flush(BlockEvent blockEvent, boolean isRemove) {
logger.info("Flush realtime event {}", blockEvent.getBlockId().getString());

// Post block/transaction triggers synchronously to the plugin (processTrigger ->
// EventPluginLoader serializes immediately) instead of the async triggerCapsuleQueue: the
// capsule is a shared cached object whose removed flag is set per-flush, so an async consumer
// could read it after a later flush overwrote it. This mirrors how contract triggers below
// are posted directly. isRemove=true re-emits the block/transaction as rolled back on a reorg.
if (instance.isBlockLogTriggerEnable()
&& !instance.isBlockLogTriggerSolidified()
&& !isRemove) {
&& !instance.isBlockLogTriggerSolidified()) {
if (blockEvent.getBlockLogTriggerCapsule() == null) {
logger.warn("BlockLogTriggerCapsule is null. {}", blockEvent.getBlockId().getString());
} else {
manager.getTriggerCapsuleQueue().offer(blockEvent.getBlockLogTriggerCapsule());
blockEvent.getBlockLogTriggerCapsule().setRemoved(isRemove);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SHOULD] This synchronous processTrigger() path removes the per-trigger exception isolation that block/tx triggers previously had in triggerCapsuleQueue (the queue consumer wraps each processTrigger() in catch(Throwable)). If any plugin listener throws from handleBlockEvent/handleTransactionTrigger, flush() aborts and the remaining tx/contract events for the same block are skipped — and the event was already polled off the queue, so they are lost, not retried. Please add a local try/catch around each trigger post here.

Note the contract-trigger posts below (lines 104-131) were already synchronous without per-trigger isolation, so to stay consistent please apply the same guard to all three (block/tx/contract) here rather than block/tx only.

blockEvent.getBlockLogTriggerCapsule().processTrigger();
}
}

if (instance.isTransactionLogTriggerEnable()
&& !instance.isTransactionLogTriggerSolidified()
&& !isRemove) {
&& !instance.isTransactionLogTriggerSolidified()) {
if (blockEvent.getTransactionLogTriggerCapsules() == null) {
logger.warn("TransactionLogTriggerCapsules is null. {}",
blockEvent.getBlockId().getString());
} else {
blockEvent.getTransactionLogTriggerCapsules().forEach(v ->
manager.getTriggerCapsuleQueue().offer(v));
blockEvent.getTransactionLogTriggerCapsules().forEach(v -> {
v.setRemoved(isRemove);
v.processTrigger();
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@ public void setup() {
System.currentTimeMillis(), Sha256Hash.ZERO_HASH.getByteString());
}

@Test
public void testSetRemoved() {
BalanceContract.TransferContract.Builder builder =
BalanceContract.TransferContract.newBuilder()
.setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString(OWNER_ADDRESS)))
.setToAddress(ByteString.copyFrom(ByteArray.fromHexString(RECEIVER_ADDRESS)))
.setAmount(1000L);
transactionCapsule = new TransactionCapsule(builder.build(),
Protocol.Transaction.Contract.ContractType.TransferContract);
TransactionLogTriggerCapsule triggerCapsule =
new TransactionLogTriggerCapsule(transactionCapsule, blockCapsule);

// default is false (forward emit); reorg rollback sets it to true
Assert.assertFalse(triggerCapsule.getTransactionLogTrigger().isRemoved());
triggerCapsule.setRemoved(true);
Assert.assertTrue(triggerCapsule.getTransactionLogTrigger().isRemoved());
}

@Test
public void testConstructorWithUnfreezeBalanceTrxCapsule() {
BalanceContract.UnfreezeBalanceContract.Builder builder2 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,12 @@ public void testSetLatestSolidifiedBlockNumber() {
Assert.assertEquals(100,
blockLogTriggerCapsule.getBlockLogTrigger().getLatestSolidifiedBlockNumber());
}

@Test
public void testSetRemoved() {
// default is false (forward emit); reorg rollback sets it to true
Assert.assertFalse(blockLogTriggerCapsule.getBlockLogTrigger().isRemoved());
blockLogTriggerCapsule.setRemoved(true);
Assert.assertTrue(blockLogTriggerCapsule.getBlockLogTrigger().isRemoved());
}
}
Loading
Loading