This commit is contained in:
Giovanni Petrantoni 2020-07-21 11:47:46 +09:00
parent 884d05cbc2
commit b61f1da9a8
2 changed files with 20 additions and 10 deletions

View File

@ -479,8 +479,7 @@ proc updateScores(g: GossipSub) = # avoid async
continue continue
# Per topic # Per topic
for topic in peer.topics: for topic, topicParams in g.topicParams:
var topicParams = g.topicParams.mgetOrPut(topic, TopicParams.init())
var info = stats.topicInfos.getOrDefault(topic) var info = stats.topicInfos.getOrDefault(topic)
# Scoring # Scoring
@ -734,6 +733,15 @@ proc handleIWant(g: GossipSub,
if msg.isSome: if msg.isSome:
result.add(msg.get()) result.add(msg.get())
proc punishPeer(g: GossipSub, peer: PubSubPeer, msg: Message) =
for t in msg.topicIDs:
# ensure we init a new topic if unknown
let _ = g.topicParams.mgetOrPut(t, TopicParams.init())
# update stats
var tstats = g.peerStats[peer].topicInfos.getOrDefault(t)
tstats.invalidMessageDeliveries += 1
g.peerStats[peer].topicInfos[t] = tstats
method rpcHandler*(g: GossipSub, method rpcHandler*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async.} = rpcMsgs: seq[RPCMsg]) {.async.} =
@ -755,15 +763,13 @@ method rpcHandler*(g: GossipSub,
g.seen.put(msgId) # add the message to the seen cache g.seen.put(msgId) # add the message to the seen cache
if g.verifySignature and not msg.verify(peer.peerInfo): if g.verifySignature and not msg.verify(peer.peerInfo):
trace "dropping message due to failed signature verification" trace "dropping message due to failed signature verification", peer
g.punishPeer(peer, msg)
continue continue
if not (await g.validate(msg)): if not (await g.validate(msg)):
trace "dropping message due to failed validation" trace "dropping message due to failed validation", peer
for t in msg.topicIDs: g.punishPeer(peer, msg)
var tstats = g.peerStats[peer].topicInfos.getOrDefault(t)
tstats.invalidMessageDeliveries += 1
g.peerStats[peer].topicInfos[t] = tstats
continue continue
# this shouldn't happen # this shouldn't happen

View File

@ -1,13 +1,17 @@
import random import random, options
import chronos import chronos
import ../../libp2p/standard_setup import ../../libp2p/standard_setup
import ../../libp2p/protocols/pubsub/gossipsub
export standard_setup export standard_setup
randomize() randomize()
proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] = proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] =
for i in 0..<num: for i in 0..<num:
result.add(newStandardSwitch(gossip = gossip)) var switch = newStandardSwitch(gossip = gossip)
var gossip = GossipSub(switch.pubSub.get())
gossip.parameters.floodPublish = false
result.add(switch)
proc subscribeNodes*(nodes: seq[Switch]) {.async.} = proc subscribeNodes*(nodes: seq[Switch]) {.async.} =
var dials: seq[Future[void]] var dials: seq[Future[void]]