From d9fa9e2e8452883ab747a5afba271019312fdc9b Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Sun, 19 Jul 2020 12:37:45 +0900 Subject: [PATCH] wip --- libp2p/protocols/pubsub/gossipsub.nim | 158 ++++++++++++++++++++----- libp2p/protocols/pubsub/pubsub.nim | 75 +----------- libp2p/protocols/pubsub/pubsubpeer.nim | 6 +- tests/pubsub/testgossipsub.nim | 4 + 4 files changed, 136 insertions(+), 107 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index ddafd13fa..d79f2ba87 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -61,6 +61,35 @@ type inMesh*: bool meshMessageDeliveriesActive*: bool + TopicParams* = object + topicWeight*: float64 + + # p1 + timeInMeshWeight*: float64 + timeInMeshQuantum*: Duration + timeInMeshCap*: float64 + + # p2 + firstMessageDeliveriesWeight*: float64 + firstMessageDeliveriesDecay*: float64 + firstMessageDeliveriesCap*: float64 + + # p3 + meshMessageDeliveriesWeight*: float64 + meshMessageDeliveriesDecay*: float64 + meshMessageDeliveriesThreshold*: float64 + meshMessageDeliveriesCap*: float64 + meshMessageDeliveriesActivation*: Duration + meshMessageDeliveriesWindow*: Duration + + # p3b + meshFailurePenaltyWeight*: float64 + meshFailurePenaltyDecay*: float64 + + # p4 + invalidMessageDeliveriesWeight*: float64 + invalidMessageDeliveriesDecay*: float64 + PeerStats* = object topicInfos*: Table[string, TopicInfo] expire*: Moment # updated on disconnect, to retain scores until expire @@ -68,27 +97,26 @@ type GossipSubParams* = object pruneBackoff*: Duration floodPublish*: bool - gossipFactor*: float + gossipFactor*: float64 dScore*: int dOut*: int - gossipThreshold*: float - publishThreshold*: float - graylistThreshold*: float - acceptPXThreshold*: float - opportunisticGraftThreshold*: float + gossipThreshold*: float64 + publishThreshold*: float64 + graylistThreshold*: float64 + acceptPXThreshold*: float64 + opportunisticGraftThreshold*: float64 decayInterval*: Duration - decayToZero*: float + decayToZero*: float64 retainScore*: Duration - appSpecificWeight*: float - ipColocationFactorWeight*: float - ipColocationFactorThreshold*: float - behaviourPenaltyWeight*: float - behaviourPenaltyDecay*: float + appSpecificWeight*: float64 + ipColocationFactorWeight*: float64 + ipColocationFactorThreshold*: float64 + behaviourPenaltyWeight*: float64 + behaviourPenaltyDecay*: float64 GossipSub* = ref object of FloodSub - parameters*: GossipSubParams mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic gossipsub*: PeerTable # peers that are subscribed to a topic @@ -101,7 +129,10 @@ type heartbeatFut: Future[void] # cancellation future for heartbeat interval heartbeatRunning: bool heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats + peerStats: Table[PubSubPeer, PeerStats] + parameters*: GossipSubParams + topicParams*: Table[string, TopicParams] when not defined(release): prunedPeers: HashSet[PubSubPeer] @@ -168,6 +199,49 @@ proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = else: ok() +proc init*(_: type[TopicParams]): TopicParams = + TopicParams( + topicWeight: 1.0, + timeInMeshWeight: 0.01, + timeInMeshQuantum: 1.seconds, + timeInMeshCap: 10.0, + firstMessageDeliveriesWeight: 1.0, + firstMessageDeliveriesDecay: 0.5, + firstMessageDeliveriesCap: 10.0, + meshMessageDeliveriesWeight: -1.0, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesCap: 10, + meshMessageDeliveriesThreshold: 5, + meshMessageDeliveriesWindow: 5.milliseconds, + meshMessageDeliveriesActivation: 1.seconds, + meshFailurePenaltyWeight: -1.0, + meshFailurePenaltyDecay: 0.5, + invalidMessageDeliveriesWeight: -1.0, + invalidMessageDeliveriesDecay: 0.5 + ) + +proc validateParameters*(parameters: TopicParams): Result[void, cstring] = + if parameters.timeInMeshWeight <= 0.0 or parameters.timeInMeshWeight > 1.0: + err("gossipsub: timeInMeshWeight parameter error, Must be a small positive value") + elif parameters.timeInMeshCap <= 0.0: + err("gossipsub: timeInMeshCap parameter error, Should be a positive value") + elif parameters.firstMessageDeliveriesWeight <= 0.0: + err("gossipsub: firstMessageDeliveriesWeight parameter error, Should be a positive value") + elif parameters.meshMessageDeliveriesWeight >= 0.0: + err("gossipsub: meshMessageDeliveriesWeight parameter error, Should be a negative value") + elif parameters.meshMessageDeliveriesThreshold <= 0.0: + err("gossipsub: meshMessageDeliveriesThreshold parameter error, Should be a positive value") + elif parameters.meshMessageDeliveriesCap < parameters.meshMessageDeliveriesThreshold: + err("gossipsub: meshMessageDeliveriesCap parameter error, Should be >= meshMessageDeliveriesThreshold") + elif parameters.meshMessageDeliveriesWindow > 100.milliseconds: + err("gossipsub: meshMessageDeliveriesWindow parameter error, Should be small, 1-5ms") + elif parameters.meshFailurePenaltyWeight >= 0.0: + err("gossipsub: meshFailurePenaltyWeight parameter error, Should be a negative value") + elif parameters.invalidMessageDeliveriesWeight >= 0.0: + err("gossipsub: invalidMessageDeliveriesWeight parameter error, Should be a negative value") + else: + ok() + method init*(g: GossipSub) = proc handler(conn: Connection, proto: string) {.async.} = ## main protocol handler that gets triggered on every @@ -198,23 +272,31 @@ proc grafted(g: GossipSub, p: PubSubPeer, topic: string) = var info = stats.topicInfos.mgetOrPut(topic, TopicInfo()) info.graftTime = Moment.now() info.meshTime = 0.seconds - do: - raise newException(CatchableError, "TopicInfo key not found for " & $p) + info.inMesh = true -proc pruned(g: GossipSub, p: PubSubPeer, topic: string) {.gcsafe.} = + assert(g.peerStats[p].topicInfos[topic].inMesh == true) + + debug "grafted", p + do: + doAssert(false, "grafted: TopicInfo key not found for " & $p) + +proc pruned(g: GossipSub, p: PubSubPeer, topic: string) = g.peerStats.withValue(p, stats) do: when not defined(release): g.prunedPeers.incl(p) - var _ = stats.topicInfos[topic] + var info = stats.topicInfos[topic] + info.inMesh = false + + debug "pruned", p do: when not defined(release): if p in g.prunedPeers: - raise newException(CatchableError, "Dupe prune " & $p) + doAssert(false, "pruned: Dupe prune " & $p) else: - raise newException(CatchableError, "TopicInfo key not found for " & $p) + doAssert(false, "pruned: TopicInfo key not found for " & $p) else: - raise newException(CatchableError, "TopicInfo key not found for " & $p) + doAssert(false, "pruned: TopicInfo key not found for " & $p) proc replenishFanout(g: GossipSub, topic: string) = ## get fanout peers for a topic @@ -261,8 +343,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = # sort peers by score grafts.sort(proc (x, y: PubSubPeer): int = let - peerx = x.score() - peery = y.score() + peerx = x.score + peery = y.score if peerx < peery: -1 elif peerx == peery: 0 else: 1) @@ -358,12 +440,18 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = result[peer.id].ihave.add(ihave) +func `/`(a, b: Duration): float64 = + let + fa = float64(a.nanoseconds) / 1000000000 + fb = float64(b.nanoseconds) / 1000000000 + fa / fb + proc updateScores(g: GossipSub) = # avoid async debug "updating scores", peers = g.peers.len let now = Moment.now() - for peer, stats in g.peerStats: + for peer, stats in g.peerStats.mpairs: debug "updating peer score", peer, gossipTopics = peer.topics.len # TODO @@ -375,15 +463,26 @@ proc updateScores(g: GossipSub) = # avoid async for topic in peer.topics: debug "updating peer topic's scores", peer, topic - # Defect on purpose, no magic here please, this should not fail! - let topicParams = g.topics[topic].parameters - var info = stats.topicInfos[topic] + var topicParams = g.topicParams.mgetOrPut(topic, TopicParams.init()) + var info = stats.topicInfos.mgetOrPut(topic, TopicInfo()) + var topicScore = 0'f64 + if info.inMesh: info.meshTime = now - info.graftTime if info.meshTime > topicParams.meshMessageDeliveriesActivation: info.meshMessageDeliveriesActive = true + + var p1 = info.meshTime / topicParams.timeInMeshQuantum + if p1 > topicParams.timeInMeshCap: + p1 = topicParams.timeInMeshCap + topicScore = p1 * topicParams.timeInMeshWeight + + peer.score += topicScore * topicParams.topicWeight + # debug assert to check nim compiler is doing what we are asking... assert(stats.topicInfos[topic].meshTime == info.meshTime) + + debug "updated peer's score", peer, score = peer.score proc heartbeat(g: GossipSub) {.async.} = while g.heartbeatRunning: @@ -412,7 +511,8 @@ proc heartbeat(g: GossipSub) {.async.} = except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception ocurred in gossipsub heartbeat", exc = exc.msg + warn "exception ocurred in gossipsub heartbeat", exc = exc.msg, trace = exc.getStackTrace() + assert(false, "exception ocurred in gossipsub heartbeat") await sleepAsync(GossipSubHeartbeatInterval) @@ -445,7 +545,7 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = g.explicitPeers.excl(peer.id) # don't retain bad score peers - if peer.score() > 0: + if peer.score > 0: g.peerStats.del(peer) return @@ -684,7 +784,7 @@ method publish*(g: GossipSub, if g.parameters.floodPublish: for id, peer in g.peers: if topic in peer.topics and - peer.score() >= g.parameters.publishThreshold: + peer.score >= g.parameters.publishThreshold: debug "publish: including flood/high score peer", peer = id peers.incl(peer) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index bdf61d899..784b06abb 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -48,40 +48,10 @@ type MsgIdProvider* = proc(m: Message): string {.noSideEffect, raises: [Defect], nimcall, gcsafe.} - TopicParams* = object - topicWeight*: float - - # p1 - timeInMeshWeight*: float - timeinMeshQuantum*: Duration - timeInMeshCap*: float - - # p2 - firstMessageDeliveriesWeight*: float - firstMessageDeliveriesDecay*: float - firstMessageDeliveriesCap*: float - - # p3 - meshMessageDeliveriesWeight*: float - meshMessageDeliveriesDecay*: float - meshMessageDeliveriesThreshold*: float - meshMessageDeliveriesCap*: float - meshMessageDeliveriesActivation*: Duration - meshMessageDeliveriesWindow*: Duration - - # p3b - meshFailurePenaltyWeight*: float - meshFailurePenaltyDecay*: float - - # p4 - invalidMessageDeliveriesWeight*: float - invalidMessageDeliveriesDecay*: float - Topic* = object # make this a variant type if one day we have different Params structs name*: string handler*: seq[TopicHandler] - parameters*: TopicParams PubSub* = ref object of LPProtocol peerInfo*: PeerInfo # this peer's info @@ -97,49 +67,6 @@ type msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) msgSeqno*: uint64 -proc init*(_: type[TopicParams]): TopicParams = - TopicParams( - topicWeight: 1.0, - timeInMeshWeight: 0.01, - timeinMeshQuantum: 1.seconds, - timeInMeshCap: 10.0, - firstMessageDeliveriesWeight: 1.0, - firstMessageDeliveriesDecay: 0.5, - firstMessageDeliveriesCap: 10.0, - meshMessageDeliveriesWeight: -1.0, - meshMessageDeliveriesDecay: 0.5, - meshMessageDeliveriesCap: 10, - meshMessageDeliveriesThreshold: 5, - meshMessageDeliveriesWindow: 5.milliseconds, - meshMessageDeliveriesActivation: 1.seconds, - meshFailurePenaltyWeight: -1.0, - meshFailurePenaltyDecay: 0.5, - invalidMessageDeliveriesWeight: -1.0, - invalidMessageDeliveriesDecay: 0.5 - ) - -proc validateParameters*(parameters: TopicParams): Result[void, cstring] = - if parameters.timeInMeshWeight <= 0.0 or parameters.timeInMeshWeight > 1.0: - err("gossipsub: timeInMeshWeight parameter error, Must be a small positive value") - elif parameters.timeInMeshCap <= 0.0: - err("gossipsub: timeInMeshCap parameter error, Should be a positive value") - elif parameters.firstMessageDeliveriesWeight <= 0.0: - err("gossipsub: firstMessageDeliveriesWeight parameter error, Should be a positive value") - elif parameters.meshMessageDeliveriesWeight >= 0.0: - err("gossipsub: meshMessageDeliveriesWeight parameter error, Should be a negative value") - elif parameters.meshMessageDeliveriesThreshold <= 0.0: - err("gossipsub: meshMessageDeliveriesThreshold parameter error, Should be a positive value") - elif parameters.meshMessageDeliveriesCap < parameters.meshMessageDeliveriesThreshold: - err("gossipsub: meshMessageDeliveriesCap parameter error, Should be >= meshMessageDeliveriesThreshold") - elif parameters.meshMessageDeliveriesWindow > 100.milliseconds: - err("gossipsub: meshMessageDeliveriesWindow parameter error, Should be small, 1-5ms") - elif parameters.meshFailurePenaltyWeight >= 0.0: - err("gossipsub: meshFailurePenaltyWeight parameter error, Should be a negative value") - elif parameters.invalidMessageDeliveriesWeight >= 0.0: - err("gossipsub: invalidMessageDeliveriesWeight parameter error, Should be a negative value") - else: - ok() - method handleConnect*(p: PubSub, peer: PubSubPeer) {.base.} = discard @@ -352,7 +279,7 @@ method subscribe*(p: PubSub, ## if topic notin p.topics: trace "subscribing to topic", name = topic - p.topics[topic] = Topic(name: topic, parameters: TopicParams.init()) + p.topics[topic] = Topic(name: topic) p.topics[topic].handler.add(handler) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index bd9e30d5b..23e2e9e15 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -42,14 +42,12 @@ type onConnect*: AsyncEvent observers*: ref seq[PubSubObserver] # ref as in smart_ptr + score*: float64 + RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} chronicles.formatIt(PubSubPeer): it.peerInfo.id -func score*(p: PubSubPeer): float64 = - # TODO - 0.0 - func hash*(p: PubSubPeer): Hash = # int is either 32/64, so intptr basically, pubsubpeer is a ref cast[pointer](p).hash diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 865933a5e..736e53164 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -308,6 +308,10 @@ suite "GossipSub": passed.complete(true) var nodes = generateNodes(2, true) + var gossipSub1: GossipSub = GossipSub(nodes[0].pubSub.get()) + gossipSub1.parameters.floodPublish = false + var gossipSub2: GossipSub = GossipSub(nodes[1].pubSub.get()) + gossipSub2.parameters.floodPublish = false var wait: seq[Future[void]] wait.add(await nodes[0].start()) wait.add(await nodes[1].start())