feat: further filter improvements (#1617)

This commit is contained in:
Hanno Cornelius 2023-03-22 10:32:53 +02:00 committed by GitHub
parent ac56e1dcdd
commit d920b973b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 53 additions and 20 deletions

View File

@ -255,9 +255,9 @@ suite "Waku Filter - subscription maintenance":
switch.peerStore[ProtoBook][peerId1] = @[WakuFilterPushCodec]
switch.peerStore[ProtoBook][peerId2] = @[WakuFilterPushCodec]
switch.peerStore[ProtoBook][peerId3] = @[WakuFilterPushCodec]
require wakuFilter.handleSubscribeRequest(peerId1, filterSubscribeRequest).isOk()
require wakuFilter.handleSubscribeRequest(peerId2, filterSubscribeRequest).isOk()
require wakuFilter.handleSubscribeRequest(peerId3, filterSubscribeRequest).isOk()
require wakuFilter.handleSubscribeRequest(peerId1, filterSubscribeRequest).statusCode == 200
require wakuFilter.handleSubscribeRequest(peerId2, filterSubscribeRequest).statusCode == 200
require wakuFilter.handleSubscribeRequest(peerId3, filterSubscribeRequest).statusCode == 200
# Then
check:

View File

@ -23,6 +23,9 @@ import
logScope:
topics = "waku filter"
const
MaxContentTopicsPerRequest = 30
type
WakuFilter* = ref object of LPProtocol
subscriptions*: FilterSubscriptions # a mapping of peer ids to a sequence of filter criteria
@ -41,6 +44,9 @@ proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic],
if pubsubTopic.isNone() or contentTopics.len() == 0:
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))
if contentTopics.len() > MaxContentTopicsPerRequest:
return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest))
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
trace "subscribing peer to filter criteria", peerId=peerId, filterCriteria=filterCriteria
@ -64,6 +70,9 @@ proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic
if pubsubTopic.isNone() or contentTopics.len() == 0:
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))
if contentTopics.len() > MaxContentTopicsPerRequest:
return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest))
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
trace "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria
@ -100,15 +109,24 @@ proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubs
var subscribeResult: FilterSubscribeResult
case request.filterSubscribeType
of FilterSubscribeType.SUBSCRIBER_PING:
subscribeResult = wf.pingSubscriber(peerId)
of FilterSubscribeType.SUBSCRIBE:
subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE:
subscribeResult = wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE_ALL:
subscribeResult = wf.unsubscribeAll(peerId)
let requestStartTime = Moment.now()
block:
## Handle subscribe request
case request.filterSubscribeType
of FilterSubscribeType.SUBSCRIBER_PING:
subscribeResult = wf.pingSubscriber(peerId)
of FilterSubscribeType.SUBSCRIBE:
subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE:
subscribeResult = wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE_ALL:
subscribeResult = wf.unsubscribeAll(peerId)
let
requestDuration = Moment.now() - requestStartTime
requestDurationSec = requestDuration.milliseconds.float / 1000 # Duration in seconds with millisecond precision floating point
waku_filter_request_duration_seconds.observe(requestDurationSec, labelValues = [$request.filterSubscribeType])
if subscribeResult.isErr():
return FilterSubscribeResponse(
@ -161,19 +179,31 @@ proc maintainSubscriptions*(wf: WakuFilter) =
wf.subscriptions.removePeers(peersToRemove)
const MessagePushTimeout = 20.seconds
proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} =
trace "handling message", pubsubTopic=pubsubTopic, message=message
let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
if subscribedPeers.len() == 0:
trace "no subscribed peers found", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
return
let handleMessageStartTime = Moment.now()
let messagePush = MessagePush(
pubsubTopic: pubsubTopic,
wakuMessage: message)
block:
## Find subscribers and push message to them
let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
if subscribedPeers.len() == 0:
trace "no subscribed peers found", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
return
await wf.pushToPeers(subscribedPeers, messagePush)
let messagePush = MessagePush(
pubsubTopic: pubsubTopic,
wakuMessage: message)
if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout(MessagePushTimeout):
debug "timed out pushing message to peers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
waku_filter_errors.inc(labelValues = [pushTimeoutFailure])
let
handleMessageDuration = Moment.now() - handleMessageStartTime
handleMessageDurationSec = handleMessageDuration.milliseconds.float / 1000 # Duration in seconds with millisecond precision floating point
waku_filter_handle_message_duration_seconds.observe(handleMessageDurationSec)
proc initProtocolHandler(wf: WakuFilter) =

View File

@ -9,6 +9,8 @@ export metrics
declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
declarePublicGauge waku_filter_requests, "number of filter subscribe requests received", ["type"]
declarePublicHistogram waku_filter_request_duration_seconds, "duration of filter subscribe requests", ["type"]
declarePublicHistogram waku_filter_handle_message_duration_seconds, "duration to push message to filter subscribers"
# Error types (metric label values)
const
@ -16,3 +18,4 @@ const
decodeRpcFailure* = "decode_rpc_failure"
requestIdMismatch* = "request_id_mismatch"
errorResponse* = "error_response"
pushTimeoutFailure* = "push_timeout_failure"