diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index c5aa3736f..a4f0a638e 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit c5aa3736f96e4d66f6aa653a2351ded74b7d21a9 +Subproject commit a4f0a638e718f05ecec01ae3a6ad2838714e7e40 diff --git a/waku/incentivization/eligibility_manager.nim b/waku/incentivization/eligibility_manager.nim index 3343f7186..da8280da3 100644 --- a/waku/incentivization/eligibility_manager.nim +++ b/waku/incentivization/eligibility_manager.nim @@ -13,7 +13,8 @@ type EligibilityManager* = ref object # FIXME: make web3 private? proc init*( T: type EligibilityManager, ethClient: string ): Future[EligibilityManager] {.async.} = - return EligibilityManager(web3: await newWeb3(ethClient), seenTxIds: initHashSet[TxHash]()) + return + EligibilityManager(web3: await newWeb3(ethClient), seenTxIds: initHashSet[TxHash]()) # TODO: handle error if web3 instance is not established # Clean up the web3 instance diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 6894f5578..ba04b6b00 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -518,6 +518,42 @@ proc connectedPeers*( return (inPeers, outPeers) +proc getStreamByPeerIdAndProtocol*( + pm: PeerManager, peerId: PeerId, protocol: string +): Future[Result[Connection, string]] {.async.} = + ## Establishes a new stream to the given peer and protocol or returns the existing stream, if any. + ## Notice that the "Connection" type represents a stream within a transport connection + ## (we will need to adapt this term.) + + let peerIdsMuxers: Table[PeerId, seq[Muxer]] = pm.switch.connManager.getConnections() + if not peerIdsMuxers.contains(peerId): + return err("peerId not found in connManager: " & $peerId) + + let muxers = peerIdsMuxers[peerId] + + var streams = newSeq[Connection](0) + for m in muxers: + for s in m.getStreams(): + ## getStreams is defined in nim-libp2p + streams.add(s) + + ## Try to get the opened streams for the given protocol + let streamsOfInterest = streams.filterIt( + it.protocol == protocol and not LPStream(it).isClosed and + not LPStream(it).isClosedRemotely + ) + + if streamsOfInterest.len > 0: + ## In theory there should be one stream per protocol. Then we just pick up the 1st + return ok(streamsOfInterest[0]) + + ## There isn't still a stream. Let's dial to create one + let streamRes = await pm.dialPeer(peerId, protocol) + if streamRes.isNone(): + return err("getStreamByPeerIdProto no connection to peer: " & $peerId) + + return ok(streamRes.get()) + proc connectToRelayPeers*(pm: PeerManager) {.async.} = var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) let totalRelayPeers = inRelayPeers.len + outRelayPeers.len diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 22504488e..d8b79ab67 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -172,29 +172,15 @@ proc pushToPeer( ): Future[Result[void, string]] {.async.} = debug "pushing message to subscribed peer", peerId = shortLog(peerId) - if not wf.peerManager.wakuPeerStore.hasPeer(peerId, WakuFilterPushCodec): - # Check that peer has not been removed from peer store - error "no addresses for peer", peerId = shortLog(peerId) - return err("no addresses for peer: " & $peerId) + let stream = ( + await wf.peerManager.getStreamByPeerIdAndProtocol(peerId, WakuFilterPushCodec) + ).valueOr: + error "pushToPeer failed", error + return err("pushToPeer failed: " & $error) - let conn = - if wf.peerConnections.contains(peerId): - wf.peerConnections[peerId] - else: - ## we never pushed a message before, let's dial then - let connRes = await wf.peerManager.dialPeer(peerId, WakuFilterPushCodec) - if connRes.isNone(): - ## We do not remove this peer, but allow the underlying peer manager - ## to do so if it is deemed necessary - error "pushToPeer no connection to peer", peerId = shortLog(peerId) - return err("pushToPeer no connection to peer: " & shortLog(peerId)) + await stream.writeLp(buffer) - let newConn = connRes.get() - wf.peerConnections[peerId] = newConn - newConn - - await conn.writeLp(buffer) - debug "published successful", peerId = shortLog(peerId), conn + debug "published successful", peerId = shortLog(peerId), stream waku_service_network_bytes.inc( amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"] )