diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 382db1fd0..3562c84e8 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -63,50 +63,38 @@ method unsubscribePeer*(f: FloodSub, peer: PeerID) = method rpcHandler*(f: FloodSub, peer: PubSubPeer, - rpcMsgs: seq[RPCMsg]) {.async.} = - await procCall PubSub(f).rpcHandler(peer, rpcMsgs) + rpcMsg: RPCMsg) {.async.} = + await procCall PubSub(f).rpcHandler(peer, rpcMsg) - for m in rpcMsgs: # for all RPC messages - if m.messages.len > 0: # if there are any messages - var toSendPeers = initHashSet[PubSubPeer]() - for msg in m.messages: # for every message - let msgId = f.msgIdProvider(msg) - logScope: msgId + if rpcMsg.messages.len > 0: # if there are any messages + var toSendPeers = initHashSet[PubSubPeer]() + for msg in rpcMsg.messages: # for every message + let msgId = f.msgIdProvider(msg) + logScope: msgId - if msgId notin f.seen: - f.seen.put(msgId) # add the message to the seen cache + if msgId notin f.seen: + f.seen.put(msgId) # add the message to the seen cache - if f.verifySignature and not msg.verify(peer.peerId): - trace "dropping message due to failed signature verification" - continue + if f.verifySignature and not msg.verify(peer.peerId): + trace "dropping message due to failed signature verification" + continue - if not (await f.validate(msg)): - trace "dropping message due to failed validation" - continue + if not (await f.validate(msg)): + trace "dropping message due to failed validation" + continue - for t in msg.topicIDs: # for every topic in the message - if t in f.floodsub: - toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic - if t in f.topics: # check that we're subscribed to it - for h in f.topics[t].handler: - trace "calling handler for message", topicId = t, - localPeer = f.peerInfo.id, - fromPeer = msg.fromPeer.pretty + for t in msg.topicIDs: # for every topic in the message + if t in f.floodsub: + toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic - try: - await h(t, msg.data) # trigger user provided handler - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception in message handler", exc = exc.msg + await handleData(f, t, msg.data) - # forward the message to all peers interested in it - let published = await f.broadcast( - toSeq(toSendPeers), - RPCMsg(messages: m.messages), - DefaultSendTimeout) + # forward the message to all peers interested in it + f.broadcast( + toSeq(toSendPeers), + RPCMsg(messages: rpcMsg.messages)) - trace "forwared message to peers", peers = published + trace "forwared message to peers", peers = toSendPeers.len method init*(f: FloodSub) = proc handler(conn: Connection, proto: string) {.async.} = @@ -122,14 +110,13 @@ method init*(f: FloodSub) = method publish*(f: FloodSub, topic: string, - data: seq[byte], - timeout: Duration = InfiniteDuration): Future[int] {.async.} = + data: seq[byte]): Future[int] {.async.} = # base returns always 0 - discard await procCall PubSub(f).publish(topic, data, timeout) + discard await procCall PubSub(f).publish(topic, data) if data.len <= 0 or topic.len <= 0: trace "topic or data missing, skipping publish" - return + return 0 if topic notin f.floodsub: trace "missing peers for topic, skipping publish" @@ -137,33 +124,34 @@ method publish*(f: FloodSub, trace "publishing on topic", name = topic inc f.msgSeqno - let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign) + let + msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign) + peers = toSeq(f.floodsub.getOrDefault(topic)) # start the future but do not wait yet - let published = await f.broadcast( - toSeq(f.floodsub.getOrDefault(topic)), - RPCMsg(messages: @[msg]), - timeout) + f.broadcast( + peers, + RPCMsg(messages: @[msg])) when defined(libp2p_expensive_metrics): libp2p_pubsub_messages_published.inc(labelValues = [topic]) - trace "published message to peers", peers = published, + trace "published message to peers", peers = peers.len, msg = msg.shortLog() - return published + return peers.len method unsubscribe*(f: FloodSub, topics: seq[TopicPair]) {.async.} = await procCall PubSub(f).unsubscribe(topics) for p in f.peers.values: - discard await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) + f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) method unsubscribeAll*(f: FloodSub, topic: string) {.async.} = await procCall PubSub(f).unsubscribeAll(topic) for p in f.peers.values: - discard await f.sendSubs(p, @[topic], false) + f.sendSubs(p, @[topic], false) method initPubSub*(f: FloodSub) = procCall PubSub(f).initPubSub() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index fbda35dcc..3714cfe7d 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -157,10 +157,10 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = # Send changes to peers after table updates to avoid stale state if grafts.len > 0: let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))) - discard await g.broadcast(grafts, graft, DefaultSendTimeout) + g.broadcast(grafts, graft) if prunes.len > 0: let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) - discard await g.broadcast(prunes, prune, DefaultSendTimeout) + g.broadcast(prunes, prune) proc dropFanoutPeers(g: GossipSub) = # drop peers that we haven't published to in @@ -229,14 +229,11 @@ proc heartbeat(g: GossipSub) {.async.} = g.replenishFanout(t) let peers = g.getGossipPeers() - var sent: seq[Future[bool]] for peer, control in peers: g.peers.withValue(peer.peerId, pubsubPeer) do: - sent &= g.send( + g.send( pubsubPeer[], - RPCMsg(control: some(control)), - DefaultSendTimeout) - checkFutures(await allFinished(sent)) + RPCMsg(control: some(control))) g.mcache.shift() # shift the cache except CancelledError as exc: @@ -379,89 +376,68 @@ proc handleIWant(g: GossipSub, method rpcHandler*(g: GossipSub, peer: PubSubPeer, - rpcMsgs: seq[RPCMsg]) {.async.} = - await procCall PubSub(g).rpcHandler(peer, rpcMsgs) + rpcMsg: RPCMsg) {.async.} = + await procCall PubSub(g).rpcHandler(peer, rpcMsg) - for m in rpcMsgs: # for all RPC messages - if m.messages.len > 0: # if there are any messages - var toSendPeers: HashSet[PubSubPeer] - for msg in m.messages: # for every message - let msgId = g.msgIdProvider(msg) - logScope: msgId + if rpcMsg.messages.len > 0: # if there are any messages + var toSendPeers: HashSet[PubSubPeer] + for msg in rpcMsg.messages: # for every message + let msgId = g.msgIdProvider(msg) + logScope: msgId - if msgId in g.seen: - trace "message already processed, skipping" - continue + if msgId in g.seen: + trace "message already processed, skipping" + continue - trace "processing message" + trace "processing message" - g.seen.put(msgId) # add the message to the seen cache + g.seen.put(msgId) # add the message to the seen cache - if g.verifySignature and not msg.verify(peer.peerId): - trace "dropping message due to failed signature verification" - continue + if g.verifySignature and not msg.verify(peer.peerId): + trace "dropping message due to failed signature verification" + continue - if not (await g.validate(msg)): - trace "dropping message due to failed validation" - continue + if not (await g.validate(msg)): + trace "dropping message due to failed validation" + continue - # this shouldn't happen - if g.peerInfo.peerId == msg.fromPeer: - trace "skipping messages from self" - continue + # this shouldn't happen + if g.peerInfo.peerId == msg.fromPeer: + trace "skipping messages from self" + continue - for t in msg.topicIDs: # for every topic in the message - if t in g.floodsub: - toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic + for t in msg.topicIDs: # for every topic in the message + if t in g.floodsub: + toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic - if t in g.mesh: - toSendPeers.incl(g.mesh[t]) # get all mesh peers for topic + if t in g.mesh: + toSendPeers.incl(g.mesh[t]) # get all mesh peers for topic - if t in g.topics: # if we're subscribed to the topic - for h in g.topics[t].handler: - trace "calling handler for message", topicId = t, - localPeer = g.peerInfo.id, - fromPeer = msg.fromPeer.pretty - try: - await h(t, msg.data) # trigger user provided handler - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception in message handler", exc = exc.msg + await handleData(g, t, msg.data) - # forward the message to all peers interested in it - let published = await g.broadcast( - toSeq(toSendPeers), - RPCMsg(messages: m.messages), - DefaultSendTimeout) + # forward the message to all peers interested in it + g.broadcast( + toSeq(toSendPeers), + RPCMsg(messages: rpcMsg.messages)) - trace "forwared message to peers", peers = published + trace "forwared message to peers", peers = toSendPeers.len + + if rpcMsg.control.isSome: + let control = rpcMsg.control.get() + g.handlePrune(peer, control.prune) var respControl: ControlMessage - if m.control.isSome: - let control = m.control.get() - g.handlePrune(peer, control.prune) + respControl.iwant.add(g.handleIHave(peer, control.ihave)) + respControl.prune.add(g.handleGraft(peer, control.graft)) + let messages = g.handleIWant(peer, control.iwant) - respControl.iwant.add(g.handleIHave(peer, control.ihave)) - respControl.prune.add(g.handleGraft(peer, control.graft)) - let messages = g.handleIWant(peer, control.iwant) + if respControl.graft.len > 0 or respControl.prune.len > 0 or + respControl.ihave.len > 0 or messages.len > 0: - if respControl.graft.len > 0 or respControl.prune.len > 0 or - respControl.ihave.len > 0: - try: - info "sending control message", msg = respControl - let sent = await g.send( - peer, - RPCMsg(control: some(respControl), messages: messages), - DefaultSendTimeout) - - if not sent: - g.unsubscribePeer(peer.peerId) - - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception forwarding control messages", exc = exc.msg + debug "sending control message", msg = shortLog(respControl) + g.send( + peer, + RPCMsg(control: some(respControl), messages: messages)) method subscribe*(g: GossipSub, topic: string, @@ -481,7 +457,7 @@ method unsubscribe*(g: GossipSub, g.mesh.del(topic) let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) - discard await g.broadcast(toSeq(peers), prune, DefaultSendTimeout) + g.broadcast(toSeq(peers), prune) method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = await procCall PubSub(g).unsubscribeAll(topic) @@ -491,14 +467,13 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = g.mesh.del(topic) let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) - discard await g.broadcast(toSeq(peers), prune, DefaultSendTimeout) + g.broadcast(toSeq(peers), prune) method publish*(g: GossipSub, topic: string, - data: seq[byte], - timeout: Duration = InfiniteDuration): Future[int] {.async.} = + data: seq[byte]): Future[int] {.async.} = # base returns always 0 - discard await procCall PubSub(g).publish(topic, data, timeout) + discard await procCall PubSub(g).publish(topic, data) trace "publishing message on topic", topic, data = data.shortLog if topic.len <= 0: # data could be 0/empty @@ -533,14 +508,14 @@ method publish*(g: GossipSub, g.mcache.put(msgId, msg) if peers.len > 0: - let published = await g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]), timeout) + g.broadcast(toSeq(peers), RPCMsg(messages: @[msg])) when defined(libp2p_expensive_metrics): - if published > 0: + if peers.len > 0: libp2p_pubsub_messages_published.inc(labelValues = [topic]) - trace "published message to peers", peers = published, + trace "published message to peers", peers = peers.len, msg = msg.shortLog() - return published + return peers.len else: debug "No peers for gossip message", topic, msg = msg.shortLog() return 0 diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 8c4a6dff2..e3c242010 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -66,54 +66,38 @@ method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} = ## trace "unsubscribing pubsub peer", peer = $peerId - if peerId in p.peers: - p.peers.del(peerId) + p.peers.del(peerId) libp2p_pubsub_peers.set(p.peers.len.int64) proc send*( p: PubSub, peer: PubSubPeer, - msg: RPCMsg, - timeout: Duration): Future[bool] {.async.} = + msg: RPCMsg) = ## send to remote peer ## trace "sending pubsub message to peer", peer = $peer, msg = shortLog(msg) - try: - await peer.send(msg, timeout) - return true - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception sending pubsub message to peer", - peer = $peer, msg = shortLog(msg) - p.unsubscribePeer(peer.peerId) + peer.send(msg) proc broadcast*( p: PubSub, - sendPeers: seq[PubSubPeer], - msg: RPCMsg, - timeout: Duration): Future[int] {.async.} = - ## send messages and cleanup failed peers + sendPeers: openArray[PubSubPeer], + msg: RPCMsg) = # raises: [Defect] + ## send messages - returns number of send attempts made. ## trace "broadcasting messages to peers", peers = sendPeers.len, message = shortLog(msg) - let sent = await allFinished( - sendPeers.mapIt( p.send(it, msg, timeout) )) - return sent.filterIt( it.finished and it.read ).len + for peer in sendPeers: + p.send(peer, msg) proc sendSubs*(p: PubSub, peer: PubSubPeer, topics: seq[string], - subscribe: bool): Future[bool] = + subscribe: bool) = ## send subscriptions to remote peer - p.send( - peer, - RPCMsg( - subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))), - DefaultSendTimeout) + p.send(peer, RPCMsg.withSubs(topics, subscribe)) method subscribeTopic*(p: PubSub, topic: string, @@ -124,16 +108,14 @@ method subscribeTopic*(p: PubSub, method rpcHandler*(p: PubSub, peer: PubSubPeer, - rpcMsgs: seq[RPCMsg]) {.async, base.} = + rpcMsg: RPCMsg) {.async, base.} = ## handle rpc messages - trace "processing RPC message", peer = peer.id, msgs = rpcMsgs.len + logScope: peer = peer.id - for m in rpcMsgs: # for all RPC messages - trace "processing messages", msg = m.shortLog - if m.subscriptions.len > 0: # if there are any subscriptions - for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic - trace "about to subscribe to topic", topicId = s.topic - p.subscribeTopic(s.topic, s.subscribe, peer) + trace "processing RPC message", msg = rpcMsg.shortLog + for s in rpcMsg.subscriptions: # subscribe/unsubscribe the peer for each topic + trace "about to subscribe to topic", topicId = s.topic + p.subscribeTopic(s.topic, s.subscribe, peer) proc getOrCreatePeer*( p: PubSub, @@ -142,16 +124,36 @@ proc getOrCreatePeer*( if peer in p.peers: return p.peers[peer] + proc getConn(): Future[(Connection, RPCMsg)] {.async.} = + let conn = await p.switch.dial(peer, proto) + return (conn, RPCMsg.withSubs(toSeq(p.topics.keys), true)) + # create new pubsub peer - let pubSubPeer = newPubSubPeer(peer, p.switch, proto) + let pubSubPeer = newPubSubPeer(peer, getConn, proto) trace "created new pubsub peer", peerId = $peer p.peers[peer] = pubSubPeer pubSubPeer.observers = p.observers libp2p_pubsub_peers.set(p.peers.len.int64) + + pubsubPeer.connect() + return pubSubPeer +proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.async.} = + if topic notin p.topics: return # Not subscribed + + for h in p.topics[topic].handler: + trace "triggering handler", topicID = topic + try: + await h(topic, data) + except CancelledError as exc: + raise exc + except CatchableError as exc: + # Handlers should never raise exceptions + warn "Error in topic handler", msg = exc.msg + method handleConn*(p: PubSub, conn: Connection, proto: string) {.base, async.} = @@ -171,15 +173,13 @@ method handleConn*(p: PubSub, await conn.close() return - proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg): Future[void] = # call pubsub rpc handler - await p.rpcHandler(peer, msgs) + p.rpcHandler(peer, msg) + + let peer = p.getOrCreatePeer(conn.peerInfo.peerId, proto) try: - let peer = p.getOrCreatePeer(conn.peerInfo.peerId, proto) - if p.topics.len > 0: - discard await p.sendSubs(peer, toSeq(p.topics.keys), true) - peer.handler = handler await peer.handle(conn) # spawn peer read loop trace "pubsub peer handler ended", peer = peer.id @@ -195,15 +195,7 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} = ## messages ## - let pubsubPeer = p.getOrCreatePeer(peer, p.codec) - if p.topics.len > 0: - # TODO sendSubs may raise, but doing asyncCheck here causes the exception - # to escape to the poll loop. - # With a bit of luck, it may be harmless to ignore exceptions here - - # some cleanup is eventually done in PubSubPeer.send - asyncCheck p.sendSubs(pubsubPeer, toSeq(p.topics.keys), true) - - pubsubPeer.subscribed = true + discard p.getOrCreatePeer(peer, p.codec) method unsubscribe*(p: PubSub, topics: seq[TopicPair]) {.base, async.} = @@ -249,32 +241,18 @@ method subscribe*(p: PubSub, p.topics[topic].handler.add(handler) - var sent: seq[Future[bool]] - for peer in toSeq(p.peers.values): - sent.add(p.sendSubs(peer, @[topic], true)) - - checkFutures(await allFinished(sent)) + for _, peer in p.peers: + p.sendSubs(peer, @[topic], true) # metrics libp2p_pubsub_topics.set(p.topics.len.int64) method publish*(p: PubSub, topic: string, - data: seq[byte], - timeout: Duration = InfiniteDuration): Future[int] {.base, async.} = + data: seq[byte]): Future[int] {.base, async.} = ## publish to a ``topic`` - if p.triggerSelf and topic in p.topics: - for h in p.topics[topic].handler: - trace "triggering handler", topicID = topic - try: - await h(topic, data) - except CancelledError as exc: - raise exc - except CatchableError as exc: - # TODO these exceptions are ignored since it's likely that if writes are - # are failing, the underlying connection is already closed - this needs - # more cleanup though - debug "Could not write to pubsub connection", msg = exc.msg + if p.triggerSelf: + await handleData(p, topic, data) return 0 diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 15004d7bc..1d8a0ef5c 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -28,27 +28,26 @@ when defined(libp2p_expensive_metrics): declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"]) declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"]) -const - DefaultSendTimeout* = 10.seconds - type PubSubObserver* = ref object onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} + GetConn* = proc(): Future[(Connection, RPCMsg)] {.gcsafe.} + PubSubPeer* = ref object of RootObj - switch*: Switch # switch instance to dial peers + getConn*: GetConn # callback to establish a new send connection codec*: string # the protocol that this peer joined from sendConn: Connection # cached send connection + connections*: seq[Connection] # connections to this peer peerId*: PeerID handler*: RPCHandler sentRpcCache: TimedCache[string] # cache for already sent messages recvdRpcCache: TimedCache[string] # cache for already received messages observers*: ref seq[PubSubObserver] # ref as in smart_ptr - subscribed*: bool # are we subscribed to this peer dialLock: AsyncLock - RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} + RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe.} func hash*(p: PubSubPeer): Hash = # int is either 32/64, so intptr basically, pubsubpeer is a ref @@ -78,13 +77,15 @@ proc sendObservers(p: PubSubPeer, msg: var RPCMsg) = proc handle*(p: PubSubPeer, conn: Connection) {.async.} = logScope: + oid = $conn.oid peer = p.id + closed = conn.closed - debug "starting pubsub read loop for peer", closed = conn.closed + debug "starting pubsub read loop" try: try: while not conn.atEof: - trace "waiting for data", closed = conn.closed + trace "waiting for data" let data = await conn.readLp(64 * 1024) let digest = $(sha256.digest(data)) trace "read data from peer", data = data.shortLog @@ -111,10 +112,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = # metrics libp2p_pubsub_received_messages.inc(labelValues = [p.id, t]) - await p.handler(p, @[msg]) + await p.handler(p, msg) p.recvdRpcCache.put(digest) finally: - debug "exiting pubsub peer read loop" await conn.close() if p.sendConn == conn: @@ -124,9 +124,14 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = raise exc except CatchableError as exc: trace "Exception occurred in PubSubPeer.handle", exc = exc.msg + finally: + debug "exiting pubsub read loop" proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = - # get a cached send connection or create a new one + ## get a cached send connection or create a new one - will return nil if + ## getting a new connection fails + ## + block: # check if there's an existing connection that can be reused let current = p.sendConn @@ -147,7 +152,8 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = # and later close one of them, other implementations such as rust-libp2p # become deaf to our messages (potentially due to the clean-up associated # with closing connections). To prevent this, we use a lock that ensures - # that only a single dial will be performed for each peer. + # that only a single dial will be performed for each peer and send the + # subscription table every time we reconnect. # # Nevertheless, this approach is still quite problematic because the gossip # sends and their respective dials may be started from the mplex read loop. @@ -172,24 +178,35 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = return current # Grab a new send connection - let newConn = await p.switch.dial(p.peerId, p.codec) # ...and here + let (newConn, handshake) = await p.getConn() # ...and here if newConn.isNil: return nil + trace "Sending handshake", oid = $newConn.oid, handshake = shortLog(handshake) + await newConn.writeLp(encodeRpcMsg(handshake)) + trace "Caching new send connection", oid = $newConn.oid p.sendConn = newConn asyncCheck p.handle(newConn) # start a read loop on the new connection return newConn - + except CancelledError as exc: + raise exc + except CatchableError as exc: + return nil finally: if p.dialLock.locked: p.dialLock.release() -proc send*( - p: PubSubPeer, - msg: RPCMsg, - timeout: Duration = DefaultSendTimeout) {.async.} = +proc connectImpl*(p: PubSubPeer) {.async.} = + try: + discard await getSendConn(p) + except CatchableError as exc: + debug "Could not connect to pubsub peer", err = exc.msg +proc connect*(p: PubSubPeer) = + asyncCheck(connectImpl(p)) + +proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} = doAssert(not isNil(p), "pubsubpeer nil!") logScope: @@ -217,16 +234,15 @@ proc send*( libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) return - var conn: Connection + var conn = await p.getSendConn() try: trace "about to send message" - conn = await p.getSendConn() - if conn == nil: debug "Couldn't get send connection, dropping message" return + trace "sending encoded msgs to peer", connId = $conn.oid - await conn.writeLp(encoded).wait(timeout) + await conn.writeLp(encoded) p.sentRpcCache.put(digest) trace "sent pubsub message to remote", connId = $conn.oid @@ -238,22 +254,32 @@ proc send*( libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) except CatchableError as exc: + # Because we detach the send call from the currently executing task using + # asyncCheck, no exceptions may leak out of it trace "unable to send to remote", exc = exc.msg # Next time sendConn is used, it will be have its close flag set and thus # will be recycled if not isNil(conn): - await conn.close() + await conn.close() # This will clean up the send connection - raise exc + if exc is CancelledError: # TODO not handled + debug "Send cancelled" + + # We'll ask for a new send connection whenever possible + if p.sendConn == conn: + p.sendConn = nil + +proc send*(p: PubSubPeer, msg: RPCMsg) = + asyncCheck sendImpl(p, msg) proc `$`*(p: PubSubPeer): string = - p.id + $p.peerId proc newPubSubPeer*(peerId: PeerID, - switch: Switch, + getConn: GetConn, codec: string): PubSubPeer = new result - result.switch = switch + result.getConn = getConn result.codec = codec result.peerId = peerId result.sentRpcCache = newTimedCache[string](2.minutes) diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 9f62f5bcb..3930de5a0 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -50,6 +50,11 @@ type messages*: seq[Message] control*: Option[ControlMessage] +func withSubs*( + T: type RPCMsg, topics: openArray[string], subscribe: bool): T = + T( + subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))) + func shortLog*(s: ControlIHave): auto = ( topicID: s.topicID.shortLog, diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 5cd33c1e6..25840e498 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -43,7 +43,7 @@ logScope: topics = "bufferstream" const - DefaultBufferSize* = 102400 + DefaultBufferSize* = 128 const BufferStreamTrackerName* = "libp2p.bufferstream" diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 779e9fca2..5d48b1905 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -16,6 +16,13 @@ type proc noop(data: seq[byte]) {.async, gcsafe.} = discard +proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): auto = + proc getConn(): Future[(Connection, RPCMsg)] {.async.} = + let conn = await p.switch.dial(peerId, GossipSubCodec) + return (conn, RPCMsg.withSubs(toSeq(p.topics.keys), true)) + + newPubSubPeer(peerId, getConn, GossipSubCodec) + proc randomPeerInfo(): PeerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) @@ -39,7 +46,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) gossipSub.peers[peerInfo.peerId] = peer gossipSub.mesh[topic].incl(peer) @@ -69,7 +76,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) gossipSub.peers[peerInfo.peerId] = peer gossipSub.mesh[topic].incl(peer) @@ -89,7 +96,7 @@ suite "GossipSub internal": proc testRun(): Future[bool] {.async.} = let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard let topic = "foobar" @@ -101,7 +108,7 @@ suite "GossipSub internal": conns &= conn var peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) peer.handler = handler gossipSub.gossipsub[topic].incl(peer) @@ -121,7 +128,7 @@ suite "GossipSub internal": proc testRun(): Future[bool] {.async.} = let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard let topic = "foobar" @@ -135,7 +142,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) peer.handler = handler gossipSub.fanout[topic].incl(peer) @@ -156,7 +163,7 @@ suite "GossipSub internal": proc testRun(): Future[bool] {.async.} = let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard let topic1 = "foobar1" @@ -173,7 +180,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) peer.handler = handler gossipSub.fanout[topic1].incl(peer) gossipSub.fanout[topic2].incl(peer) @@ -197,7 +204,7 @@ suite "GossipSub internal": proc testRun(): Future[bool] {.async.} = let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard let topic = "foobar" @@ -212,7 +219,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) peer.handler = handler if i mod 2 == 0: gossipSub.fanout[topic].incl(peer) @@ -225,7 +232,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) peer.handler = handler gossipSub.gossipsub[topic].incl(peer) @@ -262,7 +269,7 @@ suite "GossipSub internal": proc testRun(): Future[bool] {.async.} = let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard let topic = "foobar" @@ -274,7 +281,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) peer.handler = handler if i mod 2 == 0: gossipSub.fanout[topic].incl(peer) @@ -307,7 +314,7 @@ suite "GossipSub internal": proc testRun(): Future[bool] {.async.} = let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard let topic = "foobar" @@ -319,7 +326,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) peer.handler = handler if i mod 2 == 0: gossipSub.mesh[topic].incl(peer) @@ -352,7 +359,7 @@ suite "GossipSub internal": proc testRun(): Future[bool] {.async.} = let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard let topic = "foobar" @@ -364,7 +371,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec) + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) peer.handler = handler if i mod 2 == 0: gossipSub.mesh[topic].incl(peer)