diff --git a/cabal.project b/cabal.project index caee395..5e1d695 100644 --- a/cabal.project +++ b/cabal.project @@ -58,8 +58,9 @@ source-repository-package source-repository-package type: git location: https://github.com/IntersectMBO/ouroboros-network - tag: 3c4433d05ec012af6d1a26e6b5e86665627c08c4 - --sha256: sha256-Jemp6PlzISA+l1wdXV6MrIxaBpAxdrLLAlbkB7ZqF2Y= + -- from coot/dmq-related-changes + tag: 625296c92363b8c5e77cddee40de4525421d2660 + --sha256: sha256-WRbKqNimAsYtgj/r3SJ0IT6z7+Q3XZf3p89BM9w6bF8= subdir: acts-generic cardano-diffusion diff --git a/dmq-node/app/Main.hs b/dmq-node/app/Main.hs index c3d322f..714d7b7 100644 --- a/dmq-node/app/Main.hs +++ b/dmq-node/app/Main.hs @@ -11,6 +11,7 @@ module Main where import Control.Concurrent.Class.MonadSTM.Strict +import Control.Concurrent.Class.MonadMVar import Control.Monad (void, when) import Control.Monad.Class.MonadThrow import Control.Tracer (Tracer (..), nullTracer, traceWith) @@ -33,8 +34,6 @@ import System.IOManager (withIOManager) import Cardano.Git.Rev (gitRev) import Cardano.KESAgent.Protocols.StandardCrypto (StandardCrypto) -import Cardano.Ledger.Keys (VKey (..)) -import Cardano.Ledger.Hashes (hashKey) import DMQ.Configuration import DMQ.Configuration.CLIOptions (parseCLIOptions) @@ -93,8 +92,13 @@ runDMQ commandLineConfig = do } = config' <> commandLineConfig `act` defaultConfiguration - let tracer :: ToJSON ev => Tracer IO (WithEventType ev) - tracer = dmqTracer prettyLog + + lock <- newMVar () + let tracer', tracer :: ToJSON ev => Tracer IO (WithEventType ev) + tracer' = dmqTracer prettyLog + -- use a lock to prevent writing two lines at the same time + -- TODO: this won't be needed with `cardano-tracer` integration + tracer = Tracer $ \a -> withMVar lock $ \_ -> traceWith tracer' a when version $ do let gitrev = $(gitRev) @@ -119,6 +123,7 @@ runDMQ commandLineConfig = do stdGen <- newStdGen let (psRng, policyRng) = split stdGen + policyRngVar <- newTVarIO policyRng -- TODO: this might not work, since `ouroboros-network` creates its own IO Completion Port. withIOManager \iocp -> do @@ -149,7 +154,7 @@ runDMQ commandLineConfig = do Mempool.getWriter SigDuplicate sigId (\now sigs -> - withPoolValidationCtx (stakePools nodeKernel) (validateSig (hashKey . VKey) now sigs) + withPoolValidationCtx (stakePools nodeKernel) (validateSig now sigs) ) (traverse_ $ \(sigid, reason) -> do traceWith ntnValidationTracer $ InvalidSignature sigid reason @@ -183,7 +188,7 @@ runDMQ commandLineConfig = do Mempool.getWriter SigDuplicate sigId (\now sigs -> - withPoolValidationCtx (stakePools nodeKernel) (validateSig (hashKey . VKey) now sigs) + withPoolValidationCtx (stakePools nodeKernel) (validateSig now sigs) ) (traverse_ $ \(sigid, reason) -> traceWith ntcValidationTracer $ InvalidSignature sigid reason @@ -212,7 +217,7 @@ runDMQ commandLineConfig = do dmqLimitsAndTimeouts dmqNtNApps dmqNtCApps - (policy policyRng) + (policy policyRngVar) Diffusion.run dmqDiffusionArguments (dmqDiffusionTracers dmqConfig tracer) diff --git a/dmq-node/changelog.d/20260122_162026_coot_peer_selection_policy.md b/dmq-node/changelog.d/20260122_162026_coot_peer_selection_policy.md new file mode 100644 index 0000000..e88ba9a --- /dev/null +++ b/dmq-node/changelog.d/20260122_162026_coot_peer_selection_policy.md @@ -0,0 +1,9 @@ +### Breaking + +- `validateSig`: removed the hashing function for cold key from arguments, added required constraints ledger's `hashKey . VKey` usage instead + +### Non-Breaking + +- Added a lock to avoid race conditions between trace events. +- Improved peer selection policy. + diff --git a/dmq-node/dmq-node.cabal b/dmq-node/dmq-node.cabal index bf79b45..0d1e735 100644 --- a/dmq-node/dmq-node.cabal +++ b/dmq-node/dmq-node.cabal @@ -148,7 +148,6 @@ executable dmq-node base, bytestring, cardano-git-rev, - cardano-ledger-core, contra-tracer >=0.1 && <0.3, dmq-node, io-classes:{io-classes, strict-stm}, diff --git a/dmq-node/src/DMQ/Configuration.hs b/dmq-node/src/DMQ/Configuration.hs index fcc6706..ae0106f 100644 --- a/dmq-node/src/DMQ/Configuration.hs +++ b/dmq-node/src/DMQ/Configuration.hs @@ -77,20 +77,49 @@ import Ouroboros.Network.TxSubmission.Inbound.V2 (TxDecisionPolicy (..)) import DMQ.Configuration.Topology (NoExtraConfig (..), NoExtraFlags (..)) --- | Configuration comes in two flavours paramemtrised by `f` functor: +-- | Configuration comes in two flavours depending on the `f` functor: -- `PartialConfig` is using `Last` and `Configuration` is using an identity -- functor `I`. -- +-- See `defaultConfiguration` for default values. +-- data Configuration' f = Configuration { + -- | Path from which the `Configuration` is read. + dmqcConfigFile :: f FilePath, + + -- | Network magic for the DMQ network + dmqcNetworkMagic :: f NetworkMagic, + -- | Network magic for local connections to a cardano-node + dmqcCardanoNetworkMagic :: f NetworkMagic, + + -- | IPv4 address to bind to for `node-to-node` communication. dmqcIPv4 :: f (Maybe IPv4), + -- | IPv6 address to bind to for `node-to-node` communication. dmqcIPv6 :: f (Maybe IPv6), - dmqcLocalAddress :: f LocalAddress, + -- | Port number for `node-to-node` DMQ communication. dmqcPortNumber :: f PortNumber, - dmqcConfigFile :: f FilePath, + -- | Local socket address for `node-to-client` DMQ communication. + dmqcLocalAddress :: f LocalAddress, + -- | Topology file path. dmqcTopologyFile :: f FilePath, + -- | Path to the `cardano-node` socket. + dmqcCardanoNodeSocket :: f FilePath, + dmqcAcceptedConnectionsLimit :: f AcceptedConnectionsLimit, + -- | Diffusion mode for `node-to-node` communication. dmqcDiffusionMode :: f DiffusionMode, + -- | Node-to-node inbound connection idle timeout. + dmqcProtocolIdleTimeout :: f DiffTime, + -- | Churn interval for peer selection. + dmqcChurnInterval :: f DiffTime, + -- | Peer sharing setting. + dmqcPeerSharing :: f PeerSharing, + + -- + -- Peer Selection Targets + -- + dmqcTargetOfRootPeers :: f Int, dmqcTargetOfKnownPeers :: f Int, dmqcTargetOfEstablishedPeers :: f Int, @@ -98,14 +127,10 @@ data Configuration' f = dmqcTargetOfKnownBigLedgerPeers :: f Int, dmqcTargetOfEstablishedBigLedgerPeers :: f Int, dmqcTargetOfActiveBigLedgerPeers :: f Int, - dmqcProtocolIdleTimeout :: f DiffTime, - dmqcChurnInterval :: f DiffTime, - dmqcPeerSharing :: f PeerSharing, - -- network magic for the DMQ network itself - dmqcNetworkMagic :: f NetworkMagic, - -- network magic for local connections to a cardano-node - dmqcCardanoNetworkMagic :: f NetworkMagic, - dmqcCardanoNodeSocket :: f FilePath, + + -- + -- Tracers & logging + -- dmqcPrettyLog :: f Bool, dmqcMuxTracer :: f Bool, @@ -148,6 +173,10 @@ data Configuration' f = dmqcLocalMsgSubmissionServerProtocolTracer :: f Bool, dmqcLocalMsgNotificationServerProtocolTracer :: f Bool, + -- + -- Application tracers + -- + dmqcSigSubmissionLogicTracer :: f Bool, dmqcSigSubmissionOutboundTracer :: f Bool, dmqcSigSubmissionInboundTracer :: f Bool, @@ -155,6 +184,7 @@ data Configuration' f = dmqcLocalMsgNotificationServerTracer :: f Bool, dmqcLocalStateQueryTracer :: f Bool, + -- | CLI only option to show version and exit. dmqcVersion :: f Bool } deriving Generic diff --git a/dmq-node/src/DMQ/Diffusion/NodeKernel.hs b/dmq-node/src/DMQ/Diffusion/NodeKernel.hs index fc71b48..2c7a484 100644 --- a/dmq-node/src/DMQ/Diffusion/NodeKernel.hs +++ b/dmq-node/src/DMQ/Diffusion/NodeKernel.hs @@ -6,6 +6,7 @@ module DMQ.Diffusion.NodeKernel , withNodeKernel , PoolValidationCtx (..) , StakePools (..) + , PoolId ) where import Control.Concurrent.Class.MonadMVar @@ -33,8 +34,8 @@ import Data.Word import System.Random (StdGen) import System.Random qualified as Random -import Cardano.Ledger.Shelley.API hiding (I) -import Ouroboros.Consensus.Shelley.Ledger.Query +import Cardano.Ledger.Shelley.API qualified as Ledger +import Ouroboros.Consensus.Shelley.Ledger.Query qualified as LedgerQuery import Ouroboros.Network.BlockFetch (FetchClientRegistry, newFetchClientRegistry) @@ -76,13 +77,13 @@ data NodeKernel crypto ntnAddr m = -- | Cardano pool id's are hashes of the cold verification key -- -type PoolId = KeyHash StakePool +type PoolId = Ledger.KeyHash Ledger.StakePool data StakePools m = StakePools { -- | contains map of cardano pool stake snapshot obtained -- via local state query client stakePoolsVar - :: !(StrictTVar m (Map PoolId StakeSnapshot)) + :: !(StrictTVar m (Map PoolId LedgerQuery.StakeSnapshot)) -- | Acquire and update validation context for signature validation , withPoolValidationCtx :: forall a. (PoolValidationCtx -> (a, PoolValidationCtx)) -> STM m a @@ -99,7 +100,7 @@ data PoolValidationCtx = PoolValidationCtx { vctxEpoch :: !(Maybe UTCTime) -- ^ UTC time of next epoch boundary for handling clock skew - , vctxStakeMap :: !(Map PoolId StakeSnapshot) + , vctxStakeMap :: !(Map PoolId LedgerQuery.StakeSnapshot) -- ^ for signature validation , vctxOcertMap :: !(Map PoolId Word64) -- ^ ocert counters to check monotonicity diff --git a/dmq-node/src/DMQ/Diffusion/PeerSelection.hs b/dmq-node/src/DMQ/Diffusion/PeerSelection.hs index 13add1d..f916198 100644 --- a/dmq-node/src/DMQ/Diffusion/PeerSelection.hs +++ b/dmq-node/src/DMQ/Diffusion/PeerSelection.hs @@ -1,40 +1,116 @@ module DMQ.Diffusion.PeerSelection where -import Data.Set (Set) +import Control.Concurrent.Class.MonadSTM.Strict +import Data.List (sortOn, unfoldr) +import Data.Map.Strict qualified as Map import Data.Set qualified as Set -import Network.Socket (SockAddr) -import Ouroboros.Network.PeerSelection.Governor.Types -import System.Random (Random (..), StdGen) +import Data.Word (Word32) +import Ouroboros.Network.PeerSelection +import System.Random (Random (..), StdGen, split) -- | Trivial peer selection policy used as dummy value -- -policy :: StdGen -> PeerSelectionPolicy SockAddr IO -policy gen = +policy :: forall peerAddr m. + ( MonadSTM m + , Ord peerAddr + ) + => StrictTVar m StdGen + -> PeerSelectionPolicy peerAddr m +policy rngVar = PeerSelectionPolicy { - policyPickKnownPeersForPeerShare = \_ _ _ -> pickTrivially - , policyPickColdPeersToForget = \_ _ _ -> pickTrivially - , policyPickColdPeersToPromote = \_ _ _ -> pickTrivially - , policyPickWarmPeersToPromote = \_ _ _ -> pickTrivially - , policyPickHotPeersToDemote = \_ _ _ -> pickTrivially - , policyPickWarmPeersToDemote = \_ _ _ -> pickTrivially - , policyPickInboundPeers = \_ _ _ -> pickTrivially - , policyFindPublicRootTimeout = 5 - , policyMaxInProgressPeerShareReqs = 0 - , policyPeerShareRetryTime = 0 -- seconds - , policyPeerShareBatchWaitTime = 0 -- seconds - , policyPeerShareOverallTimeout = 0 -- seconds - , policyPeerShareActivationDelay = 2 -- seconds + policyPickKnownPeersForPeerShare = simplePromotionPolicy, + policyPickColdPeersToPromote = simplePromotionPolicy, + policyPickWarmPeersToPromote = simplePromotionPolicy, + policyPickInboundPeers = simplePromotionPolicy, + + policyPickHotPeersToDemote = hotDemotionPolicy, + policyPickWarmPeersToDemote = warmDemotionPolicy, + policyPickColdPeersToForget = coldForgetPolicy, + + policyFindPublicRootTimeout = 5, + policyMaxInProgressPeerShareReqs = 0, + policyPeerShareRetryTime = 0, -- seconds + policyPeerShareBatchWaitTime = 0, -- seconds + policyPeerShareOverallTimeout = 0, -- seconds + policyPeerShareActivationDelay = 2 -- seconds } where - pickTrivially :: Applicative m => Set SockAddr -> Int -> m (Set SockAddr) - pickTrivially set n = pure - . fst - $ go gen (Set.toList set) n [] - where - go g _ 0 acc = (Set.fromList acc, g) - go g [] _ acc = (Set.fromList acc, g) - go g xs k acc = - let (idx, g') = randomR (0, length xs - 1) g - picked = xs !! idx - xs' = take idx xs ++ drop (idx + 1) xs - in go g' xs' (k - 1) (picked : acc) + hotDemotionPolicy :: PickPolicy peerAddr (STM m) + hotDemotionPolicy _ _ _ available pickNum = do + available' <- addRand rngVar available (,) + return $ Set.fromList + . map fst + . take pickNum + . sortOn snd + . Map.assocs + $ available' + + -- Randomly pick peers to demote, peers with knownPeerTepid set are twice + -- as likely to be demoted. + warmDemotionPolicy :: PickPolicy peerAddr (STM m) + warmDemotionPolicy _ _ isTepid available pickNum = do + available' <- addRand rngVar available (tepidWeight isTepid) + return $ Set.fromList + . map fst + . take pickNum + . sortOn snd + . Map.assocs + $ available' + + simplePromotionPolicy :: PickPolicy peerAddr (STM m) + simplePromotionPolicy _ _ _ available pickNum = do + available' <- addRand rngVar available (,) + return $ Set.fromList + . map fst + . take pickNum + . sortOn snd + . Map.assocs + $ available' + + -- Randomly pick peers to forget, peers with failures are more likely to + -- be forgotten. + coldForgetPolicy :: PickPolicy peerAddr (STM m) + coldForgetPolicy _ failCnt _ available pickNum = do + available' <- addRand rngVar available (failWeight failCnt) + return $ Set.fromList + . map fst + . take pickNum + . sortOn snd + . Map.assocs + $ available' + + -- Failures lowers r + failWeight :: (peerAddr -> Int) + -> peerAddr + -> Word32 + -> (peerAddr, Word32) + failWeight failCnt peer r = + (peer, r `div` fromIntegral (failCnt peer + 1)) + + -- Tepid flag cuts r in half + tepidWeight :: (peerAddr -> Bool) + -> peerAddr + -> Word32 + -> (peerAddr, Word32) + tepidWeight isTepid peer r = + if isTepid peer then (peer, r `div` 2) + else (peer, r) + + + -- Add scaled random number in order to prevent ordering based on SockAddr +addRand :: ( MonadSTM m + , Ord peerAddr + ) + => StrictTVar m StdGen + -> Set.Set peerAddr + -> (peerAddr -> Word32 -> (peerAddr, Word32)) + -> STM m (Map.Map peerAddr Word32) +addRand rngVar available scaleFn = do + inRng <- readTVar rngVar + + let (rng, rng') = split inRng + rns = take (Set.size available) $ unfoldr (Just . random) rng :: [Word32] + available' = Map.fromList $ zipWith scaleFn (Set.toList available) rns + writeTVar rngVar rng' + return available' + diff --git a/dmq-node/src/DMQ/Protocol/SigSubmission/Validate.hs b/dmq-node/src/DMQ/Protocol/SigSubmission/Validate.hs index 02e239e..6fc1143 100644 --- a/dmq-node/src/DMQ/Protocol/SigSubmission/Validate.hs +++ b/dmq-node/src/DMQ/Protocol/SigSubmission/Validate.hs @@ -30,13 +30,12 @@ import Data.Text (Text) import Data.Typeable import Data.Word -import Cardano.Crypto.DSIGN.Class (ContextDSIGN) import Cardano.Crypto.DSIGN.Class qualified as DSIGN import Cardano.Crypto.KES.Class (KESAlgorithm (..)) import Cardano.KESAgent.KES.Crypto as KES import Cardano.KESAgent.KES.OCert (OCert (..), OCertSignable, validateOCert) -import Cardano.Ledger.BaseTypes.NonZero -import Cardano.Ledger.Hashes +import Cardano.Ledger.BaseTypes.NonZero qualified as Ledger +import Cardano.Ledger.Keys qualified as Ledger import DMQ.Diffusion.NodeKernel (PoolValidationCtx (..)) import DMQ.Protocol.SigSubmission.Type @@ -100,30 +99,29 @@ c_MAX_CLOCK_SKEW_SEC :: NominalDiffTime c_MAX_CLOCK_SKEW_SEC = 5 pattern NotZeroSetSnapshot :: StakeSnapshot -pattern NotZeroSetSnapshot <- (isZero . ssSetPool -> False) +pattern NotZeroSetSnapshot <- (Ledger.isZero . ssSetPool -> False) pattern NotZeroMarkSnapshot :: StakeSnapshot -pattern NotZeroMarkSnapshot <- (isZero . ssMarkPool -> False) +pattern NotZeroMarkSnapshot <- (Ledger.isZero . ssMarkPool -> False) pattern ZeroSetSnapshot :: StakeSnapshot -pattern ZeroSetSnapshot <- (isZero . ssSetPool -> True) +pattern ZeroSetSnapshot <- (Ledger.isZero . ssSetPool -> True) {-# COMPLETE NotZeroSetSnapshot, NotZeroMarkSnapshot, ZeroSetSnapshot #-} validateSig :: forall crypto. ( Crypto crypto - , ContextDSIGN (KES.DSIGN crypto) ~ () + , DSIGN crypto ~ Ledger.DSIGN , DSIGN.Signable (DSIGN crypto) (OCertSignable crypto) , ContextKES (KES crypto) ~ () , Signable (KES crypto) ByteString ) - => (DSIGN.VerKeyDSIGN (DSIGN crypto) -> KeyHash StakePool) - -> UTCTime + => UTCTime -> [Sig crypto] -> PoolValidationCtx -> ([Either (SigId, SigValidationError) (Sig crypto)], PoolValidationCtx) -validateSig verKeyHashingFn now sigs ctx0 = +validateSig now sigs ctx0 = State.runState (traverse (exceptions . validate) sigs) ctx0 where exceptions :: StateT s (Except e) a @@ -150,23 +148,32 @@ validateSig verKeyHashingFn now sigs ctx0 = } = do ctx@PoolValidationCtx { vctxEpoch, vctxStakeMap, vctxOcertMap } <- State.get + -- + -- verify KES period + -- + sigKESPeriod < endKESPeriod ?! KESAfterEndOCERT endKESPeriod sigKESPeriod sigKESPeriod >= startKESPeriod ?! KESBeforeStartOCERT startKESPeriod sigKESPeriod + -- + -- verify that the pool is registered and eligible to mint blocks + -- + let -- `vctxEpoch` and `vctxStakeMap` are initialized in one STM -- transaction, which guarantees that fromJust will not fail nextEpoch = fromJust vctxEpoch - case Map.lookup (verKeyHashingFn coldKey) vctxStakeMap of + case Map.lookup (Ledger.hashKey (Ledger.VKey coldKey)) vctxStakeMap of Nothing | isNothing vctxEpoch -> left NotInitialized | otherwise -> left UnrecognizedPool + Just ss@NotZeroSetSnapshot -> if | now <= addUTCTime c_MAX_CLOCK_SKEW_SEC nextEpoch -> return () -- local-state-query is late, but the pool is about to expire - | isZero (ssMarkPool ss) + | Ledger.isZero (ssMarkPool ss) -> left SigExpired | otherwise @@ -189,19 +196,12 @@ validateSig verKeyHashingFn now sigs ctx0 = -- pool unregistered and is ineligible to mint blocks Just ZeroSetSnapshot -> left SigExpired - -- validate OCert, which includes verifying its signature - validateOCert coldKey ocertVkHot ocert - ?!: InvalidSignatureOCERT ocertN sigKESPeriod - - -- validate KES signature of the payload - verifyKES () ocertVkHot - (unKESPeriod sigKESPeriod - unKESPeriod startKESPeriod) - (LBS.toStrict signedBytes) - kesSig - ?!: InvalidKESSignature ocertKESPeriod sigKESPeriod + -- + -- verify that our observations of ocertN are strictly monotonic + -- case Map.alterF (\a -> (a, Just ocertN)) - (verKeyHashingFn coldKey) + (Ledger.hashKey (Ledger.VKey coldKey)) vctxOcertMap of (Nothing, ocertCounters') -- there is no ocert in the map, e.g. we're validating a signature @@ -215,6 +215,21 @@ validateSig verKeyHashingFn now sigs ctx0 = | otherwise -> left (InvalidOCertCounter prevOcertN ocertN) + -- + -- Cryptographic checks + -- + + -- validate OCert, which includes verifying its signature + validateOCert coldKey ocertVkHot ocert + ?!: InvalidSignatureOCERT ocertN sigKESPeriod + + -- validate KES signature of the payload + verifyKES () ocertVkHot + (unKESPeriod sigKESPeriod - unKESPeriod startKESPeriod) + (LBS.toStrict signedBytes) + kesSig + ?!: InvalidKESSignature ocertKESPeriod sigKESPeriod + return sig where startKESPeriod, endKESPeriod :: KESPeriod diff --git a/dmq-node/test/DMQ/Protocol/SigSubmission/Test.hs b/dmq-node/test/DMQ/Protocol/SigSubmission/Test.hs index 260fefd..6d0720f 100644 --- a/dmq-node/test/DMQ/Protocol/SigSubmission/Test.hs +++ b/dmq-node/test/DMQ/Protocol/SigSubmission/Test.hs @@ -989,7 +989,7 @@ prop_validateSig constr validity = ioProperty do . counterexample ("KES seed: " ++ show (ctx constr)) . counterexample ("KES vk key: " ++ show (ocertVkHot . getSigOpCertificate . sigOpCertificate $ sig)) . counterexample (show sig) - $ case (validity, fst $ validateSig (hashKey . VKey) now [sig] validationCtx) of + $ case (validity, fst $ validateSig now [sig] validationCtx) of (Valid {}, Left (_, err) : _) -> counterexample (show err) False (Valid {}, Right _ : _) -> property True