add back topics in peers, re-enable flood publish

This commit is contained in:
Giovanni Petrantoni 2020-07-04 12:15:35 +09:00
parent 028b390c7f
commit 466f3c9171
3 changed files with 18 additions and 8 deletions

View File

@ -517,12 +517,12 @@ method publish*(g: GossipSub,
var peers = g.explicitPeers
if topic.len > 0: # data could be 0/empty
# if g.parameters.floodPublish:
# for id, peer in g.peers:
# if peer.topics.find(topic) != -1 and
# peer.score() >= g.parameters.publishThreshold:
# debug "publish: including flood/high score peer", peer = id
# peers.incl(id)
if g.parameters.floodPublish:
for id, peer in g.peers:
if topic in peer.topics and
peer.score() >= g.parameters.publishThreshold:
debug "publish: including flood/high score peer", peer = id
peers.incl(id)
if topic in g.topics: # if we're subscribed use the mesh
peers = g.mesh.getOrDefault(topic)

View File

@ -81,9 +81,17 @@ method subscribeTopic*(p: PubSub,
subscribe: bool,
peerId: string) {.base, async.} =
var peer = p.peers.getOrDefault(peerId)
if isNil(peer) or isNil(peer.peerInfo): # should not happen
if subscribe:
warn "subscribeTopic but peer was unknown!"
warn "subscribeTopic (subscribe) but peer was unknown!", peer = peerId
assert(false, "subscribeTopic (subscribe) but peer was unknown!") # bad , stop here if debug
return
if subscribe:
peer.topics.incl(topic)
else:
peer.topics.excl(topic)
method rpcHandler*(p: PubSub,
peer: PubSubPeer,

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import options, hashes, strutils, tables, hashes
import options, hashes, strutils, tables, hashes, sets
import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf],
timedcache,
@ -45,6 +45,7 @@ type
sendConn: Connection
peerInfo*: PeerInfo
handler*: RPCHandler
topics*: HashSet[string]
sentRpcCache: TimedCache[string] # cache for already sent messages
recvdRpcCache: TimedCache[string] # cache for already received messages
refs*: int # refcount of the connections this peer is handling
@ -192,3 +193,4 @@ proc newPubSubPeer*(peerInfo: PeerInfo,
result.sentRpcCache = newTimedCache[string](2.minutes)
result.recvdRpcCache = newTimedCache[string](2.minutes)
result.onConnect = newAsyncEvent()
result.topics = initHashSet[string]()