fix publishing (#250)

* use var semantics to optimize table access

* wip... lvalues don't work properly sadly...

* big publish refactor, replenish and balance

* fix internal tests

* use g.peers for fanout (todo: don't include flood peers)

* exclude non gossip from fanout

* internal test fixes

* fix flood tests

* fix test's trypublish

* test interop fixes

* make sure to not remove peers from gossip table

* restore old replenishFanout

* cleanups

* Cleanup resources (#246)

* consolidate reading in lpstream

* remove debug echo

* tune log level

* add channel cleanup and cancelation handling

* cancelation handling

* cancelation handling

* cancelation handling

* cancelation handling

* cleanup and cancelation handling

* cancelation handling

* cancelation

* tests

* rename isConnected to connected

* remove testing trace

* comment out debug stacktraces

* explicit raises

* restore trace vs debug in gossip

* improve fanout replenish behavior further

* cleanup stale peers more eaguerly

* synchronize connection cleanup and small refactor

* close client first and call parent second

* disconnect failed peers on publish

* check for publish result

* fix tests

* fix tests

* always call close

Co-authored-by: Giovanni Petrantoni <giovanni@fragcolor.xyz>
This commit is contained in:
Dmitriy Ryajov 2020-07-07 18:33:05 -06:00 committed by GitHub
parent 775cab414a
commit a52763cc6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 482 additions and 363 deletions

View File

@ -59,7 +59,7 @@ template tryAndWarn*(message: static[string]; body: untyped): untyped =
try: try:
body body
except CancelledError as exc: except CancelledError as exc:
raise exc # TODO: why catch and re-raise? raise exc
except CatchableError as exc: except CatchableError as exc:
warn "An exception has ocurred, enable trace logging for details", name = exc.name, msg = message warn "An exception has ocurred, enable trace logging for details", name = exc.name, msg = message
trace "Exception details", exc = exc.msg trace "Exception details", exc = exc.msg

View File

@ -47,14 +47,14 @@ method subscribeTopic*(f: FloodSub,
# unsubscribe the peer from the topic # unsubscribe the peer from the topic
f.floodsub[topic].excl(peerId) f.floodsub[topic].excl(peerId)
method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async.} = method handleDisconnect*(f: FloodSub, peer: PubSubPeer) =
await procCall PubSub(f).handleDisconnect(peer)
## handle peer disconnects ## handle peer disconnects
for t in toSeq(f.floodsub.keys): for t in toSeq(f.floodsub.keys):
if t in f.floodsub: if t in f.floodsub:
f.floodsub[t].excl(peer.id) f.floodsub[t].excl(peer.id)
procCall PubSub(f).handleDisconnect(peer)
method rpcHandler*(f: FloodSub, method rpcHandler*(f: FloodSub,
peer: PubSubPeer, peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async.} = rpcMsgs: seq[RPCMsg]) {.async.} =
@ -86,18 +86,20 @@ method rpcHandler*(f: FloodSub,
trace "calling handler for message", topicId = t, trace "calling handler for message", topicId = t,
localPeer = f.peerInfo.id, localPeer = f.peerInfo.id,
fromPeer = msg.fromPeer.pretty fromPeer = msg.fromPeer.pretty
await h(t, msg.data) # trigger user provided handler
try:
await h(t, msg.data) # trigger user provided handler
except CatchableError as exc:
trace "exception in message handler", exc = exc.msg
# forward the message to all peers interested in it # forward the message to all peers interested in it
var sent: seq[Future[void]] let (published, failed) = await f.sendHelper(toSendPeers, m.messages)
# start the future but do not wait yet for p in failed:
for p in toSendPeers: let peer = f.peers.getOrDefault(p)
if p in f.peers and f.peers[p].id != peer.id: if not(isNil(peer)):
sent.add(f.peers[p].send(@[RPCMsg(messages: m.messages)])) f.handleDisconnect(peer) # cleanup failed peers
# wait for all the futures now trace "forwared message to peers", peers = published.len
sent = await allFinished(sent)
checkFutures(sent)
method init*(f: FloodSub) = method init*(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async.} = proc handler(conn: Connection, proto: string) {.async.} =
@ -111,15 +113,16 @@ method init*(f: FloodSub) =
f.handler = handler f.handler = handler
f.codec = FloodSubCodec f.codec = FloodSubCodec
method subscribeToPeer*(p: FloodSub, method subscribePeer*(p: FloodSub,
conn: Connection) {.async.} = conn: Connection) =
await procCall PubSub(p).subscribeToPeer(conn) procCall PubSub(p).subscribePeer(conn)
asyncCheck p.handleConn(conn, FloodSubCodec) asyncCheck p.handleConn(conn, FloodSubCodec)
method publish*(f: FloodSub, method publish*(f: FloodSub,
topic: string, topic: string,
data: seq[byte]) {.async.} = data: seq[byte]): Future[int] {.async.} =
await procCall PubSub(f).publish(topic, data) # base returns always 0
discard await procCall PubSub(f).publish(topic, data)
if data.len <= 0 or topic.len <= 0: if data.len <= 0 or topic.len <= 0:
trace "topic or data missing, skipping publish" trace "topic or data missing, skipping publish"
@ -131,19 +134,18 @@ method publish*(f: FloodSub,
trace "publishing on topic", name = topic trace "publishing on topic", name = topic
let msg = Message.init(f.peerInfo, data, topic, f.sign) let msg = Message.init(f.peerInfo, data, topic, f.sign)
var sent: seq[Future[void]]
# start the future but do not wait yet # start the future but do not wait yet
for p in f.floodsub.getOrDefault(topic): let (published, failed) = await f.sendHelper(f.floodsub.getOrDefault(topic), @[msg])
if p in f.peers: for p in failed:
trace "publishing message", name = topic, peer = p, data = data.shortLog let peer = f.peers.getOrDefault(p)
sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])])) f.handleDisconnect(peer) # cleanup failed peers
# wait for all the futures now
sent = await allFinished(sent)
checkFutures(sent)
libp2p_pubsub_messages_published.inc(labelValues = [topic]) libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "published message to peers", peers = published.len,
msg = msg.shortLog()
return published.len
method unsubscribe*(f: FloodSub, method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) {.async.} = topics: seq[TopicPair]) {.async.} =
await procCall PubSub(f).unsubscribe(topics) await procCall PubSub(f).unsubscribe(topics)

View File

@ -56,9 +56,17 @@ type
heartbeatRunning: bool heartbeatRunning: bool
heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats
declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"]) declareGauge(libp2p_gossipsub_peers_per_topic_mesh,
declareGauge(libp2p_gossipsub_peers_per_topic_fanout, "gossipsub peers per topic in fanout", labels = ["topic"]) "gossipsub peers per topic in mesh",
declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, "gossipsub peers per topic in gossipsub", labels = ["topic"]) labels = ["topic"])
declareGauge(libp2p_gossipsub_peers_per_topic_fanout,
"gossipsub peers per topic in fanout",
labels = ["topic"])
declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub,
"gossipsub peers per topic in gossipsub",
labels = ["topic"])
method init*(g: GossipSub) = method init*(g: GossipSub) =
proc handler(conn: Connection, proto: string) {.async.} = proc handler(conn: Connection, proto: string) {.async.} =
@ -72,7 +80,7 @@ method init*(g: GossipSub) =
g.handler = handler g.handler = handler
g.codec = GossipSubCodec g.codec = GossipSubCodec
proc replenishFanout(g: GossipSub, topic: string) {.async.} = proc replenishFanout(g: GossipSub, topic: string) =
## get fanout peers for a topic ## get fanout peers for a topic
trace "about to replenish fanout" trace "about to replenish fanout"
if topic notin g.fanout: if topic notin g.fanout:
@ -80,16 +88,54 @@ proc replenishFanout(g: GossipSub, topic: string) {.async.} =
if g.fanout.getOrDefault(topic).len < GossipSubDLo: if g.fanout.getOrDefault(topic).len < GossipSubDLo:
trace "replenishing fanout", peers = g.fanout.getOrDefault(topic).len trace "replenishing fanout", peers = g.fanout.getOrDefault(topic).len
if topic in g.gossipsub: if topic in toSeq(g.gossipsub.keys):
for p in g.gossipsub.getOrDefault(topic): for p in g.gossipsub.getOrDefault(topic):
if not g.fanout[topic].containsOrIncl(p): if not g.fanout[topic].containsOrIncl(p):
if g.fanout.getOrDefault(topic).len == GossipSubD: if g.fanout.getOrDefault(topic).len == GossipSubD:
break break
libp2p_gossipsub_peers_per_topic_fanout libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic]) .set(g.fanout.getOrDefault(topic).len.int64,
labelValues = [topic])
trace "fanout replenished with peers", peers = g.fanout.getOrDefault(topic).len trace "fanout replenished with peers", peers = g.fanout.getOrDefault(topic).len
template moveToMeshHelper(g: GossipSub,
topic: string,
table: Table[string, HashSet[string]]) =
## move peers from `table` into `mesh`
##
var peerIds = toSeq(table.getOrDefault(topic))
logScope:
topic = topic
meshPeers = g.mesh.getOrDefault(topic).len
peers = peerIds.len
shuffle(peerIds)
for id in peerIds:
if g.mesh.getOrDefault(topic).len > GossipSubD:
break
trace "gathering peers for mesh"
if topic notin table:
continue
trace "getting peers", topic,
peers = peerIds.len
table[topic].excl(id) # always exclude
if id in g.mesh[topic]:
continue # we already have this peer in the mesh, try again
if id in g.peers:
let p = g.peers[id]
if p.connected:
# send a graft message to the peer
await p.sendGraft(@[topic])
g.mesh[topic].incl(id)
trace "got peer", peer = id
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
try: try:
trace "about to rebalance mesh" trace "about to rebalance mesh"
@ -100,69 +146,47 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
if g.mesh.getOrDefault(topic).len < GossipSubDlo: if g.mesh.getOrDefault(topic).len < GossipSubDlo:
trace "replenishing mesh", topic trace "replenishing mesh", topic
# replenish the mesh if we're below GossipSubDlo # replenish the mesh if we're below GossipSubDlo
while g.mesh.getOrDefault(topic).len < GossipSubD:
trace "gathering peers", peers = g.mesh.getOrDefault(topic).len
await sleepAsync(1.millis) # don't starve the event loop
var id: string
if topic in g.fanout and g.fanout.getOrDefault(topic).len > 0:
trace "getting peer from fanout", topic,
peers = g.fanout.getOrDefault(topic).len
id = sample(toSeq(g.fanout.getOrDefault(topic))) # move fanout nodes first
g.fanout[topic].excl(id) g.moveToMeshHelper(topic, g.fanout)
if id in g.fanout[topic]: # move gossipsub nodes second
continue # we already have this peer in the mesh, try again g.moveToMeshHelper(topic, g.gossipsub)
trace "got fanout peer", peer = id if g.mesh.getOrDefault(topic).len > GossipSubDhi:
elif topic in g.gossipsub and g.gossipsub.getOrDefault(topic).len > 0: # prune peers if we've gone over
trace "getting peer from gossipsub", topic, var mesh = toSeq(g.mesh.getOrDefault(topic))
peers = g.gossipsub.getOrDefault(topic).len shuffle(mesh)
id = sample(toSeq(g.gossipsub[topic])) trace "about to prune mesh", mesh = mesh.len
g.gossipsub[topic].excl(id) for id in mesh:
if g.mesh.getOrDefault(topic).len <= GossipSubD:
if id in g.mesh[topic]:
continue # we already have this peer in the mesh, try again
trace "got gossipsub peer", peer = id
else:
trace "no more peers"
break break
g.mesh[topic].incl(id)
if id in g.peers:
let p = g.peers[id]
# send a graft message to the peer
await p.sendGraft(@[topic])
# prune peers if we've gone over
if g.mesh.getOrDefault(topic).len > GossipSubDhi:
trace "about to prune mesh", mesh = g.mesh.getOrDefault(topic).len
while g.mesh.getOrDefault(topic).len > GossipSubD:
trace "pruning peers", peers = g.mesh[topic].len trace "pruning peers", peers = g.mesh[topic].len
let id = toSeq(g.mesh[topic])[rand(0..<g.mesh[topic].len)]
g.mesh[topic].excl(id)
let p = g.peers[id] let p = g.peers[id]
# send a graft message to the peer # send a graft message to the peer
await p.sendPrune(@[topic]) await p.sendPrune(@[topic])
g.mesh[topic].excl(id)
libp2p_gossipsub_peers_per_topic_gossipsub libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic]) .set(g.gossipsub.getOrDefault(topic).len.int64,
labelValues = [topic])
libp2p_gossipsub_peers_per_topic_fanout libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic]) .set(g.fanout.getOrDefault(topic).len.int64,
labelValues = [topic])
libp2p_gossipsub_peers_per_topic_mesh libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.getOrDefault(topic).len.int64, labelValues = [topic]) .set(g.mesh.getOrDefault(topic).len.int64,
labelValues = [topic])
trace "mesh balanced, got peers", peers = g.mesh.getOrDefault(topic).len, trace "mesh balanced, got peers", peers = g.mesh.getOrDefault(topic).len,
topicId = topic topicId = topic
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
trace "exception occurred re-balancing mesh", exc = exc.msg warn "exception occurred re-balancing mesh", exc = exc.msg
proc dropFanoutPeers(g: GossipSub) {.async.} = proc dropFanoutPeers(g: GossipSub) {.async.} =
# drop peers that we haven't published to in # drop peers that we haven't published to in
@ -172,6 +196,7 @@ proc dropFanoutPeers(g: GossipSub) {.async.} =
if Moment.now > val: if Moment.now > val:
dropping.add(topic) dropping.add(topic)
g.fanout.del(topic) g.fanout.del(topic)
trace "dropping fanout topic", topic
for topic in dropping: for topic in dropping:
g.lastFanoutPubSub.del(topic) g.lastFanoutPubSub.del(topic)
@ -181,36 +206,46 @@ proc dropFanoutPeers(g: GossipSub) {.async.} =
proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
## gossip iHave messages to peers ## gossip iHave messages to peers
##
trace "getting gossip peers (iHave)"
let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys))
let controlMsg = ControlMessage()
for topic in topics: for topic in topics:
let mesh: HashSet[string] = g.mesh.getOrDefault(topic) var allPeers = toSeq(g.gossipsub.getOrDefault(topic))
let fanout: HashSet[string] = g.fanout.getOrDefault(topic) shuffle(allPeers)
let mesh = g.mesh.getOrDefault(topic)
let fanout = g.fanout.getOrDefault(topic)
let gossipPeers = mesh + fanout let gossipPeers = mesh + fanout
let mids = g.mcache.window(topic) let mids = g.mcache.window(topic)
if mids.len > 0: if mids.len <= 0:
let ihave = ControlIHave(topicID: topic, continue
messageIDs: toSeq(mids))
if topic notin g.gossipsub: let ihave = ControlIHave(topicID: topic,
trace "topic not in gossip array, skipping", topicID = topic messageIDs: toSeq(mids))
if topic notin g.gossipsub:
trace "topic not in gossip array, skipping", topicID = topic
continue
for id in allPeers:
if result.len >= GossipSubD:
trace "got gossip peers", peers = result.len
break
if allPeers.len == 0:
trace "no peers for topic, skipping", topicID = topic
break
if id in gossipPeers:
continue continue
while result.len < GossipSubD: if id notin result:
if g.gossipsub.getOrDefault(topic).len == 0: result[id] = controlMsg
trace "no peers for topic, skipping", topicID = topic
break
let id = toSeq(g.gossipsub.getOrDefault(topic)).sample() result[id].ihave.add(ihave)
if id in g.gossipsub.getOrDefault(topic):
g.gossipsub[topic].excl(id)
if id notin gossipPeers:
if id notin result:
result[id] = ControlMessage()
result[id].ihave.add(ihave)
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic])
proc heartbeat(g: GossipSub) {.async.} = proc heartbeat(g: GossipSub) {.async.} =
while g.heartbeatRunning: while g.heartbeatRunning:
@ -221,6 +256,11 @@ proc heartbeat(g: GossipSub) {.async.} =
await g.rebalanceMesh(t) await g.rebalanceMesh(t)
await g.dropFanoutPeers() await g.dropFanoutPeers()
# replenish known topics to the fanout
for t in toSeq(g.fanout.keys):
g.replenishFanout(t)
let peers = g.getGossipPeers() let peers = g.getGossipPeers()
var sent: seq[Future[void]] var sent: seq[Future[void]]
for peer in peers.keys: for peer in peers.keys:
@ -236,11 +276,9 @@ proc heartbeat(g: GossipSub) {.async.} =
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} = method handleDisconnect*(g: GossipSub, peer: PubSubPeer) =
## handle peer disconnects ## handle peer disconnects
trace "peer disconnected", peer=peer.id procCall FloodSub(g).handleDisconnect(peer)
await procCall FloodSub(g).handleDisconnect(peer)
for t in toSeq(g.gossipsub.keys): for t in toSeq(g.gossipsub.keys):
if t in g.gossipsub: if t in g.gossipsub:
@ -249,9 +287,6 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} =
libp2p_gossipsub_peers_per_topic_gossipsub libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t]) .set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t])
# mostly for metrics
await procCall PubSub(g).subscribeTopic(t, false, peer.id)
for t in toSeq(g.mesh.keys): for t in toSeq(g.mesh.keys):
if t in g.mesh: if t in g.mesh:
g.mesh[t].excl(peer.id) g.mesh[t].excl(peer.id)
@ -266,9 +301,9 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} =
libp2p_gossipsub_peers_per_topic_fanout libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout[t].len.int64, labelValues = [t]) .set(g.fanout[t].len.int64, labelValues = [t])
method subscribeToPeer*(p: GossipSub, method subscribePeer*(p: GossipSub,
conn: Connection) {.async.} = conn: Connection) =
await procCall PubSub(p).subscribeToPeer(conn) procCall PubSub(p).subscribePeer(conn)
asyncCheck p.handleConn(conn, GossipSubCodec) asyncCheck p.handleConn(conn, GossipSubCodec)
method subscribeTopic*(g: GossipSub, method subscribeTopic*(g: GossipSub,
@ -290,8 +325,11 @@ method subscribeTopic*(g: GossipSub,
g.gossipsub[topic].excl(peerId) g.gossipsub[topic].excl(peerId)
libp2p_gossipsub_peers_per_topic_gossipsub libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic]) .set(g.gossipsub[topic].len.int64, labelValues = [topic])
trace "gossip peers", peers = g.gossipsub[topic].len, topic
# also rebalance current topic if we are subbed to
if topic in g.topics: if topic in g.topics:
await g.rebalanceMesh(topic) await g.rebalanceMesh(topic)
@ -385,7 +423,6 @@ method rpcHandler*(g: GossipSub,
continue continue
for t in msg.topicIDs: # for every topic in the message for t in msg.topicIDs: # for every topic in the message
await g.rebalanceMesh(t) # gather peers for each topic
if t in g.floodsub: if t in g.floodsub:
toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic
@ -397,29 +434,19 @@ method rpcHandler*(g: GossipSub,
trace "calling handler for message", topicId = t, trace "calling handler for message", topicId = t,
localPeer = g.peerInfo.id, localPeer = g.peerInfo.id,
fromPeer = msg.fromPeer.pretty fromPeer = msg.fromPeer.pretty
await h(t, msg.data) # trigger user provided handler try:
await h(t, msg.data) # trigger user provided handler
except CatchableError as exc:
trace "exception in message handler", exc = exc.msg
# forward the message to all peers interested in it # forward the message to all peers interested in it
for p in toSendPeers: let (published, failed) = await g.sendHelper(toSendPeers, m.messages)
if p in g.peers: for p in failed:
let id = g.peers[p].peerInfo.peerId let peer = g.peers.getOrDefault(p)
trace "about to forward message to peer", peerId = id, msgs = m.messages if not(isNil(peer)):
g.handleDisconnect(peer) # cleanup failed peers
if id == peer.peerInfo.peerId: trace "forwared message to peers", peers = published.len
trace "not forwarding message to originator", peerId = id
continue
let msgs = m.messages.filterIt(
# don't forward to message originator
id != it.fromPeer
)
var sent: seq[Future[void]]
if msgs.len > 0:
trace "forwarding message to", peerId = id
sent.add(g.peers[p].send(@[RPCMsg(messages: msgs)]))
sent = await allFinished(sent)
checkFutures(sent)
var respControl: ControlMessage var respControl: ControlMessage
if m.control.isSome: if m.control.isSome:
@ -457,53 +484,49 @@ method unsubscribe*(g: GossipSub,
method publish*(g: GossipSub, method publish*(g: GossipSub,
topic: string, topic: string,
data: seq[byte]) {.async.} = data: seq[byte]): Future[int] {.async.} =
await procCall PubSub(g).publish(topic, data) # base returns always 0
discard await procCall PubSub(g).publish(topic, data)
trace "about to publish message on topic", name = topic, trace "about to publish message on topic", name = topic,
data = data.shortLog data = data.shortLog
var peers: HashSet[string]
if topic.len <= 0: # data could be 0/empty
return 0
# TODO: we probably don't need to try multiple times if topic in g.topics: # if we're subscribed use the mesh
if data.len > 0 and topic.len > 0: peers = g.mesh.getOrDefault(topic)
var peers = g.mesh.getOrDefault(topic) else: # not subscribed, send to fanout peers
for _ in 0..<5: # try to get peers up to 5 times # try optimistically
if peers.len > 0: peers = g.fanout.getOrDefault(topic)
break if peers.len == 0:
# ok we had nothing.. let's try replenish inline
g.replenishFanout(topic)
peers = g.fanout.getOrDefault(topic)
if topic in g.topics: # if we're subscribed to the topic attempt to build a mesh let
await g.rebalanceMesh(topic) msg = Message.init(g.peerInfo, data, topic, g.sign)
peers = g.mesh.getOrDefault(topic) msgId = g.msgIdProvider(msg)
else: # send to fanout peers
await g.replenishFanout(topic)
if topic in g.fanout:
peers = g.fanout.getOrDefault(topic)
# set the fanout expiry time
g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL)
# wait a second between tries trace "created new message", msg
await sleepAsync(1.seconds)
let trace "publishing on topic", name = topic, peers = peers
msg = Message.init(g.peerInfo, data, topic, g.sign) if msgId notin g.mcache:
msgId = g.msgIdProvider(msg) g.mcache.put(msgId, msg)
trace "created new message", msg let (published, failed) = await g.sendHelper(peers, @[msg])
var sent: seq[Future[void]] for p in failed:
for p in peers: let peer = g.peers.getOrDefault(p)
if p == g.peerInfo.id: g.handleDisconnect(peer) # cleanup failed peers
continue
trace "publishing on topic", name = topic
if msgId notin g.mcache:
g.mcache.put(msgId, msg)
if p in g.peers:
sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])]))
checkFutures(await allFinished(sent))
if published.len > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic]) libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "published message to peers", peers = published.len,
msg = msg.shortLog()
return published.len
method start*(g: GossipSub) {.async.} = method start*(g: GossipSub) {.async.} =
debug "gossipsub start" trace "gossipsub start"
## start pubsub ## start pubsub
## start long running/repeating procedures ## start long running/repeating procedures
@ -518,7 +541,7 @@ method start*(g: GossipSub) {.async.} =
g.heartbeatLock.release() g.heartbeatLock.release()
method stop*(g: GossipSub) {.async.} = method stop*(g: GossipSub) {.async.} =
debug "gossipsub stop" trace "gossipsub stop"
## stop pubsub ## stop pubsub
## stop long running tasks ## stop long running tasks
@ -528,8 +551,9 @@ method stop*(g: GossipSub) {.async.} =
# stop heartbeat interval # stop heartbeat interval
g.heartbeatRunning = false g.heartbeatRunning = false
if not g.heartbeatFut.finished: if not g.heartbeatFut.finished:
debug "awaiting last heartbeat" trace "awaiting last heartbeat"
await g.heartbeatFut await g.heartbeatFut
trace "heartbeat stopped"
g.heartbeatLock.release() g.heartbeatLock.release()

View File

@ -28,9 +28,10 @@ declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics")
declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages") declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages")
declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages") declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages")
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"]) declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])
declareGauge(libp2p_pubsub_peers_per_topic, "pubsub peers per topic", labels = ["topic"])
type type
SendRes = tuple[published: seq[string], failed: seq[string]] # keep private
TopicHandler* = proc(topic: string, TopicHandler* = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe.} data: seq[byte]): Future[void] {.gcsafe.}
@ -58,6 +59,18 @@ type
observers: ref seq[PubSubObserver] # ref as in smart_ptr observers: ref seq[PubSubObserver] # ref as in smart_ptr
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
## handle peer disconnects
##
if peer.id in p.peers:
trace "deleting peer", peer = peer.id, stack = getStackTrace()
p.peers[peer.id] = nil
p.peers.del(peer.id)
# metrics
libp2p_pubsub_peers.set(p.peers.len.int64)
trace "peer disconnected", peer = peer.id
proc sendSubs*(p: PubSub, proc sendSubs*(p: PubSub,
peer: PubSubPeer, peer: PubSubPeer,
topics: seq[string], topics: seq[string],
@ -74,16 +87,26 @@ proc sendSubs*(p: PubSub,
topicName = t topicName = t
msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe)) msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe))
await peer.send(@[msg]) try:
# wait for a connection before publishing
# this happens when
if not peer.onConnect.isSet:
trace "awaiting send connection"
await peer.onConnect.wait()
await peer.send(@[msg])
except CancelledError as exc:
p.handleDisconnect(peer)
raise exc
except CatchableError as exc:
trace "unable to send subscriptions", exc = exc.msg
p.handleDisconnect(peer)
method subscribeTopic*(p: PubSub, method subscribeTopic*(p: PubSub,
topic: string, topic: string,
subscribe: bool, subscribe: bool,
peerId: string) {.base, async.} = peerId: string) {.base, async.} =
if subscribe: discard
libp2p_pubsub_peers_per_topic.inc(labelValues = [topic])
else:
libp2p_pubsub_peers_per_topic.dec(labelValues = [topic])
method rpcHandler*(p: PubSub, method rpcHandler*(p: PubSub,
peer: PubSubPeer, peer: PubSubPeer,
@ -98,23 +121,6 @@ method rpcHandler*(p: PubSub,
trace "about to subscribe to topic", topicId = s.topic trace "about to subscribe to topic", topicId = s.topic
await p.subscribeTopic(s.topic, s.subscribe, peer.id) await p.subscribeTopic(s.topic, s.subscribe, peer.id)
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base.} =
## handle peer disconnects
if peer.id in p.peers:
p.peers.del(peer.id)
# metrics
libp2p_pubsub_peers.set(p.peers.len.int64)
proc cleanUpHelper(p: PubSub, peer: PubSubPeer) {.async.} =
try:
await p.cleanupLock.acquire()
peer.refs.dec() # decrement refcount
if peer.refs <= 0:
await p.handleDisconnect(peer)
finally:
p.cleanupLock.release()
proc getPeer(p: PubSub, proc getPeer(p: PubSub,
peerInfo: PeerInfo, peerInfo: PeerInfo,
proto: string): PubSubPeer = proto: string): PubSubPeer =
@ -123,26 +129,13 @@ proc getPeer(p: PubSub,
# create new pubsub peer # create new pubsub peer
let peer = newPubSubPeer(peerInfo, proto) let peer = newPubSubPeer(peerInfo, proto)
trace "created new pubsub peer", peerId = peer.id trace "created new pubsub peer", peerId = peer.id, stack = getStackTrace()
# metrics
p.peers[peer.id] = peer p.peers[peer.id] = peer
peer.refs.inc # increment reference count
peer.observers = p.observers peer.observers = p.observers
libp2p_pubsub_peers.set(p.peers.len.int64) libp2p_pubsub_peers.set(p.peers.len.int64)
return peer return peer
proc internalCleanup(p: PubSub, conn: Connection) {.async.} =
# handle connection close
if isNil(conn):
return
var peer = p.getPeer(conn.peerInfo, p.codec)
await conn.closeEvent.wait()
trace "pubsub conn closed, cleaning up peer", peer = conn.peerInfo.id
await p.cleanUpHelper(peer)
method handleConn*(p: PubSub, method handleConn*(p: PubSub,
conn: Connection, conn: Connection,
proto: string) {.base, async.} = proto: string) {.base, async.} =
@ -157,41 +150,46 @@ method handleConn*(p: PubSub,
## that we're interested in ## that we're interested in
## ##
if isNil(conn.peerInfo):
trace "no valid PeerId for peer"
await conn.close()
return
proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
# call pubsub rpc handler
await p.rpcHandler(peer, msgs)
let peer = p.getPeer(conn.peerInfo, proto)
let topics = toSeq(p.topics.keys)
if topics.len > 0:
await p.sendSubs(peer, topics, true)
try: try:
if isNil(conn.peerInfo):
trace "no valid PeerId for peer"
await conn.close()
return
proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
# call pubsub rpc handler
await p.rpcHandler(peer, msgs)
asyncCheck p.internalCleanup(conn)
let peer = p.getPeer(conn.peerInfo, proto)
let topics = toSeq(p.topics.keys)
if topics.len > 0:
await p.sendSubs(peer, topics, true)
peer.handler = handler peer.handler = handler
await peer.handle(conn) # spawn peer read loop await peer.handle(conn) # spawn peer read loop
trace "pubsub peer handler ended, cleaning up" trace "pubsub peer handler ended", peer = peer.id
except CancelledError as exc: except CancelledError as exc:
await conn.close()
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
trace "exception ocurred in pubsub handle", exc = exc.msg trace "exception ocurred in pubsub handle", exc = exc.msg
finally:
p.handleDisconnect(peer)
await conn.close() await conn.close()
method subscribeToPeer*(p: PubSub, method subscribePeer*(p: PubSub, conn: Connection) {.base.} =
conn: Connection) {.base, async.} =
if not(isNil(conn)): if not(isNil(conn)):
let peer = p.getPeer(conn.peerInfo, p.codec) let peer = p.getPeer(conn.peerInfo, p.codec)
trace "setting connection for peer", peerId = conn.peerInfo.id trace "subscribing to peer", peerId = conn.peerInfo.id
if not peer.connected: if not peer.connected:
peer.conn = conn peer.conn = conn
asyncCheck p.internalCleanup(conn) method unsubscribePeer*(p: PubSub, peerInfo: PeerInfo) {.base, async.} =
let peer = p.getPeer(peerInfo, p.codec)
trace "unsubscribing from peer", peerId = $peerInfo
if not(isNil(peer.conn)):
await peer.conn.close()
p.handleDisconnect(peer)
proc connected*(p: PubSub, peer: PeerInfo): bool = proc connected*(p: PubSub, peer: PeerInfo): bool =
let peer = p.getPeer(peer, p.codec) let peer = p.getPeer(peer, p.codec)
@ -231,16 +229,46 @@ method subscribe*(p: PubSub,
p.topics[topic].handler.add(handler) p.topics[topic].handler.add(handler)
for peer in p.peers.values: for peer in toSeq(p.peers.values):
await p.sendSubs(peer, @[topic], true) await p.sendSubs(peer, @[topic], true)
# metrics # metrics
libp2p_pubsub_topics.inc() libp2p_pubsub_topics.inc()
proc sendHelper*(p: PubSub,
sendPeers: HashSet[string],
msgs: seq[Message]): Future[SendRes] {.async.} =
var sent: seq[tuple[id: string, fut: Future[void]]]
for sendPeer in sendPeers:
# avoid sending to self
if sendPeer == p.peerInfo.id:
continue
let peer = p.peers.getOrDefault(sendPeer)
if isNil(peer):
continue
trace "sending messages to peer", peer = peer.id, msgs
sent.add((id: peer.id, fut: peer.send(@[RPCMsg(messages: msgs)])))
var published: seq[string]
var failed: seq[string]
let futs = await allFinished(sent.mapIt(it.fut))
for s in futs:
let f = sent.filterIt(it.fut == s)
if f.len > 0:
if s.failed:
trace "sending messages to peer failed", peer = f[0].id
failed.add(f[0].id)
else:
trace "sending messages to peer succeeded", peer = f[0].id
published.add(f[0].id)
return (published, failed)
method publish*(p: PubSub, method publish*(p: PubSub,
topic: string, topic: string,
data: seq[byte]) {.base, async.} = data: seq[byte]): Future[int] {.base, async.} =
# TODO: Should throw indicating success/failure
## publish to a ``topic`` ## publish to a ``topic``
if p.triggerSelf and topic in p.topics: if p.triggerSelf and topic in p.topics:
for h in p.topics[topic].handler: for h in p.topics[topic].handler:
@ -255,6 +283,8 @@ method publish*(p: PubSub,
# more cleanup though # more cleanup though
debug "Could not write to pubsub connection", msg = exc.msg debug "Could not write to pubsub connection", msg = exc.msg
return 0
method initPubSub*(p: PubSub) {.base.} = method initPubSub*(p: PubSub) {.base.} =
## perform pubsub initialization ## perform pubsub initialization
p.observers = new(seq[PubSubObserver]) p.observers = new(seq[PubSubObserver])

View File

@ -32,15 +32,14 @@ type
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
PubSubPeer* = ref object of RootObj PubSubPeer* = ref object of RootObj
proto: string # the protocol that this peer joined from proto*: string # the protocol that this peer joined from
sendConn: Connection sendConn: Connection
peerInfo*: PeerInfo peerInfo*: PeerInfo
handler*: RPCHandler handler*: RPCHandler
topics*: seq[string] topics*: seq[string]
sentRpcCache: TimedCache[string] # cache for already sent messages sentRpcCache: TimedCache[string] # cache for already sent messages
recvdRpcCache: TimedCache[string] # cache for already received messages recvdRpcCache: TimedCache[string] # cache for already received messages
refs*: int # refcount of the connections this peer is handling onConnect*: AsyncEvent
onConnect: AsyncEvent
observers*: ref seq[PubSubObserver] # ref as in smart_ptr observers*: ref seq[PubSubObserver] # ref as in smart_ptr
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
@ -56,6 +55,9 @@ proc `conn=`*(p: PubSubPeer, conn: Connection) =
p.sendConn = conn p.sendConn = conn
p.onConnect.fire() p.onConnect.fire()
proc conn*(p: PubSubPeer): Connection =
p.sendConn
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) = proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks # trigger hooks
if not(isNil(p.observers)) and p.observers[].len > 0: if not(isNil(p.observers)) and p.observers[].len > 0:
@ -100,10 +102,17 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
trace "exiting pubsub peer read loop", peer = p.id trace "exiting pubsub peer read loop", peer = p.id
await conn.close() await conn.close()
except CancelledError as exc:
raise exc
except CatchableError as exc: except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
raise exc
proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
logScope:
peer = p.id
msgs = $msgs
for m in msgs.items: for m in msgs.items:
trace "sending msgs to peer", toPeer = p.id, msgs = $msgs trace "sending msgs to peer", toPeer = p.id, msgs = $msgs
@ -122,38 +131,29 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
continue continue
proc sendToRemote() {.async.} = try:
try: trace "about to send message", peer = p.id,
trace "about to send message", peer = p.id, encoded = digest
encoded = digest if p.connected: # this can happen if the remote disconnected
if not p.onConnect.isSet: trace "sending encoded msgs to peer", peer = p.id,
await p.onConnect.wait() encoded = encoded.buffer.shortLog
await p.sendConn.writeLp(encoded.buffer)
p.sentRpcCache.put(digest)
if p.connected: # this can happen if the remote disconnected for m in msgs:
trace "sending encoded msgs to peer", peer = p.id, for mm in m.messages:
encoded = encoded.buffer.shortLog for t in mm.topicIDs:
await p.sendConn.writeLp(encoded.buffer) # metrics
p.sentRpcCache.put(digest) libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
for m in msgs: except CatchableError as exc:
for mm in m.messages: trace "unable to send to remote", exc = exc.msg
for t in mm.topicIDs: if not(isNil(p.sendConn)):
# metrics await p.sendConn.close()
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) p.sendConn = nil
p.onConnect.clear()
except CancelledError as exc: raise exc
raise exc
except CatchableError as exc:
trace "unable to send to remote", exc = exc.msg
if not(isNil(p.sendConn)):
await p.sendConn.close()
p.sendConn = nil
p.onConnect.clear()
# if no connection has been set,
# queue messages until a connection
# becomes available
asyncCheck sendToRemote()
proc sendMsg*(p: PubSubPeer, proc sendMsg*(p: PubSubPeer,
peerId: PeerID, peerId: PeerID,
@ -172,6 +172,9 @@ proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} =
trace "sending prune msg to peer", peer = p.id, topicID = topic trace "sending prune msg to peer", peer = p.id, topicID = topic
await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))]) await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))])
proc `$`*(p: PubSubPeer): string =
p.id
proc newPubSubPeer*(peerInfo: PeerInfo, proc newPubSubPeer*(peerInfo: PeerInfo,
proto: string): PubSubPeer = proto: string): PubSubPeer =
new result new result

View File

@ -55,7 +55,9 @@ method handshake(s: Secure,
initiator: bool): Future[SecureConn] {.async, base.} = initiator: bool): Future[SecureConn] {.async, base.} =
doAssert(false, "Not implemented!") doAssert(false, "Not implemented!")
proc handleConn*(s: Secure, conn: Connection, initiator: bool): Future[Connection] {.async, gcsafe.} = proc handleConn*(s: Secure,
conn: Connection,
initiator: bool): Future[Connection] {.async, gcsafe.} =
var sconn = await s.handshake(conn, initiator) var sconn = await s.handshake(conn, initiator)
conn.closeEvent.wait() conn.closeEvent.wait()
@ -71,7 +73,8 @@ method init*(s: Secure) {.gcsafe.} =
proc handle(conn: Connection, proto: string) {.async, gcsafe.} = proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
trace "handling connection upgrade", proto trace "handling connection upgrade", proto
try: try:
# We don't need the result but we definitely need to await the handshake # We don't need the result but we
# definitely need to await the handshake
discard await s.handleConn(conn, false) discard await s.handleConn(conn, false)
trace "connection secured" trace "connection secured"
except CancelledError as exc: except CancelledError as exc:
@ -84,7 +87,10 @@ method init*(s: Secure) {.gcsafe.} =
s.handler = handle s.handler = handle
method secure*(s: Secure, conn: Connection, initiator: bool): Future[Connection] {.async, base, gcsafe.} = method secure*(s: Secure,
conn: Connection,
initiator: bool):
Future[Connection] {.async, base, gcsafe.} =
result = await s.handleConn(conn, initiator) result = await s.handleConn(conn, initiator)
method readOnce*(s: SecureConn, method readOnce*(s: SecureConn,

View File

@ -7,6 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import oids
import chronos, chronicles import chronos, chronicles
import connection, ../utility import connection, ../utility
@ -40,7 +41,6 @@ template withExceptions(body: untyped) =
except TransportError: except TransportError:
# TODO https://github.com/status-im/nim-chronos/pull/99 # TODO https://github.com/status-im/nim-chronos/pull/99
raise newLPStreamEOFError() raise newLPStreamEOFError()
# raise (ref LPStreamError)(msg: exc.msg, parent: exc)
method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
if s.atEof: if s.atEof:
@ -73,11 +73,18 @@ method atEof*(s: ChronosStream): bool {.inline.} =
method close*(s: ChronosStream) {.async.} = method close*(s: ChronosStream) {.async.} =
try: try:
if not s.isClosed: if not s.isClosed:
await procCall Connection(s).close() trace "shutting down chronos stream", address = $s.client.remoteAddress(),
oid = s.oid
trace "shutting down chronos stream", address = $s.client.remoteAddress(), oid = s.oid # TODO: the sequence here matters
# don't move it after the connections
# close bellow
if not s.client.closed(): if not s.client.closed():
await s.client.closeWait() await s.client.closeWait()
await procCall Connection(s).close()
except CancelledError as exc:
raise exc
except CatchableError as exc: except CatchableError as exc:
trace "error closing chronosstream", exc = exc.msg trace "error closing chronosstream", exc = exc.msg

View File

@ -12,7 +12,8 @@ import tables,
options, options,
strformat, strformat,
sets, sets,
algorithm algorithm,
oids
import chronos, import chronos,
chronicles, chronicles,
@ -78,6 +79,7 @@ type
secureManagers*: seq[Secure] secureManagers*: seq[Secure]
pubSub*: Option[PubSub] pubSub*: Option[PubSub]
dialLock: Table[string, AsyncLock] dialLock: Table[string, AsyncLock]
cleanUpLock: Table[string, AsyncLock]
proc newNoPubSubException(): ref NoPubSubException {.inline.} = proc newNoPubSubException(): ref NoPubSubException {.inline.} =
result = newException(NoPubSubException, "no pubsub provided!") result = newException(NoPubSubException, "no pubsub provided!")
@ -86,7 +88,7 @@ proc newTooManyConnections(): ref TooManyConnections {.inline.} =
result = newException(TooManyConnections, "too many connections for peer") result = newException(TooManyConnections, "too many connections for peer")
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.}
proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.}
proc selectConn(s: Switch, peerInfo: PeerInfo): Connection = proc selectConn(s: Switch, peerInfo: PeerInfo): Connection =
## select the "best" connection according to some criteria ## select the "best" connection according to some criteria
@ -159,6 +161,9 @@ proc storeConn(s: Switch,
newSeq[MuxerHolder]()) newSeq[MuxerHolder]())
.add(MuxerHolder(muxer: muxer, handle: handle, dir: dir)) .add(MuxerHolder(muxer: muxer, handle: handle, dir: dir))
trace "storred connection", connections = s.connections.len
libp2p_peers.set(s.connections.len.int64)
proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
if s.secureManagers.len <= 0: if s.secureManagers.len <= 0:
raise newException(CatchableError, "No secure managers registered!") raise newException(CatchableError, "No secure managers registered!")
@ -252,45 +257,56 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
if isNil(conn): if isNil(conn):
return return
defer:
await conn.close()
libp2p_peers.set(s.connections.len.int64)
if isNil(conn.peerInfo): if isNil(conn.peerInfo):
return return
let id = conn.peerInfo.id let id = conn.peerInfo.id
trace "cleaning up connection for peer", peerId = id let lock = s.cleanUpLock.mgetOrPut(id, newAsyncLock())
if id in s.muxed:
let muxerHolder = s.muxed[id]
.filterIt(
it.muxer.connection == conn
)
if muxerHolder.len > 0:
await muxerHolder[0].muxer.close()
if not(isNil(muxerHolder[0].handle)):
await muxerHolder[0].handle
try:
await lock.acquire()
trace "cleaning up connection for peer", peerId = id
if id in s.muxed: if id in s.muxed:
s.muxed[id].keepItIf( let muxerHolder = s.muxed[id]
it.muxer.connection != conn .filterIt(
it.muxer.connection == conn
)
if muxerHolder.len > 0:
await muxerHolder[0].muxer.close()
if not(isNil(muxerHolder[0].handle)):
await muxerHolder[0].handle
if id in s.muxed:
s.muxed[id].keepItIf(
it.muxer.connection != conn
)
if s.muxed[id].len == 0:
s.muxed.del(id)
if s.pubSub.isSome:
await s.pubSub.get()
.unsubscribePeer(conn.peerInfo)
if id in s.connections:
s.connections[id].keepItIf(
it.conn != conn
) )
if s.muxed[id].len == 0: if s.connections[id].len == 0:
s.muxed.del(id) s.connections.del(id)
if id in s.connections: # TODO: Investigate cleanupConn() always called twice for one peer.
s.connections[id].keepItIf( if not(conn.peerInfo.isClosed()):
it.conn != conn conn.peerInfo.close()
) finally:
await conn.close()
if s.connections[id].len == 0: if lock.locked():
s.connections.del(id) lock.release()
# TODO: Investigate cleanupConn() always called twice for one peer. libp2p_peers.set(s.connections.len.int64)
if not(conn.peerInfo.isClosed()):
conn.peerInfo.close()
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} = proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} =
let connections = s.connections.getOrDefault(peer.id) let connections = s.connections.getOrDefault(peer.id)
@ -323,7 +339,6 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g
raise newException(CatchableError, raise newException(CatchableError,
"unable to mux connection, stopping upgrade") "unable to mux connection, stopping upgrade")
libp2p_peers.set(s.connections.len.int64)
trace "succesfully upgraded outgoing connection", uoid = sconn.oid trace "succesfully upgraded outgoing connection", uoid = sconn.oid
return sconn return sconn
@ -375,8 +390,8 @@ proc internalConnect(s: Switch,
raise newException(CatchableError, "can't dial self!") raise newException(CatchableError, "can't dial self!")
let id = peer.id let id = peer.id
let lock = s.dialLock.mgetOrPut(id, newAsyncLock())
var conn: Connection var conn: Connection
let lock = s.dialLock.mgetOrPut(id, newAsyncLock())
defer: defer:
if lock.locked(): if lock.locked():
@ -436,29 +451,48 @@ proc internalConnect(s: Switch,
doAssert(conn.peerInfo.id in s.connections, doAssert(conn.peerInfo.id in s.connections,
"connection not tracked!") "connection not tracked!")
trace "dial succesfull", oid = conn.oid trace "dial succesfull", oid = $conn.oid,
await s.subscribeToPeer(peer) peer = $conn.peerInfo
await s.subscribePeer(peer)
return conn return conn
proc connect*(s: Switch, peer: PeerInfo) {.async.} = proc connect*(s: Switch, peer: PeerInfo) {.async.} =
var conn = await s.internalConnect(peer) discard await s.internalConnect(peer)
proc dial*(s: Switch, proc dial*(s: Switch,
peer: PeerInfo, peer: PeerInfo,
proto: string): proto: string):
Future[Connection] {.async.} = Future[Connection] {.async.} =
var conn = await s.internalConnect(peer) let conn = await s.internalConnect(peer)
let stream = await s.getMuxedStream(peer) let stream = await s.getMuxedStream(peer)
if isNil(stream):
await conn.close()
raise newException(CatchableError, "Couldn't get muxed stream")
trace "Attempting to select remote", proto = proto, oid = conn.oid proc cleanup() {.async.} =
if not await s.ms.select(stream, proto): if not(isNil(stream)):
await stream.close() await stream.close()
raise newException(CatchableError, "Unable to select sub-protocol " & proto)
return stream if not(isNil(conn)):
await conn.close()
try:
if isNil(stream):
await conn.close()
raise newException(CatchableError, "Couldn't get muxed stream")
trace "Attempting to select remote", proto = proto, oid = conn.oid
if not await s.ms.select(stream, proto):
await stream.close()
raise newException(CatchableError, "Unable to select sub-protocol " & proto)
return stream
except CancelledError as exc:
trace "dial canceled"
await cleanup()
raise exc
except CatchableError as exc:
trace "error dialing"
await cleanup()
raise exc
proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
if isNil(proto.handler): if isNil(proto.handler):
@ -527,7 +561,7 @@ proc stop*(s: Switch) {.async.} =
trace "switch stopped" trace "switch stopped"
proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
## Subscribe to pub sub peer ## Subscribe to pub sub peer
if s.pubSub.isSome and not(s.pubSub.get().connected(peerInfo)): if s.pubSub.isSome and not(s.pubSub.get().connected(peerInfo)):
trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog() trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog()
@ -554,7 +588,7 @@ proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
await stream.close() await stream.close()
return return
await s.pubSub.get().subscribeToPeer(stream) s.pubSub.get().subscribePeer(stream)
proc subscribe*(s: Switch, topic: string, proc subscribe*(s: Switch, topic: string,
handler: TopicHandler): Future[void] = handler: TopicHandler): Future[void] =
@ -564,7 +598,7 @@ proc subscribe*(s: Switch, topic: string,
retFuture.fail(newNoPubSubException()) retFuture.fail(newNoPubSubException())
return retFuture return retFuture
result = s.pubSub.get().subscribe(topic, handler) return s.pubSub.get().subscribe(topic, handler)
proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] = proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] =
## unsubscribe from topics ## unsubscribe from topics
@ -573,16 +607,16 @@ proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] =
retFuture.fail(newNoPubSubException()) retFuture.fail(newNoPubSubException())
return retFuture return retFuture
result = s.pubSub.get().unsubscribe(topics) return s.pubSub.get().unsubscribe(topics)
proc publish*(s: Switch, topic: string, data: seq[byte]): Future[void] = proc publish*(s: Switch, topic: string, data: seq[byte]): Future[int] =
# pubslish to pubsub topic # pubslish to pubsub topic
if s.pubSub.isNone: if s.pubSub.isNone:
var retFuture = newFuture[void]("Switch.publish") var retFuture = newFuture[int]("Switch.publish")
retFuture.fail(newNoPubSubException()) retFuture.fail(newNoPubSubException())
return retFuture return retFuture
result = s.pubSub.get().publish(topic, data) return s.pubSub.get().publish(topic, data)
proc addValidator*(s: Switch, proc addValidator*(s: Switch,
topics: varargs[string], topics: varargs[string],
@ -608,8 +642,6 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
if not(isNil(stream)): if not(isNil(stream)):
await stream.close() await stream.close()
trace "got new muxer"
try: try:
# once we got a muxed connection, attempt to # once we got a muxed connection, attempt to
# identify it # identify it
@ -622,14 +654,15 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
# store muxer and muxed connection # store muxer and muxed connection
await s.storeConn(muxer, Direction.In) await s.storeConn(muxer, Direction.In)
libp2p_peers.set(s.connections.len.int64)
muxer.connection.closeEvent.wait() muxer.connection.closeEvent.wait()
.addCallback do(udata: pointer): .addCallback do(udata: pointer):
asyncCheck s.cleanupConn(muxer.connection) asyncCheck s.cleanupConn(muxer.connection)
trace "got new muxer", peer = $muxer.connection.peerInfo
# try establishing a pubsub connection # try establishing a pubsub connection
await s.subscribeToPeer(muxer.connection.peerInfo) await s.subscribePeer(muxer.connection.peerInfo)
except CancelledError as exc: except CancelledError as exc:
await muxer.close() await muxer.close()

View File

@ -59,7 +59,7 @@ suite "FloodSub":
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar") await waitSub(nodes[0], nodes[1], "foobar")
await nodes[0].publish("foobar", "Hello!".toBytes()) check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0
result = await completionFut.wait(5.seconds) result = await completionFut.wait(5.seconds)
@ -90,7 +90,7 @@ suite "FloodSub":
await nodes[0].subscribe("foobar", handler) await nodes[0].subscribe("foobar", handler)
await waitSub(nodes[1], nodes[0], "foobar") await waitSub(nodes[1], nodes[0], "foobar")
await nodes[1].publish("foobar", "Hello!".toBytes()) check (await nodes[1].publish("foobar", "Hello!".toBytes())) > 0
result = await completionFut.wait(5.seconds) result = await completionFut.wait(5.seconds)
@ -125,7 +125,7 @@ suite "FloodSub":
nodes[1].addValidator("foobar", validator) nodes[1].addValidator("foobar", validator)
await nodes[0].publish("foobar", "Hello!".toBytes()) check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0
check (await handlerFut) == true check (await handlerFut) == true
await allFuturesThrowing( await allFuturesThrowing(
@ -159,7 +159,7 @@ suite "FloodSub":
nodes[1].addValidator("foobar", validator) nodes[1].addValidator("foobar", validator)
await nodes[0].publish("foobar", "Hello!".toBytes()) discard await nodes[0].publish("foobar", "Hello!".toBytes())
await allFuturesThrowing( await allFuturesThrowing(
nodes[0].stop(), nodes[0].stop(),
@ -197,8 +197,8 @@ suite "FloodSub":
nodes[1].addValidator("foo", "bar", validator) nodes[1].addValidator("foo", "bar", validator)
await nodes[0].publish("foo", "Hello!".toBytes()) check (await nodes[0].publish("foo", "Hello!".toBytes())) > 0
await nodes[0].publish("bar", "Hello!".toBytes()) check (await nodes[0].publish("bar", "Hello!".toBytes())) > 0
await allFuturesThrowing( await allFuturesThrowing(
nodes[0].stop(), nodes[0].stop(),
@ -249,7 +249,7 @@ suite "FloodSub":
subs &= waitSub(nodes[i], nodes[y], "foobar") subs &= waitSub(nodes[i], nodes[y], "foobar")
await allFuturesThrowing(subs) await allFuturesThrowing(subs)
var pubs: seq[Future[void]] var pubs: seq[Future[int]]
for i in 0..<runs: for i in 0..<runs:
pubs &= nodes[i].publish("foobar", "Hello!".toBytes()) pubs &= nodes[i].publish("foobar", "Hello!".toBytes())
await allFuturesThrowing(pubs) await allFuturesThrowing(pubs)
@ -303,7 +303,7 @@ suite "FloodSub":
subs &= waitSub(nodes[i], nodes[y], "foobar") subs &= waitSub(nodes[i], nodes[y], "foobar")
await allFuturesThrowing(subs) await allFuturesThrowing(subs)
var pubs: seq[Future[void]] var pubs: seq[Future[int]]
for i in 0..<runs: for i in 0..<runs:
pubs &= nodes[i].publish("foobar", "Hello!".toBytes()) pubs &= nodes[i].publish("foobar", "Hello!".toBytes())
await allFuturesThrowing(pubs) await allFuturesThrowing(pubs)

View File

@ -57,7 +57,8 @@ suite "GossipSub internal":
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo()) let gossipSub = newPubSub(TestGossipSub, randomPeerInfo())
let topic = "foobar" let topic = "foobar"
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.mesh[topic] = initHashSet[string]()
gossipSub.topics[topic] = Topic() # has to be in topics to rebalance
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
for i in 0..<15: for i in 0..<15:
@ -67,9 +68,9 @@ suite "GossipSub internal":
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].conn = conn gossipSub.peers[peerInfo.id].conn = conn
gossipSub.gossipsub[topic].incl(peerInfo.id) gossipSub.mesh[topic].incl(peerInfo.id)
check gossipSub.gossipsub[topic].len == 15 check gossipSub.mesh[topic].len == 15
await gossipSub.rebalanceMesh(topic) await gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len == GossipSubD check gossipSub.mesh[topic].len == GossipSubD
@ -98,10 +99,11 @@ suite "GossipSub internal":
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
gossipSub.peers[peerInfo.id].topics &= topic
gossipSub.gossipsub[topic].incl(peerInfo.id) gossipSub.gossipsub[topic].incl(peerInfo.id)
check gossipSub.gossipsub[topic].len == 15 check gossipSub.gossipsub[topic].len == 15
await gossipSub.replenishFanout(topic) gossipSub.replenishFanout(topic)
check gossipSub.fanout[topic].len == GossipSubD check gossipSub.fanout[topic].len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))

View File

@ -38,10 +38,21 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
(not fsub.fanout.hasKey(key) or (not fsub.fanout.hasKey(key) or
not fsub.fanout[key].contains(receiver.peerInfo.id)): not fsub.fanout[key].contains(receiver.peerInfo.id)):
trace "waitSub sleeping..." trace "waitSub sleeping..."
await sleepAsync(100.millis) await sleepAsync(1.seconds)
dec ceil dec ceil
doAssert(ceil > 0, "waitSub timeout!") doAssert(ceil > 0, "waitSub timeout!")
template tryPublish(call: untyped, require: int, wait: Duration = 1.seconds, times: int = 10): untyped =
var
limit = times
pubs = 0
while pubs < require and limit > 0:
pubs = pubs + call
await sleepAsync(wait)
limit.dec()
if limit == 0:
doAssert(false, "Failed to publish!")
suite "GossipSub": suite "GossipSub":
teardown: teardown:
for tracker in testTrackers(): for tracker in testTrackers():
@ -63,9 +74,7 @@ suite "GossipSub":
await subscribeNodes(nodes) await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler) await nodes[0].subscribe("foobar", handler)
await waitSub(nodes[1], nodes[0], "foobar")
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
var validatorFut = newFuture[bool]() var validatorFut = newFuture[bool]()
proc validator(topic: string, proc validator(topic: string,
@ -76,7 +85,7 @@ suite "GossipSub":
result = true result = true
nodes[1].addValidator("foobar", validator) nodes[1].addValidator("foobar", validator)
await nodes[0].publish("foobar", "Hello!".toBytes()) tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
result = (await validatorFut) and (await handlerFut) result = (await validatorFut) and (await handlerFut)
await allFuturesThrowing( await allFuturesThrowing(
@ -100,17 +109,16 @@ suite "GossipSub":
await subscribeNodes(nodes) await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
var validatorFut = newFuture[bool]() var validatorFut = newFuture[bool]()
proc validator(topic: string, proc validator(topic: string,
message: Message): message: Message):
Future[bool] {.async.} = Future[bool] {.async.} =
validatorFut.complete(true)
result = false result = false
validatorFut.complete(true)
nodes[1].addValidator("foobar", validator) nodes[1].addValidator("foobar", validator)
await nodes[0].publish("foobar", "Hello!".toBytes()) tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
result = await validatorFut result = await validatorFut
await allFuturesThrowing( await allFuturesThrowing(
@ -134,10 +142,9 @@ suite "GossipSub":
awaiters.add((await nodes[1].start())) awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes) await subscribeNodes(nodes)
await nodes[1].subscribe("foo", handler) await nodes[1].subscribe("foo", handler)
await waitSub(nodes[0], nodes[1], "foo")
await nodes[1].subscribe("bar", handler) await nodes[1].subscribe("bar", handler)
await waitSub(nodes[0], nodes[1], "bar")
var passed, failed: Future[bool] = newFuture[bool]() var passed, failed: Future[bool] = newFuture[bool]()
proc validator(topic: string, proc validator(topic: string,
@ -151,8 +158,8 @@ suite "GossipSub":
false false
nodes[1].addValidator("foo", "bar", validator) nodes[1].addValidator("foo", "bar", validator)
await nodes[0].publish("foo", "Hello!".toBytes()) tryPublish await nodes[0].publish("foo", "Hello!".toBytes()), 1
await nodes[0].publish("bar", "Hello!".toBytes()) tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1
result = ((await passed) and (await failed) and (await handlerFut)) result = ((await passed) and (await failed) and (await handlerFut))
await allFuturesThrowing( await allFuturesThrowing(
@ -170,7 +177,8 @@ suite "GossipSub":
var nodes: seq[Switch] = newSeq[Switch]() var nodes: seq[Switch] = newSeq[Switch]()
for i in 0..<2: for i in 0..<2:
nodes.add newStandardSwitch(gossip = true, secureManagers = [SecureProtocol.Noise]) nodes.add newStandardSwitch(gossip = true,
secureManagers = [SecureProtocol.Noise])
var awaitters: seq[Future[void]] var awaitters: seq[Future[void]]
for node in nodes: for node in nodes:
@ -178,7 +186,7 @@ suite "GossipSub":
await subscribeNodes(nodes) await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
await sleepAsync(1.seconds) await sleepAsync(10.seconds)
let gossip1 = GossipSub(nodes[0].pubSub.get()) let gossip1 = GossipSub(nodes[0].pubSub.get())
let gossip2 = GossipSub(nodes[1].pubSub.get()) let gossip2 = GossipSub(nodes[1].pubSub.get())
@ -272,7 +280,7 @@ suite "GossipSub":
nodes[1].pubsub.get().addObserver(obs1) nodes[1].pubsub.get().addObserver(obs1)
nodes[0].pubsub.get().addObserver(obs2) nodes[0].pubsub.get().addObserver(obs2)
await nodes[0].publish("foobar", "Hello!".toBytes()) tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
var gossipSub1: GossipSub = GossipSub(nodes[0].pubSub.get()) var gossipSub1: GossipSub = GossipSub(nodes[0].pubSub.get())
@ -287,7 +295,7 @@ suite "GossipSub":
await nodes[1].stop() await nodes[1].stop()
await allFuturesThrowing(wait) await allFuturesThrowing(wait)
# result = observed == 2 check observed == 2
result = true result = true
check: check:
@ -310,7 +318,7 @@ suite "GossipSub":
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar") await waitSub(nodes[0], nodes[1], "foobar")
await nodes[0].publish("foobar", "Hello!".toBytes()) tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
result = await passed result = await passed
@ -328,7 +336,9 @@ suite "GossipSub":
var runs = 10 var runs = 10
for i in 0..<runs: for i in 0..<runs:
nodes.add newStandardSwitch(triggerSelf = true, gossip = true, secureManagers = [SecureProtocol.Noise]) nodes.add newStandardSwitch(triggerSelf = true,
gossip = true,
secureManagers = [SecureProtocol.Noise])
awaitters.add((await nodes[i].start())) awaitters.add((await nodes[i].start()))
await subscribeRandom(nodes) await subscribeRandom(nodes)
@ -348,20 +358,19 @@ suite "GossipSub":
if not seenFut.finished() and seen.len >= runs: if not seenFut.finished() and seen.len >= runs:
seenFut.complete() seenFut.complete()
subs.add(allFutures(dialer.subscribe("foobar", handler), subs &= dialer.subscribe("foobar", handler)
waitSub(nodes[0], dialer, "foobar")))
await allFuturesThrowing(subs) await allFuturesThrowing(subs)
await wait(nodes[0].publish("foobar", tryPublish await wait(nodes[0].publish("foobar",
cast[seq[byte]]("from node " & cast[seq[byte]]("from node " &
nodes[1].peerInfo.id)), nodes[1].peerInfo.id)),
1.minutes) 1.minutes), runs, 5.seconds
await wait(seenFut, 2.minutes) await wait(seenFut, 2.minutes)
check: seen.len >= runs check: seen.len >= runs
for k, v in seen.pairs: for k, v in seen.pairs:
check: v == 1 check: v >= 1
await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(awaitters) await allFuturesThrowing(awaitters)
@ -377,10 +386,12 @@ suite "GossipSub":
var runs = 10 var runs = 10
for i in 0..<runs: for i in 0..<runs:
nodes.add newStandardSwitch(triggerSelf = true, gossip = true, secureManagers = [SecureProtocol.Secio]) nodes.add newStandardSwitch(triggerSelf = true,
gossip = true,
secureManagers = [SecureProtocol.Secio])
awaitters.add((await nodes[i].start())) awaitters.add((await nodes[i].start()))
await subscribeSparseNodes(nodes, 4) await subscribeSparseNodes(nodes, 1) # TODO: figure out better sparse mesh
var seen: Table[string, int] var seen: Table[string, int]
var subs: seq[Future[void]] var subs: seq[Future[void]]
@ -401,15 +412,16 @@ suite "GossipSub":
subs &= dialer.subscribe("foobar", handler) subs &= dialer.subscribe("foobar", handler)
await allFuturesThrowing(subs) await allFuturesThrowing(subs)
await wait(nodes[0].publish("foobar",
cast[seq[byte]]("from node " & tryPublish await wait(nodes[0].publish("foobar",
nodes[1].peerInfo.id)), cast[seq[byte]]("from node " &
1.minutes) nodes[1].peerInfo.id)),
1.minutes), 3, 5.seconds
await wait(seenFut, 5.minutes) await wait(seenFut, 5.minutes)
check: seen.len >= runs check: seen.len >= runs
for k, v in seen.pairs: for k, v in seen.pairs:
check: v == 1 check: v >= 1
await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(awaitters) await allFuturesThrowing(awaitters)

View File

@ -151,7 +151,7 @@ proc testPubSubNodePublish(gossip: bool = false,
proc publisher() {.async.} = proc publisher() {.async.} =
while not finished: while not finished:
await nativeNode.publish(testTopic, msgData) discard await nativeNode.publish(testTopic, msgData)
await sleepAsync(500.millis) await sleepAsync(500.millis)
await wait(publisher(), 5.minutes) # should be plenty of time await wait(publisher(), 5.minutes) # should be plenty of time