gossipsub / floodsub fixes (#348)

* mcache fixes

* remove timed cache - the window shifting already removes old messages
* ref -> object
* avoid unnecessary allocations with `[]` operator

* simplify init

* fix several gossipsub/floodsub issues

* floodsub, gossipsub: don't rebroadcast messages that fail validation
(!)
* floodsub, gossipsub: don't crash when unsubscribing from unknown
topics (!)
* gossipsub: don't send message to peers that are not interested in the
topic, when messages don't share topic list
* floodsub: don't repeat all messages for each message when
rebroadcasting
* floodsub: allow sending empty data
* floodsub: fix inefficient unsubscribe
* sync floodsub/gossipsub logging
* gossipsub: include incoming messages in mcache (!)
* gossipsub: don't rebroadcast already-seen messages (!)
* pubsubpeer: remove incoming/outgoing seen caches - these are already
handled in gossipsub, floodsub and will cause trouble when peers try to
resubscribe / regraft topics (because control messages will have same
digest)
* timedcache: reimplement without timers (fixes timer leaks and extreme
inefficiency due to per-message closures, futures etc)
* timedcache: ref -> obj
This commit is contained in:
Jacek Sieka 2020-09-04 08:10:32 +02:00 committed by GitHub
parent c0bc73ddac
commit 5819c6a9a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 250 additions and 257 deletions

View File

@ -7,16 +7,17 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import sequtils, tables, sets, strutils
import std/[sequtils, sets, tables]
import chronos, chronicles, metrics
import pubsub,
pubsubpeer,
timedcache,
peertable,
rpc/[messages, message],
import ./pubsub,
./pubsubpeer,
./timedcache,
./peertable,
./rpc/[message, messages],
../../stream/connection,
../../peerid,
../../peerinfo
../../peerinfo,
../../utility
logScope:
topics = "floodsub"
@ -49,15 +50,13 @@ method subscribeTopic*(f: FloodSub,
method unsubscribePeer*(f: FloodSub, peer: PeerID) =
## handle peer disconnects
##
trace "unsubscribing floodsub peer", peer = $peer
let pubSubPeer = f.peers.getOrDefault(peer)
if pubSubPeer.isNil:
return
for t in toSeq(f.floodsub.keys):
if t in f.floodsub:
f.floodsub[t].excl(pubSubPeer)
for _, v in f.floodsub.mpairs():
v.excl(pubSubPeer)
procCall PubSub(f).unsubscribePeer(peer)
@ -66,35 +65,34 @@ method rpcHandler*(f: FloodSub,
rpcMsg: RPCMsg) {.async.} =
await procCall PubSub(f).rpcHandler(peer, rpcMsg)
if rpcMsg.messages.len > 0: # if there are any messages
var toSendPeers = initHashSet[PubSubPeer]()
for msg in rpcMsg.messages: # for every message
let msgId = f.msgIdProvider(msg)
logScope: msgId
logScope:
msgId
peer = peer.id
if msgId notin f.seen:
f.seen.put(msgId) # add the message to the seen cache
if f.seen.put(msgId):
trace "Dropping already-seen message"
continue
if f.verifySignature and not msg.verify(peer.peerId):
trace "dropping message due to failed signature verification"
debug "Dropping message due to failed signature verification"
continue
if not (await f.validate(msg)):
trace "dropping message due to failed validation"
trace "Dropping message due to failed validation"
continue
var toSendPeers = initHashSet[PubSubPeer]()
for t in msg.topicIDs: # for every topic in the message
if t in f.floodsub:
toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic
f.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
await handleData(f, t, msg.data)
# forward the message to all peers interested in it
f.broadcast(
toSeq(toSendPeers),
RPCMsg(messages: rpcMsg.messages))
trace "forwared message to peers", peers = toSendPeers.len
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
f.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg]))
trace "Forwared message to peers", peers = toSendPeers.len
method init*(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async.} =
@ -114,30 +112,41 @@ method publish*(f: FloodSub,
# base returns always 0
discard await procCall PubSub(f).publish(topic, data)
if data.len <= 0 or topic.len <= 0:
trace "topic or data missing, skipping publish"
logScope: topic
trace "Publishing message on topic", data = data.shortLog
if topic.len <= 0: # data could be 0/empty
debug "Empty topic, skipping publish"
return 0
if topic notin f.floodsub:
trace "missing peers for topic, skipping publish"
return
let peers = toSeq(f.floodsub.getOrDefault(topic))
if peers.len == 0:
debug "No peers for topic, skipping publish"
return 0
trace "publishing on topic", name = topic
inc f.msgSeqno
let
msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign)
peers = toSeq(f.floodsub.getOrDefault(topic))
msgId = f.msgIdProvider(msg)
# start the future but do not wait yet
f.broadcast(
peers,
RPCMsg(messages: @[msg]))
logScope: msgId
trace "Created new message", msg = shortLog(msg), peers = peers.len
if f.seen.put(msgId):
# custom msgid providers might cause this
trace "Dropping already-seen message"
return 0
# Try to send to all peers that are known to be interested
f.broadcast(peers, RPCMsg(messages: @[msg]))
when defined(libp2p_expensive_metrics):
libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "published message to peers", peers = peers.len,
msg = msg.shortLog()
trace "Published message to peers"
return peers.len
method unsubscribe*(f: FloodSub,
@ -156,5 +165,5 @@ method unsubscribeAll*(f: FloodSub, topic: string) {.async.} =
method initPubSub*(f: FloodSub) =
procCall PubSub(f).initPubSub()
f.floodsub = initTable[string, HashSet[PubSubPeer]]()
f.seen = newTimedCache[string](2.minutes)
f.seen = TimedCache[string].init(2.minutes)
f.init()

View File

@ -7,20 +7,19 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/[tables, sets, options, sequtils, random]
import std/[options, random, sequtils, sets, tables]
import chronos, chronicles, metrics
import pubsub,
floodsub,
pubsubpeer,
peertable,
mcache,
timedcache,
rpc/[messages, message],
import ./pubsub,
./floodsub,
./pubsubpeer,
./peertable,
./mcache,
./timedcache,
./rpc/[messages, message],
../protocol,
../../peerinfo,
../../stream/connection,
../../peerinfo,
../../peerid,
../../errors,
../../utility
logScope:
@ -379,47 +378,36 @@ method rpcHandler*(g: GossipSub,
rpcMsg: RPCMsg) {.async.} =
await procCall PubSub(g).rpcHandler(peer, rpcMsg)
if rpcMsg.messages.len > 0: # if there are any messages
var toSendPeers: HashSet[PubSubPeer]
for msg in rpcMsg.messages: # for every message
let msgId = g.msgIdProvider(msg)
logScope: msgId
logScope:
msgId
peer = peer.id
if msgId in g.seen:
trace "message already processed, skipping"
if g.seen.put(msgId):
trace "Dropping already-seen message"
continue
trace "processing message"
g.seen.put(msgId) # add the message to the seen cache
g.mcache.put(msgId, msg)
if g.verifySignature and not msg.verify(peer.peerId):
trace "dropping message due to failed signature verification"
debug "Dropping message due to failed signature verification"
continue
if not (await g.validate(msg)):
trace "dropping message due to failed validation"
continue
# this shouldn't happen
if g.peerInfo.peerId == msg.fromPeer:
trace "skipping messages from self"
trace "Dropping message due to failed validation"
continue
var toSendPeers = initHashSet[PubSubPeer]()
for t in msg.topicIDs: # for every topic in the message
if t in g.floodsub:
toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic
if t in g.mesh:
toSendPeers.incl(g.mesh[t]) # get all mesh peers for topic
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
await handleData(g, t, msg.data)
# forward the message to all peers interested in it
g.broadcast(
toSeq(toSendPeers),
RPCMsg(messages: rpcMsg.messages))
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg]))
trace "forwared message to peers", peers = toSendPeers.len
if rpcMsg.control.isSome:
@ -451,12 +439,13 @@ method unsubscribe*(g: GossipSub,
for (topic, handler) in topics:
# delete from mesh only if no handlers are left
if g.topics[topic].handler.len <= 0:
if topic notin g.topics:
if topic in g.mesh:
let peers = g.mesh.getOrDefault(topic)
let peers = g.mesh[topic]
g.mesh.del(topic)
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
let prune = RPCMsg(
control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
g.broadcast(toSeq(peers), prune)
method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
@ -474,9 +463,12 @@ method publish*(g: GossipSub,
data: seq[byte]): Future[int] {.async.} =
# base returns always 0
discard await procCall PubSub(g).publish(topic, data)
trace "publishing message on topic", topic, data = data.shortLog
logScope: topic
trace "Publishing message on topic", data = data.shortLog
if topic.len <= 0: # data could be 0/empty
debug "Empty topic, skipping publish"
return 0
var peers: HashSet[PubSubPeer]
@ -497,28 +489,34 @@ method publish*(g: GossipSub,
# time
g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL)
if peers.len == 0:
debug "No peers for topic, skipping publish"
return 0
inc g.msgSeqno
let
msg = Message.init(g.peerInfo, data, topic, g.msgSeqno, g.sign)
msgId = g.msgIdProvider(msg)
trace "created new message", msg, topic, peers = peers.len
logScope: msgId
trace "Created new message", msg = shortLog(msg), peers = peers.len
if g.seen.put(msgId):
# custom msgid providers might cause this
trace "Dropping already-seen message"
return 0
if msgId notin g.mcache:
g.mcache.put(msgId, msg)
if peers.len > 0:
g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]))
when defined(libp2p_expensive_metrics):
if peers.len > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "published message to peers", peers = peers.len,
msg = msg.shortLog()
trace "Published message to peers"
return peers.len
else:
debug "No peers for gossip message", topic, msg = msg.shortLog()
return 0
method start*(g: GossipSub) {.async.} =
trace "gossipsub start"
@ -556,7 +554,7 @@ method initPubSub*(g: GossipSub) =
procCall FloodSub(g).initPubSub()
randomize()
g.mcache = newMCache(GossipSubHistoryGossip, GossipSubHistoryLength)
g.mcache = MCache.init(GossipSubHistoryGossip, GossipSubHistoryLength)
g.mesh = initTable[string, HashSet[PubSubPeer]]() # meshes - topic to peer
g.fanout = initTable[string, HashSet[PubSubPeer]]() # fanout - topic to peer
g.gossipsub = initTable[string, HashSet[PubSubPeer]]()# topic to peer map of all gossipsub peers

View File

@ -7,69 +7,57 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import chronos, chronicles
import tables, options, sets, sequtils
import rpc/[messages], timedcache
import std/[sets, tables, options]
import rpc/[messages]
export sets, tables, messages, options
type
CacheEntry* = object
mid*: string
msg*: Message
topicIDs*: seq[string]
MCache* = ref object of RootObj
msgs*: TimedCache[Message]
MCache* = object of RootObj
msgs*: Table[string, Message]
history*: seq[seq[CacheEntry]]
historySize*: Natural
windowSize*: Natural
proc get*(c: MCache, mid: string): Option[Message] =
func get*(c: MCache, mid: string): Option[Message] =
result = none(Message)
if mid in c.msgs:
result = some(c.msgs[mid])
proc contains*(c: MCache, mid: string): bool =
func contains*(c: MCache, mid: string): bool =
c.get(mid).isSome
proc put*(c: MCache, msgId: string, msg: Message) =
proc handler(key: string, val: Message) {.gcsafe.} =
## make sure we remove the message from history
## to keep things consisten
c.history.applyIt(
it.filterIt(it.mid != msgId)
)
func put*(c: var MCache, msgId: string, msg: Message) =
if msgId notin c.msgs:
c.msgs.put(msgId, msg, handler = handler)
c.history[0].add(CacheEntry(mid: msgId, msg: msg))
c.msgs[msgId] = msg
c.history[0].add(CacheEntry(mid: msgId, topicIDs: msg.topicIDs))
proc window*(c: MCache, topic: string): HashSet[string] =
func window*(c: MCache, topic: string): HashSet[string] =
result = initHashSet[string]()
let len =
if c.windowSize > c.history.len:
c.history.len
else:
c.windowSize
let
len = min(c.windowSize, c.history.len)
if c.history.len > 0:
for slot in c.history[0..<len]:
for entry in slot:
for t in entry.msg.topicIDs:
for i in 0..<len:
for entry in c.history[i]:
for t in entry.topicIDs:
if t == topic:
result.incl(entry.mid)
break
proc shift*(c: MCache) =
while c.history.len > c.historySize:
func shift*(c: var MCache) =
for entry in c.history.pop():
c.msgs.del(entry.mid)
c.history.insert(@[])
proc newMCache*(window: Natural, history: Natural): MCache =
new result
result.historySize = history
result.windowSize = window
result.history = newSeq[seq[CacheEntry]]()
result.history.add(@[]) # initialize with empty slot
result.msgs = newTimedCache[Message](2.minutes)
func init*(T: type MCache, window, history: Natural): T =
T(
history: newSeq[seq[CacheEntry]](history),
historySize: history,
windowSize: window
)

View File

@ -201,14 +201,15 @@ method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async.} =
## unsubscribe from a list of ``topic`` strings
for t in topics:
for i, h in p.topics[t.topic].handler:
p.topics.withValue(t.topic, subs):
for i, h in subs[].handler:
if h == t.handler:
p.topics[t.topic].handler.del(i)
subs[].handler.del(i)
# make sure we delete the topic if
# no more handlers are left
if p.topics[t.topic].handler.len <= 0:
p.topics.del(t.topic)
if subs.handler.len <= 0:
p.topics.del(t.topic) # careful, invalidates subs
# metrics
libp2p_pubsub_topics.set(p.topics.len.int64)

View File

@ -10,8 +10,6 @@
import std/[hashes, options, strutils, tables]
import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf],
timedcache,
../../switch,
../../peerid,
../../peerinfo,
../../stream/connection,
@ -42,8 +40,6 @@ type
connections*: seq[Connection] # connections to this peer
peerId*: PeerID
handler*: RPCHandler
sentRpcCache: TimedCache[string] # cache for already sent messages
recvdRpcCache: TimedCache[string] # cache for already received messages
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
dialLock: AsyncLock
@ -87,33 +83,24 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
while not conn.atEof:
trace "waiting for data"
let data = await conn.readLp(64 * 1024)
let digest = $(sha256.digest(data))
trace "read data from peer", data = data.shortLog
if digest in p.recvdRpcCache:
when defined(libp2p_expensive_metrics):
libp2p_pubsub_skipped_received_messages.inc(labelValues = [p.id])
trace "message already received, skipping"
continue
var rmsg = decodeRpcMsg(data)
if rmsg.isErr():
notice "failed to decode msg from peer"
break
var msg = rmsg.get()
trace "decoded msg from peer", msg = msg.shortLog
trace "decoded msg from peer", msg = rmsg.get().shortLog
# trigger hooks
p.recvObservers(msg)
p.recvObservers(rmsg.get())
when defined(libp2p_expensive_metrics):
for m in msg.messages:
for m in rmsg.get().messages:
for t in m.topicIDs:
# metrics
libp2p_pubsub_received_messages.inc(labelValues = [p.id, t])
await p.handler(p, msg)
p.recvdRpcCache.put(digest)
await p.handler(p, rmsg.get())
finally:
await conn.close()
@ -227,13 +214,6 @@ proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} =
logScope:
encoded = shortLog(encoded)
let digest = $(sha256.digest(encoded))
if digest in p.sentRpcCache:
trace "message already sent to peer, skipping"
when defined(libp2p_expensive_metrics):
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
return
var conn = await p.getSendConn()
try:
trace "about to send message"
@ -243,8 +223,6 @@ proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} =
trace "sending encoded msgs to peer", connId = $conn.oid
await conn.writeLp(encoded)
p.sentRpcCache.put(digest)
trace "sent pubsub message to remote", connId = $conn.oid
when defined(libp2p_expensive_metrics):
@ -282,6 +260,4 @@ proc newPubSubPeer*(peerId: PeerID,
result.getConn = getConn
result.codec = codec
result.peerId = peerId
result.sentRpcCache = newTimedCache[string](2.minutes)
result.recvdRpcCache = newTimedCache[string](2.minutes)
result.dialLock = newAsyncLock()

View File

@ -7,73 +7,59 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import tables
import chronos, chronicles
import std/[heapqueue, sets]
logScope:
topics = "timedcache"
import chronos/timer
const Timeout* = 10.seconds # default timeout in ms
type
ExpireHandler*[V] = proc(key: string, val: V) {.gcsafe.}
TimedEntry*[V] = object of RootObj
val: V
handler: ExpireHandler[V]
TimedEntry*[K] = ref object of RootObj
key: K
expiresAt: Moment
TimedCache*[V] = ref object of RootObj
cache*: Table[string, TimedEntry[V]]
onExpire*: ExpireHandler[V]
timeout*: Duration
TimedCache*[K] = object of RootObj
expiries: HeapQueue[TimedEntry[K]]
entries: HashSet[K]
timeout: Duration
# TODO: This belong in chronos, temporary left here until chronos is updated
proc addTimer*(at: Duration, cb: CallbackFunc, udata: pointer = nil) =
## Arrange for the callback ``cb`` to be called at the given absolute
## timestamp ``at``. You can also pass ``udata`` to callback.
addTimer(Moment.fromNow(at), cb, udata)
func `<`*(a, b: TimedEntry): bool =
a.expiresAt < b.expiresAt
proc put*[V](t: TimedCache[V],
key: string,
val: V = "",
timeout: Duration,
handler: ExpireHandler[V] = nil) =
trace "adding entry to timed cache", key = key
t.cache[key] = TimedEntry[V](val: val, handler: handler)
func expire*(t: var TimedCache, now: Moment = Moment.now()) =
while t.expiries.len() > 0 and t.expiries[0].expiresAt < now:
t.entries.excl(t.expiries.pop().key)
addTimer(
timeout,
proc (arg: pointer = nil) {.gcsafe.} =
trace "deleting expired entry from timed cache", key = key
if key in t.cache:
let entry = t.cache[key]
t.cache.del(key)
if not isNil(entry.handler):
entry.handler(key, entry.val)
func del*[K](t: var TimedCache[K], key: K): bool =
# Removes existing key from cache, returning false if it was not present
if not t.entries.missingOrExcl(key):
for i in 0..<t.expiries.len:
if t.expiries[i].key == key:
t.expiries.del(i)
break
true
else:
false
func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
# Puts k in cache, returning true if the item was already present and false
# otherwise. If the item was already present, its expiry timer will be
# refreshed.
t.expire(now)
var res = t.del(k) # Refresh existing item
t.entries.incl(k)
t.expiries.push(TimedEntry[K](key: k, expiresAt: now + t.timeout))
res
func contains*[K](t: TimedCache[K], k: K): bool =
k in t.entries
func init*[K](T: type TimedCache[K], timeout: Duration = Timeout): T =
T(
expiries: initHeapQueue[TimedEntry[K]](),
entries: initHashSet[K](),
timeout: timeout
)
proc put*[V](t: TimedCache[V],
key: string,
val: V = "",
handler: ExpireHandler[V] = nil) =
t.put(key, val, t.timeout, handler)
proc contains*[V](t: TimedCache[V], key: string): bool =
t.cache.contains(key)
proc del*[V](t: TimedCache[V], key: string) =
trace "deleting entry from timed cache", key = key
t.cache.del(key)
proc get*[V](t: TimedCache[V], key: string): V =
t.cache[key].val
proc `[]`*[V](t: TimedCache[V], key: string): V =
t.get(key)
proc `[]=`*[V](t: TimedCache[V], key: string, val: V): V =
t.put(key, val)
proc newTimedCache*[V](timeout: Duration = Timeout): TimedCache[V] =
new result
result.cache = initTable[string, TimedEntry[V]]()
result.timeout = timeout

View File

@ -15,14 +15,14 @@ proc randomPeerID(): PeerID =
suite "MCache":
test "put/get":
var mCache = newMCache(3, 5)
var mCache = MCache.init(3, 5)
var msg = Message(fromPeer: randomPeerID(), seqno: "12345".toBytes())
let msgId = defaultMsgIdProvider(msg)
mCache.put(msgId, msg)
check mCache.get(msgId).isSome and mCache.get(msgId).get() == msg
test "window":
var mCache = newMCache(3, 5)
var mCache = MCache.init(3, 5)
for i in 0..<3:
var msg = Message(fromPeer: randomPeerID(),
@ -43,7 +43,7 @@ suite "MCache":
check mCache.get(id).get().topicIDs[0] == "foo"
test "shift - shift 1 window at a time":
var mCache = newMCache(1, 5)
var mCache = MCache.init(1, 5)
for i in 0..<3:
var msg = Message(fromPeer: randomPeerID(),
@ -73,7 +73,7 @@ suite "MCache":
check mCache.window("baz").len == 0
test "shift - 2 windows at a time":
var mCache = newMCache(1, 5)
var mCache = MCache.init(1, 5)
for i in 0..<3:
var msg = Message(fromPeer: randomPeerID(),

View File

@ -4,4 +4,5 @@ import testgossipinternal,
testfloodsub,
testgossipsub,
testmcache,
testtimedcache,
testmessage

View File

@ -0,0 +1,34 @@
{.used.}
import std/unittest
import chronos/timer
import ../../libp2p/protocols/pubsub/timedcache
suite "TimedCache":
test "put/get":
var cache = TimedCache[int].init(5.seconds)
let now = Moment.now()
check:
not cache.put(1, now)
not cache.put(2, now + 3.seconds)
check:
1 in cache
2 in cache
check: not cache.put(3, now + 6.seconds) # expires 1
check:
1 notin cache
2 in cache
3 in cache
check:
cache.put(2, now + 7.seconds) # refreshes 2
not cache.put(4, now + 12.seconds) # expires 3
check:
2 in cache
3 notin cache
4 in cache