From a52763cc6dfa92b5a6ad374ba7b802f57eed8ac7 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 7 Jul 2020 18:33:05 -0600 Subject: [PATCH] fix publishing (#250) * use var semantics to optimize table access * wip... lvalues don't work properly sadly... * big publish refactor, replenish and balance * fix internal tests * use g.peers for fanout (todo: don't include flood peers) * exclude non gossip from fanout * internal test fixes * fix flood tests * fix test's trypublish * test interop fixes * make sure to not remove peers from gossip table * restore old replenishFanout * cleanups * Cleanup resources (#246) * consolidate reading in lpstream * remove debug echo * tune log level * add channel cleanup and cancelation handling * cancelation handling * cancelation handling * cancelation handling * cancelation handling * cleanup and cancelation handling * cancelation handling * cancelation * tests * rename isConnected to connected * remove testing trace * comment out debug stacktraces * explicit raises * restore trace vs debug in gossip * improve fanout replenish behavior further * cleanup stale peers more eaguerly * synchronize connection cleanup and small refactor * close client first and call parent second * disconnect failed peers on publish * check for publish result * fix tests * fix tests * always call close Co-authored-by: Giovanni Petrantoni --- libp2p/errors.nim | 2 +- libp2p/protocols/pubsub/floodsub.nim | 54 ++--- libp2p/protocols/pubsub/gossipsub.nim | 300 +++++++++++++------------ libp2p/protocols/pubsub/pubsub.nim | 152 ++++++++----- libp2p/protocols/pubsub/pubsubpeer.nim | 67 +++--- libp2p/protocols/secure/secure.nim | 12 +- libp2p/stream/chronosstream.nim | 13 +- libp2p/switch.nim | 143 +++++++----- tests/pubsub/testfloodsub.nim | 16 +- tests/pubsub/testgossipinternal.nim | 10 +- tests/pubsub/testgossipsub.nim | 74 +++--- tests/testinterop.nim | 2 +- 12 files changed, 482 insertions(+), 363 deletions(-) diff --git a/libp2p/errors.nim b/libp2p/errors.nim index f59ab93..ed541fb 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -59,7 +59,7 @@ template tryAndWarn*(message: static[string]; body: untyped): untyped = try: body except CancelledError as exc: - raise exc # TODO: why catch and re-raise? + raise exc except CatchableError as exc: warn "An exception has ocurred, enable trace logging for details", name = exc.name, msg = message trace "Exception details", exc = exc.msg diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index adf2b7d..f629270 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -47,14 +47,14 @@ method subscribeTopic*(f: FloodSub, # unsubscribe the peer from the topic f.floodsub[topic].excl(peerId) -method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async.} = - await procCall PubSub(f).handleDisconnect(peer) - +method handleDisconnect*(f: FloodSub, peer: PubSubPeer) = ## handle peer disconnects for t in toSeq(f.floodsub.keys): if t in f.floodsub: f.floodsub[t].excl(peer.id) + procCall PubSub(f).handleDisconnect(peer) + method rpcHandler*(f: FloodSub, peer: PubSubPeer, rpcMsgs: seq[RPCMsg]) {.async.} = @@ -86,18 +86,20 @@ method rpcHandler*(f: FloodSub, trace "calling handler for message", topicId = t, localPeer = f.peerInfo.id, fromPeer = msg.fromPeer.pretty - await h(t, msg.data) # trigger user provided handler + + try: + await h(t, msg.data) # trigger user provided handler + except CatchableError as exc: + trace "exception in message handler", exc = exc.msg # forward the message to all peers interested in it - var sent: seq[Future[void]] - # start the future but do not wait yet - for p in toSendPeers: - if p in f.peers and f.peers[p].id != peer.id: - sent.add(f.peers[p].send(@[RPCMsg(messages: m.messages)])) + let (published, failed) = await f.sendHelper(toSendPeers, m.messages) + for p in failed: + let peer = f.peers.getOrDefault(p) + if not(isNil(peer)): + f.handleDisconnect(peer) # cleanup failed peers - # wait for all the futures now - sent = await allFinished(sent) - checkFutures(sent) + trace "forwared message to peers", peers = published.len method init*(f: FloodSub) = proc handler(conn: Connection, proto: string) {.async.} = @@ -111,15 +113,16 @@ method init*(f: FloodSub) = f.handler = handler f.codec = FloodSubCodec -method subscribeToPeer*(p: FloodSub, - conn: Connection) {.async.} = - await procCall PubSub(p).subscribeToPeer(conn) +method subscribePeer*(p: FloodSub, + conn: Connection) = + procCall PubSub(p).subscribePeer(conn) asyncCheck p.handleConn(conn, FloodSubCodec) method publish*(f: FloodSub, topic: string, - data: seq[byte]) {.async.} = - await procCall PubSub(f).publish(topic, data) + data: seq[byte]): Future[int] {.async.} = + # base returns always 0 + discard await procCall PubSub(f).publish(topic, data) if data.len <= 0 or topic.len <= 0: trace "topic or data missing, skipping publish" @@ -131,19 +134,18 @@ method publish*(f: FloodSub, trace "publishing on topic", name = topic let msg = Message.init(f.peerInfo, data, topic, f.sign) - var sent: seq[Future[void]] # start the future but do not wait yet - for p in f.floodsub.getOrDefault(topic): - if p in f.peers: - trace "publishing message", name = topic, peer = p, data = data.shortLog - sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])])) - - # wait for all the futures now - sent = await allFinished(sent) - checkFutures(sent) + let (published, failed) = await f.sendHelper(f.floodsub.getOrDefault(topic), @[msg]) + for p in failed: + let peer = f.peers.getOrDefault(p) + f.handleDisconnect(peer) # cleanup failed peers libp2p_pubsub_messages_published.inc(labelValues = [topic]) + trace "published message to peers", peers = published.len, + msg = msg.shortLog() + return published.len + method unsubscribe*(f: FloodSub, topics: seq[TopicPair]) {.async.} = await procCall PubSub(f).unsubscribe(topics) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 9498266..f6a4a2e 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -56,9 +56,17 @@ type heartbeatRunning: bool heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats -declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"]) -declareGauge(libp2p_gossipsub_peers_per_topic_fanout, "gossipsub peers per topic in fanout", labels = ["topic"]) -declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, "gossipsub peers per topic in gossipsub", labels = ["topic"]) +declareGauge(libp2p_gossipsub_peers_per_topic_mesh, + "gossipsub peers per topic in mesh", + labels = ["topic"]) + +declareGauge(libp2p_gossipsub_peers_per_topic_fanout, + "gossipsub peers per topic in fanout", + labels = ["topic"]) + +declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, + "gossipsub peers per topic in gossipsub", + labels = ["topic"]) method init*(g: GossipSub) = proc handler(conn: Connection, proto: string) {.async.} = @@ -72,7 +80,7 @@ method init*(g: GossipSub) = g.handler = handler g.codec = GossipSubCodec -proc replenishFanout(g: GossipSub, topic: string) {.async.} = +proc replenishFanout(g: GossipSub, topic: string) = ## get fanout peers for a topic trace "about to replenish fanout" if topic notin g.fanout: @@ -80,16 +88,54 @@ proc replenishFanout(g: GossipSub, topic: string) {.async.} = if g.fanout.getOrDefault(topic).len < GossipSubDLo: trace "replenishing fanout", peers = g.fanout.getOrDefault(topic).len - if topic in g.gossipsub: + if topic in toSeq(g.gossipsub.keys): for p in g.gossipsub.getOrDefault(topic): if not g.fanout[topic].containsOrIncl(p): if g.fanout.getOrDefault(topic).len == GossipSubD: break libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic]) + .set(g.fanout.getOrDefault(topic).len.int64, + labelValues = [topic]) + trace "fanout replenished with peers", peers = g.fanout.getOrDefault(topic).len +template moveToMeshHelper(g: GossipSub, + topic: string, + table: Table[string, HashSet[string]]) = + ## move peers from `table` into `mesh` + ## + var peerIds = toSeq(table.getOrDefault(topic)) + + logScope: + topic = topic + meshPeers = g.mesh.getOrDefault(topic).len + peers = peerIds.len + + shuffle(peerIds) + for id in peerIds: + if g.mesh.getOrDefault(topic).len > GossipSubD: + break + + trace "gathering peers for mesh" + if topic notin table: + continue + + trace "getting peers", topic, + peers = peerIds.len + + table[topic].excl(id) # always exclude + if id in g.mesh[topic]: + continue # we already have this peer in the mesh, try again + + if id in g.peers: + let p = g.peers[id] + if p.connected: + # send a graft message to the peer + await p.sendGraft(@[topic]) + g.mesh[topic].incl(id) + trace "got peer", peer = id + proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = try: trace "about to rebalance mesh" @@ -100,69 +146,47 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = if g.mesh.getOrDefault(topic).len < GossipSubDlo: trace "replenishing mesh", topic # replenish the mesh if we're below GossipSubDlo - while g.mesh.getOrDefault(topic).len < GossipSubD: - trace "gathering peers", peers = g.mesh.getOrDefault(topic).len - await sleepAsync(1.millis) # don't starve the event loop - var id: string - if topic in g.fanout and g.fanout.getOrDefault(topic).len > 0: - trace "getting peer from fanout", topic, - peers = g.fanout.getOrDefault(topic).len - id = sample(toSeq(g.fanout.getOrDefault(topic))) - g.fanout[topic].excl(id) + # move fanout nodes first + g.moveToMeshHelper(topic, g.fanout) - if id in g.fanout[topic]: - continue # we already have this peer in the mesh, try again + # move gossipsub nodes second + g.moveToMeshHelper(topic, g.gossipsub) - trace "got fanout peer", peer = id - elif topic in g.gossipsub and g.gossipsub.getOrDefault(topic).len > 0: - trace "getting peer from gossipsub", topic, - peers = g.gossipsub.getOrDefault(topic).len + if g.mesh.getOrDefault(topic).len > GossipSubDhi: + # prune peers if we've gone over + var mesh = toSeq(g.mesh.getOrDefault(topic)) + shuffle(mesh) - id = sample(toSeq(g.gossipsub[topic])) - g.gossipsub[topic].excl(id) - - if id in g.mesh[topic]: - continue # we already have this peer in the mesh, try again - - trace "got gossipsub peer", peer = id - else: - trace "no more peers" + trace "about to prune mesh", mesh = mesh.len + for id in mesh: + if g.mesh.getOrDefault(topic).len <= GossipSubD: break - g.mesh[topic].incl(id) - if id in g.peers: - let p = g.peers[id] - # send a graft message to the peer - await p.sendGraft(@[topic]) - - # prune peers if we've gone over - if g.mesh.getOrDefault(topic).len > GossipSubDhi: - trace "about to prune mesh", mesh = g.mesh.getOrDefault(topic).len - while g.mesh.getOrDefault(topic).len > GossipSubD: trace "pruning peers", peers = g.mesh[topic].len - let id = toSeq(g.mesh[topic])[rand(0.. val: dropping.add(topic) g.fanout.del(topic) + trace "dropping fanout topic", topic for topic in dropping: g.lastFanoutPubSub.del(topic) @@ -181,36 +206,46 @@ proc dropFanoutPeers(g: GossipSub) {.async.} = proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = ## gossip iHave messages to peers + ## + + trace "getting gossip peers (iHave)" let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) + let controlMsg = ControlMessage() for topic in topics: - let mesh: HashSet[string] = g.mesh.getOrDefault(topic) - let fanout: HashSet[string] = g.fanout.getOrDefault(topic) + var allPeers = toSeq(g.gossipsub.getOrDefault(topic)) + shuffle(allPeers) + + let mesh = g.mesh.getOrDefault(topic) + let fanout = g.fanout.getOrDefault(topic) let gossipPeers = mesh + fanout let mids = g.mcache.window(topic) - if mids.len > 0: - let ihave = ControlIHave(topicID: topic, - messageIDs: toSeq(mids)) + if mids.len <= 0: + continue - if topic notin g.gossipsub: - trace "topic not in gossip array, skipping", topicID = topic + let ihave = ControlIHave(topicID: topic, + messageIDs: toSeq(mids)) + + if topic notin g.gossipsub: + trace "topic not in gossip array, skipping", topicID = topic + continue + + for id in allPeers: + if result.len >= GossipSubD: + trace "got gossip peers", peers = result.len + break + + if allPeers.len == 0: + trace "no peers for topic, skipping", topicID = topic + break + + if id in gossipPeers: continue - while result.len < GossipSubD: - if g.gossipsub.getOrDefault(topic).len == 0: - trace "no peers for topic, skipping", topicID = topic - break + if id notin result: + result[id] = controlMsg - let id = toSeq(g.gossipsub.getOrDefault(topic)).sample() - if id in g.gossipsub.getOrDefault(topic): - g.gossipsub[topic].excl(id) - if id notin gossipPeers: - if id notin result: - result[id] = ControlMessage() - result[id].ihave.add(ihave) - - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic]) + result[id].ihave.add(ihave) proc heartbeat(g: GossipSub) {.async.} = while g.heartbeatRunning: @@ -221,6 +256,11 @@ proc heartbeat(g: GossipSub) {.async.} = await g.rebalanceMesh(t) await g.dropFanoutPeers() + + # replenish known topics to the fanout + for t in toSeq(g.fanout.keys): + g.replenishFanout(t) + let peers = g.getGossipPeers() var sent: seq[Future[void]] for peer in peers.keys: @@ -236,11 +276,9 @@ proc heartbeat(g: GossipSub) {.async.} = await sleepAsync(1.seconds) -method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} = +method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = ## handle peer disconnects - trace "peer disconnected", peer=peer.id - - await procCall FloodSub(g).handleDisconnect(peer) + procCall FloodSub(g).handleDisconnect(peer) for t in toSeq(g.gossipsub.keys): if t in g.gossipsub: @@ -249,9 +287,6 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} = libp2p_gossipsub_peers_per_topic_gossipsub .set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t]) - # mostly for metrics - await procCall PubSub(g).subscribeTopic(t, false, peer.id) - for t in toSeq(g.mesh.keys): if t in g.mesh: g.mesh[t].excl(peer.id) @@ -266,9 +301,9 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} = libp2p_gossipsub_peers_per_topic_fanout .set(g.fanout[t].len.int64, labelValues = [t]) -method subscribeToPeer*(p: GossipSub, - conn: Connection) {.async.} = - await procCall PubSub(p).subscribeToPeer(conn) +method subscribePeer*(p: GossipSub, + conn: Connection) = + procCall PubSub(p).subscribePeer(conn) asyncCheck p.handleConn(conn, GossipSubCodec) method subscribeTopic*(g: GossipSub, @@ -290,8 +325,11 @@ method subscribeTopic*(g: GossipSub, g.gossipsub[topic].excl(peerId) libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic]) + .set(g.gossipsub[topic].len.int64, labelValues = [topic]) + trace "gossip peers", peers = g.gossipsub[topic].len, topic + + # also rebalance current topic if we are subbed to if topic in g.topics: await g.rebalanceMesh(topic) @@ -385,7 +423,6 @@ method rpcHandler*(g: GossipSub, continue for t in msg.topicIDs: # for every topic in the message - await g.rebalanceMesh(t) # gather peers for each topic if t in g.floodsub: toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic @@ -397,29 +434,19 @@ method rpcHandler*(g: GossipSub, trace "calling handler for message", topicId = t, localPeer = g.peerInfo.id, fromPeer = msg.fromPeer.pretty - await h(t, msg.data) # trigger user provided handler + try: + await h(t, msg.data) # trigger user provided handler + except CatchableError as exc: + trace "exception in message handler", exc = exc.msg # forward the message to all peers interested in it - for p in toSendPeers: - if p in g.peers: - let id = g.peers[p].peerInfo.peerId - trace "about to forward message to peer", peerId = id, msgs = m.messages + let (published, failed) = await g.sendHelper(toSendPeers, m.messages) + for p in failed: + let peer = g.peers.getOrDefault(p) + if not(isNil(peer)): + g.handleDisconnect(peer) # cleanup failed peers - if id == peer.peerInfo.peerId: - trace "not forwarding message to originator", peerId = id - continue - - let msgs = m.messages.filterIt( - # don't forward to message originator - id != it.fromPeer - ) - - var sent: seq[Future[void]] - if msgs.len > 0: - trace "forwarding message to", peerId = id - sent.add(g.peers[p].send(@[RPCMsg(messages: msgs)])) - sent = await allFinished(sent) - checkFutures(sent) + trace "forwared message to peers", peers = published.len var respControl: ControlMessage if m.control.isSome: @@ -457,53 +484,49 @@ method unsubscribe*(g: GossipSub, method publish*(g: GossipSub, topic: string, - data: seq[byte]) {.async.} = - await procCall PubSub(g).publish(topic, data) + data: seq[byte]): Future[int] {.async.} = + # base returns always 0 + discard await procCall PubSub(g).publish(topic, data) trace "about to publish message on topic", name = topic, data = data.shortLog + var peers: HashSet[string] + if topic.len <= 0: # data could be 0/empty + return 0 - # TODO: we probably don't need to try multiple times - if data.len > 0 and topic.len > 0: - var peers = g.mesh.getOrDefault(topic) - for _ in 0..<5: # try to get peers up to 5 times - if peers.len > 0: - break + if topic in g.topics: # if we're subscribed use the mesh + peers = g.mesh.getOrDefault(topic) + else: # not subscribed, send to fanout peers + # try optimistically + peers = g.fanout.getOrDefault(topic) + if peers.len == 0: + # ok we had nothing.. let's try replenish inline + g.replenishFanout(topic) + peers = g.fanout.getOrDefault(topic) - if topic in g.topics: # if we're subscribed to the topic attempt to build a mesh - await g.rebalanceMesh(topic) - peers = g.mesh.getOrDefault(topic) - else: # send to fanout peers - await g.replenishFanout(topic) - if topic in g.fanout: - peers = g.fanout.getOrDefault(topic) - # set the fanout expiry time - g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) + let + msg = Message.init(g.peerInfo, data, topic, g.sign) + msgId = g.msgIdProvider(msg) - # wait a second between tries - await sleepAsync(1.seconds) + trace "created new message", msg - let - msg = Message.init(g.peerInfo, data, topic, g.sign) - msgId = g.msgIdProvider(msg) + trace "publishing on topic", name = topic, peers = peers + if msgId notin g.mcache: + g.mcache.put(msgId, msg) - trace "created new message", msg - var sent: seq[Future[void]] - for p in peers: - if p == g.peerInfo.id: - continue - - trace "publishing on topic", name = topic - if msgId notin g.mcache: - g.mcache.put(msgId, msg) - - if p in g.peers: - sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])])) - checkFutures(await allFinished(sent)) + let (published, failed) = await g.sendHelper(peers, @[msg]) + for p in failed: + let peer = g.peers.getOrDefault(p) + g.handleDisconnect(peer) # cleanup failed peers + if published.len > 0: libp2p_pubsub_messages_published.inc(labelValues = [topic]) + trace "published message to peers", peers = published.len, + msg = msg.shortLog() + return published.len + method start*(g: GossipSub) {.async.} = - debug "gossipsub start" + trace "gossipsub start" ## start pubsub ## start long running/repeating procedures @@ -518,7 +541,7 @@ method start*(g: GossipSub) {.async.} = g.heartbeatLock.release() method stop*(g: GossipSub) {.async.} = - debug "gossipsub stop" + trace "gossipsub stop" ## stop pubsub ## stop long running tasks @@ -528,8 +551,9 @@ method stop*(g: GossipSub) {.async.} = # stop heartbeat interval g.heartbeatRunning = false if not g.heartbeatFut.finished: - debug "awaiting last heartbeat" + trace "awaiting last heartbeat" await g.heartbeatFut + trace "heartbeat stopped" g.heartbeatLock.release() diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 434edc1..d30bec7 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -28,9 +28,10 @@ declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics") declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages") declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages") declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"]) -declareGauge(libp2p_pubsub_peers_per_topic, "pubsub peers per topic", labels = ["topic"]) type + SendRes = tuple[published: seq[string], failed: seq[string]] # keep private + TopicHandler* = proc(topic: string, data: seq[byte]): Future[void] {.gcsafe.} @@ -58,6 +59,18 @@ type observers: ref seq[PubSubObserver] # ref as in smart_ptr msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) +method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = + ## handle peer disconnects + ## + if peer.id in p.peers: + trace "deleting peer", peer = peer.id, stack = getStackTrace() + p.peers[peer.id] = nil + p.peers.del(peer.id) + + # metrics + libp2p_pubsub_peers.set(p.peers.len.int64) + trace "peer disconnected", peer = peer.id + proc sendSubs*(p: PubSub, peer: PubSubPeer, topics: seq[string], @@ -74,16 +87,26 @@ proc sendSubs*(p: PubSub, topicName = t msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe)) - await peer.send(@[msg]) + try: + # wait for a connection before publishing + # this happens when + if not peer.onConnect.isSet: + trace "awaiting send connection" + await peer.onConnect.wait() + + await peer.send(@[msg]) + except CancelledError as exc: + p.handleDisconnect(peer) + raise exc + except CatchableError as exc: + trace "unable to send subscriptions", exc = exc.msg + p.handleDisconnect(peer) method subscribeTopic*(p: PubSub, topic: string, subscribe: bool, peerId: string) {.base, async.} = - if subscribe: - libp2p_pubsub_peers_per_topic.inc(labelValues = [topic]) - else: - libp2p_pubsub_peers_per_topic.dec(labelValues = [topic]) + discard method rpcHandler*(p: PubSub, peer: PubSubPeer, @@ -98,23 +121,6 @@ method rpcHandler*(p: PubSub, trace "about to subscribe to topic", topicId = s.topic await p.subscribeTopic(s.topic, s.subscribe, peer.id) -method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base.} = - ## handle peer disconnects - if peer.id in p.peers: - p.peers.del(peer.id) - - # metrics - libp2p_pubsub_peers.set(p.peers.len.int64) - -proc cleanUpHelper(p: PubSub, peer: PubSubPeer) {.async.} = - try: - await p.cleanupLock.acquire() - peer.refs.dec() # decrement refcount - if peer.refs <= 0: - await p.handleDisconnect(peer) - finally: - p.cleanupLock.release() - proc getPeer(p: PubSub, peerInfo: PeerInfo, proto: string): PubSubPeer = @@ -123,26 +129,13 @@ proc getPeer(p: PubSub, # create new pubsub peer let peer = newPubSubPeer(peerInfo, proto) - trace "created new pubsub peer", peerId = peer.id - - # metrics + trace "created new pubsub peer", peerId = peer.id, stack = getStackTrace() p.peers[peer.id] = peer - peer.refs.inc # increment reference count peer.observers = p.observers libp2p_pubsub_peers.set(p.peers.len.int64) return peer -proc internalCleanup(p: PubSub, conn: Connection) {.async.} = - # handle connection close - if isNil(conn): - return - - var peer = p.getPeer(conn.peerInfo, p.codec) - await conn.closeEvent.wait() - trace "pubsub conn closed, cleaning up peer", peer = conn.peerInfo.id - await p.cleanUpHelper(peer) - method handleConn*(p: PubSub, conn: Connection, proto: string) {.base, async.} = @@ -157,41 +150,46 @@ method handleConn*(p: PubSub, ## that we're interested in ## + if isNil(conn.peerInfo): + trace "no valid PeerId for peer" + await conn.close() + return + + proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = + # call pubsub rpc handler + await p.rpcHandler(peer, msgs) + + let peer = p.getPeer(conn.peerInfo, proto) + let topics = toSeq(p.topics.keys) + if topics.len > 0: + await p.sendSubs(peer, topics, true) + try: - if isNil(conn.peerInfo): - trace "no valid PeerId for peer" - await conn.close() - return - - proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = - # call pubsub rpc handler - await p.rpcHandler(peer, msgs) - - asyncCheck p.internalCleanup(conn) - let peer = p.getPeer(conn.peerInfo, proto) - let topics = toSeq(p.topics.keys) - if topics.len > 0: - await p.sendSubs(peer, topics, true) - peer.handler = handler await peer.handle(conn) # spawn peer read loop - trace "pubsub peer handler ended, cleaning up" + trace "pubsub peer handler ended", peer = peer.id except CancelledError as exc: - await conn.close() raise exc except CatchableError as exc: trace "exception ocurred in pubsub handle", exc = exc.msg + finally: + p.handleDisconnect(peer) await conn.close() -method subscribeToPeer*(p: PubSub, - conn: Connection) {.base, async.} = +method subscribePeer*(p: PubSub, conn: Connection) {.base.} = if not(isNil(conn)): let peer = p.getPeer(conn.peerInfo, p.codec) - trace "setting connection for peer", peerId = conn.peerInfo.id + trace "subscribing to peer", peerId = conn.peerInfo.id if not peer.connected: peer.conn = conn - asyncCheck p.internalCleanup(conn) +method unsubscribePeer*(p: PubSub, peerInfo: PeerInfo) {.base, async.} = + let peer = p.getPeer(peerInfo, p.codec) + trace "unsubscribing from peer", peerId = $peerInfo + if not(isNil(peer.conn)): + await peer.conn.close() + + p.handleDisconnect(peer) proc connected*(p: PubSub, peer: PeerInfo): bool = let peer = p.getPeer(peer, p.codec) @@ -231,16 +229,46 @@ method subscribe*(p: PubSub, p.topics[topic].handler.add(handler) - for peer in p.peers.values: + for peer in toSeq(p.peers.values): await p.sendSubs(peer, @[topic], true) # metrics libp2p_pubsub_topics.inc() +proc sendHelper*(p: PubSub, + sendPeers: HashSet[string], + msgs: seq[Message]): Future[SendRes] {.async.} = + var sent: seq[tuple[id: string, fut: Future[void]]] + for sendPeer in sendPeers: + # avoid sending to self + if sendPeer == p.peerInfo.id: + continue + + let peer = p.peers.getOrDefault(sendPeer) + if isNil(peer): + continue + + trace "sending messages to peer", peer = peer.id, msgs + sent.add((id: peer.id, fut: peer.send(@[RPCMsg(messages: msgs)]))) + + var published: seq[string] + var failed: seq[string] + let futs = await allFinished(sent.mapIt(it.fut)) + for s in futs: + let f = sent.filterIt(it.fut == s) + if f.len > 0: + if s.failed: + trace "sending messages to peer failed", peer = f[0].id + failed.add(f[0].id) + else: + trace "sending messages to peer succeeded", peer = f[0].id + published.add(f[0].id) + + return (published, failed) + method publish*(p: PubSub, topic: string, - data: seq[byte]) {.base, async.} = - # TODO: Should throw indicating success/failure + data: seq[byte]): Future[int] {.base, async.} = ## publish to a ``topic`` if p.triggerSelf and topic in p.topics: for h in p.topics[topic].handler: @@ -255,6 +283,8 @@ method publish*(p: PubSub, # more cleanup though debug "Could not write to pubsub connection", msg = exc.msg + return 0 + method initPubSub*(p: PubSub) {.base.} = ## perform pubsub initialization p.observers = new(seq[PubSubObserver]) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ee9851b..1bda8ad 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -32,15 +32,14 @@ type onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} PubSubPeer* = ref object of RootObj - proto: string # the protocol that this peer joined from + proto*: string # the protocol that this peer joined from sendConn: Connection peerInfo*: PeerInfo handler*: RPCHandler topics*: seq[string] sentRpcCache: TimedCache[string] # cache for already sent messages recvdRpcCache: TimedCache[string] # cache for already received messages - refs*: int # refcount of the connections this peer is handling - onConnect: AsyncEvent + onConnect*: AsyncEvent observers*: ref seq[PubSubObserver] # ref as in smart_ptr RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} @@ -56,6 +55,9 @@ proc `conn=`*(p: PubSubPeer, conn: Connection) = p.sendConn = conn p.onConnect.fire() +proc conn*(p: PubSubPeer): Connection = + p.sendConn + proc recvObservers(p: PubSubPeer, msg: var RPCMsg) = # trigger hooks if not(isNil(p.observers)) and p.observers[].len > 0: @@ -100,10 +102,17 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = trace "exiting pubsub peer read loop", peer = p.id await conn.close() + except CancelledError as exc: + raise exc except CatchableError as exc: trace "Exception occurred in PubSubPeer.handle", exc = exc.msg + raise exc proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = + logScope: + peer = p.id + msgs = $msgs + for m in msgs.items: trace "sending msgs to peer", toPeer = p.id, msgs = $msgs @@ -122,38 +131,29 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) continue - proc sendToRemote() {.async.} = - try: - trace "about to send message", peer = p.id, - encoded = digest - if not p.onConnect.isSet: - await p.onConnect.wait() + try: + trace "about to send message", peer = p.id, + encoded = digest + if p.connected: # this can happen if the remote disconnected + trace "sending encoded msgs to peer", peer = p.id, + encoded = encoded.buffer.shortLog + await p.sendConn.writeLp(encoded.buffer) + p.sentRpcCache.put(digest) - if p.connected: # this can happen if the remote disconnected - trace "sending encoded msgs to peer", peer = p.id, - encoded = encoded.buffer.shortLog - await p.sendConn.writeLp(encoded.buffer) - p.sentRpcCache.put(digest) + for m in msgs: + for mm in m.messages: + for t in mm.topicIDs: + # metrics + libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) - for m in msgs: - for mm in m.messages: - for t in mm.topicIDs: - # metrics - libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) + except CatchableError as exc: + trace "unable to send to remote", exc = exc.msg + if not(isNil(p.sendConn)): + await p.sendConn.close() + p.sendConn = nil + p.onConnect.clear() - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "unable to send to remote", exc = exc.msg - if not(isNil(p.sendConn)): - await p.sendConn.close() - p.sendConn = nil - p.onConnect.clear() - - # if no connection has been set, - # queue messages until a connection - # becomes available - asyncCheck sendToRemote() + raise exc proc sendMsg*(p: PubSubPeer, peerId: PeerID, @@ -172,6 +172,9 @@ proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} = trace "sending prune msg to peer", peer = p.id, topicID = topic await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))]) +proc `$`*(p: PubSubPeer): string = + p.id + proc newPubSubPeer*(peerInfo: PeerInfo, proto: string): PubSubPeer = new result diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index e48025a..235de6e 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -55,7 +55,9 @@ method handshake(s: Secure, initiator: bool): Future[SecureConn] {.async, base.} = doAssert(false, "Not implemented!") -proc handleConn*(s: Secure, conn: Connection, initiator: bool): Future[Connection] {.async, gcsafe.} = +proc handleConn*(s: Secure, + conn: Connection, + initiator: bool): Future[Connection] {.async, gcsafe.} = var sconn = await s.handshake(conn, initiator) conn.closeEvent.wait() @@ -71,7 +73,8 @@ method init*(s: Secure) {.gcsafe.} = proc handle(conn: Connection, proto: string) {.async, gcsafe.} = trace "handling connection upgrade", proto try: - # We don't need the result but we definitely need to await the handshake + # We don't need the result but we + # definitely need to await the handshake discard await s.handleConn(conn, false) trace "connection secured" except CancelledError as exc: @@ -84,7 +87,10 @@ method init*(s: Secure) {.gcsafe.} = s.handler = handle -method secure*(s: Secure, conn: Connection, initiator: bool): Future[Connection] {.async, base, gcsafe.} = +method secure*(s: Secure, + conn: Connection, + initiator: bool): + Future[Connection] {.async, base, gcsafe.} = result = await s.handleConn(conn, initiator) method readOnce*(s: SecureConn, diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index 4ff0d03..312ae23 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -7,6 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import oids import chronos, chronicles import connection, ../utility @@ -40,7 +41,6 @@ template withExceptions(body: untyped) = except TransportError: # TODO https://github.com/status-im/nim-chronos/pull/99 raise newLPStreamEOFError() - # raise (ref LPStreamError)(msg: exc.msg, parent: exc) method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = if s.atEof: @@ -73,11 +73,18 @@ method atEof*(s: ChronosStream): bool {.inline.} = method close*(s: ChronosStream) {.async.} = try: if not s.isClosed: - await procCall Connection(s).close() + trace "shutting down chronos stream", address = $s.client.remoteAddress(), + oid = s.oid - trace "shutting down chronos stream", address = $s.client.remoteAddress(), oid = s.oid + # TODO: the sequence here matters + # don't move it after the connections + # close bellow if not s.client.closed(): await s.client.closeWait() + await procCall Connection(s).close() + + except CancelledError as exc: + raise exc except CatchableError as exc: trace "error closing chronosstream", exc = exc.msg diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 0ec74ad..418b0ca 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -12,7 +12,8 @@ import tables, options, strformat, sets, - algorithm + algorithm, + oids import chronos, chronicles, @@ -78,6 +79,7 @@ type secureManagers*: seq[Secure] pubSub*: Option[PubSub] dialLock: Table[string, AsyncLock] + cleanUpLock: Table[string, AsyncLock] proc newNoPubSubException(): ref NoPubSubException {.inline.} = result = newException(NoPubSubException, "no pubsub provided!") @@ -86,7 +88,7 @@ proc newTooManyConnections(): ref TooManyConnections {.inline.} = result = newException(TooManyConnections, "too many connections for peer") proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} -proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} +proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} proc selectConn(s: Switch, peerInfo: PeerInfo): Connection = ## select the "best" connection according to some criteria @@ -159,6 +161,9 @@ proc storeConn(s: Switch, newSeq[MuxerHolder]()) .add(MuxerHolder(muxer: muxer, handle: handle, dir: dir)) + trace "storred connection", connections = s.connections.len + libp2p_peers.set(s.connections.len.int64) + proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = if s.secureManagers.len <= 0: raise newException(CatchableError, "No secure managers registered!") @@ -252,45 +257,56 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} = if isNil(conn): return - defer: - await conn.close() - libp2p_peers.set(s.connections.len.int64) - if isNil(conn.peerInfo): return let id = conn.peerInfo.id - trace "cleaning up connection for peer", peerId = id - if id in s.muxed: - let muxerHolder = s.muxed[id] - .filterIt( - it.muxer.connection == conn - ) - - if muxerHolder.len > 0: - await muxerHolder[0].muxer.close() - if not(isNil(muxerHolder[0].handle)): - await muxerHolder[0].handle + let lock = s.cleanUpLock.mgetOrPut(id, newAsyncLock()) + try: + await lock.acquire() + trace "cleaning up connection for peer", peerId = id if id in s.muxed: - s.muxed[id].keepItIf( - it.muxer.connection != conn + let muxerHolder = s.muxed[id] + .filterIt( + it.muxer.connection == conn + ) + + if muxerHolder.len > 0: + await muxerHolder[0].muxer.close() + if not(isNil(muxerHolder[0].handle)): + await muxerHolder[0].handle + + if id in s.muxed: + s.muxed[id].keepItIf( + it.muxer.connection != conn + ) + + if s.muxed[id].len == 0: + s.muxed.del(id) + + if s.pubSub.isSome: + await s.pubSub.get() + .unsubscribePeer(conn.peerInfo) + + if id in s.connections: + s.connections[id].keepItIf( + it.conn != conn ) - if s.muxed[id].len == 0: - s.muxed.del(id) + if s.connections[id].len == 0: + s.connections.del(id) - if id in s.connections: - s.connections[id].keepItIf( - it.conn != conn - ) + # TODO: Investigate cleanupConn() always called twice for one peer. + if not(conn.peerInfo.isClosed()): + conn.peerInfo.close() + finally: + await conn.close() - if s.connections[id].len == 0: - s.connections.del(id) + if lock.locked(): + lock.release() - # TODO: Investigate cleanupConn() always called twice for one peer. - if not(conn.peerInfo.isClosed()): - conn.peerInfo.close() + libp2p_peers.set(s.connections.len.int64) proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} = let connections = s.connections.getOrDefault(peer.id) @@ -323,7 +339,6 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g raise newException(CatchableError, "unable to mux connection, stopping upgrade") - libp2p_peers.set(s.connections.len.int64) trace "succesfully upgraded outgoing connection", uoid = sconn.oid return sconn @@ -375,8 +390,8 @@ proc internalConnect(s: Switch, raise newException(CatchableError, "can't dial self!") let id = peer.id - let lock = s.dialLock.mgetOrPut(id, newAsyncLock()) var conn: Connection + let lock = s.dialLock.mgetOrPut(id, newAsyncLock()) defer: if lock.locked(): @@ -436,29 +451,48 @@ proc internalConnect(s: Switch, doAssert(conn.peerInfo.id in s.connections, "connection not tracked!") - trace "dial succesfull", oid = conn.oid - await s.subscribeToPeer(peer) + trace "dial succesfull", oid = $conn.oid, + peer = $conn.peerInfo + + await s.subscribePeer(peer) return conn proc connect*(s: Switch, peer: PeerInfo) {.async.} = - var conn = await s.internalConnect(peer) + discard await s.internalConnect(peer) proc dial*(s: Switch, peer: PeerInfo, proto: string): Future[Connection] {.async.} = - var conn = await s.internalConnect(peer) + let conn = await s.internalConnect(peer) let stream = await s.getMuxedStream(peer) - if isNil(stream): - await conn.close() - raise newException(CatchableError, "Couldn't get muxed stream") - trace "Attempting to select remote", proto = proto, oid = conn.oid - if not await s.ms.select(stream, proto): - await stream.close() - raise newException(CatchableError, "Unable to select sub-protocol " & proto) + proc cleanup() {.async.} = + if not(isNil(stream)): + await stream.close() - return stream + if not(isNil(conn)): + await conn.close() + + try: + if isNil(stream): + await conn.close() + raise newException(CatchableError, "Couldn't get muxed stream") + + trace "Attempting to select remote", proto = proto, oid = conn.oid + if not await s.ms.select(stream, proto): + await stream.close() + raise newException(CatchableError, "Unable to select sub-protocol " & proto) + + return stream + except CancelledError as exc: + trace "dial canceled" + await cleanup() + raise exc + except CatchableError as exc: + trace "error dialing" + await cleanup() + raise exc proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = if isNil(proto.handler): @@ -527,7 +561,7 @@ proc stop*(s: Switch) {.async.} = trace "switch stopped" -proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = +proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = ## Subscribe to pub sub peer if s.pubSub.isSome and not(s.pubSub.get().connected(peerInfo)): trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog() @@ -554,7 +588,7 @@ proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = await stream.close() return - await s.pubSub.get().subscribeToPeer(stream) + s.pubSub.get().subscribePeer(stream) proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] = @@ -564,7 +598,7 @@ proc subscribe*(s: Switch, topic: string, retFuture.fail(newNoPubSubException()) return retFuture - result = s.pubSub.get().subscribe(topic, handler) + return s.pubSub.get().subscribe(topic, handler) proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] = ## unsubscribe from topics @@ -573,16 +607,16 @@ proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] = retFuture.fail(newNoPubSubException()) return retFuture - result = s.pubSub.get().unsubscribe(topics) + return s.pubSub.get().unsubscribe(topics) -proc publish*(s: Switch, topic: string, data: seq[byte]): Future[void] = +proc publish*(s: Switch, topic: string, data: seq[byte]): Future[int] = # pubslish to pubsub topic if s.pubSub.isNone: - var retFuture = newFuture[void]("Switch.publish") + var retFuture = newFuture[int]("Switch.publish") retFuture.fail(newNoPubSubException()) return retFuture - result = s.pubSub.get().publish(topic, data) + return s.pubSub.get().publish(topic, data) proc addValidator*(s: Switch, topics: varargs[string], @@ -608,8 +642,6 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = if not(isNil(stream)): await stream.close() - trace "got new muxer" - try: # once we got a muxed connection, attempt to # identify it @@ -622,14 +654,15 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = # store muxer and muxed connection await s.storeConn(muxer, Direction.In) - libp2p_peers.set(s.connections.len.int64) muxer.connection.closeEvent.wait() .addCallback do(udata: pointer): asyncCheck s.cleanupConn(muxer.connection) + trace "got new muxer", peer = $muxer.connection.peerInfo + # try establishing a pubsub connection - await s.subscribeToPeer(muxer.connection.peerInfo) + await s.subscribePeer(muxer.connection.peerInfo) except CancelledError as exc: await muxer.close() diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 119c51e..f679897 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -59,7 +59,7 @@ suite "FloodSub": await nodes[1].subscribe("foobar", handler) await waitSub(nodes[0], nodes[1], "foobar") - await nodes[0].publish("foobar", "Hello!".toBytes()) + check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0 result = await completionFut.wait(5.seconds) @@ -90,7 +90,7 @@ suite "FloodSub": await nodes[0].subscribe("foobar", handler) await waitSub(nodes[1], nodes[0], "foobar") - await nodes[1].publish("foobar", "Hello!".toBytes()) + check (await nodes[1].publish("foobar", "Hello!".toBytes())) > 0 result = await completionFut.wait(5.seconds) @@ -125,7 +125,7 @@ suite "FloodSub": nodes[1].addValidator("foobar", validator) - await nodes[0].publish("foobar", "Hello!".toBytes()) + check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0 check (await handlerFut) == true await allFuturesThrowing( @@ -159,7 +159,7 @@ suite "FloodSub": nodes[1].addValidator("foobar", validator) - await nodes[0].publish("foobar", "Hello!".toBytes()) + discard await nodes[0].publish("foobar", "Hello!".toBytes()) await allFuturesThrowing( nodes[0].stop(), @@ -197,8 +197,8 @@ suite "FloodSub": nodes[1].addValidator("foo", "bar", validator) - await nodes[0].publish("foo", "Hello!".toBytes()) - await nodes[0].publish("bar", "Hello!".toBytes()) + check (await nodes[0].publish("foo", "Hello!".toBytes())) > 0 + check (await nodes[0].publish("bar", "Hello!".toBytes())) > 0 await allFuturesThrowing( nodes[0].stop(), @@ -249,7 +249,7 @@ suite "FloodSub": subs &= waitSub(nodes[i], nodes[y], "foobar") await allFuturesThrowing(subs) - var pubs: seq[Future[void]] + var pubs: seq[Future[int]] for i in 0.. 0, "waitSub timeout!") +template tryPublish(call: untyped, require: int, wait: Duration = 1.seconds, times: int = 10): untyped = + var + limit = times + pubs = 0 + while pubs < require and limit > 0: + pubs = pubs + call + await sleepAsync(wait) + limit.dec() + if limit == 0: + doAssert(false, "Failed to publish!") + suite "GossipSub": teardown: for tracker in testTrackers(): @@ -63,9 +74,7 @@ suite "GossipSub": await subscribeNodes(nodes) await nodes[0].subscribe("foobar", handler) - await waitSub(nodes[1], nodes[0], "foobar") await nodes[1].subscribe("foobar", handler) - await waitSub(nodes[0], nodes[1], "foobar") var validatorFut = newFuture[bool]() proc validator(topic: string, @@ -76,7 +85,7 @@ suite "GossipSub": result = true nodes[1].addValidator("foobar", validator) - await nodes[0].publish("foobar", "Hello!".toBytes()) + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 result = (await validatorFut) and (await handlerFut) await allFuturesThrowing( @@ -100,17 +109,16 @@ suite "GossipSub": await subscribeNodes(nodes) await nodes[1].subscribe("foobar", handler) - await waitSub(nodes[0], nodes[1], "foobar") var validatorFut = newFuture[bool]() proc validator(topic: string, message: Message): Future[bool] {.async.} = - validatorFut.complete(true) result = false + validatorFut.complete(true) nodes[1].addValidator("foobar", validator) - await nodes[0].publish("foobar", "Hello!".toBytes()) + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 result = await validatorFut await allFuturesThrowing( @@ -134,10 +142,9 @@ suite "GossipSub": awaiters.add((await nodes[1].start())) await subscribeNodes(nodes) + await nodes[1].subscribe("foo", handler) - await waitSub(nodes[0], nodes[1], "foo") await nodes[1].subscribe("bar", handler) - await waitSub(nodes[0], nodes[1], "bar") var passed, failed: Future[bool] = newFuture[bool]() proc validator(topic: string, @@ -151,8 +158,8 @@ suite "GossipSub": false nodes[1].addValidator("foo", "bar", validator) - await nodes[0].publish("foo", "Hello!".toBytes()) - await nodes[0].publish("bar", "Hello!".toBytes()) + tryPublish await nodes[0].publish("foo", "Hello!".toBytes()), 1 + tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1 result = ((await passed) and (await failed) and (await handlerFut)) await allFuturesThrowing( @@ -170,7 +177,8 @@ suite "GossipSub": var nodes: seq[Switch] = newSeq[Switch]() for i in 0..<2: - nodes.add newStandardSwitch(gossip = true, secureManagers = [SecureProtocol.Noise]) + nodes.add newStandardSwitch(gossip = true, + secureManagers = [SecureProtocol.Noise]) var awaitters: seq[Future[void]] for node in nodes: @@ -178,7 +186,7 @@ suite "GossipSub": await subscribeNodes(nodes) await nodes[1].subscribe("foobar", handler) - await sleepAsync(1.seconds) + await sleepAsync(10.seconds) let gossip1 = GossipSub(nodes[0].pubSub.get()) let gossip2 = GossipSub(nodes[1].pubSub.get()) @@ -272,7 +280,7 @@ suite "GossipSub": nodes[1].pubsub.get().addObserver(obs1) nodes[0].pubsub.get().addObserver(obs2) - await nodes[0].publish("foobar", "Hello!".toBytes()) + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 var gossipSub1: GossipSub = GossipSub(nodes[0].pubSub.get()) @@ -287,7 +295,7 @@ suite "GossipSub": await nodes[1].stop() await allFuturesThrowing(wait) - # result = observed == 2 + check observed == 2 result = true check: @@ -310,7 +318,7 @@ suite "GossipSub": await nodes[1].subscribe("foobar", handler) await waitSub(nodes[0], nodes[1], "foobar") - await nodes[0].publish("foobar", "Hello!".toBytes()) + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 result = await passed @@ -328,7 +336,9 @@ suite "GossipSub": var runs = 10 for i in 0..= runs: seenFut.complete() - subs.add(allFutures(dialer.subscribe("foobar", handler), - waitSub(nodes[0], dialer, "foobar"))) + subs &= dialer.subscribe("foobar", handler) await allFuturesThrowing(subs) - await wait(nodes[0].publish("foobar", - cast[seq[byte]]("from node " & - nodes[1].peerInfo.id)), - 1.minutes) + tryPublish await wait(nodes[0].publish("foobar", + cast[seq[byte]]("from node " & + nodes[1].peerInfo.id)), + 1.minutes), runs, 5.seconds await wait(seenFut, 2.minutes) check: seen.len >= runs for k, v in seen.pairs: - check: v == 1 + check: v >= 1 await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(awaitters) @@ -377,10 +386,12 @@ suite "GossipSub": var runs = 10 for i in 0..= runs for k, v in seen.pairs: - check: v == 1 + check: v >= 1 await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(awaitters) diff --git a/tests/testinterop.nim b/tests/testinterop.nim index 988846e..fc3bef4 100644 --- a/tests/testinterop.nim +++ b/tests/testinterop.nim @@ -151,7 +151,7 @@ proc testPubSubNodePublish(gossip: bool = false, proc publisher() {.async.} = while not finished: - await nativeNode.publish(testTopic, msgData) + discard await nativeNode.publish(testTopic, msgData) await sleepAsync(500.millis) await wait(publisher(), 5.minutes) # should be plenty of time