Gossip subscription improvements (#497)

* salt ids in seen table

* add subscription validation callback and avoid processing topics we don't care of

* apply penalty on bad subscription

* fix IHave handling IDs

* reduce indenting, add some comments

* fix gossip randombytes generation

* do not descore unwanted topics (might happen, due to timing, needs improvements)

* cleaning up and added tests

* validate subscriptions only when subscribing

* set notice level for failed publish

* fix floodsub behavior
This commit is contained in:
Giovanni Petrantoni 2021-01-13 23:49:44 +09:00 committed by GitHub
parent 87be2c7f1f
commit dc48170b0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 167 additions and 21 deletions

View File

@ -45,17 +45,25 @@ method subscribeTopic*(f: FloodSub,
trace "ignoring unknown peer"
return
procCall PubSub(f).subscribeTopic(topic, subscribe, peer)
if topic notin f.floodsub:
f.floodsub[topic] = initHashSet[PubSubPeer]()
if subscribe and not(isNil(f.subscriptionValidator)) and not(f.subscriptionValidator(topic)):
# this is a violation, so warn should be in order
warn "ignoring invalid topic subscription", topic, peer
return
if subscribe:
if topic notin f.floodsub:
f.floodsub[topic] = initHashSet[PubSubPeer]()
trace "adding subscription for topic", peer, topic
# subscribe the peer to the topic
f.floodsub[topic].incl(peer)
else:
if topic notin f.floodsub:
return
trace "removing subscription for topic", peer, topic
# unsubscribe the peer from the topic
f.floodsub[topic].excl(peer)

View File

@ -8,7 +8,7 @@
## those terms.
import std/[tables, sets, options, sequtils, strutils, random, algorithm]
import chronos, chronicles, metrics
import chronos, chronicles, metrics, bearssl
import ./pubsub,
./floodsub,
./pubsubpeer,
@ -20,7 +20,8 @@ import ./pubsub,
../../stream/connection,
../../peerinfo,
../../peerid,
../../utility
../../utility,
../../crypto/curve25519
import stew/results
export results
@ -106,7 +107,10 @@ type
PeerStats* = object
topicInfos*: Table[string, TopicInfo]
expire*: Moment # updated on disconnect, to retain scores until expire
# the following are copies from PubSubPeer, in order to restore them on re-connection
score*: float64 # a copy of the score to keep in case the peer is disconnected
appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score
GossipSubParams* = object
explicit: bool
@ -166,6 +170,8 @@ type
heartbeatEvents*: seq[AsyncEvent]
randomBytes: seq[byte]
MeshMetrics = object
# scratch buffers for metrics
otherPeersPerTopicMesh: int64
@ -334,7 +340,10 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) =
else:
# we knew this peer
# restore previously stored score
peer.score = g.peerStats[peer.peerId].score
let stats = g.peerStats[peer.peerId]
peer.score = stats.score
peer.appScore = stats.appScore
peer.behaviourPenalty = stats.behaviourPenalty
proc grafted(g: GossipSub, p: PubSubPeer, topic: string) =
g.peerStats.withValue(p.peerId, stats):
@ -823,6 +832,8 @@ proc updateScores(g: GossipSub) = # avoid async
# copy into stats the score to keep until expired
stats.score = peer.score
stats.appScore = peer.appScore
stats.behaviourPenalty = peer.behaviourPenalty
assert(g.peerStats[peer.peerId].score == peer.score) # nim sanity check
trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted
@ -882,7 +893,6 @@ proc heartbeat(g: GossipSub) {.async.} =
# do this before relance
# in order to avoid grafted -> pruned in the same cycle
let meshPeers = g.mesh.getOrDefault(t)
let gossipPeers = g.gossipsub.getOrDefault(t)
var prunes: seq[PubSubPeer]
for peer in meshPeers:
if peer.score < 0.0:
@ -984,17 +994,23 @@ method subscribeTopic*(g: GossipSub,
trace "ignoring unknown peer"
return
# Skip floodsub - we don't want it to add the peer to `g.floodsub`
procCall PubSub(g).subscribeTopic(topic, subscribe, peer)
if subscribe and not(isNil(g.subscriptionValidator)) and not(g.subscriptionValidator(topic)):
# this is a violation, so warn should be in order
warn "ignoring invalid topic subscription", topic, peer
# also punish
peer.behaviourPenalty += 1
return
if subscribe:
trace "peer subscribed to topic"
# subscribe remote peer to the topic
discard g.gossipsub.addPeer(topic, peer)
if peer.peerId in g.parameters.directPeers:
discard g.explicit.addPeer(topic, peer)
else:
trace "peer unsubscribed from topic"
# unsubscribe remote peer from the topic
g.gossipsub.removePeer(topic, peer)
g.mesh.removePeer(topic, peer)
@ -1006,6 +1022,9 @@ method subscribeTopic*(g: GossipSub,
proc punishPeer(g: GossipSub, peer: PubSubPeer, topics: seq[string]) =
for t in topics:
if t notin g.topics:
continue
# ensure we init a new topic if unknown
let _ = g.topicParams.mgetOrPut(t, TopicParams.init())
# update stats
@ -1125,7 +1144,8 @@ proc handleIHave(g: GossipSub,
peer, topic = ihave.topicID, msgs = ihave.messageIDs
if ihave.topicID in g.mesh:
for m in ihave.messageIDs:
if m notin g.seen:
let msgId = m & g.randomBytes
if msgId notin g.seen:
if peer.iHaveBudget > 0:
result.messageIDs.add(m)
dec peer.iHaveBudget
@ -1165,11 +1185,17 @@ method rpcHandler*(g: GossipSub,
for msg in rpcMsg.messages: # for every message
let msgId = g.msgIdProvider(msg)
if g.seen.put(msgId):
# avoid the remote peer from controlling the seen table hashing
# by adding random bytes to the ID we ensure we randomize the IDs
# we do only for seen as this is the great filter from the external world
if g.seen.put(msgId & g.randomBytes):
trace "Dropping already-seen message", msgId = shortLog(msgId), peer
# make sure to update score tho before continuing
for t in msg.topicIDs: # for every topic in the message
for t in msg.topicIDs:
if t notin g.topics:
continue
# for every topic in the message
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
# if in mesh add more delivery score
g.peerStats.withValue(peer.peerId, pstats):
@ -1192,6 +1218,11 @@ method rpcHandler*(g: GossipSub,
# onto the next message
continue
# avoid processing messages we are not interested in
if msg.topicIDs.allIt(it notin g.topics):
debug "Dropping message of topic without subscription", msgId = shortLog(msgId), peer
continue
if (msg.signature.len > 0 or g.verifySignature) and not msg.verify():
# always validate if signature is present or required
debug "Dropping message due to failed signature verification",
@ -1228,6 +1259,9 @@ method rpcHandler*(g: GossipSub,
var toSendPeers = initHashSet[PubSubPeer]()
for t in msg.topicIDs: # for every topic in the message
if t notin g.topics:
continue
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
g.peerStats.withValue(peer.peerId, pstats):
@ -1360,7 +1394,9 @@ method publish*(g: GossipSub,
# base returns always 0
discard await procCall PubSub(g).publish(topic, data)
logScope: topic
logScope:
topic
trace "Publishing message on topic", data = data.shortLog
if topic.len <= 0: # data could be 0/empty
@ -1398,7 +1434,7 @@ method publish*(g: GossipSub,
g.lastFanoutPubSub[topic] = Moment.fromNow(g.parameters.fanoutTTL)
if peers.len == 0:
debug "No peers for topic, skipping publish"
notice "No peers for topic, skipping publish"
# skipping topic as our metrics finds that heavy
libp2p_gossipsub_failed_publish.inc()
return 0
@ -1416,7 +1452,7 @@ method publish*(g: GossipSub,
trace "Created new message", msg = shortLog(msg), peers = peers.len
if g.seen.put(msgId):
if g.seen.put(msgId & g.randomBytes):
# custom msgid providers might cause this
trace "Dropping already-seen message"
return 0
@ -1494,3 +1530,6 @@ method initPubSub*(g: GossipSub) =
g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics
g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip
g.control = initTable[string, ControlMessage]() # pending control messages
var rng = newRng()
g.randomBytes = newSeqUninitialized[byte](32)
brHmacDrbgGenerate(rng[], g.randomBytes)

View File

@ -75,12 +75,15 @@ type
Accept, Reject, Ignore
ValidatorHandler* = proc(topic: string,
message: Message): Future[ValidationResult] {.gcsafe, closure.}
message: Message): Future[ValidationResult] {.gcsafe.}
TopicPair* = tuple[topic: string, handler: TopicHandler]
MsgIdProvider* =
proc(m: Message): MessageID {.noSideEffect, raises: [Defect], nimcall, gcsafe.}
proc(m: Message): MessageID {.noSideEffect, raises: [Defect], gcsafe.}
SubscriptionValidator* =
proc(topic: string): bool {.raises: [Defect], gcsafe.}
Topic* = object
# make this a variant type if one day we have different Params structs
@ -102,6 +105,7 @@ type
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
msgSeqno*: uint64
anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send
subscriptionValidator*: SubscriptionValidator # callback used to validate subscriptions
knownTopics*: HashSet[string]
@ -194,8 +198,9 @@ method subscribeTopic*(p: PubSub,
topic: string,
subscribe: bool,
peer: PubSubPeer) {.base.} =
# called when remote peer subscribes to a topic
discard
# both gossipsub and floodsub diverge, and this super call is not necessary right now
# if necessary remove the assertion
doAssert(false, "unexpected call to pubsub.subscribeTopic")
method rpcHandler*(p: PubSub,
peer: PubSubPeer,
@ -503,6 +508,7 @@ proc init*[PubParams: object | bool](
verifySignature: bool = true,
sign: bool = true,
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
subscriptionValidator: SubscriptionValidator = nil,
parameters: PubParams = false): P =
let pubsub =
when PubParams is bool:
@ -514,7 +520,8 @@ proc init*[PubParams: object | bool](
sign: sign,
peers: initTable[PeerID, PubSubPeer](),
topics: initTable[string, Topic](),
msgIdProvider: msgIdProvider)
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator)
else:
P(switch: switch,
peerInfo: switch.peerInfo,
@ -525,6 +532,7 @@ proc init*[PubParams: object | bool](
peers: initTable[PeerID, PubSubPeer](),
topics: initTable[string, Topic](),
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator,
parameters: parameters)
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} =

View File

@ -414,3 +414,40 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "Drop messages of topics without subscription":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
check false
let topic = "foobar"
# gossipSub.topicParams[topic] = TopicParams.init()
# gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
# gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]()
for i in 0..<30:
let conn = newBufferStream(noop)
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.handler = handler
# generate messages
var seqno = 0'u64
for i in 0..5:
let conn = newBufferStream(noop)
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
inc seqno
let msg = Message.init(some(peerInfo), ("bar" & $i).toBytes(), topic, some(seqno), false)
await gossipSub.rpcHandler(peer, RPCMsg(messages: @[msg]))
check gossipSub.mcache.msgs.len == 0
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

View File

@ -662,3 +662,57 @@ suite "GossipSub":
it.switch.stop())))
await allFuturesThrowing(nodesFut)
asyncTest "GossipSub invalid topic subscription":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
handlerFut.complete(true)
let
nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
var gossip = GossipSub(nodes[0])
let invalidDetected = newFuture[void]()
gossip.subscriptionValidator =
proc(topic: string): bool =
if topic == "foobar":
try:
invalidDetected.complete()
except:
raise newException(Defect, "Exception during subscriptionValidator")
false
else:
true
await subscribeNodes(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
await invalidDetected.wait(10.seconds)
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
await allFuturesThrowing(nodesFut.concat())