Gossipsub: don't send to peers seen during validation (#648)

* Gossipsub: don't send to peers seen during validation

* Less error prone code

* add metric

* Fix metric

* remove dangling code test

* address comments

* don't allocate memory
This commit is contained in:
Tanguy 2021-11-14 09:08:05 +01:00 committed by GitHub
parent 7d677f848f
commit 6f779c47c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 107 additions and 2 deletions

View File

@ -37,6 +37,7 @@ logScope:
declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish")
declareCounter(libp2p_gossipsub_invalid_topic_subscription, "number of invalid topic subscriptions that happened")
declareCounter(libp2p_gossipsub_duplicate_during_validation, "number of duplicates received during message validation")
proc init*(_: type[GossipSubParams]): GossipSubParams =
GossipSubParams(
@ -295,7 +296,9 @@ method rpcHandler*(g: GossipSub,
for i in 0..<rpcMsg.messages.len(): # for every message
template msg: untyped = rpcMsg.messages[i]
let msgId = g.msgIdProvider(msg)
let
msgId = g.msgIdProvider(msg)
msgIdSalted = msgId & g.seenSalt
# addSeen adds salt to msgId to avoid
# remote attacking the hash function
@ -306,6 +309,8 @@ method rpcHandler*(g: GossipSub,
# score only if messages are not too old.
g.rewardDelivered(peer, msg.topicIDs, false)
g.validationSeen.withValue(msgIdSalted, seen): seen[].incl(peer)
# onto the next message
continue
@ -331,7 +336,16 @@ method rpcHandler*(g: GossipSub,
# g.anonymize needs no evaluation when receiving messages
# as we have a "lax" policy and allow signed messages
# Be careful not to fill the validationSeen table
# (eg, pop everything you put in it)
g.validationSeen[msgIdSalted] = initHashSet[PubSubPeer]()
let validation = await g.validate(msg)
var seenPeers: HashSet[PubSubPeer]
discard g.validationSeen.pop(msgIdSalted, seenPeers)
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
case validation
of ValidationResult.Reject:
debug "Dropping message after validation, reason: reject",
@ -350,7 +364,7 @@ method rpcHandler*(g: GossipSub,
g.rewardDelivered(peer, msg.topicIDs, true)
var toSendPeers = initHashSet[PubSubPeer]()
var toSendPeers = HashSet[PubSubPeer]()
for t in msg.topicIDs: # for every topic in the message
if t notin g.topics:
continue
@ -360,6 +374,11 @@ method rpcHandler*(g: GossipSub,
await handleData(g, t, msg.data)
# Don't send it to source peer, or peers that
# sent it during validation
toSendPeers.excl(peer)
toSendPeers.excl(seenPeers)
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))

View File

@ -139,6 +139,7 @@ type
disconnectBadPeers*: bool
BackoffTable* = Table[string, Table[PeerID, Moment]]
ValidationSeenTable* = Table[MessageID, HashSet[PubSubPeer]]
GossipSub* = ref object of FloodSub
mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic
@ -150,6 +151,7 @@ type
gossip*: Table[string, seq[ControlIHave]] # pending gossip
control*: Table[string, ControlMessage] # pending control messages
mcache*: MCache # messages cache
validationSeen*: ValidationSeenTable # peers who sent us message in validation
heartbeatFut*: Future[void] # cancellation future for heartbeat interval
heartbeatRunning*: bool

View File

@ -22,6 +22,7 @@ import utils, ../../libp2p/[errors,
protocols/pubsub/gossipsub,
protocols/pubsub/pubsubpeer,
protocols/pubsub/peertable,
protocols/pubsub/timedcache,
protocols/pubsub/rpc/messages]
import ../helpers
@ -556,6 +557,89 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - GossipSub should not send to source & peers who already seen":
# 3 nodes: A, B, C
# A publishes, B relays, C is having a long validation
# so C should not send to anyone
let
nodes = generateNodes(
3,
gossip = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
nodes[2].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
nodes[2].start(),
))
await subscribeNodes(nodes)
var cRelayed: Future[void] = newFuture[void]()
var bFinished: Future[void] = newFuture[void]()
var
aReceived = 0
cReceived = 0
proc handlerA(topic: string, data: seq[byte]) {.async, gcsafe.} =
inc aReceived
check aReceived < 2
proc handlerB(topic: string, data: seq[byte]) {.async, gcsafe.} = discard
proc handlerC(topic: string, data: seq[byte]) {.async, gcsafe.} =
inc cReceived
check cReceived < 2
cRelayed.complete()
nodes[0].subscribe("foobar", handlerA)
nodes[1].subscribe("foobar", handlerB)
nodes[2].subscribe("foobar", handlerC)
await waitSub(nodes[0], nodes[1], "foobar")
await waitSub(nodes[0], nodes[2], "foobar")
await waitSub(nodes[2], nodes[1], "foobar")
await waitSub(nodes[1], nodes[2], "foobar")
var gossip1: GossipSub = GossipSub(nodes[0])
var gossip2: GossipSub = GossipSub(nodes[1])
var gossip3: GossipSub = GossipSub(nodes[2])
proc slowValidator(topic: string, message: Message): Future[ValidationResult] {.async.} =
await cRelayed
# Empty A & C caches to detect duplicates
gossip1.seen = TimedCache[MessageId].init()
gossip3.seen = TimedCache[MessageId].init()
let msgId = toSeq(gossip2.validationSeen.keys)[0]
check await checkExpiring(try: gossip2.validationSeen[msgId].len > 0 except: false)
result = ValidationResult.Accept
bFinished.complete()
nodes[1].addValidator("foobar", slowValidator)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
await bFinished
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop(),
nodes[2].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop(),
nodes[2].stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - GossipSub send over floodPublish A -> B":
var passed: Future[bool] = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =