From 75b023c9e57b090dc67f928285c3d08105231ba9 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com> Date: Fri, 30 Oct 2020 21:49:54 +0900 Subject: [PATCH] gossipsub audit fixes (#412) * [SEC] gossipsub - rebalanceMesh grafts peers giving preference to low scores #405 * comment score choices * compiler warning fixes/bug fixes (unsubscribe) * rebalanceMesh does not enforce D_out quota * fix outbound grafting * fight the nim compiler * fix closure capture bs... * another closure fix * #403 rebalance prune fixes * more test fixing * #403 fixes * #402 avoid removing scores on unsub * #401 handleGraft improvements * [SEC] handleIHAVE/handleIWANT recommendations * add a note about peer exchange handling --- libp2p/crypto/ecnist.nim | 2 +- libp2p/multiaddress.nim | 4 +- libp2p/protocols/pubsub/gossipsub.nim | 179 +++++++++++++++----------- libp2p/protocols/pubsub/pubsub.nim | 18 ++- tests/pubsub/testgossipsub.nim | 21 ++- 5 files changed, 130 insertions(+), 94 deletions(-) diff --git a/libp2p/crypto/ecnist.nim b/libp2p/crypto/ecnist.nim index 8c3ebdc..c3a4338 100644 --- a/libp2p/crypto/ecnist.nim +++ b/libp2p/crypto/ecnist.nim @@ -510,7 +510,7 @@ proc getRawBytes*(pubkey: EcPublicKey): EcResult[seq[byte]] = let length = ? pubkey.toRawBytes(res) res.setLen(length) discard ? pubkey.toRawBytes(res) - ok(res) + return ok(res) else: return err(EcKeyIncorrectError) diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index 2b2c712..c6e3d3c 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -873,7 +873,9 @@ proc init*(mtype: typedesc[MultiAddress], address: TransportAddress, let protoProto = case protocol of IPPROTO_TCP: getProtocol("tcp") of IPPROTO_UDP: getProtocol("udp") - else: return err("multiaddress: protocol should be either TCP or UDP") + else: default(MAProtocol) + if protoProto.size == 0: + return err("multiaddress: protocol should be either TCP or UDP") if address.family == AddressFamily.IPv4: res.data.write(getProtocol("ip4").mcodec) res.data.writeArray(address.address_v4) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 10cdadc..b5b0366 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -260,7 +260,7 @@ proc validateParameters*(parameters: TopicParams): Result[void, cstring] = else: ok() -func byScore(x,y: PubSubPeer): int = (x.score - y.score).int +func byScore(x,y: PubSubPeer): int = system.cmp(x.score, y.score) method init*(g: GossipSub) = proc handler(conn: Connection, proto: string) {.async.} = @@ -408,8 +408,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = # shuffle anyway, score might be not used shuffle(grafts) - # sort peers by score - grafts.sort(byScore) + # sort peers by score, high score first since we graft + grafts.sort(byScore, SortOrder.Descending) # Graft peers so we reach a count of D grafts.setLen(min(grafts.len, GossipSubD - g.mesh.peers(topic))) @@ -421,59 +421,63 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = g.fanout.removePeer(topic, peer) grafting &= peer - elif npeers < g.parameters.dOut: - trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic) - # replenish the mesh if we're below Dlo - grafts = toSeq( - g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - - g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) - ) + else: + var meshPeers = toSeq(g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())) + meshPeers.keepIf do (x: PubSubPeer) -> bool: x.outbound + if meshPeers.len < g.parameters.dOut: + trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic) + + grafts = toSeq( + g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - + g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) + ) - grafts.keepIf do (x: PubSubPeer) -> bool: - # get only outbound ones - x.outbound and - # avoid negative score peers - x.score >= 0.0 and - # don't pick explicit peers - x.peerId notin g.parameters.directPeers and - # and avoid peers we are backing off - x.peerId notin g.backingOff + grafts.keepIf do (x: PubSubPeer) -> bool: + # get only outbound ones + x.outbound and + # avoid negative score peers + x.score >= 0.0 and + # don't pick explicit peers + x.peerId notin g.parameters.directPeers and + # and avoid peers we are backing off + x.peerId notin g.backingOff - # shuffle anyway, score might be not used - shuffle(grafts) + # shuffle anyway, score might be not used + shuffle(grafts) - # sort peers by score - grafts.sort(byScore) + # sort peers by score, high score first, we are grafting + grafts.sort(byScore, SortOrder.Descending) - # Graft peers so we reach a count of D - grafts.setLen(min(grafts.len, g.parameters.dOut - g.mesh.peers(topic))) + # Graft peers so we reach a count of D + grafts.setLen(min(grafts.len, g.parameters.dOut)) - trace "grafting outbound peers", topic, peers = grafts.len + trace "grafting outbound peers", topic, peers = grafts.len - for peer in grafts: - if g.mesh.addPeer(topic, peer): - g.grafted(peer, topic) - g.fanout.removePeer(topic, peer) - grafting &= peer + for peer in grafts: + if g.mesh.addPeer(topic, peer): + g.grafted(peer, topic) + g.fanout.removePeer(topic, peer) + grafting &= peer if g.mesh.peers(topic) > GossipSubDhi: # prune peers if we've gone over Dhi prunes = toSeq(g.mesh[topic]) + # avoid pruning peers we are currently grafting in this heartbeat + prunes.keepIf do (x: PubSubPeer) -> bool: x notin grafting + let mesh = prunes # shuffle anyway, score might be not used shuffle(prunes) - # sort peers by score (inverted) - prunes.sort(byScore) + # sort peers by score (inverted), pruning, so low score peers are on top + prunes.sort(byScore, SortOrder.Ascending) # keep high score peers if prunes.len > g.parameters.dScore: prunes.setLen(prunes.len - g.parameters.dScore) - # we must try to keep outbound peers - # to keep an outbound mesh quota - # so we try to first prune inbound peers - # if none we add up some outbound + + # collect inbound/outbound info var outbound: seq[PubSubPeer] var inbound: seq[PubSubPeer] for peer in prunes: @@ -482,20 +486,25 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = else: inbound &= peer + # ensure that there are at least D_out peers first and rebalance to GossipSubD after that + let maxOutboundPrunes = + block: + var count = 0 + for peer in mesh: + if peer.outbound: + inc count + count - g.parameters.dOut + outbound.setLen(min(outbound.len, max(0, maxOutboundPrunes))) + + # concat remaining outbound peers + inbound &= outbound + let pruneLen = inbound.len - GossipSubD if pruneLen > 0: # Ok we got some peers to prune, # for this heartbeat let's prune those shuffle(inbound) inbound.setLen(pruneLen) - else: - # We could not find any inbound to prune - # Yet we are on Hi, so we need to cull outbound peers - let keepDOutLen = outbound.len - g.parameters.dOut - if keepDOutLen > 0: - shuffle(outbound) - outbound.setLen(keepDOutLen) - inbound &= outbound trace "pruning", prunes = inbound.len for peer in inbound: @@ -505,7 +514,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = # opportunistic grafting, by spec mesh should not be empty... if g.mesh.peers(topic) > 1: var peers = toSeq(g.mesh[topic]) - peers.sort(byScore) + # grafting so high score has priority + peers.sort(byScore, SortOrder.Descending) let medianIdx = peers.len div 2 let median = peers[medianIdx] if median.score < g.parameters.opportunisticGraftThreshold: @@ -728,7 +738,7 @@ proc updateScores(g: GossipSub) = # avoid async if peer.behaviourPenalty < g.parameters.decayToZero: peer.behaviourPenalty = 0 - debug "updated peer's score", peer, score = peer.score, n_topics, is_grafted + trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted for peer in evicting: g.peerStats.del(peer) @@ -842,11 +852,6 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = libp2p_gossipsub_peers_per_topic_fanout .set(g.fanout.peers(t).int64, labelValues = [t]) - # don't retain bad score peers - if pubSubPeer.score < 0.0: - g.peerStats.del(pubSubPeer) - return - g.peerStats[pubSubPeer].expire = Moment.now() + g.parameters.retainScore for topic, info in g.peerStats[pubSubPeer].topicInfos.mpairs: info.firstMessageDeliveries = 0 @@ -893,6 +898,15 @@ method subscribeTopic*(g: GossipSub, trace "gossip peers", peers = g.gossipsub.peers(topic), topic +proc punishPeer(g: GossipSub, peer: PubSubPeer, topics: seq[string]) = + for t in topics: + # ensure we init a new topic if unknown + let _ = g.topicParams.mgetOrPut(t, TopicParams.init()) + # update stats + var tstats = g.peerStats[peer].topicInfos.getOrDefault(t) + tstats.invalidMessageDeliveries += 1 + g.peerStats[peer].topicInfos[t] = tstats + proc handleGraft(g: GossipSub, peer: PubSubPeer, grafts: seq[ControlGraft]): seq[ControlPrune] = @@ -906,17 +920,21 @@ proc handleGraft(g: GossipSub, # It is an error to GRAFT on a explicit peer if peer.peerId in g.parameters.directPeers: - trace "attempt to graft an explicit peer", peer=peer.id, + # receiving a graft from a direct peer should yield a more prominent warning (protocol violation) + warn "attempt to graft an explicit peer", peer=peer.id, topicID=graft.topicID # and such an attempt should be logged and rejected with a PRUNE result.add(ControlPrune( topicID: graft.topicID, peers: @[], # omitting heavy computation here as the remote did something illegal backoff: g.parameters.pruneBackoff.seconds.uint64)) + + g.punishPeer(peer, @[topic]) + continue - if peer.peerId in g.backingOff: - trace "attempt to graft an backingOff peer", peer=peer.id, + if peer.peerId in g.backingOff and g.backingOff[peer.peerId] > Moment.now(): + trace "attempt to graft a backingOff peer", peer=peer.id, topicID=graft.topicID, expire=g.backingOff[peer.peerId] # and such an attempt should be logged and rejected with a PRUNE @@ -924,10 +942,18 @@ proc handleGraft(g: GossipSub, topicID: graft.topicID, peers: @[], # omitting heavy computation here as the remote did something illegal backoff: g.parameters.pruneBackoff.seconds.uint64)) + + g.punishPeer(peer, @[topic]) + continue if peer notin g.peerStats: - g.peerStats[peer] = PeerStats() + g.onNewPeer(peer) + + # not in the spec exactly, but let's avoid way too low score peers + # other clients do it too also was an audit recommendation + if peer.score < g.parameters.publishThreshold: + continue # If they send us a graft before they send us a subscribe, what should # we do? For now, we add them to mesh but don't add them to gossipsub. @@ -970,6 +996,10 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = g.pruned(peer, prune.topicID) g.mesh.removePeer(prune.topicID, peer) + # TODO peer exchange, we miss ambient peer discovery in libp2p, so we are blocked by that + # another option could be to implement signed peer records + ## if peer.score > g.parameters.gossipThreshold and prunes.peers.len > 0: + when defined(libp2p_expensive_metrics): libp2p_gossipsub_peers_per_topic_mesh .set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID]) @@ -979,26 +1009,36 @@ proc handleIHave(g: GossipSub, ihaves: seq[ControlIHave]): ControlIWant = if peer.score < g.parameters.gossipThreshold: trace "ihave: ignoring low score peer", peer, score = peer.score - elif peer.iHaveBudget == 0: + elif peer.iHaveBudget <= 0: trace "ihave: ignoring out of budget peer", peer, score = peer.score else: - dec peer.iHaveBudget - for ihave in ihaves: + var deIhaves = ihaves.deduplicate() + for ihave in deIhaves.mitems: trace "peer sent ihave", peer, topic = ihave.topicID, msgs = ihave.messageIDs - if ihave.topicID in g.mesh: for m in ihave.messageIDs: if m notin g.seen: - result.messageIDs.add(m) + if peer.iHaveBudget > 0: + result.messageIDs.add(m) + dec peer.iHaveBudget + else: + return + + # shuffling result.messageIDs before sending it out to increase the likelihood + # of getting an answer if the peer truncates the list due to internal size restrictions. + shuffle(result.messageIDs) proc handleIWant(g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]): seq[Message] = if peer.score < g.parameters.gossipThreshold: trace "iwant: ignoring low score peer", peer, score = peer.score + elif peer.iWantBudget <= 0: + trace "iwant: ignoring out of budget peer", peer, score = peer.score else: - for iwant in iwants: + var deIwants = iwants.deduplicate() + for iwant in deIwants: for mid in iwant.messageIDs: trace "peer sent iwant", peer, messageID = mid let msg = g.mcache.get(mid) @@ -1010,15 +1050,6 @@ proc handleIWant(g: GossipSub, else: return -proc punishPeer(g: GossipSub, peer: PubSubPeer, msg: Message) = - for t in msg.topicIDs: - # ensure we init a new topic if unknown - let _ = g.topicParams.mgetOrPut(t, TopicParams.init()) - # update stats - var tstats = g.peerStats[peer].topicInfos.getOrDefault(t) - tstats.invalidMessageDeliveries += 1 - g.peerStats[peer].topicInfos[t] = tstats - method rpcHandler*(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async.} = @@ -1050,13 +1081,13 @@ method rpcHandler*(g: GossipSub, if (msg.signature.len > 0 or g.verifySignature) and not msg.verify(): # always validate if signature is present or required debug "Dropping message due to failed signature verification", msgId, peer - g.punishPeer(peer, msg) + g.punishPeer(peer, msg.topicIDs) continue if msg.seqno.len > 0 and msg.seqno.len != 8: # if we have seqno should be 8 bytes long debug "Dropping message due to invalid seqno length", msgId, peer - g.punishPeer(peer, msg) + g.punishPeer(peer, msg.topicIDs) continue # g.anonymize needs no evaluation when receiving messages @@ -1066,7 +1097,7 @@ method rpcHandler*(g: GossipSub, case validation of ValidationResult.Reject: debug "Dropping message due to failed validation", msgId, peer - g.punishPeer(peer, msg) + g.punishPeer(peer, msg.topicIDs) continue of ValidationResult.Ignore: debug "Dropping message due to ignored validation", msgId, peer diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 228bd59..f9ef13a 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -225,15 +225,19 @@ method unsubscribe*(p: PubSub, topics: seq[TopicPair]) {.base, async.} = ## unsubscribe from a list of ``topic`` strings for t in topics: - p.topics.withValue(t.topic, topic): - topic[].handler.keepIf(proc (x: auto): bool = x != t.handler) + let + handler = t.handler + ttopic = t.topic + closureScope: + p.topics.withValue(ttopic, topic): + topic[].handler.keepIf(proc (x: auto): bool = x != handler) - if topic[].handler.len == 0: - # make sure we delete the topic if - # no more handlers are left - p.topics.del(t.topic) + if topic[].handler.len == 0: + # make sure we delete the topic if + # no more handlers are left + p.topics.del(ttopic) - libp2p_pubsub_topics.set(p.topics.len.int64) + libp2p_pubsub_topics.set(p.topics.len.int64) proc unsubscribe*(p: PubSub, topic: string, diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 77046be..06d9b5f 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -539,11 +539,11 @@ suite "GossipSub": for dialer in nodes: var handler: TopicHandler closureScope: - var dialerNode = dialer + var peerName = $dialer.peerInfo.peerId handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} = - if $dialerNode.peerInfo.peerId notin seen: - seen[$dialerNode.peerInfo.peerId] = 0 - seen[$dialerNode.peerInfo.peerId].inc + if peerName notin seen: + seen[peerName] = 0 + seen[peerName].inc check topic == "foobar" if not seenFut.finished() and seen.len >= runs: seenFut.complete() @@ -588,17 +588,16 @@ suite "GossipSub": await allFuturesThrowing(nodes.mapIt(it.start())) await subscribeNodes(nodes) - var seen: Table[PeerID, int] + var seen: Table[string, int] var seenFut = newFuture[void]() for dialer in nodes: var handler: TopicHandler closureScope: - var dialerNode = dialer - handler = proc(topic: string, data: seq[byte]) - {.async, gcsafe, closure.} = - if dialerNode.peerInfo.peerId notin seen: - seen[dialerNode.peerInfo.peerId] = 0 - seen[dialerNode.peerInfo.peerId].inc + var peerName = $dialer.peerInfo.peerId + handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} = + if peerName notin seen: + seen[peerName] = 0 + seen[peerName].inc check topic == "foobar" if not seenFut.finished() and seen.len >= runs: seenFut.complete()