diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 535de0c8a..c74a81fd8 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -123,7 +123,7 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy trace "handle: starting multistream handling", handshaked = active var handshaked = active try: - while not conn.closed: + while not conn.atEof: var ms = string.fromBytes(await conn.readLp(1024)) validateSuffix(ms) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 4671c227b..abe7faffa 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -82,7 +82,7 @@ template withEOFExceptions(body: untyped): untyped = proc cleanupTimer(s: LPChannel) {.async.} = ## cleanup timers if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished: - await s.timerTaskFut.cancelAndWait() + s.timerTaskFut.cancel() proc closeMessage(s: LPChannel) {.async.} = logScope: diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 708a15b59..98cf462d3 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -117,7 +117,7 @@ method handle*(m: Mplex) {.async, gcsafe.} = trace "stopping mplex main loop", oid = $m.oid await m.close() - while not m.connection.closed: + while not m.connection.atEof: trace "waiting for data", oid = $m.oid let (id, msgType, data) = await m.connection.readMsg() trace "read message from connection", id = id, @@ -169,7 +169,10 @@ method handle*(m: Mplex) {.async, gcsafe.} = trace "pushing data to channel" if data.len > MaxMsgSize: + warn "attempting to send a packet larger than allowed", allowed = MaxMsgSize, + sending = data.len raise newLPStreamLimitError() + await channel.pushTo(data) of MessageType.CloseIn, MessageType.CloseOut: diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 6df139e0c..d0adce3e6 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -97,7 +97,7 @@ method rpcHandler*(f: FloodSub, trace "exception in message handler", exc = exc.msg # forward the message to all peers interested in it - let published = await f.publishHelper(toSendPeers, m.messages) + let published = await f.publishHelper(toSendPeers, m.messages, DefaultSendTimeout) trace "forwared message to peers", peers = published @@ -120,9 +120,10 @@ method subscribePeer*(p: FloodSub, method publish*(f: FloodSub, topic: string, - data: seq[byte]): Future[int] {.async.} = + data: seq[byte], + timeout: Duration = InfiniteDuration): Future[int] {.async.} = # base returns always 0 - discard await procCall PubSub(f).publish(topic, data) + discard await procCall PubSub(f).publish(topic, data, timeout) if data.len <= 0 or topic.len <= 0: trace "topic or data missing, skipping publish" @@ -137,7 +138,7 @@ method publish*(f: FloodSub, let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign) # start the future but do not wait yet - let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg]) + let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg], timeout) libp2p_pubsub_messages_published.inc(labelValues = [topic]) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 969adfd21..9b3d0e4a5 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -189,16 +189,14 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = let gossipPeers = mesh + fanout let mids = g.mcache.window(topic) - if mids.len <= 0: + if not mids.len > 0: continue - let ihave = ControlIHave(topicID: topic, - messageIDs: toSeq(mids)) - if topic notin g.gossipsub: trace "topic not in gossip array, skipping", topicID = topic continue + let ihave = ControlIHave(topicID: topic, messageIDs: toSeq(mids)) for peer in allPeers: if result.len >= GossipSubD: trace "got gossip peers", peers = result.len @@ -422,7 +420,7 @@ method rpcHandler*(g: GossipSub, trace "exception in message handler", exc = exc.msg # forward the message to all peers interested in it - let published = await g.publishHelper(toSendPeers, m.messages) + let published = await g.publishHelper(toSendPeers, m.messages, DefaultSendTimeout) trace "forwared message to peers", peers = published @@ -436,9 +434,15 @@ method rpcHandler*(g: GossipSub, let messages = g.handleIWant(peer, control.iwant) if respControl.graft.len > 0 or respControl.prune.len > 0 or - respControl.ihave.len > 0 or respControl.iwant.len > 0: - await peer.send( - RPCMsg(control: some(respControl), messages: messages)) + respControl.ihave.len > 0: + try: + info "sending control message", msg = respControl + await peer.send( + RPCMsg(control: some(respControl), messages: messages)) + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception forwarding control messages", exc = exc.msg method subscribe*(g: GossipSub, topic: string, @@ -476,9 +480,10 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = method publish*(g: GossipSub, topic: string, - data: seq[byte]): Future[int] {.async.} = + data: seq[byte], + timeout: Duration = InfiniteDuration): Future[int] {.async.} = # base returns always 0 - discard await procCall PubSub(g).publish(topic, data) + discard await procCall PubSub(g).publish(topic, data, timeout) trace "publishing message on topic", topic, data = data.shortLog var peers: HashSet[PubSubPeer] @@ -512,7 +517,7 @@ method publish*(g: GossipSub, if msgId notin g.mcache: g.mcache.put(msgId, msg) - let published = await g.publishHelper(peers, @[msg]) + let published = await g.publishHelper(peers, @[msg], timeout) if published > 0: libp2p_pubsub_messages_published.inc(labelValues = [topic]) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 8f1822f7d..f6f6de0e0 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -30,8 +30,6 @@ declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messag declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"]) type - SendRes = tuple[published: seq[string], failed: seq[string]] # keep private - TopicHandler* = proc(topic: string, data: seq[byte]): Future[void] {.gcsafe.} @@ -51,7 +49,7 @@ type peerInfo*: PeerInfo # this peer's info topics*: Table[string, Topic] # local topics peers*: Table[string, PubSubPeer] # peerid to peer map - conns*: Table[PeerInfo, HashSet[Connection]] # peers connections + conns*: Table[PeerInfo, HashSet[Connection]] # peers connections triggerSelf*: bool # trigger own local handler on publish verifySignature*: bool # enable signature verification sign*: bool # enable message signing @@ -66,6 +64,7 @@ method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = ## if not(isNil(peer)) and peer.peerInfo notin p.conns: trace "deleting peer", peer = peer.id + peer.onConnect.fire() # Make sure all pending sends are unblocked p.peers.del(peer.id) trace "peer disconnected", peer = peer.id @@ -95,24 +94,7 @@ proc sendSubs*(p: PubSub, topics: seq[string], subscribe: bool) {.async.} = ## send subscriptions to remote peer - - try: - # wait for a connection before publishing - # this happens when - if not peer.onConnect.isSet: - trace "awaiting send connection" - await peer.onConnect.wait() - - await peer.sendSubOpts(topics, subscribe) - except CancelledError as exc: - if not(isNil(peer)) and not(isNil(peer.conn)): - await peer.conn.close() - - raise exc - except CatchableError as exc: - trace "unable to send subscriptions", exc = exc.msg - if not(isNil(peer)) and not(isNil(peer.conn)): - await peer.conn.close() + asyncCheck peer.sendSubOpts(topics, subscribe) method subscribeTopic*(p: PubSub, topic: string, @@ -147,7 +129,6 @@ proc getOrCreatePeer(p: PubSub, p.peers[peer.id] = peer peer.observers = p.observers - # metrics libp2p_pubsub_peers.set(p.peers.len.int64) return peer @@ -281,9 +262,11 @@ method subscribe*(p: PubSub, # metrics libp2p_pubsub_topics.set(p.topics.len.int64) -proc sendHelper*(p: PubSub, - sendPeers: HashSet[PubSubPeer], - msgs: seq[Message]): Future[SendRes] {.async.} = +proc publishHelper*(p: PubSub, + sendPeers: HashSet[PubSubPeer], + msgs: seq[Message], + timeout: Duration): Future[int] {.async.} = + # send messages and cleanup failed peers var sent: seq[tuple[id: string, fut: Future[void]]] for sendPeer in sendPeers: # avoid sending to self @@ -291,7 +274,7 @@ proc sendHelper*(p: PubSub, continue trace "sending messages to peer", peer = sendPeer.id, msgs - sent.add((id: sendPeer.id, fut: sendPeer.send(RPCMsg(messages: msgs)))) + sent.add((id: sendPeer.id, fut: sendPeer.send(RPCMsg(messages: msgs), timeout))) var published: seq[string] var failed: seq[string] @@ -306,13 +289,6 @@ proc sendHelper*(p: PubSub, trace "sending messages to peer succeeded", peer = f[0].id published.add(f[0].id) - return (published, failed) - -proc publishHelper*(p: PubSub, - sendPeers: HashSet[PubSubPeer], - msgs: seq[Message]): Future[int] {.async.} = - # send messages and cleanup failed peers - let (published, failed) = await p.sendHelper(sendPeers, msgs) for f in failed: let peer = p.peers.getOrDefault(f) if not(isNil(peer)) and not(isNil(peer.conn)): @@ -322,7 +298,8 @@ proc publishHelper*(p: PubSub, method publish*(p: PubSub, topic: string, - data: seq[byte]): Future[int] {.base, async.} = + data: seq[byte], + timeout: Duration = InfiniteDuration): Future[int] {.base, async.} = ## publish to a ``topic`` if p.triggerSelf and topic in p.topics: for h in p.topics[topic].handler: diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index f223ee06c..0b58614d0 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -26,6 +26,10 @@ declareCounter(libp2p_pubsub_received_messages, "number of messages received", l declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"]) declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"]) +const + DefaultReadTimeout* = 1.minutes + DefaultSendTimeout* = 10.seconds + type PubSubObserver* = ref object onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} @@ -81,9 +85,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = debug "starting pubsub read loop for peer", closed = conn.closed try: try: - while not conn.closed: + while not conn.atEof: trace "waiting for data", closed = conn.closed - let data = await conn.readLp(64 * 1024) + let data = await conn.readLp(64 * 1024).wait(DefaultReadTimeout) let digest = $(sha256.digest(data)) trace "read data from peer", data = data.shortLog if digest in p.recvdRpcCache: @@ -119,7 +123,10 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = trace "Exception occurred in PubSubPeer.handle", exc = exc.msg raise exc -proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} = +proc send*( + p: PubSubPeer, + msg: RPCMsg, + timeout: Duration = DefaultSendTimeout) {.async.} = logScope: peer = p.id msg = shortLog(msg) @@ -132,7 +139,7 @@ proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} = let encoded = encodeRpcMsg(mm) if encoded.len <= 0: - trace "empty message, skipping" + info "empty message, skipping" return logScope: @@ -144,21 +151,36 @@ proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} = libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) return - try: + proc sendToRemote() {.async.} = + logScope: + peer = p.id + msg = shortLog(msg) + trace "about to send message" + + if not p.onConnect.isSet: + await p.onConnect.wait() + if p.connected: # this can happen if the remote disconnected trace "sending encoded msgs to peer" await p.sendConn.writeLp(encoded) p.sentRpcCache.put(digest) + trace "sent pubsub message to remote" for x in mm.messages: for t in x.topicIDs: # metrics libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) + let sendFut = sendToRemote() + try: + await sendFut.wait(timeout) except CatchableError as exc: trace "unable to send to remote", exc = exc.msg + if not sendFut.finished: + sendFut.cancel() + if not(isNil(p.sendConn)): await p.sendConn.close() p.sendConn = nil @@ -166,21 +188,41 @@ proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} = raise exc -proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool): Future[void] = +proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool) {.async.} = trace "sending subscriptions", peer = p.id, subscribe, topicIDs = topics - p.send(RPCMsg( - subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it)))) + try: + await p.send(RPCMsg( + subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))), + # the long timeout is mostly for cases where + # the connection is flaky at the beggingin + timeout = 3.minutes) + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception sending subscriptions", exc = exc.msg -proc sendGraft*(p: PubSubPeer, topics: seq[string]): Future[void] = +proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} = trace "sending graft to peer", peer = p.id, topicIDs = topics - p.send(RPCMsg(control: some( - ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it)))))) -proc sendPrune*(p: PubSubPeer, topics: seq[string]): Future[void] = + try: + await p.send(RPCMsg(control: some( + ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it)))))) + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception sending grafts", exc = exc.msg + +proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} = trace "sending prune to peer", peer = p.id, topicIDs = topics - p.send(RPCMsg(control: some( - ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it)))))) + + try: + await p.send(RPCMsg(control: some( + ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it)))))) + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception sending prunes", exc = exc.msg proc `$`*(p: PubSubPeer): string = p.id diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 4cb305707..254ee0ca1 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -97,14 +97,16 @@ proc triggerHooks(s: Switch, peer: PeerInfo, cycle: Lifecycle) {.async, gcsafe.} proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} +proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} = try: await conn.closeEvent.wait() + trace "about to cleanup pubsub peer" if s.pubSub.isSome: let fut = s.pubsubMonitors.getOrDefault(conn.peerInfo.peerId) if not(isNil(fut)) and not(fut.finished): - await fut.cancelAndWait() + fut.cancel() await s.pubSub.get().unsubscribePeer(conn.peerInfo) except CancelledError as exc: @@ -235,7 +237,7 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g return sconn proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = - trace "upgrading incoming connection", conn = $conn, oid = conn.oid + trace "upgrading incoming connection", conn = $conn, oid = $conn.oid let ms = newMultistream() # secure incoming connections @@ -244,7 +246,7 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = {.async, gcsafe, closure.} = var sconn: Connection - trace "Securing connection", oid = conn.oid + trace "Securing connection", oid = $conn.oid let secure = s.secureManagers.filterIt(it.codec == proto)[0] try: @@ -356,8 +358,8 @@ proc internalConnect(s: Switch, trace "dial successful", oid = $conn.oid, peer = $conn.peerInfo - await s.subscribePeer(peer) asyncCheck s.cleanupPubSubPeer(conn) + asyncCheck s.subscribePeer(conn.peerInfo) trace "got connection", oid = $conn.oid, direction = $conn.dir, @@ -391,7 +393,7 @@ proc dial*(s: Switch, oid = $conn.oid if not await s.ms.select(stream, proto): await stream.close() - raise newException(CatchableError, "Unable to select sub-protocol " & proto) + raise newException(CatchableError, "Unable to select sub-protocol" & proto) return stream except CancelledError as exc: @@ -445,7 +447,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = if s.pubSub.isSome: await s.pubSub.get().start() - info "started libp2p node", peer = $s.peerInfo, addrs = s.peerInfo.addrs + debug "started libp2p node", peer = $s.peerInfo, addrs = s.peerInfo.addrs result = startFuts # listen for incoming connections proc stop*(s: Switch) {.async.} = @@ -472,7 +474,9 @@ proc stop*(s: Switch) {.async.} = proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = ## Subscribe to pub sub peer - if s.pubSub.isSome and not(s.pubSub.get().connected(peerInfo)): + ## + + if s.pubSub.isSome and not s.pubSub.get().connected(peerInfo): trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog() var stream: Connection try: @@ -483,6 +487,7 @@ proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = if not await s.ms.select(stream, s.pubSub.get().codec): if not(isNil(stream)): + trace "couldn't select pubsub", codec = s.pubSub.get().codec await stream.close() return @@ -504,23 +509,17 @@ proc pubsubMonitor(s: Switch, peer: PeerInfo) {.async.} = ## pubsub connection as well ## - var tries = 0 - var backoffFactor = 5 # up to ~10 mins - var backoff = 1.seconds - while s.isConnected(peer) and - tries < MaxPubsubReconnectAttempts: + while s.isConnected(peer): try: - debug "subscribing to pubsub peer", peer = $peer - await s.subscribePeerInternal(peer) + debug "subscribing to pubsub peer", peer = $peer + await s.subscribePeerInternal(peer) except CancelledError as exc: raise exc except CatchableError as exc: trace "exception in pubsub monitor", peer = $peer, exc = exc.msg finally: - debug "awaiting backoff period before reconnecting", peer = $peer, backoff, tries - await sleepAsync(backoff) # allow the peer to cooldown - backoff = backoff * backoffFactor - tries.inc() + debug "sleeping before trying pubsub peer", peer = $peer + await sleepAsync(1.seconds) # allow the peer to cooldown trace "exiting pubsub monitor", peer = $peer @@ -528,6 +527,8 @@ proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = if peerInfo.peerId notin s.pubsubMonitors: s.pubsubMonitors[peerInfo.peerId] = s.pubsubMonitor(peerInfo) + result = s.pubsubMonitors.getOrDefault(peerInfo.peerId) + proc subscribe*(s: Switch, topic: string, handler: TopicHandler) {.async.} = ## subscribe to a pubsub topic @@ -554,14 +555,17 @@ proc unsubscribeAll*(s: Switch, topic: string) {.async.} = await s.pubSub.get().unsubscribeAll(topic) -proc publish*(s: Switch, topic: string, data: seq[byte]): Future[int] {.async.} = +proc publish*(s: Switch, + topic: string, + data: seq[byte], + timeout: Duration = InfiniteDuration): Future[int] {.async.} = ## pubslish to pubsub topic ## if s.pubSub.isNone: raise newNoPubSubException() - return await s.pubSub.get().publish(topic, data) + return await s.pubSub.get().publish(topic, data, timeout) proc addValidator*(s: Switch, topics: varargs[string], @@ -611,8 +615,8 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = asyncCheck s.triggerHooks(muxer.connection.peerInfo, Lifecycle.Upgraded) # try establishing a pubsub connection - await s.subscribePeer(muxer.connection.peerInfo) asyncCheck s.cleanupPubSubPeer(muxer.connection) + asyncCheck s.subscribePeer(muxer.connection.peerInfo) except CancelledError as exc: await muxer.close() diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index b8bff2713..3d8fdad16 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -15,7 +15,9 @@ import utils, ../../libp2p/[errors, switch, stream/connection, + stream/bufferstream, crypto/crypto, + protocols/pubsub/pubsubpeer, protocols/pubsub/pubsub, protocols/pubsub/floodsub, protocols/pubsub/rpc/messages, @@ -218,6 +220,45 @@ suite "FloodSub": check: waitFor(runTests()) == true + test "FloodSub publish should fail on timeout": + proc runTests(): Future[bool] {.async.} = + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + discard + + var nodes = generateNodes(2) + var awaiters: seq[Future[void]] + awaiters.add((await nodes[0].start())) + awaiters.add((await nodes[1].start())) + + let subscribes = await subscribeNodes(nodes) + await nodes[1].subscribe("foobar", handler) + await waitSub(nodes[0], nodes[1], "foobar") + + let pubsub = nodes[0].pubSub.get() + let peer = pubsub.peers[nodes[1].peerInfo.id] + + peer.conn = Connection(newBufferStream( + proc (data: seq[byte]) {.async, gcsafe.} = + await sleepAsync(10.seconds) + ,size = 0)) + + let in10millis = Moment.fromNow(10.millis) + let sent = await nodes[0].publish("foobar", "Hello!".toBytes(), 10.millis) + + check Moment.now() >= in10millis + check sent == 0 + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop()) + + await allFuturesThrowing(subscribes) + await allFuturesThrowing(awaiters) + result = true + + check: + waitFor(runTests()) == true + test "FloodSub multiple peers, no self trigger": proc runTests(): Future[bool] {.async.} = var runs = 10 diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index f496508b5..5b1bedaec 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -16,8 +16,10 @@ import utils, ../../libp2p/[errors, peerid, peerinfo, stream/connection, + stream/bufferstream, crypto/crypto, protocols/pubsub/pubsub, + protocols/pubsub/pubsubpeer, protocols/pubsub/gossipsub, protocols/pubsub/peertable, protocols/pubsub/rpc/messages] @@ -200,6 +202,45 @@ suite "GossipSub": check: waitFor(runTests()) == true + test "GossipSub publish should fail on timeout": + proc runTests(): Future[bool] {.async.} = + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + discard + + var nodes = generateNodes(2, gossip = true) + var awaiters: seq[Future[void]] + awaiters.add((await nodes[0].start())) + awaiters.add((await nodes[1].start())) + + let subscribes = await subscribeNodes(nodes) + await nodes[1].subscribe("foobar", handler) + await waitSub(nodes[0], nodes[1], "foobar") + + let pubsub = nodes[0].pubSub.get() + let peer = pubsub.peers[nodes[1].peerInfo.id] + + peer.conn = Connection(newBufferStream( + proc (data: seq[byte]) {.async, gcsafe.} = + await sleepAsync(10.seconds) + , size = 0)) + + let in10millis = Moment.fromNow(10.millis) + let sent = await nodes[0].publish("foobar", "Hello!".toBytes(), 10.millis) + + check Moment.now() >= in10millis + check sent == 0 + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop()) + + await allFuturesThrowing(subscribes) + await allFuturesThrowing(awaiters) + result = true + + check: + waitFor(runTests()) == true + test "e2e - GossipSub should add remote peer topic subscriptions": proc testBasicGossipSub(): Future[bool] {.async.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = diff --git a/tests/testmultistream.nim b/tests/testmultistream.nim index 7f944b6cf..4e1c241bd 100644 --- a/tests/testmultistream.nim +++ b/tests/testmultistream.nim @@ -54,6 +54,7 @@ method write*(s: TestSelectStream, msg: seq[byte]) {.async, gcsafe.} = discard method close(s: TestSelectStream) {.async, gcsafe.} = s.isClosed = true + s.isEof = true proc newTestSelectStream(): TestSelectStream = new result @@ -104,6 +105,7 @@ method write*(s: TestLsStream, msg: seq[byte]) {.async, gcsafe.} = method close(s: TestLsStream) {.async, gcsafe.} = s.isClosed = true + s.isEof = true proc newTestLsStream(ls: LsHandler): TestLsStream {.gcsafe.} = new result @@ -157,6 +159,7 @@ method write*(s: TestNaStream, msg: seq[byte]) {.async, gcsafe.} = method close(s: TestNaStream) {.async, gcsafe.} = s.isClosed = true + s.isEof = true proc newTestNaStream(na: NaHandler): TestNaStream = new result @@ -234,6 +237,7 @@ suite "Multistream select": let conn = newTestNaStream(testNaHandler) proc testNaHandler(msg: string): Future[void] {.async, gcsafe.} = + echo msg check msg == Na await conn.close()