mirror of https://github.com/waku-org/nwaku.git
filter: enhancements in subscription management
* waku_filter_v2: idiomatic way run periodic subscription manager * filter subscriptions: add more debug logs * filter: make sure the custom start and stop procs are called * make sure filter protocol is started if it is mounted * filter: dial push connection onsubscribe only
This commit is contained in:
parent
1b532e8ab9
commit
3af3b2bb90
|
@ -429,7 +429,6 @@ proc mountFilter*(
|
||||||
some(rateLimitSetting),
|
some(rateLimitSetting),
|
||||||
)
|
)
|
||||||
|
|
||||||
if node.started:
|
|
||||||
try:
|
try:
|
||||||
await node.wakuFilter.start()
|
await node.wakuFilter.start()
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
|
|
|
@ -2,7 +2,13 @@
|
||||||
|
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import std/options, chronicles, chronos, libp2p/protocols/protocol, bearssl/rand
|
import
|
||||||
|
std/options,
|
||||||
|
chronicles,
|
||||||
|
chronos,
|
||||||
|
libp2p/protocols/protocol,
|
||||||
|
bearssl/rand,
|
||||||
|
stew/byteutils
|
||||||
import
|
import
|
||||||
../node/peer_manager,
|
../node/peer_manager,
|
||||||
../node/delivery_monitor/subscriptions_observer,
|
../node/delivery_monitor/subscriptions_observer,
|
||||||
|
@ -101,6 +107,7 @@ proc sendSubscribeRequest(
|
||||||
proc ping*(
|
proc ping*(
|
||||||
wfc: WakuFilterClient, servicePeer: RemotePeerInfo
|
wfc: WakuFilterClient, servicePeer: RemotePeerInfo
|
||||||
): Future[FilterSubscribeResult] {.async.} =
|
): Future[FilterSubscribeResult] {.async.} =
|
||||||
|
debug "sending ping", servicePeer = shortLog($servicePeer)
|
||||||
let requestId = generateRequestId(wfc.rng)
|
let requestId = generateRequestId(wfc.rng)
|
||||||
let filterSubscribeRequest = FilterSubscribeRequest.ping(requestId)
|
let filterSubscribeRequest = FilterSubscribeRequest.ping(requestId)
|
||||||
|
|
||||||
|
@ -170,17 +177,23 @@ proc initProtocolHandler(wfc: WakuFilterClient) =
|
||||||
proc handler(conn: Connection, proto: string) {.async.} =
|
proc handler(conn: Connection, proto: string) {.async.} =
|
||||||
let buf = await conn.readLp(int(DefaultMaxPushSize))
|
let buf = await conn.readLp(int(DefaultMaxPushSize))
|
||||||
|
|
||||||
let decodeRes = MessagePush.decode(buf)
|
let msgPush = MessagePush.decode(buf).valueOr:
|
||||||
if decodeRes.isErr():
|
error "Failed to decode message push", peerId = conn.peerId, error = $error
|
||||||
error "Failed to decode message push", peerId = conn.peerId
|
|
||||||
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
return
|
return
|
||||||
|
|
||||||
let messagePush = decodeRes.value #TODO: toAPI() split here
|
let msg_hash =
|
||||||
trace "Received message push", peerId = conn.peerId, messagePush
|
computeMessageHash(msgPush.pubsubTopic, msgPush.wakuMessage).to0xHex()
|
||||||
|
|
||||||
|
debug "Received message push",
|
||||||
|
peerId = conn.peerId,
|
||||||
|
msg_hash,
|
||||||
|
payload = shortLog(msgPush.wakuMessage.payload),
|
||||||
|
pubsubTopic = msgPush.pubsubTopic,
|
||||||
|
content_topic = msgPush.wakuMessage.contentTopic
|
||||||
|
|
||||||
for handler in wfc.pushHandlers:
|
for handler in wfc.pushHandlers:
|
||||||
asyncSpawn handler(messagePush.pubsubTopic, messagePush.wakuMessage)
|
asyncSpawn handler(msgPush.pubsubTopic, msgPush.wakuMessage)
|
||||||
|
|
||||||
# Protocol specifies no response for now
|
# Protocol specifies no response for now
|
||||||
return
|
return
|
||||||
|
|
|
@ -25,15 +25,15 @@ type WakuFilter* = ref object of LPProtocol
|
||||||
subscriptions*: FilterSubscriptions
|
subscriptions*: FilterSubscriptions
|
||||||
# a mapping of peer ids to a sequence of filter criteria
|
# a mapping of peer ids to a sequence of filter criteria
|
||||||
peerManager: PeerManager
|
peerManager: PeerManager
|
||||||
maintenanceTask: TimerCallback
|
|
||||||
messageCache: TimedCache[string]
|
messageCache: TimedCache[string]
|
||||||
peerRequestRateLimiter*: PerPeerRateLimiter
|
peerRequestRateLimiter*: PerPeerRateLimiter
|
||||||
|
subscriptionsManagerFut: Future[void]
|
||||||
|
|
||||||
proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
||||||
trace "pinging subscriber", peerId = peerId
|
debug "pinging subscriber", peerId = peerId
|
||||||
|
|
||||||
if not wf.subscriptions.isSubscribed(peerId):
|
if not wf.subscriptions.isSubscribed(peerId):
|
||||||
debug "pinging peer has no subscriptions", peerId = peerId
|
error "pinging peer has no subscriptions", peerId = peerId
|
||||||
return err(FilterSubscribeError.notFound())
|
return err(FilterSubscribeError.notFound())
|
||||||
|
|
||||||
wf.subscriptions.refreshSubscription(peerId)
|
wf.subscriptions.refreshSubscription(peerId)
|
||||||
|
@ -45,14 +45,16 @@ proc subscribe(
|
||||||
peerId: PeerID,
|
peerId: PeerID,
|
||||||
pubsubTopic: Option[PubsubTopic],
|
pubsubTopic: Option[PubsubTopic],
|
||||||
contentTopics: seq[ContentTopic],
|
contentTopics: seq[ContentTopic],
|
||||||
): FilterSubscribeResult =
|
): Future[FilterSubscribeResult] {.async.} =
|
||||||
# TODO: check if this condition is valid???
|
# TODO: check if this condition is valid???
|
||||||
if pubsubTopic.isNone() or contentTopics.len == 0:
|
if pubsubTopic.isNone() or contentTopics.len == 0:
|
||||||
|
error "pubsubTopic and contentTopics must be specified", peerId = peerId
|
||||||
return err(
|
return err(
|
||||||
FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")
|
FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")
|
||||||
)
|
)
|
||||||
|
|
||||||
if contentTopics.len > MaxContentTopicsPerRequest:
|
if contentTopics.len > MaxContentTopicsPerRequest:
|
||||||
|
error "exceeds maximum content topics", peerId = peerId
|
||||||
return err(
|
return err(
|
||||||
FilterSubscribeError.badRequest(
|
FilterSubscribeError.badRequest(
|
||||||
"exceeds maximum content topics: " & $MaxContentTopicsPerRequest
|
"exceeds maximum content topics: " & $MaxContentTopicsPerRequest
|
||||||
|
@ -61,12 +63,14 @@ proc subscribe(
|
||||||
|
|
||||||
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
||||||
|
|
||||||
trace "subscribing peer to filter criteria",
|
debug "subscribing peer to filter criteria",
|
||||||
peerId = peerId, filterCriteria = filterCriteria
|
peerId = peerId, filterCriteria = filterCriteria
|
||||||
|
|
||||||
wf.subscriptions.addSubscription(peerId, filterCriteria).isOkOr:
|
(await wf.subscriptions.addSubscription(peerId, filterCriteria, wf.peerManager)).isOkOr:
|
||||||
return err(FilterSubscribeError.serviceUnavailable(error))
|
return err(FilterSubscribeError.serviceUnavailable(error))
|
||||||
|
|
||||||
|
debug "correct subscription", peerId = peerId
|
||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc unsubscribe(
|
proc unsubscribe(
|
||||||
|
@ -76,11 +80,13 @@ proc unsubscribe(
|
||||||
contentTopics: seq[ContentTopic],
|
contentTopics: seq[ContentTopic],
|
||||||
): FilterSubscribeResult =
|
): FilterSubscribeResult =
|
||||||
if pubsubTopic.isNone() or contentTopics.len == 0:
|
if pubsubTopic.isNone() or contentTopics.len == 0:
|
||||||
|
error "pubsubTopic and contentTopics must be specified", peerId = peerId
|
||||||
return err(
|
return err(
|
||||||
FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")
|
FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")
|
||||||
)
|
)
|
||||||
|
|
||||||
if contentTopics.len > MaxContentTopicsPerRequest:
|
if contentTopics.len > MaxContentTopicsPerRequest:
|
||||||
|
error "exceeds maximum content topics", peerId = peerId
|
||||||
return err(
|
return err(
|
||||||
FilterSubscribeError.badRequest(
|
FilterSubscribeError.badRequest(
|
||||||
"exceeds maximum content topics: " & $MaxContentTopicsPerRequest
|
"exceeds maximum content topics: " & $MaxContentTopicsPerRequest
|
||||||
|
@ -93,27 +99,31 @@ proc unsubscribe(
|
||||||
peerId = peerId, filterCriteria = filterCriteria
|
peerId = peerId, filterCriteria = filterCriteria
|
||||||
|
|
||||||
wf.subscriptions.removeSubscription(peerId, filterCriteria).isOkOr:
|
wf.subscriptions.removeSubscription(peerId, filterCriteria).isOkOr:
|
||||||
|
error "failed to remove subscription", error = $error
|
||||||
return err(FilterSubscribeError.notFound())
|
return err(FilterSubscribeError.notFound())
|
||||||
|
|
||||||
## Note: do not remove from peerRequestRateLimiter to prevent trick with subscribe/unsubscribe loop
|
## Note: do not remove from peerRequestRateLimiter to prevent trick with subscribe/unsubscribe loop
|
||||||
## We remove only if peerManager removes the peer
|
## We remove only if peerManager removes the peer
|
||||||
|
debug "correct unsubscription", peerId = peerId
|
||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
proc unsubscribeAll(
|
||||||
|
wf: WakuFilter, peerId: PeerID
|
||||||
|
): Future[FilterSubscribeResult] {.async.} =
|
||||||
if not wf.subscriptions.isSubscribed(peerId):
|
if not wf.subscriptions.isSubscribed(peerId):
|
||||||
debug "unsubscribing peer has no subscriptions", peerId = peerId
|
debug "unsubscribing peer has no subscriptions", peerId = peerId
|
||||||
return err(FilterSubscribeError.notFound())
|
return err(FilterSubscribeError.notFound())
|
||||||
|
|
||||||
debug "removing peer subscription", peerId = peerId
|
debug "removing peer subscription", peerId = peerId
|
||||||
wf.subscriptions.removePeer(peerId)
|
await wf.subscriptions.removePeer(peerId)
|
||||||
wf.subscriptions.cleanUp()
|
wf.subscriptions.cleanUp()
|
||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc handleSubscribeRequest*(
|
proc handleSubscribeRequest*(
|
||||||
wf: WakuFilter, peerId: PeerId, request: FilterSubscribeRequest
|
wf: WakuFilter, peerId: PeerId, request: FilterSubscribeRequest
|
||||||
): FilterSubscribeResponse =
|
): Future[FilterSubscribeResponse] {.async.} =
|
||||||
info "received filter subscribe request", peerId = peerId, request = request
|
info "received filter subscribe request", peerId = peerId, request = request
|
||||||
waku_filter_requests.inc(labelValues = [$request.filterSubscribeType])
|
waku_filter_requests.inc(labelValues = [$request.filterSubscribeType])
|
||||||
|
|
||||||
|
@ -127,12 +137,13 @@ proc handleSubscribeRequest*(
|
||||||
of FilterSubscribeType.SUBSCRIBER_PING:
|
of FilterSubscribeType.SUBSCRIBER_PING:
|
||||||
subscribeResult = wf.pingSubscriber(peerId)
|
subscribeResult = wf.pingSubscriber(peerId)
|
||||||
of FilterSubscribeType.SUBSCRIBE:
|
of FilterSubscribeType.SUBSCRIBE:
|
||||||
subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
|
subscribeResult =
|
||||||
|
await wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
|
||||||
of FilterSubscribeType.UNSUBSCRIBE:
|
of FilterSubscribeType.UNSUBSCRIBE:
|
||||||
subscribeResult =
|
subscribeResult =
|
||||||
wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics)
|
wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics)
|
||||||
of FilterSubscribeType.UNSUBSCRIBE_ALL:
|
of FilterSubscribeType.UNSUBSCRIBE_ALL:
|
||||||
subscribeResult = wf.unsubscribeAll(peerId)
|
subscribeResult = await wf.unsubscribeAll(peerId)
|
||||||
|
|
||||||
let
|
let
|
||||||
requestDuration = Moment.now() - requestStartTime
|
requestDuration = Moment.now() - requestStartTime
|
||||||
|
@ -143,6 +154,7 @@ proc handleSubscribeRequest*(
|
||||||
)
|
)
|
||||||
|
|
||||||
if subscribeResult.isErr():
|
if subscribeResult.isErr():
|
||||||
|
error "subscription request error", peerId = shortLog(peerId), request = request
|
||||||
return FilterSubscribeResponse(
|
return FilterSubscribeResponse(
|
||||||
requestId: request.requestId,
|
requestId: request.requestId,
|
||||||
statusCode: subscribeResult.error.kind.uint32,
|
statusCode: subscribeResult.error.kind.uint32,
|
||||||
|
@ -152,22 +164,19 @@ proc handleSubscribeRequest*(
|
||||||
return FilterSubscribeResponse.ok(request.requestId)
|
return FilterSubscribeResponse.ok(request.requestId)
|
||||||
|
|
||||||
proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
|
proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
|
||||||
trace "pushing message to subscribed peer", peer_id = shortLog(peer)
|
debug "pushing message to subscribed peer", peerId = shortLog(peer)
|
||||||
|
|
||||||
if not wf.peerManager.wakuPeerStore.hasPeer(peer, WakuFilterPushCodec):
|
if not wf.peerManager.wakuPeerStore.hasPeer(peer, WakuFilterPushCodec):
|
||||||
# Check that peer has not been removed from peer store
|
# Check that peer has not been removed from peer store
|
||||||
error "no addresses for peer", peer_id = shortLog(peer)
|
error "no addresses for peer", peerId = shortLog(peer)
|
||||||
return
|
return
|
||||||
|
|
||||||
## TODO: Check if dial is necessary always???
|
let conn = wf.subscriptions.getConnectionByPeerId(peer).valueOr:
|
||||||
let conn = await wf.peerManager.dialPeer(peer, WakuFilterPushCodec)
|
error "could not get connection by peer id", error = $error
|
||||||
if conn.isNone():
|
|
||||||
## We do not remove this peer, but allow the underlying peer manager
|
|
||||||
## to do so if it is deemed necessary
|
|
||||||
error "no connection to peer", peer_id = shortLog(peer)
|
|
||||||
return
|
return
|
||||||
|
|
||||||
await conn.get().writeLp(buffer)
|
await conn.writeLp(buffer)
|
||||||
|
debug "published successful", peerId = shortLog(peer)
|
||||||
waku_service_network_bytes.inc(
|
waku_service_network_bytes.inc(
|
||||||
amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"]
|
amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"]
|
||||||
)
|
)
|
||||||
|
@ -181,15 +190,17 @@ proc pushToPeers(
|
||||||
|
|
||||||
## it's also refresh expire of msghash, that's why update cache every time, even if it has a value.
|
## it's also refresh expire of msghash, that's why update cache every time, even if it has a value.
|
||||||
if wf.messageCache.put(msgHash, Moment.now()):
|
if wf.messageCache.put(msgHash, Moment.now()):
|
||||||
notice "duplicate message found, not-pushing message to subscribed peers",
|
error "duplicate message found, not-pushing message to subscribed peers",
|
||||||
pubsubTopic = messagePush.pubsubTopic,
|
pubsubTopic = messagePush.pubsubTopic,
|
||||||
contentTopic = messagePush.wakuMessage.contentTopic,
|
contentTopic = messagePush.wakuMessage.contentTopic,
|
||||||
|
payload = shortLog(messagePush.wakuMessage.payload),
|
||||||
target_peer_ids = targetPeerIds,
|
target_peer_ids = targetPeerIds,
|
||||||
msg_hash = msgHash
|
msg_hash = msgHash
|
||||||
else:
|
else:
|
||||||
notice "pushing message to subscribed peers",
|
notice "pushing message to subscribed peers",
|
||||||
pubsubTopic = messagePush.pubsubTopic,
|
pubsubTopic = messagePush.pubsubTopic,
|
||||||
contentTopic = messagePush.wakuMessage.contentTopic,
|
contentTopic = messagePush.wakuMessage.contentTopic,
|
||||||
|
payload = shortLog(messagePush.wakuMessage.payload),
|
||||||
target_peer_ids = targetPeerIds,
|
target_peer_ids = targetPeerIds,
|
||||||
msg_hash = msgHash
|
msg_hash = msgHash
|
||||||
|
|
||||||
|
@ -201,19 +212,19 @@ proc pushToPeers(
|
||||||
pushFuts.add(pushFut)
|
pushFuts.add(pushFut)
|
||||||
await allFutures(pushFuts)
|
await allFutures(pushFuts)
|
||||||
|
|
||||||
proc maintainSubscriptions*(wf: WakuFilter) =
|
proc maintainSubscriptions*(wf: WakuFilter) {.async.} =
|
||||||
trace "maintaining subscriptions"
|
debug "maintaining subscriptions"
|
||||||
|
|
||||||
## Remove subscriptions for peers that have been removed from peer store
|
## Remove subscriptions for peers that have been removed from peer store
|
||||||
var peersToRemove: seq[PeerId]
|
var peersToRemove: seq[PeerId]
|
||||||
for peerId in wf.subscriptions.peersSubscribed.keys:
|
for peerId in wf.subscriptions.peersSubscribed.keys:
|
||||||
if not wf.peerManager.wakuPeerStore.hasPeer(peerId, WakuFilterPushCodec):
|
if not wf.peerManager.wakuPeerStore.hasPeer(peerId, WakuFilterPushCodec):
|
||||||
debug "peer has been removed from peer store, removing subscription",
|
debug "peer has been removed from peer store, we will remove subscription",
|
||||||
peerId = peerId
|
peerId = peerId
|
||||||
peersToRemove.add(peerId)
|
peersToRemove.add(peerId)
|
||||||
|
|
||||||
if peersToRemove.len > 0:
|
if peersToRemove.len > 0:
|
||||||
wf.subscriptions.removePeers(peersToRemove)
|
await wf.subscriptions.removePeers(peersToRemove)
|
||||||
wf.peerRequestRateLimiter.unregister(peersToRemove)
|
wf.peerRequestRateLimiter.unregister(peersToRemove)
|
||||||
|
|
||||||
wf.subscriptions.cleanUp()
|
wf.subscriptions.cleanUp()
|
||||||
|
@ -227,7 +238,7 @@ proc handleMessage*(
|
||||||
) {.async.} =
|
) {.async.} =
|
||||||
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||||
|
|
||||||
trace "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash
|
debug "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash
|
||||||
|
|
||||||
let handleMessageStartTime = Moment.now()
|
let handleMessageStartTime = Moment.now()
|
||||||
|
|
||||||
|
@ -236,7 +247,7 @@ proc handleMessage*(
|
||||||
let subscribedPeers =
|
let subscribedPeers =
|
||||||
wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
|
wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
|
||||||
if subscribedPeers.len == 0:
|
if subscribedPeers.len == 0:
|
||||||
trace "no subscribed peers found",
|
error "no subscribed peers found",
|
||||||
pubsubTopic = pubsubTopic,
|
pubsubTopic = pubsubTopic,
|
||||||
contentTopic = message.contentTopic,
|
contentTopic = message.contentTopic,
|
||||||
msg_hash = msgHash
|
msg_hash = msgHash
|
||||||
|
@ -270,7 +281,7 @@ proc handleMessage*(
|
||||||
|
|
||||||
proc initProtocolHandler(wf: WakuFilter) =
|
proc initProtocolHandler(wf: WakuFilter) =
|
||||||
proc handler(conn: Connection, proto: string) {.async.} =
|
proc handler(conn: Connection, proto: string) {.async.} =
|
||||||
trace "filter subscribe request handler triggered", peer_id = shortLog(conn.peerId)
|
debug "filter subscribe request handler triggered", peerId = shortLog(conn.peerId)
|
||||||
|
|
||||||
var response: FilterSubscribeResponse
|
var response: FilterSubscribeResponse
|
||||||
|
|
||||||
|
@ -290,13 +301,13 @@ proc initProtocolHandler(wf: WakuFilter) =
|
||||||
|
|
||||||
let request = decodeRes.value #TODO: toAPI() split here
|
let request = decodeRes.value #TODO: toAPI() split here
|
||||||
|
|
||||||
response = wf.handleSubscribeRequest(conn.peerId, request)
|
response = await wf.handleSubscribeRequest(conn.peerId, request)
|
||||||
|
|
||||||
debug "sending filter subscribe response",
|
debug "sending filter subscribe response",
|
||||||
peer_id = shortLog(conn.peerId), response = response
|
peer_id = shortLog(conn.peerId), response = response
|
||||||
do:
|
do:
|
||||||
debug "filter request rejected due rate limit exceeded",
|
debug "filter request rejected due rate limit exceeded",
|
||||||
peerId = conn.peerId, limit = $wf.peerRequestRateLimiter.setting
|
peerId = shortLog(conn.peerId), limit = $wf.peerRequestRateLimiter.setting
|
||||||
response = FilterSubscribeResponse(
|
response = FilterSubscribeResponse(
|
||||||
requestId: "N/A",
|
requestId: "N/A",
|
||||||
statusCode: FilterSubscribeErrorKind.TOO_MANY_REQUESTS.uint32,
|
statusCode: FilterSubscribeErrorKind.TOO_MANY_REQUESTS.uint32,
|
||||||
|
@ -319,7 +330,7 @@ proc new*(
|
||||||
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
|
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
|
||||||
): T =
|
): T =
|
||||||
let wf = WakuFilter(
|
let wf = WakuFilter(
|
||||||
subscriptions: FilterSubscriptions.init(
|
subscriptions: FilterSubscriptions.new(
|
||||||
subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer
|
subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer
|
||||||
),
|
),
|
||||||
peerManager: peerManager,
|
peerManager: peerManager,
|
||||||
|
@ -331,28 +342,19 @@ proc new*(
|
||||||
setServiceLimitMetric(WakuFilterSubscribeCodec, rateLimitSetting)
|
setServiceLimitMetric(WakuFilterSubscribeCodec, rateLimitSetting)
|
||||||
return wf
|
return wf
|
||||||
|
|
||||||
const MaintainSubscriptionsInterval* = 1.minutes
|
proc periodicSubscriptionsMaintenance(wf: WakuFilter) {.async.} =
|
||||||
|
const MaintainSubscriptionsInterval = 1.minutes
|
||||||
|
debug "starting to maintain subscriptions"
|
||||||
|
while true:
|
||||||
|
await wf.maintainSubscriptions()
|
||||||
|
await sleepAsync(MaintainSubscriptionsInterval)
|
||||||
|
|
||||||
proc startMaintainingSubscriptions(wf: WakuFilter, interval: Duration) =
|
proc start*(wf: WakuFilter) {.async.} =
|
||||||
trace "starting to maintain subscriptions"
|
|
||||||
var maintainSubs: CallbackFunc
|
|
||||||
maintainSubs = CallbackFunc(
|
|
||||||
proc(udata: pointer) {.gcsafe.} =
|
|
||||||
maintainSubscriptions(wf)
|
|
||||||
wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs)
|
|
||||||
)
|
|
||||||
|
|
||||||
wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs)
|
|
||||||
|
|
||||||
method start*(wf: WakuFilter) {.async, base.} =
|
|
||||||
debug "starting filter protocol"
|
debug "starting filter protocol"
|
||||||
wf.startMaintainingSubscriptions(MaintainSubscriptionsInterval)
|
|
||||||
|
|
||||||
await procCall LPProtocol(wf).start()
|
await procCall LPProtocol(wf).start()
|
||||||
|
wf.subscriptionsManagerFut = wf.periodicSubscriptionsMaintenance()
|
||||||
|
|
||||||
method stop*(wf: WakuFilter) {.async, base.} =
|
proc stop*(wf: WakuFilter) {.async.} =
|
||||||
debug "stopping filter protocol"
|
debug "stopping filter protocol"
|
||||||
if not wf.maintenanceTask.isNil():
|
await wf.subscriptionsManagerFut.cancelAndWait()
|
||||||
wf.maintenanceTask.clearTimer()
|
|
||||||
|
|
||||||
await procCall LPProtocol(wf).stop()
|
await procCall LPProtocol(wf).stop()
|
||||||
|
|
|
@ -1,7 +1,18 @@
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import std/[sets, tables], chronicles, chronos, libp2p/peerid, stew/shims/sets
|
import
|
||||||
import ../waku_core, ../utils/tableutils
|
std/[options, sets, tables, sequtils],
|
||||||
|
chronicles,
|
||||||
|
chronos,
|
||||||
|
libp2p/peerid,
|
||||||
|
libp2p/stream/connection,
|
||||||
|
stew/shims/sets
|
||||||
|
import
|
||||||
|
../waku_core,
|
||||||
|
../utils/tableutils,
|
||||||
|
../common/rate_limit/setting,
|
||||||
|
../node/peer_manager,
|
||||||
|
./common
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "waku filter subscriptions"
|
topics = "waku filter subscriptions"
|
||||||
|
@ -20,16 +31,16 @@ type
|
||||||
|
|
||||||
SubscribedPeers* = HashSet[PeerID] # a sequence of peer ids
|
SubscribedPeers* = HashSet[PeerID] # a sequence of peer ids
|
||||||
|
|
||||||
PeerData* = tuple[lastSeen: Moment, criteriaCount: uint]
|
PeerData* = tuple[lastSeen: Moment, criteriaCount: uint, connection: Connection]
|
||||||
|
|
||||||
FilterSubscriptions* = object
|
FilterSubscriptions* = ref object
|
||||||
peersSubscribed*: Table[PeerID, PeerData]
|
peersSubscribed*: Table[PeerID, PeerData]
|
||||||
subscriptions: Table[FilterCriterion, SubscribedPeers]
|
subscriptions: Table[FilterCriterion, SubscribedPeers]
|
||||||
subscriptionTimeout: Duration
|
subscriptionTimeout: Duration
|
||||||
maxPeers: uint
|
maxPeers: uint
|
||||||
maxCriteriaPerPeer: uint
|
maxCriteriaPerPeer: uint
|
||||||
|
|
||||||
proc init*(
|
proc new*(
|
||||||
T: type FilterSubscriptions,
|
T: type FilterSubscriptions,
|
||||||
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
|
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
|
||||||
maxFilterPeers: uint32 = MaxFilterPeers,
|
maxFilterPeers: uint32 = MaxFilterPeers,
|
||||||
|
@ -44,7 +55,7 @@ proc init*(
|
||||||
maxCriteriaPerPeer: maxFilterCriteriaPerPeer,
|
maxCriteriaPerPeer: maxFilterCriteriaPerPeer,
|
||||||
)
|
)
|
||||||
|
|
||||||
proc isSubscribed*(s: var FilterSubscriptions, peerId: PeerID): bool =
|
proc isSubscribed*(s: FilterSubscriptions, peerId: PeerID): bool =
|
||||||
s.peersSubscribed.withValue(peerId, data):
|
s.peersSubscribed.withValue(peerId, data):
|
||||||
return Moment.now() - data.lastSeen <= s.subscriptionTimeout
|
return Moment.now() - data.lastSeen <= s.subscriptionTimeout
|
||||||
|
|
||||||
|
@ -54,7 +65,7 @@ proc subscribedPeerCount*(s: FilterSubscriptions): uint =
|
||||||
return cast[uint](s.peersSubscribed.len)
|
return cast[uint](s.peersSubscribed.len)
|
||||||
|
|
||||||
proc getPeerSubscriptions*(
|
proc getPeerSubscriptions*(
|
||||||
s: var FilterSubscriptions, peerId: PeerID
|
s: FilterSubscriptions, peerId: PeerID
|
||||||
): seq[FilterCriterion] =
|
): seq[FilterCriterion] =
|
||||||
## Get all pubsub-content topics a peer is subscribed to
|
## Get all pubsub-content topics a peer is subscribed to
|
||||||
var subscribedContentTopics: seq[FilterCriterion] = @[]
|
var subscribedContentTopics: seq[FilterCriterion] = @[]
|
||||||
|
@ -69,7 +80,7 @@ proc getPeerSubscriptions*(
|
||||||
return subscribedContentTopics
|
return subscribedContentTopics
|
||||||
|
|
||||||
proc findSubscribedPeers*(
|
proc findSubscribedPeers*(
|
||||||
s: var FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic
|
s: FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic
|
||||||
): seq[PeerID] =
|
): seq[PeerID] =
|
||||||
let filterCriterion: FilterCriterion = (pubsubTopic, contentTopic)
|
let filterCriterion: FilterCriterion = (pubsubTopic, contentTopic)
|
||||||
|
|
||||||
|
@ -82,15 +93,36 @@ proc findSubscribedPeers*(
|
||||||
|
|
||||||
return foundPeers
|
return foundPeers
|
||||||
|
|
||||||
proc removePeer*(s: var FilterSubscriptions, peerId: PeerID) =
|
proc removePeer*(s: FilterSubscriptions, peerId: PeerID) {.async.} =
|
||||||
## Remove all subscriptions for a given peer
|
## Remove all subscriptions for a given peer
|
||||||
|
debug "removePeer",
|
||||||
|
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerId = peerId
|
||||||
|
|
||||||
|
s.peersSubscribed.withValue(peerId, peerData):
|
||||||
|
debug "closing connection with peer", peerId = shortLog(peerId)
|
||||||
|
await peerData.connection.close()
|
||||||
|
|
||||||
s.peersSubscribed.del(peerId)
|
s.peersSubscribed.del(peerId)
|
||||||
|
|
||||||
proc removePeers*(s: var FilterSubscriptions, peerIds: seq[PeerID]) =
|
debug "removePeer after deletion",
|
||||||
## Remove all subscriptions for a given list of peers
|
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerId = peerId
|
||||||
s.peersSubscribed.keepItIf(key notin peerIds)
|
|
||||||
|
proc removePeers*(s: FilterSubscriptions, peerIds: seq[PeerID]) {.async.} =
|
||||||
|
## Remove all subscriptions for a given list of peers
|
||||||
|
debug "removePeers",
|
||||||
|
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)),
|
||||||
|
peerIds = peerIds.mapIt(shortLog(it))
|
||||||
|
|
||||||
|
for peer in peerIds:
|
||||||
|
await s.removePeer(peer)
|
||||||
|
|
||||||
|
debug "removePeers after deletion",
|
||||||
|
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)),
|
||||||
|
peerIds = peerIds.mapIt(shortLog(it))
|
||||||
|
|
||||||
|
proc cleanUp*(fs: FilterSubscriptions) =
|
||||||
|
debug "cleanUp", currentPeerIds = toSeq(fs.peersSubscribed.keys).mapIt(shortLog(it))
|
||||||
|
|
||||||
proc cleanUp*(fs: var FilterSubscriptions) =
|
|
||||||
## Remove all subscriptions for peers that have not been seen for a while
|
## Remove all subscriptions for peers that have not been seen for a while
|
||||||
let now = Moment.now()
|
let now = Moment.now()
|
||||||
fs.peersSubscribed.keepItIf(now - val.lastSeen <= fs.subscriptionTimeout)
|
fs.peersSubscribed.keepItIf(now - val.lastSeen <= fs.subscriptionTimeout)
|
||||||
|
@ -101,14 +133,23 @@ proc cleanUp*(fs: var FilterSubscriptions) =
|
||||||
|
|
||||||
fs.subscriptions.keepItIf(val.len > 0)
|
fs.subscriptions.keepItIf(val.len > 0)
|
||||||
|
|
||||||
|
debug "after cleanUp",
|
||||||
|
currentPeerIds = toSeq(fs.peersSubscribed.keys).mapIt(shortLog(it))
|
||||||
|
|
||||||
proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) =
|
proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) =
|
||||||
s.peersSubscribed.withValue(peerId, data):
|
s.peersSubscribed.withValue(peerId, data):
|
||||||
data.lastSeen = Moment.now()
|
data.lastSeen = Moment.now()
|
||||||
|
|
||||||
proc addSubscription*(
|
proc addSubscription*(
|
||||||
s: var FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria
|
s: FilterSubscriptions,
|
||||||
): Result[void, string] =
|
peerId: PeerID,
|
||||||
|
filterCriteria: FilterCriteria,
|
||||||
|
peerManager: PeerManager,
|
||||||
|
): Future[Result[void, string]] {.async.} =
|
||||||
## Add a subscription for a given peer
|
## Add a subscription for a given peer
|
||||||
|
##
|
||||||
|
## The peerManager is needed to establish the first Connection through
|
||||||
|
## /vac/waku/filter-push/2.0.0-beta1
|
||||||
var peerData: ptr PeerData
|
var peerData: ptr PeerData
|
||||||
|
|
||||||
s.peersSubscribed.withValue(peerId, data):
|
s.peersSubscribed.withValue(peerId, data):
|
||||||
|
@ -120,9 +161,17 @@ proc addSubscription*(
|
||||||
do:
|
do:
|
||||||
## not yet subscribed
|
## not yet subscribed
|
||||||
if cast[uint](s.peersSubscribed.len) >= s.maxPeers:
|
if cast[uint](s.peersSubscribed.len) >= s.maxPeers:
|
||||||
return err("node has reached maximum number of subscriptions")
|
return err("node has reached maximum number of subscriptions: " & $(s.maxPeers))
|
||||||
|
|
||||||
|
let connRes = await peerManager.dialPeer(peerId, WakuFilterPushCodec)
|
||||||
|
if connRes.isNone():
|
||||||
|
## We do not remove this peer, but allow the underlying peer manager
|
||||||
|
## to do so if it is deemed necessary
|
||||||
|
return err("addSubscription no connection to peer: " & shortLog(peerId))
|
||||||
|
|
||||||
|
let newPeerData: PeerData =
|
||||||
|
(lastSeen: Moment.now(), criteriaCount: 0, connection: connRes.get())
|
||||||
|
|
||||||
let newPeerData: PeerData = (lastSeen: Moment.now(), criteriaCount: 0)
|
|
||||||
peerData = addr(s.peersSubscribed.mgetOrPut(peerId, newPeerData))
|
peerData = addr(s.peersSubscribed.mgetOrPut(peerId, newPeerData))
|
||||||
|
|
||||||
for filterCriterion in filterCriteria:
|
for filterCriterion in filterCriteria:
|
||||||
|
@ -134,7 +183,7 @@ proc addSubscription*(
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc removeSubscription*(
|
proc removeSubscription*(
|
||||||
s: var FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria
|
s: FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria
|
||||||
): Result[void, string] =
|
): Result[void, string] =
|
||||||
## Remove a subscription for a given peer
|
## Remove a subscription for a given peer
|
||||||
|
|
||||||
|
@ -156,3 +205,12 @@ proc removeSubscription*(
|
||||||
return ok()
|
return ok()
|
||||||
do:
|
do:
|
||||||
return err("Peer has no subscriptions")
|
return err("Peer has no subscriptions")
|
||||||
|
|
||||||
|
proc getConnectionByPeerId*(
|
||||||
|
s: FilterSubscriptions, peerId: PeerID
|
||||||
|
): Result[Connection, string] =
|
||||||
|
if not s.peersSubscribed.hasKey(peerId):
|
||||||
|
return err("peer not subscribed: " & shortLog(peerId))
|
||||||
|
|
||||||
|
let peerData = s.peersSubscribed.getOrDefault(peerId)
|
||||||
|
return ok(peerData.connection)
|
||||||
|
|
|
@ -71,6 +71,7 @@ proc request*(
|
||||||
await conn.writeLP(rpc.encode().buffer)
|
await conn.writeLP(rpc.encode().buffer)
|
||||||
buffer = await conn.readLp(DefaultMaxRpcSize.int)
|
buffer = await conn.readLp(DefaultMaxRpcSize.int)
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
|
error "exception when handling peer exchange request", error = getCurrentExceptionMsg()
|
||||||
waku_px_errors.inc(labelValues = [exc.msg])
|
waku_px_errors.inc(labelValues = [exc.msg])
|
||||||
callResult = (
|
callResult = (
|
||||||
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
|
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
|
||||||
|
@ -81,10 +82,12 @@ proc request*(
|
||||||
await conn.closeWithEof()
|
await conn.closeWithEof()
|
||||||
|
|
||||||
if callResult.status_code != PeerExchangeResponseStatusCode.SUCCESS:
|
if callResult.status_code != PeerExchangeResponseStatusCode.SUCCESS:
|
||||||
|
error "peer exchange request failed", status_code = callResult.status_code
|
||||||
return err(callResult)
|
return err(callResult)
|
||||||
|
|
||||||
let decodedBuff = PeerExchangeRpc.decode(buffer)
|
let decodedBuff = PeerExchangeRpc.decode(buffer)
|
||||||
if decodedBuff.isErr():
|
if decodedBuff.isErr():
|
||||||
|
error "peer exchange request error decoding buffer", error = $decodedBuff.error
|
||||||
return err(
|
return err(
|
||||||
(
|
(
|
||||||
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
|
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
|
||||||
|
@ -92,6 +95,7 @@ proc request*(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
if decodedBuff.get().response.status_code != PeerExchangeResponseStatusCode.SUCCESS:
|
if decodedBuff.get().response.status_code != PeerExchangeResponseStatusCode.SUCCESS:
|
||||||
|
error "peer exchange request error", status_code = decodedBuff.get().response.status_code
|
||||||
return err(
|
return err(
|
||||||
(
|
(
|
||||||
status_code: decodedBuff.get().response.status_code,
|
status_code: decodedBuff.get().response.status_code,
|
||||||
|
@ -107,6 +111,7 @@ proc request*(
|
||||||
try:
|
try:
|
||||||
let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec)
|
let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec)
|
||||||
if connOpt.isNone():
|
if connOpt.isNone():
|
||||||
|
error "error in request connOpt is none"
|
||||||
return err(
|
return err(
|
||||||
(
|
(
|
||||||
status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE,
|
status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE,
|
||||||
|
@ -115,6 +120,7 @@ proc request*(
|
||||||
)
|
)
|
||||||
return await wpx.request(numPeers, connOpt.get())
|
return await wpx.request(numPeers, connOpt.get())
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
|
error "peer exchange request exception", error = getCurrentExceptionMsg()
|
||||||
return err(
|
return err(
|
||||||
(
|
(
|
||||||
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
|
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
|
||||||
|
@ -128,6 +134,7 @@ proc request*(
|
||||||
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
|
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
|
||||||
if peerOpt.isNone():
|
if peerOpt.isNone():
|
||||||
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
|
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
|
||||||
|
error "peer exchange error peerOpt is none"
|
||||||
return err(
|
return err(
|
||||||
(
|
(
|
||||||
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
|
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
|
||||||
|
@ -144,6 +151,7 @@ proc respond(
|
||||||
try:
|
try:
|
||||||
await conn.writeLP(rpc.encode().buffer)
|
await conn.writeLP(rpc.encode().buffer)
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
|
error "exception when trying to send a respond", error = getCurrentExceptionMsg()
|
||||||
waku_px_errors.inc(labelValues = [exc.msg])
|
waku_px_errors.inc(labelValues = [exc.msg])
|
||||||
return err(
|
return err(
|
||||||
(
|
(
|
||||||
|
@ -165,6 +173,7 @@ proc respondError(
|
||||||
try:
|
try:
|
||||||
await conn.writeLP(rpc.encode().buffer)
|
await conn.writeLP(rpc.encode().buffer)
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
|
error "exception when trying to send a respond", error = getCurrentExceptionMsg()
|
||||||
waku_px_errors.inc(labelValues = [exc.msg])
|
waku_px_errors.inc(labelValues = [exc.msg])
|
||||||
return err(
|
return err(
|
||||||
(
|
(
|
||||||
|
@ -192,15 +201,15 @@ proc getEnrsFromCache(
|
||||||
|
|
||||||
proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
|
proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
|
||||||
if peer.origin != Discv5:
|
if peer.origin != Discv5:
|
||||||
trace "peer not from discv5", peer = $peer, origin = $peer.origin
|
debug "peer not from discv5", peer = $peer, origin = $peer.origin
|
||||||
return false
|
return false
|
||||||
|
|
||||||
if peer.enr.isNone():
|
if peer.enr.isNone():
|
||||||
trace "peer has no ENR", peer = $peer
|
debug "peer has no ENR", peer = $peer
|
||||||
return false
|
return false
|
||||||
|
|
||||||
if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()):
|
if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()):
|
||||||
trace "peer has mismatching cluster", peer = $peer
|
debug "peer has mismatching cluster", peer = $peer
|
||||||
return false
|
return false
|
||||||
|
|
||||||
return true
|
return true
|
||||||
|
@ -218,6 +227,7 @@ proc populateEnrCache(wpx: WakuPeerExchange) =
|
||||||
|
|
||||||
# swap cache for new
|
# swap cache for new
|
||||||
wpx.enrCache = newEnrCache
|
wpx.enrCache = newEnrCache
|
||||||
|
debug "ENR cache populated"
|
||||||
|
|
||||||
proc updatePxEnrCache(wpx: WakuPeerExchange) {.async.} =
|
proc updatePxEnrCache(wpx: WakuPeerExchange) {.async.} =
|
||||||
# try more aggressively to fill the cache at startup
|
# try more aggressively to fill the cache at startup
|
||||||
|
@ -237,6 +247,7 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
|
||||||
try:
|
try:
|
||||||
buffer = await conn.readLp(DefaultMaxRpcSize.int)
|
buffer = await conn.readLp(DefaultMaxRpcSize.int)
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
|
error "exception when handling px request", error = getCurrentExceptionMsg()
|
||||||
waku_px_errors.inc(labelValues = [exc.msg])
|
waku_px_errors.inc(labelValues = [exc.msg])
|
||||||
|
|
||||||
(
|
(
|
||||||
|
@ -260,8 +271,8 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
|
||||||
error "Failed to respond with BAD_REQUEST:", error = $error
|
error "Failed to respond with BAD_REQUEST:", error = $error
|
||||||
return
|
return
|
||||||
|
|
||||||
trace "peer exchange request received"
|
|
||||||
let enrs = wpx.getEnrsFromCache(decBuf.get().request.numPeers)
|
let enrs = wpx.getEnrsFromCache(decBuf.get().request.numPeers)
|
||||||
|
debug "peer exchange request received", enrs = $enrs
|
||||||
(await wpx.respond(enrs, conn)).isErrOr:
|
(await wpx.respond(enrs, conn)).isErrOr:
|
||||||
waku_px_peers_sent.inc(enrs.len().int64())
|
waku_px_peers_sent.inc(enrs.len().int64())
|
||||||
do:
|
do:
|
||||||
|
|
Loading…
Reference in New Issue