mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-23 04:18:30 +00:00
chore: refactor filter to react when the remote peer closes the stream (#3281)
Better control when the remote peer closes the WakuFilterPushCodec stream. For example, go-waku closes the stream for every received message. On the other hand, js-waku keeps the stream opened. Therefore, we support both scenarios.
This commit is contained in:
parent
8a7e602543
commit
ce7f09a35f
2
vendor/nim-libp2p
vendored
2
vendor/nim-libp2p
vendored
@ -1 +1 @@
|
|||||||
Subproject commit c5aa3736f96e4d66f6aa653a2351ded74b7d21a9
|
Subproject commit a4f0a638e718f05ecec01ae3a6ad2838714e7e40
|
@ -13,7 +13,8 @@ type EligibilityManager* = ref object # FIXME: make web3 private?
|
|||||||
proc init*(
|
proc init*(
|
||||||
T: type EligibilityManager, ethClient: string
|
T: type EligibilityManager, ethClient: string
|
||||||
): Future[EligibilityManager] {.async.} =
|
): Future[EligibilityManager] {.async.} =
|
||||||
return EligibilityManager(web3: await newWeb3(ethClient), seenTxIds: initHashSet[TxHash]())
|
return
|
||||||
|
EligibilityManager(web3: await newWeb3(ethClient), seenTxIds: initHashSet[TxHash]())
|
||||||
# TODO: handle error if web3 instance is not established
|
# TODO: handle error if web3 instance is not established
|
||||||
|
|
||||||
# Clean up the web3 instance
|
# Clean up the web3 instance
|
||||||
|
@ -518,6 +518,42 @@ proc connectedPeers*(
|
|||||||
|
|
||||||
return (inPeers, outPeers)
|
return (inPeers, outPeers)
|
||||||
|
|
||||||
|
proc getStreamByPeerIdAndProtocol*(
|
||||||
|
pm: PeerManager, peerId: PeerId, protocol: string
|
||||||
|
): Future[Result[Connection, string]] {.async.} =
|
||||||
|
## Establishes a new stream to the given peer and protocol or returns the existing stream, if any.
|
||||||
|
## Notice that the "Connection" type represents a stream within a transport connection
|
||||||
|
## (we will need to adapt this term.)
|
||||||
|
|
||||||
|
let peerIdsMuxers: Table[PeerId, seq[Muxer]] = pm.switch.connManager.getConnections()
|
||||||
|
if not peerIdsMuxers.contains(peerId):
|
||||||
|
return err("peerId not found in connManager: " & $peerId)
|
||||||
|
|
||||||
|
let muxers = peerIdsMuxers[peerId]
|
||||||
|
|
||||||
|
var streams = newSeq[Connection](0)
|
||||||
|
for m in muxers:
|
||||||
|
for s in m.getStreams():
|
||||||
|
## getStreams is defined in nim-libp2p
|
||||||
|
streams.add(s)
|
||||||
|
|
||||||
|
## Try to get the opened streams for the given protocol
|
||||||
|
let streamsOfInterest = streams.filterIt(
|
||||||
|
it.protocol == protocol and not LPStream(it).isClosed and
|
||||||
|
not LPStream(it).isClosedRemotely
|
||||||
|
)
|
||||||
|
|
||||||
|
if streamsOfInterest.len > 0:
|
||||||
|
## In theory there should be one stream per protocol. Then we just pick up the 1st
|
||||||
|
return ok(streamsOfInterest[0])
|
||||||
|
|
||||||
|
## There isn't still a stream. Let's dial to create one
|
||||||
|
let streamRes = await pm.dialPeer(peerId, protocol)
|
||||||
|
if streamRes.isNone():
|
||||||
|
return err("getStreamByPeerIdProto no connection to peer: " & $peerId)
|
||||||
|
|
||||||
|
return ok(streamRes.get())
|
||||||
|
|
||||||
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
||||||
var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
|
var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
|
||||||
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
|
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
|
||||||
|
@ -172,29 +172,15 @@ proc pushToPeer(
|
|||||||
): Future[Result[void, string]] {.async.} =
|
): Future[Result[void, string]] {.async.} =
|
||||||
debug "pushing message to subscribed peer", peerId = shortLog(peerId)
|
debug "pushing message to subscribed peer", peerId = shortLog(peerId)
|
||||||
|
|
||||||
if not wf.peerManager.wakuPeerStore.hasPeer(peerId, WakuFilterPushCodec):
|
let stream = (
|
||||||
# Check that peer has not been removed from peer store
|
await wf.peerManager.getStreamByPeerIdAndProtocol(peerId, WakuFilterPushCodec)
|
||||||
error "no addresses for peer", peerId = shortLog(peerId)
|
).valueOr:
|
||||||
return err("no addresses for peer: " & $peerId)
|
error "pushToPeer failed", error
|
||||||
|
return err("pushToPeer failed: " & $error)
|
||||||
|
|
||||||
let conn =
|
await stream.writeLp(buffer)
|
||||||
if wf.peerConnections.contains(peerId):
|
|
||||||
wf.peerConnections[peerId]
|
|
||||||
else:
|
|
||||||
## we never pushed a message before, let's dial then
|
|
||||||
let connRes = await wf.peerManager.dialPeer(peerId, WakuFilterPushCodec)
|
|
||||||
if connRes.isNone():
|
|
||||||
## We do not remove this peer, but allow the underlying peer manager
|
|
||||||
## to do so if it is deemed necessary
|
|
||||||
error "pushToPeer no connection to peer", peerId = shortLog(peerId)
|
|
||||||
return err("pushToPeer no connection to peer: " & shortLog(peerId))
|
|
||||||
|
|
||||||
let newConn = connRes.get()
|
debug "published successful", peerId = shortLog(peerId), stream
|
||||||
wf.peerConnections[peerId] = newConn
|
|
||||||
newConn
|
|
||||||
|
|
||||||
await conn.writeLp(buffer)
|
|
||||||
debug "published successful", peerId = shortLog(peerId), conn
|
|
||||||
waku_service_network_bytes.inc(
|
waku_service_network_bytes.inc(
|
||||||
amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"]
|
amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"]
|
||||||
)
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user