From afaa7f2a845006aace5be66c6fa5016fe5653d34 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Mon, 17 Aug 2020 01:20:50 +0900 Subject: [PATCH] finishup with score and ihave budget --- libp2p/multiaddress.nim | 9 ++- libp2p/protocols/pubsub/gossipsub.nim | 86 ++++++++++++++++++++------ libp2p/protocols/pubsub/pubsubpeer.nim | 7 ++- libp2p/switch.nim | 5 ++ 4 files changed, 85 insertions(+), 22 deletions(-) diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index f07c2a24d..859c119bf 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -11,7 +11,7 @@ {.push raises: [Defect].} -import nativesockets +import nativesockets, hashes import tables, strutils, stew/shims/net import chronos import multicodec, multihash, multibase, transcoder, vbuffer, peerid, @@ -56,6 +56,13 @@ const IPPROTO_TCP = Protocol.IPPROTO_TCP IPPROTO_UDP = Protocol.IPPROTO_UDP +proc hash*(a: MultiAddress): Hash = + var h: Hash = 0 + h = h !& hash(a.data.buffer) + h = h !& hash(a.data.offset) + h = h !& hash(a.data.length) + !$h + proc ip4StB(s: string, vb: var VBuffer): bool = ## IPv4 stringToBuffer() implementation. try: diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index e505e8fb5..4100b2c36 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -56,6 +56,7 @@ const const BackoffSlackTime = 2 # seconds IWantPeerBudget = 25 # 25 messages per second ( reset every heartbeat ) + IHavePeerBudget = 10 type TopicInfo* = object @@ -146,6 +147,7 @@ type parameters*: GossipSubParams topicParams*: Table[string, TopicParams] directPeersLoop: Future[void] + peersInIP: Table[MultiAddress, HashSet[PubSubPeer]] heartbeatEvents*: seq[AsyncEvent] @@ -282,6 +284,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) = # new peer g.peerStats[peer] = PeerStats() peer.iWantBudget = IWantPeerBudget + peer.iHaveBudget = IHavePeerBudget return else: # we knew this peer @@ -618,6 +621,24 @@ func `/`(a, b: Duration): float64 = fb = float64(b.nanoseconds) / 1000000000 fa / fb +proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = + if peer.sendConn == nil: + 0.0 + else: + let + address = peer.sendConn.observedAddr + ipPeers = g.peersInIP.getOrDefault(address) + len = ipPeers.len.float64 + if len > g.parameters.ipColocationFactorThreshold: + let over = len - g.parameters.ipColocationFactorThreshold + over * over + else: + # lazy update peersInIP + if address notin g.peersInIP: + g.peersInIP[address] = initHashSet[PubSubPeer]() + g.peersInIP[address].incl(peer) + 0.0 + proc updateScores(g: GossipSub) = # avoid async trace "updating scores", peers = g.peers.len @@ -694,7 +715,18 @@ proc updateScores(g: GossipSub) = # avoid async # Wrap up # commit our changes, mgetOrPut does NOT work as wanted with value types (lent?) stats.topicInfos[topic] = info - + + peer.score += peer.appScore * g.parameters.appSpecificWeight + + peer.score += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight + + peer.score += g.colocationFactor(peer) * g.parameters.ipColocationFactorWeight + + # decay behaviourPenalty + peer.behaviourPenalty *= g.parameters.behaviourPenaltyDecay + if peer.behaviourPenalty < g.parameters.decayToZero: + peer.behaviourPenalty = 0 + trace "updated peer's score", peer, score = peer.score for peer in evicting: @@ -715,9 +747,11 @@ proc heartbeat(g: GossipSub) {.async.} = g.backingOff.del(peer) # reset IWANT budget + # reset IHAVE cap block: for peer in g.peers.values: peer.iWantBudget = IWantPeerBudget + peer.iHaveBudget = IHavePeerBudget g.updateScores() @@ -779,6 +813,11 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = if pubSubPeer.isNil: return + # remove from peer IPs collection too + if pubSubPeer.sendConn != nil: + g.peersInIP.withValue(pubSubPeer.sendConn.observedAddr, s) do: + s[].excl(pubSubPeer) + for t in toSeq(g.gossipsub.keys): g.gossipsub.removePeer(t, pubSubPeer) # also try to remove from explicit table here @@ -947,29 +986,38 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = proc handleIHave(g: GossipSub, peer: PubSubPeer, ihaves: seq[ControlIHave]): ControlIWant = - for ihave in ihaves: - trace "peer sent ihave", - peer = peer.id, topic = ihave.topicID, msgs = ihave.messageIDs + if peer.score < g.parameters.gossipThreshold: + trace "ihave: ignoring low score peer", peer = $peer, score = peer.score + elif peer.iHaveBudget == 0: + trace "ihave: ignoring out of budget peer", peer = $peer, score = peer.score + else: + dec peer.iHaveBudget + for ihave in ihaves: + trace "peer sent ihave", + peer = peer.id, topic = ihave.topicID, msgs = ihave.messageIDs - if ihave.topicID in g.mesh: - for m in ihave.messageIDs: - if m notin g.seen: - result.messageIDs.add(m) + if ihave.topicID in g.mesh: + for m in ihave.messageIDs: + if m notin g.seen: + result.messageIDs.add(m) proc handleIWant(g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]): seq[Message] = - for iwant in iwants: - for mid in iwant.messageIDs: - trace "peer sent iwant", peer = peer.id, messageID = mid - let msg = g.mcache.get(mid) - if msg.isSome: - # avoid spam - if peer.iWantBudget > 0: - result.add(msg.get()) - dec peer.iWantBudget - else: - return + if peer.score < g.parameters.gossipThreshold: + trace "iwant: ignoring low score peer", peer = $peer, score = peer.score + else: + for iwant in iwants: + for mid in iwant.messageIDs: + trace "peer sent iwant", peer = peer.id, messageID = mid + let msg = g.mcache.get(mid) + if msg.isSome: + # avoid spam + if peer.iWantBudget > 0: + result.add(msg.get()) + dec peer.iWantBudget + else: + return proc punishPeer(g: GossipSub, peer: PubSubPeer, msg: Message) = for t in msg.topicIDs: diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index c55f09d24..8bdda1ff7 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -39,7 +39,7 @@ type PubSubPeer* = ref object of RootObj switch*: Switch # switch instance to dial peers codec*: string # the protocol that this peer joined from - sendConn: Connection + sendConn*: Connection peerId*: PeerID handler*: RPCHandler sentRpcCache: TimedCache[string] # cache for already sent messages @@ -50,7 +50,10 @@ type score*: float64 iWantBudget*: int - outbound*: bool + iHaveBudget*: int + outbound*: bool # if this is an outbound connection + appScore*: float64 # application specific score + behaviourPenalty*: float64 # the eventual penalty score RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 2aa17b5ae..9f5b1eec4 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -251,6 +251,11 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = for muxer in s.muxers.values: ms.addHandler(muxer.codecs, muxer) + # add the mounted protocols + # notice this should be kept in sync ... + for handler in s.ms.handlers: + ms.handlers &= handler + # handle subsequent secure requests await ms.handle(sconn)