mirror of https://github.com/waku-org/nwaku.git
filter: dial push connection onsubscribe only
This commit is contained in:
parent
e493b588f3
commit
26a2112c5d
|
@ -45,7 +45,7 @@ proc subscribe(
|
|||
peerId: PeerID,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic],
|
||||
): FilterSubscribeResult =
|
||||
): Future[FilterSubscribeResult] {.async.} =
|
||||
# TODO: check if this condition is valid???
|
||||
if pubsubTopic.isNone() or contentTopics.len == 0:
|
||||
error "pubsubTopic and contentTopics must be specified", peerId = peerId
|
||||
|
@ -66,7 +66,7 @@ proc subscribe(
|
|||
debug "subscribing peer to filter criteria",
|
||||
peerId = peerId, filterCriteria = filterCriteria
|
||||
|
||||
wf.subscriptions.addSubscription(peerId, filterCriteria).isOkOr:
|
||||
(await wf.subscriptions.addSubscription(peerId, filterCriteria, wf.peerManager)).isOkOr:
|
||||
return err(FilterSubscribeError.serviceUnavailable(error))
|
||||
|
||||
debug "correct subscription", peerId = peerId
|
||||
|
@ -108,20 +108,22 @@ proc unsubscribe(
|
|||
|
||||
ok()
|
||||
|
||||
proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
||||
proc unsubscribeAll(
|
||||
wf: WakuFilter, peerId: PeerID
|
||||
): Future[FilterSubscribeResult] {.async.} =
|
||||
if not wf.subscriptions.isSubscribed(peerId):
|
||||
debug "unsubscribing peer has no subscriptions", peerId = peerId
|
||||
return err(FilterSubscribeError.notFound())
|
||||
|
||||
debug "removing peer subscription", peerId = peerId
|
||||
wf.subscriptions.removePeer(peerId)
|
||||
await wf.subscriptions.removePeer(peerId)
|
||||
wf.subscriptions.cleanUp()
|
||||
|
||||
ok()
|
||||
|
||||
proc handleSubscribeRequest*(
|
||||
wf: WakuFilter, peerId: PeerId, request: FilterSubscribeRequest
|
||||
): FilterSubscribeResponse =
|
||||
): Future[FilterSubscribeResponse] {.async.} =
|
||||
info "received filter subscribe request", peerId = peerId, request = request
|
||||
waku_filter_requests.inc(labelValues = [$request.filterSubscribeType])
|
||||
|
||||
|
@ -135,12 +137,13 @@ proc handleSubscribeRequest*(
|
|||
of FilterSubscribeType.SUBSCRIBER_PING:
|
||||
subscribeResult = wf.pingSubscriber(peerId)
|
||||
of FilterSubscribeType.SUBSCRIBE:
|
||||
subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
|
||||
subscribeResult =
|
||||
await wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
|
||||
of FilterSubscribeType.UNSUBSCRIBE:
|
||||
subscribeResult =
|
||||
wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics)
|
||||
of FilterSubscribeType.UNSUBSCRIBE_ALL:
|
||||
subscribeResult = wf.unsubscribeAll(peerId)
|
||||
subscribeResult = await wf.unsubscribeAll(peerId)
|
||||
|
||||
let
|
||||
requestDuration = Moment.now() - requestStartTime
|
||||
|
@ -168,20 +171,11 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
|
|||
error "no addresses for peer", peerId = shortLog(peer)
|
||||
return
|
||||
|
||||
## TODO: Check if dial is necessary always???
|
||||
let conn = await wf.peerManager.dialPeer(peer, WakuFilterPushCodec)
|
||||
if conn.isNone():
|
||||
## We do not remove this peer, but allow the underlying peer manager
|
||||
## to do so if it is deemed necessary
|
||||
error "no connection to peer", peerId = shortLog(peer)
|
||||
let conn = wf.subscriptions.getConnectionByPeerId(peer).valueOr:
|
||||
error "could not get connection by peer id", error = $error
|
||||
return
|
||||
|
||||
defer:
|
||||
## TODO we need to perform a better resource management and avoid
|
||||
## creating a new stream and closing it for every single msg
|
||||
await conn.get().close()
|
||||
|
||||
await conn.get().writeLp(buffer)
|
||||
await conn.writeLp(buffer)
|
||||
debug "published successful", peerId = shortLog(peer)
|
||||
waku_service_network_bytes.inc(
|
||||
amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"]
|
||||
|
@ -218,7 +212,7 @@ proc pushToPeers(
|
|||
pushFuts.add(pushFut)
|
||||
await allFutures(pushFuts)
|
||||
|
||||
proc maintainSubscriptions*(wf: WakuFilter) =
|
||||
proc maintainSubscriptions*(wf: WakuFilter) {.async.} =
|
||||
debug "maintaining subscriptions"
|
||||
|
||||
## Remove subscriptions for peers that have been removed from peer store
|
||||
|
@ -230,7 +224,7 @@ proc maintainSubscriptions*(wf: WakuFilter) =
|
|||
peersToRemove.add(peerId)
|
||||
|
||||
if peersToRemove.len > 0:
|
||||
wf.subscriptions.removePeers(peersToRemove)
|
||||
await wf.subscriptions.removePeers(peersToRemove)
|
||||
wf.peerRequestRateLimiter.unregister(peersToRemove)
|
||||
|
||||
wf.subscriptions.cleanUp()
|
||||
|
@ -307,7 +301,7 @@ proc initProtocolHandler(wf: WakuFilter) =
|
|||
|
||||
let request = decodeRes.value #TODO: toAPI() split here
|
||||
|
||||
response = wf.handleSubscribeRequest(conn.peerId, request)
|
||||
response = await wf.handleSubscribeRequest(conn.peerId, request)
|
||||
|
||||
debug "sending filter subscribe response",
|
||||
peer_id = shortLog(conn.peerId), response = response
|
||||
|
@ -352,7 +346,7 @@ proc periodicSubscriptionsMaintenance(wf: WakuFilter) {.async.} =
|
|||
const MaintainSubscriptionsInterval = 1.minutes
|
||||
debug "starting to maintain subscriptions"
|
||||
while true:
|
||||
wf.maintainSubscriptions()
|
||||
await wf.maintainSubscriptions()
|
||||
await sleepAsync(MaintainSubscriptionsInterval)
|
||||
|
||||
proc start*(wf: WakuFilter) {.async.} =
|
||||
|
|
|
@ -1,13 +1,18 @@
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[sets, tables, sequtils],
|
||||
std/[options, sets, tables, sequtils],
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/peerid,
|
||||
libp2p/stream/connection,
|
||||
stew/shims/sets
|
||||
import ../waku_core, ../utils/tableutils, ../common/rate_limit/setting
|
||||
import
|
||||
../waku_core,
|
||||
../utils/tableutils,
|
||||
../common/rate_limit/setting,
|
||||
../node/peer_manager,
|
||||
./common
|
||||
|
||||
logScope:
|
||||
topics = "waku filter subscriptions"
|
||||
|
@ -26,9 +31,9 @@ type
|
|||
|
||||
SubscribedPeers* = HashSet[PeerID] # a sequence of peer ids
|
||||
|
||||
PeerData* = tuple[lastSeen: Moment, criteriaCount: uint]
|
||||
PeerData* = tuple[lastSeen: Moment, criteriaCount: uint, connection: Connection]
|
||||
|
||||
FilterSubscriptions* = object
|
||||
FilterSubscriptions* = ref object
|
||||
peersSubscribed*: Table[PeerID, PeerData]
|
||||
subscriptions: Table[FilterCriterion, SubscribedPeers]
|
||||
subscriptionTimeout: Duration
|
||||
|
@ -50,7 +55,7 @@ proc init*(
|
|||
maxCriteriaPerPeer: maxFilterCriteriaPerPeer,
|
||||
)
|
||||
|
||||
proc isSubscribed*(s: var FilterSubscriptions, peerId: PeerID): bool =
|
||||
proc isSubscribed*(s: FilterSubscriptions, peerId: PeerID): bool =
|
||||
s.peersSubscribed.withValue(peerId, data):
|
||||
return Moment.now() - data.lastSeen <= s.subscriptionTimeout
|
||||
|
||||
|
@ -60,7 +65,7 @@ proc subscribedPeerCount*(s: FilterSubscriptions): uint =
|
|||
return cast[uint](s.peersSubscribed.len)
|
||||
|
||||
proc getPeerSubscriptions*(
|
||||
s: var FilterSubscriptions, peerId: PeerID
|
||||
s: FilterSubscriptions, peerId: PeerID
|
||||
): seq[FilterCriterion] =
|
||||
## Get all pubsub-content topics a peer is subscribed to
|
||||
var subscribedContentTopics: seq[FilterCriterion] = @[]
|
||||
|
@ -75,7 +80,7 @@ proc getPeerSubscriptions*(
|
|||
return subscribedContentTopics
|
||||
|
||||
proc findSubscribedPeers*(
|
||||
s: var FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic
|
||||
s: FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic
|
||||
): seq[PeerID] =
|
||||
let filterCriterion: FilterCriterion = (pubsubTopic, contentTopic)
|
||||
|
||||
|
@ -88,29 +93,34 @@ proc findSubscribedPeers*(
|
|||
|
||||
return foundPeers
|
||||
|
||||
proc removePeer*(s: var FilterSubscriptions, peerId: PeerID) =
|
||||
proc removePeer*(s: FilterSubscriptions, peerId: PeerID) {.async.} =
|
||||
## Remove all subscriptions for a given peer
|
||||
debug "removePeer",
|
||||
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerId = peerId
|
||||
|
||||
## Remove all subscriptions for a given peer
|
||||
s.peersSubscribed.withValue(peerId, peerData):
|
||||
debug "closing connection with peer", peerId = shortLog(peerId)
|
||||
await peerData.connection.close()
|
||||
|
||||
s.peersSubscribed.del(peerId)
|
||||
|
||||
debug "removePeer after deletion",
|
||||
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerId = peerId
|
||||
|
||||
proc removePeers*(s: var FilterSubscriptions, peerIds: seq[PeerID]) =
|
||||
proc removePeers*(s: FilterSubscriptions, peerIds: seq[PeerID]) {.async.} =
|
||||
## Remove all subscriptions for a given list of peers
|
||||
debug "removePeers",
|
||||
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)),
|
||||
peerIds = peerIds.mapIt(shortLog(it))
|
||||
|
||||
## Remove all subscriptions for a given list of peers
|
||||
s.peersSubscribed.keepItIf(key notin peerIds)
|
||||
for peer in peerIds:
|
||||
await s.removePeer(peer)
|
||||
|
||||
debug "removePeers after deletion",
|
||||
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)),
|
||||
peerIds = peerIds.mapIt(shortLog(it))
|
||||
|
||||
proc cleanUp*(fs: var FilterSubscriptions) =
|
||||
proc cleanUp*(fs: FilterSubscriptions) =
|
||||
debug "cleanUp", currentPeerIds = toSeq(fs.peersSubscribed.keys).mapIt(shortLog(it))
|
||||
|
||||
## Remove all subscriptions for peers that have not been seen for a while
|
||||
|
@ -131,9 +141,15 @@ proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) =
|
|||
data.lastSeen = Moment.now()
|
||||
|
||||
proc addSubscription*(
|
||||
s: var FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria
|
||||
): Result[void, string] =
|
||||
s: FilterSubscriptions,
|
||||
peerId: PeerID,
|
||||
filterCriteria: FilterCriteria,
|
||||
peerManager: PeerManager,
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
## Add a subscription for a given peer
|
||||
##
|
||||
## The peerManager is needed to establish the first Connection through
|
||||
## /vac/waku/filter-push/2.0.0-beta1
|
||||
var peerData: ptr PeerData
|
||||
|
||||
s.peersSubscribed.withValue(peerId, data):
|
||||
|
@ -147,8 +163,14 @@ proc addSubscription*(
|
|||
if cast[uint](s.peersSubscribed.len) >= s.maxPeers:
|
||||
return err("node has reached maximum number of subscriptions: " & $(s.maxPeers))
|
||||
|
||||
let newPeerData: PeerData = (lastSeen: Moment.now(), criteriaCount: 0)
|
||||
peerData = addr(s.peersSubscribed.mgetOrPut(peerId, newPeerData))
|
||||
let connRes = await peerManager.dialPeer(peerId, WakuFilterPushCodec)
|
||||
if connRes.isNone():
|
||||
## We do not remove this peer, but allow the underlying peer manager
|
||||
## to do so if it is deemed necessary
|
||||
return err("addSubscription no connection to peer: " & shortLog(peerId))
|
||||
|
||||
let newPeerData: PeerData =
|
||||
(lastSeen: Moment.now(), criteriaCount: 0, connection: connRes.get())
|
||||
|
||||
for filterCriterion in filterCriteria:
|
||||
var peersOfSub = addr(s.subscriptions.mgetOrPut(filterCriterion, SubscribedPeers()))
|
||||
|
@ -159,7 +181,7 @@ proc addSubscription*(
|
|||
return ok()
|
||||
|
||||
proc removeSubscription*(
|
||||
s: var FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria
|
||||
s: FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria
|
||||
): Result[void, string] =
|
||||
## Remove a subscription for a given peer
|
||||
|
||||
|
@ -181,3 +203,12 @@ proc removeSubscription*(
|
|||
return ok()
|
||||
do:
|
||||
return err("Peer has no subscriptions")
|
||||
|
||||
proc getConnectionByPeerId*(
|
||||
s: FilterSubscriptions, peerId: PeerID
|
||||
): Result[Connection, string] =
|
||||
if not s.peersSubscribed.hasKey(peerId):
|
||||
return err("peer not subscribed: " & shortLog(peerId))
|
||||
|
||||
let peerData = s.peersSubscribed.getOrDefault(peerId)
|
||||
return ok(peerData.connection)
|
||||
|
|
Loading…
Reference in New Issue