gossipsub fixes (#186)

This commit is contained in:
Dmitriy Ryajov 2020-05-21 14:24:20 -06:00 committed by GitHub
parent ba53c08b3c
commit 9132f16927
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 357 additions and 270 deletions

View File

@ -86,6 +86,139 @@ method init(g: GossipSub) =
g.handler = handler g.handler = handler
g.codec = GossipSubCodec g.codec = GossipSubCodec
proc replenishFanout(g: GossipSub, topic: string) {.async.} =
## get fanout peers for a topic
trace "about to replenish fanout"
if topic notin g.fanout:
g.fanout[topic] = initHashSet[string]()
if g.fanout[topic].len < GossipSubDLo:
trace "replenishing fanout", peers = g.fanout[topic].len
if topic in g.gossipsub:
for p in g.gossipsub[topic]:
if not g.fanout[topic].containsOrIncl(p):
if g.fanout[topic].len == GossipSubD:
break
trace "fanout replenished with peers", peers = g.fanout[topic].len
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
try:
trace "about to rebalance mesh"
# create a mesh topic that we're subscribing to
if topic notin g.mesh:
g.mesh[topic] = initHashSet[string]()
if g.mesh[topic].len < GossipSubDlo:
trace "replenishing mesh"
# replenish the mesh if we're below GossipSubDlo
while g.mesh[topic].len < GossipSubD:
trace "gathering peers", peers = g.mesh[topic].len
var id: string
if topic in g.fanout and g.fanout[topic].len > 0:
id = sample(toSeq(g.fanout[topic]))
g.fanout[topic].excl(id)
trace "got fanout peer", peer = id
elif topic in g.gossipsub and g.gossipsub[topic].len > 0:
id = sample(toSeq(g.gossipsub[topic]))
g.gossipsub[topic].excl(id)
trace "got gossipsub peer", peer = id
else:
trace "no more peers"
break
g.mesh[topic].incl(id)
if id in g.peers:
let p = g.peers[id]
# send a graft message to the peer
await p.sendGraft(@[topic])
# prune peers if we've gone over
if g.mesh[topic].len > GossipSubDhi:
trace "pruning mesh"
while g.mesh[topic].len > GossipSubD:
trace "pruning peers", peers = g.mesh[topic].len
let id = toSeq(g.mesh[topic])[rand(0..<g.mesh[topic].len)]
g.mesh[topic].excl(id)
let p = g.peers[id]
# send a graft message to the peer
await p.sendPrune(@[topic])
trace "mesh balanced, got peers", peers = g.mesh[topic].len
except CatchableError as exc:
trace "exception occured rebalancing mes", exc = exc.msg
proc dropFanoutPeers(g: GossipSub) {.async.} =
# drop peers that we haven't published to in
# GossipSubFanoutTTL seconds
var dropping = newSeq[string]()
for topic, val in g.lastFanoutPubSub:
if Moment.now > val:
dropping.add(topic)
g.fanout.del(topic)
for topic in dropping:
g.lastFanoutPubSub.del(topic)
proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
## gossip iHave messages to peers
let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys))
for topic in topics:
let mesh: HashSet[string] =
if topic in g.mesh:
g.mesh[topic]
else:
initHashSet[string]()
let fanout: HashSet[string] =
if topic in g.fanout:
g.fanout[topic]
else:
initHashSet[string]()
let gossipPeers = mesh + fanout
let mids = g.mcache.window(topic)
if mids.len > 0:
let ihave = ControlIHave(topicID: topic,
messageIDs: toSeq(mids))
if topic notin g.gossipsub:
trace "topic not in gossip array, skipping", topicID = topic
continue
while result.len < GossipSubD:
if not (g.gossipsub[topic].len > 0):
trace "no peers for topic, skipping", topicID = topic
break
let id = toSeq(g.gossipsub[topic]).sample()
g.gossipsub[topic].excl(id)
if id notin gossipPeers:
if id notin result:
result[id] = ControlMessage()
result[id].ihave.add(ihave)
proc heartbeat(g: GossipSub) {.async.} =
try:
await g.heartbeatLock.acquire()
trace "running heartbeat"
await sleepAsync(GossipSubHeartbeatInitialDelay)
for t in g.mesh.keys:
await g.rebalanceMesh(t)
await g.dropFanoutPeers()
let peers = g.getGossipPeers()
for peer in peers.keys:
await g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))])
g.mcache.shift() # shift the cache
except CatchableError as exc:
trace "exception ocurred in gossipsub heartbeat", exc = exc.msg
finally:
g.heartbeatLock.release()
method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async.} = method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async.} =
## handle peer disconnects ## handle peer disconnects
trace "peer disconnected", peer=peer.id trace "peer disconnected", peer=peer.id
@ -148,19 +281,22 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
if prune.topicID in g.mesh: if prune.topicID in g.mesh:
g.mesh[prune.topicID].excl(peer.id) g.mesh[prune.topicID].excl(peer.id)
proc handleIHave(g: GossipSub, peer: PubSubPeer, ihaves: seq[ proc handleIHave(g: GossipSub,
ControlIHave]): ControlIWant = peer: PubSubPeer,
ihaves: seq[ControlIHave]): ControlIWant =
for ihave in ihaves: for ihave in ihaves:
trace "processing ihave message", peer = peer.id, trace "processing ihave message", peer = peer.id,
topicID = ihave.topicID topicID = ihave.topicID,
msgs = ihave.messageIDs
if ihave.topicID in g.mesh: if ihave.topicID in g.mesh:
for m in ihave.messageIDs: for m in ihave.messageIDs:
if m notin g.seen: if m notin g.seen:
result.messageIDs.add(m) result.messageIDs.add(m)
proc handleIWant(g: GossipSub, peer: PubSubPeer, iwants: seq[ proc handleIWant(g: GossipSub,
ControlIWant]): seq[Message] = peer: PubSubPeer,
iwants: seq[ControlIWant]): seq[Message] =
for iwant in iwants: for iwant in iwants:
for mid in iwant.messageIDs: for mid in iwant.messageIDs:
trace "processing iwant message", peer = peer.id, trace "processing iwant message", peer = peer.id,
@ -176,7 +312,7 @@ method rpcHandler(g: GossipSub,
for m in rpcMsgs: # for all RPC messages for m in rpcMsgs: # for all RPC messages
if m.messages.len > 0: # if there are any messages if m.messages.len > 0: # if there are any messages
var toSendPeers: HashSet[string] = initHashSet[string]() var toSendPeers: HashSet[string]
for msg in m.messages: # for every message for msg in m.messages: # for every message
trace "processing message with id", msg = msg.msgId trace "processing message with id", msg = msg.msgId
if msg.msgId in g.seen: if msg.msgId in g.seen:
@ -199,7 +335,7 @@ method rpcHandler(g: GossipSub,
continue continue
for t in msg.topicIDs: # for every topic in the message for t in msg.topicIDs: # for every topic in the message
await g.rebalanceMesh(t) # gather peers for each topic
if t in g.floodsub: if t in g.floodsub:
toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic
@ -218,20 +354,23 @@ method rpcHandler(g: GossipSub,
for p in toSendPeers: for p in toSendPeers:
if p in g.peers: if p in g.peers:
let id = g.peers[p].peerInfo.peerId let id = g.peers[p].peerInfo.peerId
trace "about to forward message to peer", peerId = id trace "about to forward message to peer", peerId = id, msgs = m.messages
if id != peer.peerInfo.peerId: if id == peer.peerInfo.peerId:
let msgs = m.messages.filterIt( trace "not forwarding message to originator", peerId = id
# don't forward to message originator continue
id != it.fromPeerId()
)
var sent: seq[Future[void]] let msgs = m.messages.filterIt(
if msgs.len > 0: # don't forward to message originator
trace "forwarding message to", peerId = id id != it.fromPeerId()
sent.add(g.peers[p].send(@[RPCMsg(messages: msgs)])) )
sent = await allFinished(sent)
checkFutures(sent) var sent: seq[Future[void]]
if msgs.len > 0:
trace "forwarding message to", peerId = id
sent.add(g.peers[p].send(@[RPCMsg(messages: msgs)]))
sent = await allFinished(sent)
checkFutures(sent)
var respControl: ControlMessage var respControl: ControlMessage
if m.control.isSome: if m.control.isSome:
@ -248,135 +387,11 @@ method rpcHandler(g: GossipSub,
respControl.ihave.len > 0 or respControl.iwant.len > 0: respControl.ihave.len > 0 or respControl.iwant.len > 0:
await peer.send(@[RPCMsg(control: some(respControl), messages: messages)]) await peer.send(@[RPCMsg(control: some(respControl), messages: messages)])
proc replenishFanout(g: GossipSub, topic: string) {.async.} =
## get fanout peers for a topic
trace "about to replenish fanout"
if topic notin g.fanout:
g.fanout[topic] = initHashSet[string]()
if g.fanout[topic].len < GossipSubDLo:
trace "replenishing fanout", peers = g.fanout[topic].len
if topic in g.gossipsub:
for p in g.gossipsub[topic]:
if not g.fanout[topic].containsOrIncl(p):
if g.fanout[topic].len == GossipSubD:
break
trace "fanout replenished with peers", peers = g.fanout[topic].len
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
trace "about to rebalance mesh"
# create a mesh topic that we're subscribing to
if topic notin g.mesh:
g.mesh[topic] = initHashSet[string]()
if g.mesh[topic].len < GossipSubDlo:
trace "replenishing mesh"
# replenish the mesh if we're below GossipSubDlo
while g.mesh[topic].len < GossipSubD:
trace "gathering peers", peers = g.mesh[topic].len
var id: string
if topic in g.fanout and g.fanout[topic].len > 0:
id = g.fanout[topic].pop()
trace "got fanout peer", peer = id
elif topic in g.gossipsub and g.gossipsub[topic].len > 0:
id = g.gossipsub[topic].pop()
trace "got gossipsub peer", peer = id
else:
trace "no more peers"
break
g.mesh[topic].incl(id)
if id in g.peers:
let p = g.peers[id]
# send a graft message to the peer
await p.sendGraft(@[topic])
# prune peers if we've gone over
if g.mesh[topic].len > GossipSubDhi:
trace "pruning mesh"
while g.mesh[topic].len > GossipSubD:
trace "pruning peers", peers = g.mesh[topic].len
let id = toSeq(g.mesh[topic])[rand(0..<g.mesh[topic].len)]
g.mesh[topic].excl(id)
let p = g.peers[id]
# send a graft message to the peer
await p.sendPrune(@[topic])
trace "mesh balanced, got peers", peers = g.mesh[topic].len
proc dropFanoutPeers(g: GossipSub) {.async.} =
# drop peers that we haven't published to in
# GossipSubFanoutTTL seconds
var dropping = newSeq[string]()
for topic, val in g.lastFanoutPubSub:
if Moment.now > val:
dropping.add(topic)
g.fanout.del(topic)
for topic in dropping:
g.lastFanoutPubSub.del(topic)
proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
## gossip iHave messages to peers
let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys))
for topic in topics:
let mesh: HashSet[string] =
if topic in g.mesh:
g.mesh[topic]
else:
initHashSet[string]()
let fanout: HashSet[string] =
if topic in g.fanout:
g.fanout[topic]
else:
initHashSet[string]()
let gossipPeers = mesh + fanout
let mids = g.mcache.window(topic)
let ihave = ControlIHave(topicID: topic,
messageIDs: toSeq(mids))
if topic notin g.gossipsub:
trace "topic not in gossip array, skipping", topicID = topic
continue
while result.len < GossipSubD:
if not (g.gossipsub[topic].len > 0):
trace "no peers for topic, skipping", topicID = topic
break
let id = toSeq(g.gossipsub[topic]).sample()
g.gossipsub[topic].excl(id)
if id notin gossipPeers:
if id notin result:
result[id] = ControlMessage()
result[id].ihave.add(ihave)
proc heartbeat(g: GossipSub) {.async.} =
await g.heartbeatLock.acquire()
trace "running heartbeat"
await sleepAsync(GossipSubHeartbeatInitialDelay)
for t in g.mesh.keys:
await g.rebalanceMesh(t)
await g.dropFanoutPeers()
let peers = g.getGossipPeers()
for peer in peers.keys:
await g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))])
g.mcache.shift() # shift the cache
g.heartbeatLock.release()
method subscribe*(g: GossipSub, method subscribe*(g: GossipSub,
topic: string, topic: string,
handler: TopicHandler) {.async.} = handler: TopicHandler) {.async.} =
await procCall PubSub(g).subscribe(topic, handler) await procCall PubSub(g).subscribe(topic, handler)
asyncCheck g.rebalanceMesh(topic) await g.rebalanceMesh(topic)
method unsubscribe*(g: GossipSub, method unsubscribe*(g: GossipSub,
topics: seq[TopicPair]) {.async.} = topics: seq[TopicPair]) {.async.} =
@ -447,6 +462,7 @@ method stop*(g: GossipSub) {.async.} =
method initPubSub(g: GossipSub) = method initPubSub(g: GossipSub) =
procCall FloodSub(g).initPubSub() procCall FloodSub(g).initPubSub()
randomize()
g.mcache = newMCache(GossipSubHistoryGossip, GossipSubHistoryLength) g.mcache = newMCache(GossipSubHistoryGossip, GossipSubHistoryLength)
g.mesh = initTable[string, HashSet[string]]() # meshes - topic to peer g.mesh = initTable[string, HashSet[string]]() # meshes - topic to peer
g.fanout = initTable[string, HashSet[string]]() # fanout - topic to peer g.fanout = initTable[string, HashSet[string]]() # fanout - topic to peer

View File

@ -199,14 +199,13 @@ method subscribe*(p: PubSub,
method publish*(p: PubSub, method publish*(p: PubSub,
topic: string, topic: string,
data: seq[byte]) {.base, async.} = data: seq[byte]) {.base, async.} =
# TODO: Should return bool indicating success/failure
## publish to a ``topic`` ## publish to a ``topic``
if p.triggerSelf and topic in p.topics: if p.triggerSelf and topic in p.topics:
for h in p.topics[topic].handler: for h in p.topics[topic].handler:
trace "triggering handler", topicID = topic trace "triggering handler", topicID = topic
try: try:
await h(topic, data) await h(topic, data)
except LPStreamEOFError:
trace "Ignoring EOF while writing"
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:

View File

@ -80,7 +80,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
try: try:
for m in msgs.items: for m in msgs.items:
trace "sending msgs to peer", toPeer = p.id trace "sending msgs to peer", toPeer = p.id, msgs = msgs
let encoded = encodeRpcMsg(m) let encoded = encodeRpcMsg(m)
# trigger hooks # trigger hooks
if not(isNil(p.observers)) and p.observers[].len > 0: if not(isNil(p.observers)) and p.observers[].len > 0:
@ -98,25 +98,32 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
continue continue
proc sendToRemote() {.async.} = proc sendToRemote() {.async.} =
trace "sending encoded msgs to peer", peer = p.id, trace "about send message", peer = p.id,
encoded = encoded.buffer.shortLog encoded = digest
await p.sendConn.writeLp(encoded.buffer) await p.onConnect.wait()
p.sentRpcCache.put(digest) try:
trace "sending encoded msgs to peer", peer = p.id,
encoded = encoded.buffer.shortLog
await p.sendConn.writeLp(encoded.buffer)
p.sentRpcCache.put(digest)
except CatchableError as exc:
trace "unable to send to remote", exc = exc.msg
if not(isNil(p.sendConn)):
await p.sendConn.close()
p.sendConn = nil
p.onConnect.clear()
# if no connection has been set, # if no connection has been set,
# queue messages untill a connection # queue messages untill a connection
# becomes available # becomes available
if p.isConnected: asyncCheck sendToRemote()
await sendToRemote()
return
p.onConnect.wait().addCallback do (udata: pointer):
asyncCheck sendToRemote()
trace "enqueued message to send at a later time", peer = p.id,
encoded = digest
except CatchableError as exc: except CatchableError as exc:
trace "Exception occurred in PubSubPeer.send", exc = exc.msg trace "Exception occurred in PubSubPeer.send", exc = exc.msg
if not(isNil(p.sendConn)):
await p.sendConn.close()
p.sendConn = nil
p.onConnect.clear()
proc sendMsg*(p: PubSubPeer, proc sendMsg*(p: PubSubPeer,
peerId: PeerID, peerId: PeerID,

View File

@ -49,19 +49,18 @@ proc decodeIHave*(pb: var ProtoBuffer): seq[ControlIHave] {.gcsafe.} =
while true: while true:
var control: ControlIHave var control: ControlIHave
if pb.enterSubMessage() > 0: if pb.getString(1, control.topicID) < 0:
if pb.getString(1, control.topicID) < 0: trace "topic field missing from ihave msg"
trace "topic field missing from ihave msg" break
trace "read topic field", topicID = control.topicID
while true:
var mid: string
if pb.getString(2, mid) < 0:
break break
trace "read messageID field", mid = mid
trace "read topic field", topicID = control.topicID control.messageIDs.add(mid)
while true:
var mid: string
if pb.getString(2, mid) < 0:
break
trace "read messageID field", mid = mid
control.messageIDs.add(mid)
result.add(control) result.add(control)
@ -70,15 +69,16 @@ proc encodeIWant*(iwant: ControlIWant, pb: var ProtoBuffer) {.gcsafe.} =
pb.write(initProtoField(1, mid)) pb.write(initProtoField(1, mid))
proc decodeIWant*(pb: var ProtoBuffer): seq[ControlIWant] {.gcsafe.} = proc decodeIWant*(pb: var ProtoBuffer): seq[ControlIWant] {.gcsafe.} =
trace "decoding ihave msg" trace "decoding iwant msg"
while pb.enterSubMessage() > 0: var control: ControlIWant
while true:
var mid: string var mid: string
var iWant: ControlIWant if pb.getString(1, mid) < 0:
while pb.getString(1, mid) > 0: break
trace "read messageID field", mid = mid control.messageIDs.add(mid)
iWant.messageIDs.add(mid) trace "read messageID field", mid = mid
result.add(iWant) result.add(control)
proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} = proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} =
if control.ihave.len > 0: if control.ihave.len > 0:
@ -128,13 +128,13 @@ proc decodeControl*(pb: var ProtoBuffer): Option[ControlMessage] {.gcsafe.} =
trace "no submessage found in Control msg" trace "no submessage found in Control msg"
break break
of 1: of 1:
control.ihave = pb.decodeIHave() control.ihave &= pb.decodeIHave()
of 2: of 2:
control.iwant = pb.decodeIWant() control.iwant &= pb.decodeIWant()
of 3: of 3:
control.graft = pb.decodeGraft() control.graft &= pb.decodeGraft()
of 4: of 4:
control.prune = pb.decodePrune() control.prune &= pb.decodePrune()
else: else:
raise newException(CatchableError, "message type not recognized") raise newException(CatchableError, "message type not recognized")

View File

@ -53,9 +53,11 @@ method init*(s: Secure) {.gcsafe.} =
method secure*(s: Secure, conn: Connection, initiator: bool): Future[Connection] {.async, base, gcsafe.} = method secure*(s: Secure, conn: Connection, initiator: bool): Future[Connection] {.async, base, gcsafe.} =
try: try:
result = await s.handleConn(conn, initiator) result = await s.handleConn(conn, initiator)
except CancelledError as exc:
raise exc
except CatchableError as exc: except CatchableError as exc:
warn "securing connection failed", msg = exc.msg warn "securing connection failed", msg = exc.msg
await conn.close() return nil
method readExactly*(s: SecureConn, method readExactly*(s: SecureConn,
pbytes: pointer, pbytes: pointer,

View File

@ -207,10 +207,10 @@ suite "FloodSub":
test "FloodSub multiple peers, no self trigger": test "FloodSub multiple peers, no self trigger":
proc runTests(): Future[bool] {.async.} = proc runTests(): Future[bool] {.async.} =
var passed = 0 var runs = 10
var futs = newSeq[(Future[void], TopicHandler, ref int)](10) var futs = newSeq[(Future[void], TopicHandler, ref int)](runs)
for i in 0..<10: for i in 0..<runs:
closureScope: closureScope:
var var
fut = newFuture[void]() fut = newFuture[void]()
@ -226,28 +226,28 @@ suite "FloodSub":
) )
var nodes: seq[Switch] = newSeq[Switch]() var nodes: seq[Switch] = newSeq[Switch]()
for i in 0..<10: for i in 0..<runs:
nodes.add newStandardSwitch() nodes.add newStandardSwitch()
var awaitters: seq[Future[void]] var awaitters: seq[Future[void]]
for i in 0..<10: for i in 0..<runs:
awaitters.add(await nodes[i].start()) awaitters.add(await nodes[i].start())
await subscribeNodes(nodes) await subscribeNodes(nodes)
for i in 0..<10: for i in 0..<runs:
await nodes[i].subscribe("foobar", futs[i][1]) await nodes[i].subscribe("foobar", futs[i][1])
var subs: seq[Future[void]] var subs: seq[Future[void]]
for i in 0..<10: for i in 0..<runs:
for y in 0..<10: for y in 0..<runs:
if y != i: if y != i:
subs &= waitSub(nodes[i], nodes[y], "foobar") subs &= waitSub(nodes[i], nodes[y], "foobar")
await allFuturesThrowing(subs) await allFuturesThrowing(subs)
var pubs: seq[Future[void]] var pubs: seq[Future[void]]
for i in 0..<10: for i in 0..<runs:
pubs &= nodes[i].publish("foobar", cast[seq[byte]]("Hello!")) pubs &= nodes[i].publish("foobar", cast[seq[byte]]("Hello!"))
await allFuturesThrowing(pubs) await allFuturesThrowing(pubs)
@ -261,10 +261,10 @@ suite "FloodSub":
test "FloodSub multiple peers, with self trigger": test "FloodSub multiple peers, with self trigger":
proc runTests(): Future[bool] {.async.} = proc runTests(): Future[bool] {.async.} =
var passed = 0 var runs = 10
var futs = newSeq[(Future[void], TopicHandler, ref int)](10) var futs = newSeq[(Future[void], TopicHandler, ref int)](runs)
for i in 0..<10: for i in 0..<runs:
closureScope: closureScope:
var var
fut = newFuture[void]() fut = newFuture[void]()
@ -274,34 +274,34 @@ suite "FloodSub":
(proc(topic: string, data: seq[byte]) {.async, gcsafe.} = (proc(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar" check topic == "foobar"
inc counter[] inc counter[]
if counter[] == 10: if counter[] == runs:
fut.complete()), fut.complete()),
counter counter
) )
var nodes: seq[Switch] = newSeq[Switch]() var nodes: seq[Switch] = newSeq[Switch]()
for i in 0..<10: for i in 0..<runs:
nodes.add newStandardSwitch(triggerSelf = true) nodes.add newStandardSwitch(triggerSelf = true)
var awaitters: seq[Future[void]] var awaitters: seq[Future[void]]
for i in 0..<10: for i in 0..<runs:
awaitters.add(await nodes[i].start()) awaitters.add(await nodes[i].start())
await subscribeNodes(nodes) await subscribeNodes(nodes)
for i in 0..<10: for i in 0..<runs:
await nodes[i].subscribe("foobar", futs[i][1]) await nodes[i].subscribe("foobar", futs[i][1])
var subs: seq[Future[void]] var subs: seq[Future[void]]
for i in 0..<10: for i in 0..<runs:
for y in 0..<10: for y in 0..<runs:
if y != i: if y != i:
subs &= waitSub(nodes[i], nodes[y], "foobar") subs &= waitSub(nodes[i], nodes[y], "foobar")
await allFuturesThrowing(subs) await allFuturesThrowing(subs)
var pubs: seq[Future[void]] var pubs: seq[Future[void]]
for i in 0..<10: for i in 0..<runs:
pubs &= nodes[i].publish("foobar", cast[seq[byte]]("Hello!")) pubs &= nodes[i].publish("foobar", cast[seq[byte]]("Hello!"))
await allFuturesThrowing(pubs) await allFuturesThrowing(pubs)
@ -310,5 +310,6 @@ suite "FloodSub":
await allFuturesThrowing(awaitters) await allFuturesThrowing(awaitters)
result = true result = true
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true

View File

@ -3,6 +3,7 @@ include ../../libp2p/protocols/pubsub/gossipsub
{.used.} {.used.}
import unittest import unittest
import stew/byteutils
import ../../libp2p/errors import ../../libp2p/errors
import ../../libp2p/stream/bufferstream import ../../libp2p/stream/bufferstream
@ -11,24 +12,27 @@ import ../helpers
type type
TestGossipSub = ref object of GossipSub TestGossipSub = ref object of GossipSub
proc noop(data: seq[byte]) {.async, gcsafe.} = discard
suite "GossipSub internal": suite "GossipSub internal":
teardown: teardown:
for tracker in testTrackers(): for tracker in testTrackers():
# echo tracker.dump()
check tracker.isLeaked() == false check tracker.isLeaked() == false
test "`rebalanceMesh` Degree Lo": test "`rebalanceMesh` Degree Lo":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA).get())) PeerInfo.init(PrivateKey.random(ECDSA).get()))
let topic = "foobar" let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[string]() gossipSub.mesh[topic] = initHashSet[string]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
for i in 0..<15: for i in 0..<15:
let conn = newConnection(newBufferStream()) let conn = newConnection(newBufferStream(noop))
conns &= conn conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(RSA).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].conn = conn gossipSub.peers[peerInfo.id].conn = conn
@ -48,16 +52,16 @@ suite "GossipSub internal":
test "`rebalanceMesh` Degree Hi": test "`rebalanceMesh` Degree Hi":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA).get())) PeerInfo.init(PrivateKey.random(ECDSA).get()))
let topic = "foobar" let topic = "foobar"
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[string]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
for i in 0..<15: for i in 0..<15:
let conn = newConnection(newBufferStream()) let conn = newConnection(newBufferStream(noop))
conns &= conn conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(RSA).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].conn = conn gossipSub.peers[peerInfo.id].conn = conn
@ -77,7 +81,7 @@ suite "GossipSub internal":
test "`replenishFanout` Degree Lo": test "`replenishFanout` Degree Lo":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA).get())) PeerInfo.init(PrivateKey.random(ECDSA).get()))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
@ -87,9 +91,9 @@ suite "GossipSub internal":
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
for i in 0..<15: for i in 0..<15:
let conn = newConnection(newBufferStream()) let conn = newConnection(newBufferStream(noop))
conns &= conn conns &= conn
var peerInfo = PeerInfo.init(PrivateKey.random(RSA).get()) var peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
@ -109,20 +113,21 @@ suite "GossipSub internal":
test "`dropFanoutPeers` drop expired fanout topics": test "`dropFanoutPeers` drop expired fanout topics":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA).get())) PeerInfo.init(PrivateKey.random(ECDSA).get()))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
let topic = "foobar" let topic = "foobar"
gossipSub.fanout[topic] = initHashSet[string]() gossipSub.fanout[topic] = initHashSet[string]()
gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(100.millis) gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(1.millis)
await sleepAsync(5.millis) # allow the topic to expire
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
for i in 0..<6: for i in 0..<6:
let conn = newConnection(newBufferStream()) let conn = newConnection(newBufferStream(noop))
conns &= conn conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(RSA).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
@ -143,7 +148,7 @@ suite "GossipSub internal":
test "`dropFanoutPeers` leave unexpired fanout topics": test "`dropFanoutPeers` leave unexpired fanout topics":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA).get())) PeerInfo.init(PrivateKey.random(ECDSA).get()))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
@ -152,14 +157,15 @@ suite "GossipSub internal":
let topic2 = "foobar2" let topic2 = "foobar2"
gossipSub.fanout[topic1] = initHashSet[string]() gossipSub.fanout[topic1] = initHashSet[string]()
gossipSub.fanout[topic2] = initHashSet[string]() gossipSub.fanout[topic2] = initHashSet[string]()
gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(100.millis) gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(1.millis)
gossipSub.lastFanoutPubSub[topic2] = Moment.fromNow(1.minutes) gossipSub.lastFanoutPubSub[topic2] = Moment.fromNow(1.minutes)
await sleepAsync(5.millis) # allow the topic to expire
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
for i in 0..<6: for i in 0..<6:
let conn = newConnection(newBufferStream()) let conn = newConnection(newBufferStream(noop))
conns &= conn conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(RSA).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
@ -183,7 +189,7 @@ suite "GossipSub internal":
test "`getGossipPeers` - should gather up to degree D non intersecting peers": test "`getGossipPeers` - should gather up to degree D non intersecting peers":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA).get())) PeerInfo.init(PrivateKey.random(ECDSA).get()))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
@ -193,10 +199,12 @@ suite "GossipSub internal":
gossipSub.fanout[topic] = initHashSet[string]() gossipSub.fanout[topic] = initHashSet[string]()
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[string]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
# geerate mesh and fanout peers
for i in 0..<30: for i in 0..<30:
let conn = newConnection(newBufferStream()) let conn = newConnection(newBufferStream(noop))
conns &= conn conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(RSA).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
@ -205,17 +213,27 @@ suite "GossipSub internal":
else: else:
gossipSub.mesh[topic].incl(peerInfo.id) gossipSub.mesh[topic].incl(peerInfo.id)
# generate gossipsub (free standing) peers
for i in 0..<15: for i in 0..<15:
let conn = newConnection(newBufferStream()) let conn = newConnection(newBufferStream(noop))
conns &= conn conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(RSA).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
gossipSub.gossipsub[topic].incl(peerInfo.id) gossipSub.gossipsub[topic].incl(peerInfo.id)
# generate messages
for i in 0..5:
let conn = newConnection(newBufferStream(noop))
conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
conn.peerInfo = peerInfo
let msg = newMessage(peerInfo, ("HELLO" & $i).toBytes(), topic, false)
gossipSub.mcache.put(msg)
check gossipSub.fanout[topic].len == 15 check gossipSub.fanout[topic].len == 15
check gossipSub.fanout[topic].len == 15 check gossipSub.mesh[topic].len == 15
check gossipSub.gossipsub[topic].len == 15 check gossipSub.gossipsub[topic].len == 15
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
@ -234,7 +252,7 @@ suite "GossipSub internal":
test "`getGossipPeers` - should not crash on missing topics in mesh": test "`getGossipPeers` - should not crash on missing topics in mesh":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA).get())) PeerInfo.init(PrivateKey.random(ECDSA).get()))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
@ -244,9 +262,9 @@ suite "GossipSub internal":
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[string]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
for i in 0..<30: for i in 0..<30:
let conn = newConnection(newBufferStream()) let conn = newConnection(newBufferStream(noop))
conns &= conn conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(RSA).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
@ -255,6 +273,58 @@ suite "GossipSub internal":
else: else:
gossipSub.gossipsub[topic].incl(peerInfo.id) gossipSub.gossipsub[topic].incl(peerInfo.id)
# generate messages
for i in 0..5:
let conn = newConnection(newBufferStream(noop))
conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
conn.peerInfo = peerInfo
let msg = newMessage(peerInfo, ("HELLO" & $i).toBytes(), topic, false)
gossipSub.mcache.put(msg)
let peers = gossipSub.getGossipPeers()
check peers.len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close()))
result = true
check:
waitFor(testRun()) == true
test "`getGossipPeers` - should not crash on missing topics in fanout":
proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(ECDSA).get()))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard
let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[string]()
gossipSub.gossipsub[topic] = initHashSet[string]()
var conns = newSeq[Connection]()
for i in 0..<30:
let conn = newConnection(newBufferStream(noop))
conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler
if i mod 2 == 0:
gossipSub.mesh[topic].incl(peerInfo.id)
else:
gossipSub.gossipsub[topic].incl(peerInfo.id)
# generate messages
for i in 0..5:
let conn = newConnection(newBufferStream(noop))
conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
conn.peerInfo = peerInfo
let msg = newMessage(peerInfo, ("HELLO" & $i).toBytes(), topic, false)
gossipSub.mcache.put(msg)
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
check peers.len == GossipSubD check peers.len == GossipSubD
@ -268,41 +338,7 @@ suite "GossipSub internal":
test "`getGossipPeers` - should not crash on missing topics in gossip": test "`getGossipPeers` - should not crash on missing topics in gossip":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA).get())) PeerInfo.init(PrivateKey.random(ECDSA).get()))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard
let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[string]()
gossipSub.gossipsub[topic] = initHashSet[string]()
var conns = newSeq[Connection]()
for i in 0..<30:
let conn = newConnection(newBufferStream())
conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(RSA).get())
conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler
if i mod 2 == 0:
gossipSub.mesh[topic].incl(peerInfo.id)
else:
gossipSub.gossipsub[topic].incl(peerInfo.id)
let peers = gossipSub.getGossipPeers()
check peers.len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close()))
result = true
check:
waitFor(testRun()) == true
test "`getGossipPeers` - should not crash on missing topics in gossip":
proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA).get()))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
@ -312,9 +348,9 @@ suite "GossipSub internal":
gossipSub.fanout[topic] = initHashSet[string]() gossipSub.fanout[topic] = initHashSet[string]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
for i in 0..<30: for i in 0..<30:
let conn = newConnection(newBufferStream()) let conn = newConnection(newBufferStream(noop))
conns &= conn conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(RSA).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
@ -323,6 +359,15 @@ suite "GossipSub internal":
else: else:
gossipSub.fanout[topic].incl(peerInfo.id) gossipSub.fanout[topic].incl(peerInfo.id)
# generate messages
for i in 0..5:
let conn = newConnection(newBufferStream(noop))
conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
conn.peerInfo = peerInfo
let msg = newMessage(peerInfo, ("bar" & $i).toBytes(), topic, false)
gossipSub.mcache.put(msg)
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
check peers.len == 0 check peers.len == 0

View File

@ -318,12 +318,13 @@ suite "GossipSub":
proc runTests(): Future[bool] {.async.} = proc runTests(): Future[bool] {.async.} =
var nodes: seq[Switch] = newSeq[Switch]() var nodes: seq[Switch] = newSeq[Switch]()
var awaitters: seq[Future[void]] var awaitters: seq[Future[void]]
var runs = 10
for i in 0..<11: for i in 0..<runs:
nodes.add newStandardSwitch(triggerSelf = true, gossip = true) nodes.add newStandardSwitch(triggerSelf = true, gossip = true)
awaitters.add((await nodes[i].start())) awaitters.add((await nodes[i].start()))
await subscribeNodes(nodes) await subscribeRandom(nodes)
var seen: Table[string, int] var seen: Table[string, int]
var subs: seq[Future[void]] var subs: seq[Future[void]]
@ -337,10 +338,11 @@ suite "GossipSub":
seen[dialerNode.peerInfo.id] = 0 seen[dialerNode.peerInfo.id] = 0
seen[dialerNode.peerInfo.id].inc seen[dialerNode.peerInfo.id].inc
check topic == "foobar" check topic == "foobar"
if not seenFut.finished() and seen.len == 10: if not seenFut.finished() and seen.len >= runs:
seenFut.complete() seenFut.complete()
subs.add(allFutures(dialer.subscribe("foobar", handler), waitSub(nodes[0], dialer, "foobar"))) subs.add(allFutures(dialer.subscribe("foobar", handler),
waitSub(nodes[0], dialer, "foobar")))
await allFuturesThrowing(subs) await allFuturesThrowing(subs)
@ -350,7 +352,7 @@ suite "GossipSub":
1.minutes) 1.minutes)
await wait(seenFut, 2.minutes) await wait(seenFut, 2.minutes)
check: seen.len >= 10 check: seen.len >= runs
for k, v in seen.pairs: for k, v in seen.pairs:
check: v == 1 check: v == 1

View File

@ -1,7 +1,10 @@
import random
import chronos import chronos
import ../../libp2p/standard_setup import ../../libp2p/standard_setup
export standard_setup export standard_setup
randomize()
proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] = proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] =
for i in 0..<num: for i in 0..<num:
result.add(newStandardSwitch(gossip = gossip)) result.add(newStandardSwitch(gossip = gossip))
@ -13,3 +16,15 @@ proc subscribeNodes*(nodes: seq[Switch]) {.async.} =
if dialer.peerInfo.peerId != node.peerInfo.peerId: if dialer.peerInfo.peerId != node.peerInfo.peerId:
dials.add(dialer.connect(node.peerInfo)) dials.add(dialer.connect(node.peerInfo))
await allFutures(dials) await allFutures(dials)
proc subscribeRandom*(nodes: seq[Switch]) {.async.} =
var dials: seq[Future[void]]
for dialer in nodes:
var dialed: seq[string]
while dialed.len < nodes.len - 1:
let node = sample(nodes)
if node.peerInfo.id notin dialed:
if dialer.peerInfo.id != node.peerInfo.id:
dials.add(dialer.connect(node.peerInfo))
dialed &= node.peerInfo.id
await allFutures(dials)