Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ source-repository-package
source-repository-package
type: git
location: https://github.com/IntersectMBO/ouroboros-network
tag: 3c4433d05ec012af6d1a26e6b5e86665627c08c4
--sha256: sha256-Jemp6PlzISA+l1wdXV6MrIxaBpAxdrLLAlbkB7ZqF2Y=
-- from coot/dmq-related-changes
tag: 625296c92363b8c5e77cddee40de4525421d2660
Comment on lines +61 to +62
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR depends on an unreleased branch (coot/dmq-related-changes) of ouroboros-network. Consider documenting what changes in that branch are required for this PR to work, or ensure that branch is merged and released before merging this PR. Using unreleased dependencies can make builds unstable and difficult to reproduce.

Copilot uses AI. Check for mistakes.
--sha256: sha256-WRbKqNimAsYtgj/r3SJ0IT6z7+Q3XZf3p89BM9w6bF8=
subdir:
acts-generic
cardano-diffusion
Expand Down
19 changes: 12 additions & 7 deletions dmq-node/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
module Main where

import Control.Concurrent.Class.MonadSTM.Strict
import Control.Concurrent.Class.MonadMVar
import Control.Monad (void, when)
import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer (..), nullTracer, traceWith)
Expand All @@ -33,8 +34,6 @@ import System.IOManager (withIOManager)

import Cardano.Git.Rev (gitRev)
import Cardano.KESAgent.Protocols.StandardCrypto (StandardCrypto)
import Cardano.Ledger.Keys (VKey (..))
import Cardano.Ledger.Hashes (hashKey)

import DMQ.Configuration
import DMQ.Configuration.CLIOptions (parseCLIOptions)
Expand Down Expand Up @@ -93,8 +92,13 @@ runDMQ commandLineConfig = do
} = config' <> commandLineConfig
`act`
defaultConfiguration
let tracer :: ToJSON ev => Tracer IO (WithEventType ev)
tracer = dmqTracer prettyLog

lock <- newMVar ()
let tracer', tracer :: ToJSON ev => Tracer IO (WithEventType ev)
tracer' = dmqTracer prettyLog
-- use a lock to prevent writing two lines at the same time
-- TODO: this won't be needed with `cardano-tracer` integration
tracer = Tracer $ \a -> withMVar lock $ \_ -> traceWith tracer' a

when version $ do
let gitrev = $(gitRev)
Expand All @@ -119,6 +123,7 @@ runDMQ commandLineConfig = do

stdGen <- newStdGen
let (psRng, policyRng) = split stdGen
policyRngVar <- newTVarIO policyRng

-- TODO: this might not work, since `ouroboros-network` creates its own IO Completion Port.
withIOManager \iocp -> do
Expand Down Expand Up @@ -149,7 +154,7 @@ runDMQ commandLineConfig = do
Mempool.getWriter SigDuplicate
sigId
(\now sigs ->
withPoolValidationCtx (stakePools nodeKernel) (validateSig (hashKey . VKey) now sigs)
withPoolValidationCtx (stakePools nodeKernel) (validateSig now sigs)
)
(traverse_ $ \(sigid, reason) -> do
traceWith ntnValidationTracer $ InvalidSignature sigid reason
Expand Down Expand Up @@ -183,7 +188,7 @@ runDMQ commandLineConfig = do
Mempool.getWriter SigDuplicate
sigId
(\now sigs ->
withPoolValidationCtx (stakePools nodeKernel) (validateSig (hashKey . VKey) now sigs)
withPoolValidationCtx (stakePools nodeKernel) (validateSig now sigs)
)
(traverse_ $ \(sigid, reason) ->
traceWith ntcValidationTracer $ InvalidSignature sigid reason
Expand Down Expand Up @@ -212,7 +217,7 @@ runDMQ commandLineConfig = do
dmqLimitsAndTimeouts
dmqNtNApps
dmqNtCApps
(policy policyRng)
(policy policyRngVar)

Diffusion.run dmqDiffusionArguments
(dmqDiffusionTracers dmqConfig tracer)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
### Breaking

- `validateSig`: removed the hashing function for cold key from arguments, added required constraints ledger's `hashKey . VKey` usage instead
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changelog mentions using "ledger's hashKey . VKey usage" but the actual implementation uses Ledger.hashKey (Ledger.VKey coldKey). While the meaning is the same, consider updating the changelog to match the actual code pattern for clarity.

Suggested change
- `validateSig`: removed the hashing function for cold key from arguments, added required constraints ledger's `hashKey . VKey` usage instead
- `validateSig`: removed the hashing function for cold key from arguments, added required constraints for using `Ledger.hashKey (Ledger.VKey coldKey)` instead

Copilot uses AI. Check for mistakes.

### Non-Breaking

- Added a lock to avoid race conditions between trace events.
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changelog entry mentions "Added a lock to avoid race conditions between trace events" but this appears to be unrelated to the PR title "PeerSelectionPolicy for dmq-node". Consider either updating the PR title to reflect all changes, or split this into a separate PR if it's an independent change.

Copilot uses AI. Check for mistakes.
- Improved peer selection policy.

1 change: 0 additions & 1 deletion dmq-node/dmq-node.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ executable dmq-node
base,
bytestring,
cardano-git-rev,
cardano-ledger-core,
contra-tracer >=0.1 && <0.3,
dmq-node,
io-classes:{io-classes, strict-stm},
Expand Down
52 changes: 41 additions & 11 deletions dmq-node/src/DMQ/Configuration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -77,35 +77,60 @@ import Ouroboros.Network.TxSubmission.Inbound.V2 (TxDecisionPolicy (..))

import DMQ.Configuration.Topology (NoExtraConfig (..), NoExtraFlags (..))

-- | Configuration comes in two flavours paramemtrised by `f` functor:
-- | Configuration comes in two flavours depending on the `f` functor:
-- `PartialConfig` is using `Last` and `Configuration` is using an identity
-- functor `I`.
--
-- See `defaultConfiguration` for default values.
--
data Configuration' f =
Configuration {
-- | Path from which the `Configuration` is read.
dmqcConfigFile :: f FilePath,

-- | Network magic for the DMQ network
dmqcNetworkMagic :: f NetworkMagic,
-- | Network magic for local connections to a cardano-node
dmqcCardanoNetworkMagic :: f NetworkMagic,

-- | IPv4 address to bind to for `node-to-node` communication.
dmqcIPv4 :: f (Maybe IPv4),
-- | IPv6 address to bind to for `node-to-node` communication.
dmqcIPv6 :: f (Maybe IPv6),
dmqcLocalAddress :: f LocalAddress,
-- | Port number for `node-to-node` DMQ communication.
dmqcPortNumber :: f PortNumber,
dmqcConfigFile :: f FilePath,
-- | Local socket address for `node-to-client` DMQ communication.
dmqcLocalAddress :: f LocalAddress,
-- | Topology file path.
dmqcTopologyFile :: f FilePath,
-- | Path to the `cardano-node` socket.
dmqcCardanoNodeSocket :: f FilePath,

dmqcAcceptedConnectionsLimit :: f AcceptedConnectionsLimit,
-- | Diffusion mode for `node-to-node` communication.
dmqcDiffusionMode :: f DiffusionMode,
-- | Node-to-node inbound connection idle timeout.
dmqcProtocolIdleTimeout :: f DiffTime,
-- | Churn interval for peer selection.
dmqcChurnInterval :: f DiffTime,
-- | Peer sharing setting.
dmqcPeerSharing :: f PeerSharing,

--
-- Peer Selection Targets
--

dmqcTargetOfRootPeers :: f Int,
dmqcTargetOfKnownPeers :: f Int,
dmqcTargetOfEstablishedPeers :: f Int,
dmqcTargetOfActivePeers :: f Int,
dmqcTargetOfKnownBigLedgerPeers :: f Int,
dmqcTargetOfEstablishedBigLedgerPeers :: f Int,
dmqcTargetOfActiveBigLedgerPeers :: f Int,
dmqcProtocolIdleTimeout :: f DiffTime,
dmqcChurnInterval :: f DiffTime,
dmqcPeerSharing :: f PeerSharing,
-- network magic for the DMQ network itself
dmqcNetworkMagic :: f NetworkMagic,
-- network magic for local connections to a cardano-node
dmqcCardanoNetworkMagic :: f NetworkMagic,
dmqcCardanoNodeSocket :: f FilePath,

--
-- Tracers & logging
--
dmqcPrettyLog :: f Bool,

dmqcMuxTracer :: f Bool,
Expand Down Expand Up @@ -148,13 +173,18 @@ data Configuration' f =
dmqcLocalMsgSubmissionServerProtocolTracer :: f Bool,
dmqcLocalMsgNotificationServerProtocolTracer :: f Bool,

--
-- Application tracers
--

dmqcSigSubmissionLogicTracer :: f Bool,
dmqcSigSubmissionOutboundTracer :: f Bool,
dmqcSigSubmissionInboundTracer :: f Bool,
dmqcLocalMsgSubmissionServerTracer :: f Bool,
dmqcLocalMsgNotificationServerTracer :: f Bool,
dmqcLocalStateQueryTracer :: f Bool,

-- | CLI only option to show version and exit.
dmqcVersion :: f Bool
}
deriving Generic
Expand Down
11 changes: 6 additions & 5 deletions dmq-node/src/DMQ/Diffusion/NodeKernel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module DMQ.Diffusion.NodeKernel
, withNodeKernel
, PoolValidationCtx (..)
, StakePools (..)
, PoolId
) where

import Control.Concurrent.Class.MonadMVar
Expand Down Expand Up @@ -33,8 +34,8 @@ import Data.Word
import System.Random (StdGen)
import System.Random qualified as Random

import Cardano.Ledger.Shelley.API hiding (I)
import Ouroboros.Consensus.Shelley.Ledger.Query
import Cardano.Ledger.Shelley.API qualified as Ledger
import Ouroboros.Consensus.Shelley.Ledger.Query qualified as LedgerQuery

import Ouroboros.Network.BlockFetch (FetchClientRegistry,
newFetchClientRegistry)
Expand Down Expand Up @@ -76,13 +77,13 @@ data NodeKernel crypto ntnAddr m =

-- | Cardano pool id's are hashes of the cold verification key
--
type PoolId = KeyHash StakePool
type PoolId = Ledger.KeyHash Ledger.StakePool

data StakePools m = StakePools {
-- | contains map of cardano pool stake snapshot obtained
-- via local state query client
stakePoolsVar
:: !(StrictTVar m (Map PoolId StakeSnapshot))
:: !(StrictTVar m (Map PoolId LedgerQuery.StakeSnapshot))
-- | Acquire and update validation context for signature validation
, withPoolValidationCtx
:: forall a. (PoolValidationCtx -> (a, PoolValidationCtx)) -> STM m a
Expand All @@ -99,7 +100,7 @@ data PoolValidationCtx =
PoolValidationCtx {
vctxEpoch :: !(Maybe UTCTime)
-- ^ UTC time of next epoch boundary for handling clock skew
, vctxStakeMap :: !(Map PoolId StakeSnapshot)
, vctxStakeMap :: !(Map PoolId LedgerQuery.StakeSnapshot)
-- ^ for signature validation
, vctxOcertMap :: !(Map PoolId Word64)
-- ^ ocert counters to check monotonicity
Expand Down
138 changes: 107 additions & 31 deletions dmq-node/src/DMQ/Diffusion/PeerSelection.hs
Original file line number Diff line number Diff line change
@@ -1,40 +1,116 @@
module DMQ.Diffusion.PeerSelection where

import Data.Set (Set)
import Control.Concurrent.Class.MonadSTM.Strict
import Data.List (sortOn, unfoldr)
import Data.Map.Strict qualified as Map
import Data.Set qualified as Set
import Network.Socket (SockAddr)
import Ouroboros.Network.PeerSelection.Governor.Types
import System.Random (Random (..), StdGen)
import Data.Word (Word32)
import Ouroboros.Network.PeerSelection
import System.Random (Random (..), StdGen, split)

-- | Trivial peer selection policy used as dummy value
--
Comment on lines 11 to 12
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment describes this as a "Trivial peer selection policy used as dummy value" but the implementation is now quite sophisticated with weighted selection based on tepid flags and failure counts. Consider updating the comment to reflect the actual implementation, e.g., "Weighted peer selection policy that prioritizes peers based on their state and failure history".

Suggested change
-- | Trivial peer selection policy used as dummy value
--
-- | Weighted peer selection policy that prioritizes peers based on
-- their state and failure history.

Copilot uses AI. Check for mistakes.
policy :: StdGen -> PeerSelectionPolicy SockAddr IO
policy gen =
policy :: forall peerAddr m.
( MonadSTM m
, Ord peerAddr
)
=> StrictTVar m StdGen
-> PeerSelectionPolicy peerAddr m
policy rngVar =
PeerSelectionPolicy {
policyPickKnownPeersForPeerShare = \_ _ _ -> pickTrivially
, policyPickColdPeersToForget = \_ _ _ -> pickTrivially
, policyPickColdPeersToPromote = \_ _ _ -> pickTrivially
, policyPickWarmPeersToPromote = \_ _ _ -> pickTrivially
, policyPickHotPeersToDemote = \_ _ _ -> pickTrivially
, policyPickWarmPeersToDemote = \_ _ _ -> pickTrivially
, policyPickInboundPeers = \_ _ _ -> pickTrivially
, policyFindPublicRootTimeout = 5
, policyMaxInProgressPeerShareReqs = 0
, policyPeerShareRetryTime = 0 -- seconds
, policyPeerShareBatchWaitTime = 0 -- seconds
, policyPeerShareOverallTimeout = 0 -- seconds
, policyPeerShareActivationDelay = 2 -- seconds
policyPickKnownPeersForPeerShare = simplePromotionPolicy,
policyPickColdPeersToPromote = simplePromotionPolicy,
policyPickWarmPeersToPromote = simplePromotionPolicy,
policyPickInboundPeers = simplePromotionPolicy,

policyPickHotPeersToDemote = hotDemotionPolicy,
policyPickWarmPeersToDemote = warmDemotionPolicy,
policyPickColdPeersToForget = coldForgetPolicy,

policyFindPublicRootTimeout = 5,
policyMaxInProgressPeerShareReqs = 0,
policyPeerShareRetryTime = 0, -- seconds
policyPeerShareBatchWaitTime = 0, -- seconds
policyPeerShareOverallTimeout = 0, -- seconds
policyPeerShareActivationDelay = 2 -- seconds
}
where
pickTrivially :: Applicative m => Set SockAddr -> Int -> m (Set SockAddr)
pickTrivially set n = pure
. fst
$ go gen (Set.toList set) n []
where
go g _ 0 acc = (Set.fromList acc, g)
go g [] _ acc = (Set.fromList acc, g)
go g xs k acc =
let (idx, g') = randomR (0, length xs - 1) g
picked = xs !! idx
xs' = take idx xs ++ drop (idx + 1) xs
in go g' xs' (k - 1) (picked : acc)
hotDemotionPolicy :: PickPolicy peerAddr (STM m)
hotDemotionPolicy _ _ _ available pickNum = do
available' <- addRand rngVar available (,)
return $ Set.fromList
. map fst
. take pickNum
. sortOn snd
. Map.assocs
$ available'

-- Randomly pick peers to demote, peers with knownPeerTepid set are twice
-- as likely to be demoted.
warmDemotionPolicy :: PickPolicy peerAddr (STM m)
warmDemotionPolicy _ _ isTepid available pickNum = do
available' <- addRand rngVar available (tepidWeight isTepid)
return $ Set.fromList
. map fst
. take pickNum
. sortOn snd
. Map.assocs
$ available'

simplePromotionPolicy :: PickPolicy peerAddr (STM m)
simplePromotionPolicy _ _ _ available pickNum = do
available' <- addRand rngVar available (,)
return $ Set.fromList
. map fst
. take pickNum
. sortOn snd
. Map.assocs
$ available'

-- Randomly pick peers to forget, peers with failures are more likely to
-- be forgotten.
coldForgetPolicy :: PickPolicy peerAddr (STM m)
coldForgetPolicy _ failCnt _ available pickNum = do
available' <- addRand rngVar available (failWeight failCnt)
return $ Set.fromList
. map fst
. take pickNum
. sortOn snd
. Map.assocs
$ available'

-- Failures lowers r
failWeight :: (peerAddr -> Int)
-> peerAddr
-> Word32
-> (peerAddr, Word32)
failWeight failCnt peer r =
(peer, r `div` fromIntegral (failCnt peer + 1))
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failWeight function uses integer division which could result in a weight of 0 when failCnt peer is large. When multiple peers have a weight of 0, their ordering becomes non-deterministic (dependent only on the random number). Consider using a minimum weight (e.g., max 1 (r \div` ...)`) to ensure some randomness is preserved, or document that this behavior is intentional.

Suggested change
(peer, r `div` fromIntegral (failCnt peer + 1))
(peer, max 1 (r `div` fromIntegral (failCnt peer + 1)))

Copilot uses AI. Check for mistakes.

-- Tepid flag cuts r in half
tepidWeight :: (peerAddr -> Bool)
-> peerAddr
-> Word32
-> (peerAddr, Word32)
tepidWeight isTepid peer r =
if isTepid peer then (peer, r `div` 2)
else (peer, r)


-- Add scaled random number in order to prevent ordering based on SockAddr
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment has incorrect indentation. It should start at column 1 like the function definition below it, not with a leading space.

Suggested change
-- Add scaled random number in order to prevent ordering based on SockAddr
-- Add scaled random number in order to prevent ordering based on SockAddr

Copilot uses AI. Check for mistakes.
addRand :: ( MonadSTM m
, Ord peerAddr
)
=> StrictTVar m StdGen
-> Set.Set peerAddr
-> (peerAddr -> Word32 -> (peerAddr, Word32))
-> STM m (Map.Map peerAddr Word32)
addRand rngVar available scaleFn = do
inRng <- readTVar rngVar

let (rng, rng') = split inRng
rns = take (Set.size available) $ unfoldr (Just . random) rng :: [Word32]
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two spaces between random) and rng on this line. Consider removing the extra space for consistency.

Suggested change
rns = take (Set.size available) $ unfoldr (Just . random) rng :: [Word32]
rns = take (Set.size available) $ unfoldr (Just . random) rng :: [Word32]

Copilot uses AI. Check for mistakes.
available' = Map.fromList $ zipWith scaleFn (Set.toList available) rns
writeTVar rngVar rng'
return available'

Loading