mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-14 08:57:14 +00:00
feat: new filter protocol increment - subscribe request handling (#1584)
* feat: new filter protocol service node * feat: add test and fix compilation errors * feat: more tests and minor fixes * chore: update waku/v2/protocol/waku_filter_v2/protocol.nim Co-authored-by: Lorenzo Delgado <lorenzo@status.im> * chore: add some convenience functions --------- Co-authored-by: Lorenzo Delgado <lorenzo@status.im>
This commit is contained in:
parent
a57bea299b
commit
ef7ce679df
239
tests/v2/waku_filter_v2/test_waku_filter_v2.nim
Normal file
239
tests/v2/waku_filter_v2/test_waku_filter_v2.nim
Normal file
@ -0,0 +1,239 @@
|
|||||||
|
{.used.}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[options,sets,strutils,tables],
|
||||||
|
testutils/unittests,
|
||||||
|
chronos,
|
||||||
|
chronicles
|
||||||
|
import
|
||||||
|
../../../waku/v2/node/peer_manager,
|
||||||
|
../../../waku/v2/protocol/waku_filter_v2,
|
||||||
|
../../../waku/v2/protocol/waku_filter_v2/rpc,
|
||||||
|
../../../waku/v2/protocol/waku_message,
|
||||||
|
../testlib/common,
|
||||||
|
../testlib/waku2
|
||||||
|
|
||||||
|
proc newTestWakuFilter(switch: Switch): Future[WakuFilter] {.async.} =
|
||||||
|
let
|
||||||
|
peerManager = PeerManager.new(switch)
|
||||||
|
proto = WakuFilter.new(peerManager)
|
||||||
|
|
||||||
|
await proto.start()
|
||||||
|
switch.mount(proto)
|
||||||
|
|
||||||
|
return proto
|
||||||
|
|
||||||
|
proc generateRequestId(rng: ref HmacDrbgContext): string =
|
||||||
|
var bytes: array[10, byte]
|
||||||
|
hmacDrbgGenerate(rng[], bytes)
|
||||||
|
return toHex(bytes)
|
||||||
|
|
||||||
|
proc createRequest(filterSubscribeType: FilterSubscribeType, pubsubTopic = none(PubsubTopic), contentTopics = newSeq[ContentTopic]()): FilterSubscribeRequest =
|
||||||
|
let requestId = generateRequestId(rng)
|
||||||
|
|
||||||
|
return FilterSubscribeRequest(
|
||||||
|
requestId: requestId,
|
||||||
|
filterSubscribeType: filterSubscribeType,
|
||||||
|
pubsubTopic: pubsubTopic,
|
||||||
|
contentTopics: contentTopics
|
||||||
|
)
|
||||||
|
|
||||||
|
proc getSubscribedContentTopics(wakuFilter: WakuFilter, peerId: PeerId): seq[ContentTopic] =
|
||||||
|
var contentTopics: seq[ContentTopic]
|
||||||
|
for filterCriterion in wakuFilter.subscriptions[peerId]:
|
||||||
|
contentTopics.add(filterCriterion[1])
|
||||||
|
|
||||||
|
return contentTopics
|
||||||
|
|
||||||
|
suite "Waku Filter - handling subscribe requests":
|
||||||
|
|
||||||
|
asyncTest "simple subscribe and unsubscribe request":
|
||||||
|
# Given
|
||||||
|
let
|
||||||
|
switch = newStandardSwitch()
|
||||||
|
wakuFilter = await newTestWakuFilter(switch)
|
||||||
|
peerId = PeerId.random().get()
|
||||||
|
filterSubscribeRequest = createRequest(
|
||||||
|
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||||
|
pubsubTopic = some(DefaultPubsubTopic),
|
||||||
|
contentTopics = @[DefaultContentTopic]
|
||||||
|
)
|
||||||
|
filterUnsubscribeRequest = createRequest(
|
||||||
|
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
|
||||||
|
pubsubTopic = filterSubscribeRequest.pubsubTopic,
|
||||||
|
contentTopics = filterSubscribeRequest.contentTopics
|
||||||
|
)
|
||||||
|
|
||||||
|
# When
|
||||||
|
let response = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
wakuFilter.subscriptions.len == 1
|
||||||
|
wakuFilter.subscriptions[peerId].len == 1
|
||||||
|
response.requestId == filterSubscribeRequest.requestId
|
||||||
|
response.statusCode == 200
|
||||||
|
response.statusDesc.get() == "OK"
|
||||||
|
|
||||||
|
# When
|
||||||
|
let response2 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeRequest)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
wakuFilter.subscriptions.len == 0 # peerId is removed from subscriptions
|
||||||
|
response2.requestId == filterUnsubscribeRequest.requestId
|
||||||
|
response2.statusCode == 200
|
||||||
|
response2.statusDesc.get() == "OK"
|
||||||
|
|
||||||
|
asyncTest "simple subscribe and unsubscribe all for multiple content topics":
|
||||||
|
# Given
|
||||||
|
let
|
||||||
|
switch = newStandardSwitch()
|
||||||
|
wakuFilter = await newTestWakuFilter(switch)
|
||||||
|
peerId = PeerId.random().get()
|
||||||
|
nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto")
|
||||||
|
filterSubscribeRequest = createRequest(
|
||||||
|
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||||
|
pubsubTopic = some(DefaultPubsubTopic),
|
||||||
|
contentTopics = @[DefaultContentTopic, nonDefaultContentTopic]
|
||||||
|
)
|
||||||
|
filterUnsubscribeAllRequest = createRequest(
|
||||||
|
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE_ALL
|
||||||
|
)
|
||||||
|
|
||||||
|
# When
|
||||||
|
let response = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
wakuFilter.subscriptions.len == 1
|
||||||
|
wakuFilter.subscriptions[peerId].len == 2
|
||||||
|
wakuFilter.getSubscribedContentTopics(peerId) == filterSubscribeRequest.contentTopics
|
||||||
|
response.requestId == filterSubscribeRequest.requestId
|
||||||
|
response.statusCode == 200
|
||||||
|
response.statusDesc.get() == "OK"
|
||||||
|
|
||||||
|
# When
|
||||||
|
let response2 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeAllRequest)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
wakuFilter.subscriptions.len == 0 # peerId is removed from subscriptions
|
||||||
|
response2.requestId == filterUnsubscribeAllRequest.requestId
|
||||||
|
response2.statusCode == 200
|
||||||
|
response2.statusDesc.get() == "OK"
|
||||||
|
|
||||||
|
asyncTest "subscribe and unsubscribe to multiple content topics":
|
||||||
|
# Given
|
||||||
|
let
|
||||||
|
switch = newStandardSwitch()
|
||||||
|
wakuFilter = await newTestWakuFilter(switch)
|
||||||
|
peerId = PeerId.random().get()
|
||||||
|
nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto")
|
||||||
|
filterSubscribeRequest1 = createRequest(
|
||||||
|
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||||
|
pubsubTopic = some(DefaultPubsubTopic),
|
||||||
|
contentTopics = @[DefaultContentTopic]
|
||||||
|
)
|
||||||
|
filterSubscribeRequest2 = createRequest(
|
||||||
|
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||||
|
pubsubTopic = filterSubscribeRequest1.pubsubTopic,
|
||||||
|
contentTopics = @[nonDefaultContentTopic]
|
||||||
|
)
|
||||||
|
filterUnsubscribeRequest1 = createRequest(
|
||||||
|
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
|
||||||
|
pubsubTopic = filterSubscribeRequest1.pubsubTopic,
|
||||||
|
contentTopics = filterSubscribeRequest1.contentTopics
|
||||||
|
)
|
||||||
|
filterUnsubscribeRequest2 = createRequest(
|
||||||
|
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
|
||||||
|
pubsubTopic = filterSubscribeRequest2.pubsubTopic,
|
||||||
|
contentTopics = filterSubscribeRequest2.contentTopics
|
||||||
|
)
|
||||||
|
|
||||||
|
# When
|
||||||
|
let response1 = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest1)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
wakuFilter.subscriptions.len == 1
|
||||||
|
wakuFilter.subscriptions[peerId].len == 1
|
||||||
|
wakuFilter.getSubscribedContentTopics(peerId) == filterSubscribeRequest1.contentTopics
|
||||||
|
response1.requestId == filterSubscribeRequest1.requestId
|
||||||
|
response1.statusCode == 200
|
||||||
|
response1.statusDesc.get() == "OK"
|
||||||
|
|
||||||
|
# When
|
||||||
|
let response2 = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest2)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
wakuFilter.subscriptions.len == 1
|
||||||
|
wakuFilter.subscriptions[peerId].len == 2
|
||||||
|
wakuFilter.getSubscribedContentTopics(peerId) ==
|
||||||
|
filterSubscribeRequest1.contentTopics &
|
||||||
|
filterSubscribeRequest2.contentTopics
|
||||||
|
response2.requestId == filterSubscribeRequest2.requestId
|
||||||
|
response2.statusCode == 200
|
||||||
|
response2.statusDesc.get() == "OK"
|
||||||
|
|
||||||
|
# When
|
||||||
|
let response3 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeRequest1)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
wakuFilter.subscriptions.len == 1
|
||||||
|
wakuFilter.subscriptions[peerId].len == 1
|
||||||
|
wakuFilter.getSubscribedContentTopics(peerId) == filterSubscribeRequest2.contentTopics
|
||||||
|
response3.requestId == filterUnsubscribeRequest1.requestId
|
||||||
|
response3.statusCode == 200
|
||||||
|
response3.statusDesc.get() == "OK"
|
||||||
|
|
||||||
|
# When
|
||||||
|
let response4 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeRequest2)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
wakuFilter.subscriptions.len == 0 # peerId is removed from subscriptions
|
||||||
|
response4.requestId == filterUnsubscribeRequest2.requestId
|
||||||
|
response4.statusCode == 200
|
||||||
|
response4.statusDesc.get() == "OK"
|
||||||
|
|
||||||
|
asyncTest "ping subscriber":
|
||||||
|
# Given
|
||||||
|
let
|
||||||
|
switch = newStandardSwitch()
|
||||||
|
wakuFilter = await newTestWakuFilter(switch)
|
||||||
|
peerId = PeerId.random().get()
|
||||||
|
pingRequest = createRequest(
|
||||||
|
filterSubscribeType = FilterSubscribeType.SUBSCRIBER_PING
|
||||||
|
)
|
||||||
|
filterSubscribeRequest = createRequest(
|
||||||
|
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||||
|
pubsubTopic = some(DefaultPubsubTopic),
|
||||||
|
contentTopics = @[DefaultContentTopic]
|
||||||
|
)
|
||||||
|
|
||||||
|
# When
|
||||||
|
let response1 = wakuFilter.handleSubscribeRequest(peerId, pingRequest)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
response1.requestId == pingRequest.requestId
|
||||||
|
response1.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32
|
||||||
|
response1.statusDesc.get().contains("peer has no subscriptions")
|
||||||
|
|
||||||
|
# When
|
||||||
|
let
|
||||||
|
response2 = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest)
|
||||||
|
response3 = wakuFilter.handleSubscribeRequest(peerId, pingRequest)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
response2.requestId == filterSubscribeRequest.requestId
|
||||||
|
response2.statusCode == 200
|
||||||
|
response2.statusDesc.get() == "OK"
|
||||||
|
response3.requestId == pingRequest.requestId
|
||||||
|
response3.statusCode == 200
|
||||||
|
response3.statusDesc.get() == "OK"
|
||||||
|
|
7
waku/v2/protocol/waku_filter_v2.nim
Normal file
7
waku/v2/protocol/waku_filter_v2.nim
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
import
|
||||||
|
./waku_filter_v2/common,
|
||||||
|
./waku_filter_v2/protocol
|
||||||
|
|
||||||
|
export
|
||||||
|
common,
|
||||||
|
protocol
|
52
waku/v2/protocol/waku_filter_v2/common.nim
Normal file
52
waku/v2/protocol/waku_filter_v2/common.nim
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
stew/results
|
||||||
|
|
||||||
|
const
|
||||||
|
WakuFilterSubscribeCodec* = "/vac/waku/filter-subscribe/2.0.0-beta1"
|
||||||
|
WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1"
|
||||||
|
|
||||||
|
type
|
||||||
|
FilterSubscribeErrorKind* {.pure.} = enum
|
||||||
|
UNKNOWN = uint32(000)
|
||||||
|
BAD_REQUEST = uint32(400)
|
||||||
|
NOT_FOUND = uint32(404)
|
||||||
|
SERVICE_UNAVAILABLE = uint32(503)
|
||||||
|
|
||||||
|
FilterSubscribeError* = object
|
||||||
|
kind*: FilterSubscribeErrorKind
|
||||||
|
cause*: string
|
||||||
|
|
||||||
|
FilterSubscribeResult* = Result[void, FilterSubscribeError]
|
||||||
|
|
||||||
|
# Convenience functions
|
||||||
|
|
||||||
|
proc badRequest*(T: type FilterSubscribeError, cause = "bad request"): FilterSubscribeError =
|
||||||
|
FilterSubscribeError(
|
||||||
|
kind: FilterSubscribeErrorKind.BAD_REQUEST,
|
||||||
|
cause: cause)
|
||||||
|
|
||||||
|
proc notFound*(T: type FilterSubscribeError, cause = "peer has no subscriptions"): FilterSubscribeError =
|
||||||
|
FilterSubscribeError(
|
||||||
|
kind: FilterSubscribeErrorKind.NOT_FOUND,
|
||||||
|
cause: cause)
|
||||||
|
|
||||||
|
proc serviceUnavailable*(T: type FilterSubscribeError, cause = "service unavailable"): FilterSubscribeError =
|
||||||
|
FilterSubscribeError(
|
||||||
|
kind: FilterSubscribeErrorKind.SERVICE_UNAVAILABLE,
|
||||||
|
cause: cause)
|
||||||
|
|
||||||
|
proc `$`*(err: FilterSubscribeError): string =
|
||||||
|
case err.kind:
|
||||||
|
of FilterSubscribeErrorKind.BAD_REQUEST:
|
||||||
|
"BAD_REQUEST: " & err.cause
|
||||||
|
of FilterSubscribeErrorKind.NOT_FOUND:
|
||||||
|
"NOT_FOUND: " & err.cause
|
||||||
|
of FilterSubscribeErrorKind.SERVICE_UNAVAILABLE:
|
||||||
|
"SERVICE_UNAVAILABLE: " & err.cause
|
||||||
|
of FilterSubscribeErrorKind.UNKNOWN:
|
||||||
|
"UNKNOWN"
|
159
waku/v2/protocol/waku_filter_v2/protocol.nim
Normal file
159
waku/v2/protocol/waku_filter_v2/protocol.nim
Normal file
@ -0,0 +1,159 @@
|
|||||||
|
## Waku Filter protocol for subscribing and filtering messages
|
||||||
|
|
||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[options,sequtils,sets,tables],
|
||||||
|
chronicles,
|
||||||
|
chronos,
|
||||||
|
libp2p/peerid,
|
||||||
|
libp2p/protocols/protocol
|
||||||
|
import
|
||||||
|
../../node/peer_manager,
|
||||||
|
../waku_message,
|
||||||
|
./common,
|
||||||
|
./protocol_metrics,
|
||||||
|
./rpc_codec,
|
||||||
|
./rpc
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "waku filter"
|
||||||
|
|
||||||
|
const
|
||||||
|
MaxSubscriptions* = 1000 # TODO make configurable
|
||||||
|
MaxCriteriaPerSubscription = 1000
|
||||||
|
|
||||||
|
type
|
||||||
|
FilterCriterion* = (PubsubTopic, ContentTopic) # a single filter criterion is fully defined by a pubsub topic and content topic
|
||||||
|
FilterCriteria* = HashSet[FilterCriterion] # a sequence of filter criteria
|
||||||
|
|
||||||
|
WakuFilter* = ref object of LPProtocol
|
||||||
|
subscriptions*: Table[PeerID, FilterCriteria] # a mapping of peer ids to a sequence of filter criteria
|
||||||
|
peerManager: PeerManager
|
||||||
|
|
||||||
|
proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
||||||
|
trace "pinging subscriber", peerId=peerId
|
||||||
|
|
||||||
|
if peerId notin wf.subscriptions:
|
||||||
|
debug "pinging peer has no subscriptions", peerId=peerId
|
||||||
|
return err(FilterSubscribeError.notFound())
|
||||||
|
|
||||||
|
ok()
|
||||||
|
|
||||||
|
proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic]): FilterSubscribeResult =
|
||||||
|
if pubsubTopic.isNone() or contentTopics.len() == 0:
|
||||||
|
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))
|
||||||
|
|
||||||
|
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
||||||
|
|
||||||
|
trace "subscribing peer to filter criteria", peerId=peerId, filterCriteria=filterCriteria
|
||||||
|
|
||||||
|
if peerId in wf.subscriptions:
|
||||||
|
var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]())
|
||||||
|
if peerSubscription.len() + filterCriteria.len() >= MaxCriteriaPerSubscription:
|
||||||
|
return err(FilterSubscribeError.serviceUnavailable("peer has reached maximum number of filter criteria"))
|
||||||
|
|
||||||
|
peerSubscription.incl(filterCriteria)
|
||||||
|
wf.subscriptions[peerId] = peerSubscription
|
||||||
|
else:
|
||||||
|
if wf.subscriptions.len() >= MaxSubscriptions:
|
||||||
|
return err(FilterSubscribeError.serviceUnavailable("node has reached maximum number of subscriptions"))
|
||||||
|
debug "creating new subscription", peerId=peerId
|
||||||
|
wf.subscriptions[peerId] = filterCriteria
|
||||||
|
|
||||||
|
ok()
|
||||||
|
|
||||||
|
proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic]): FilterSubscribeResult =
|
||||||
|
if pubsubTopic.isNone() or contentTopics.len() == 0:
|
||||||
|
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))
|
||||||
|
|
||||||
|
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
||||||
|
|
||||||
|
trace "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria
|
||||||
|
|
||||||
|
if peerId notin wf.subscriptions:
|
||||||
|
debug "unsubscibing peer has no subscriptions", peerId=peerId
|
||||||
|
return err(FilterSubscribeError.notFound())
|
||||||
|
|
||||||
|
var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]())
|
||||||
|
# TODO: consider error response if filter criteria does not exist
|
||||||
|
peerSubscription.excl(filterCriteria)
|
||||||
|
|
||||||
|
if peerSubscription.len() == 0:
|
||||||
|
debug "peer has no more subscriptions, removing subscription", peerId=peerId
|
||||||
|
wf.subscriptions.del(peerId)
|
||||||
|
else:
|
||||||
|
wf.subscriptions[peerId] = peerSubscription
|
||||||
|
|
||||||
|
ok()
|
||||||
|
|
||||||
|
proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
||||||
|
if peerId notin wf.subscriptions:
|
||||||
|
debug "unsubscibing peer has no subscriptions", peerId=peerId
|
||||||
|
return err(FilterSubscribeError.notFound())
|
||||||
|
|
||||||
|
debug "removing peer subscription", peerId=peerId
|
||||||
|
wf.subscriptions.del(peerId)
|
||||||
|
|
||||||
|
ok()
|
||||||
|
|
||||||
|
proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubscribeRequest): FilterSubscribeResponse =
|
||||||
|
info "received filter subscribe request", peerId=peerId, request=request
|
||||||
|
waku_filter_requests.inc(labelValues = [$request.filterSubscribeType])
|
||||||
|
|
||||||
|
var subscribeResult: FilterSubscribeResult
|
||||||
|
|
||||||
|
case request.filterSubscribeType
|
||||||
|
of FilterSubscribeType.SUBSCRIBER_PING:
|
||||||
|
subscribeResult = wf.pingSubscriber(peerId)
|
||||||
|
of FilterSubscribeType.SUBSCRIBE:
|
||||||
|
subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
|
||||||
|
of FilterSubscribeType.UNSUBSCRIBE:
|
||||||
|
subscribeResult = wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics)
|
||||||
|
of FilterSubscribeType.UNSUBSCRIBE_ALL:
|
||||||
|
subscribeResult = wf.unsubscribeAll(peerId)
|
||||||
|
|
||||||
|
if subscribeResult.isErr():
|
||||||
|
return FilterSubscribeResponse(
|
||||||
|
requestId: request.requestId,
|
||||||
|
statusCode: subscribeResult.error.kind.uint32,
|
||||||
|
statusDesc: some($subscribeResult.error)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return FilterSubscribeResponse.ok(request.requestId)
|
||||||
|
|
||||||
|
proc handleMessage*(wf: WakuFilter, message: WakuMessage) =
|
||||||
|
raiseAssert "Unimplemented"
|
||||||
|
|
||||||
|
proc initProtocolHandler(wf: WakuFilter) =
|
||||||
|
|
||||||
|
proc handler(conn: Connection, proto: string) {.async.} =
|
||||||
|
let buf = await conn.readLp(MaxSubscribeSize)
|
||||||
|
|
||||||
|
let decodeRes = FilterSubscribeRequest.decode(buf)
|
||||||
|
if decodeRes.isErr():
|
||||||
|
error "Failed to decode filter subscribe request", peerId=conn.peerId
|
||||||
|
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
|
return
|
||||||
|
|
||||||
|
let request = decodeRes.value #TODO: toAPI() split here
|
||||||
|
|
||||||
|
let response = wf.handleSubscribeRequest(conn.peerId, request)
|
||||||
|
|
||||||
|
await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here
|
||||||
|
return
|
||||||
|
|
||||||
|
wf.handler = handler
|
||||||
|
wf.codec = WakuFilterSubscribeCodec
|
||||||
|
|
||||||
|
proc new*(T: type WakuFilter,
|
||||||
|
peerManager: PeerManager): T =
|
||||||
|
|
||||||
|
let wf = WakuFilter(
|
||||||
|
peerManager: peerManager
|
||||||
|
)
|
||||||
|
wf.initProtocolHandler()
|
||||||
|
wf
|
15
waku/v2/protocol/waku_filter_v2/protocol_metrics.nim
Normal file
15
waku/v2/protocol/waku_filter_v2/protocol_metrics.nim
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import metrics
|
||||||
|
|
||||||
|
export metrics
|
||||||
|
|
||||||
|
declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
|
||||||
|
declarePublicGauge waku_filter_requests, "number of filter subscribe requests received", ["type"]
|
||||||
|
|
||||||
|
# Error types (metric label values)
|
||||||
|
const
|
||||||
|
decodeRpcFailure* = "decode_rpc_failure"
|
44
waku/v2/protocol/waku_filter_v2/rpc.nim
Normal file
44
waku/v2/protocol/waku_filter_v2/rpc.nim
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/options
|
||||||
|
import
|
||||||
|
../waku_message
|
||||||
|
|
||||||
|
type
|
||||||
|
FilterSubscribeType* {.pure.} = enum
|
||||||
|
# Indicates the type of request from client to service node
|
||||||
|
SUBSCRIBER_PING = uint32(0)
|
||||||
|
SUBSCRIBE = uint32(1)
|
||||||
|
UNSUBSCRIBE = uint32(2)
|
||||||
|
UNSUBSCRIBE_ALL = uint32(3)
|
||||||
|
|
||||||
|
FilterSubscribeRequest* = object
|
||||||
|
# Request from client to service node
|
||||||
|
requestId*: string
|
||||||
|
filterSubscribeType*: FilterSubscribeType
|
||||||
|
pubsubTopic*: Option[PubsubTopic]
|
||||||
|
contentTopics*: seq[ContentTopic]
|
||||||
|
|
||||||
|
FilterSubscribeResponse* = object
|
||||||
|
# Response from service node to client
|
||||||
|
requestId*: string
|
||||||
|
statusCode*: uint32
|
||||||
|
statusDesc*: Option[string]
|
||||||
|
|
||||||
|
MessagePush* = object
|
||||||
|
# Message pushed from service node to client
|
||||||
|
wakuMessage*: WakuMessage
|
||||||
|
pubsubTopic*: Option[string]
|
||||||
|
|
||||||
|
# Convenience functions
|
||||||
|
|
||||||
|
proc ok*(T: type FilterSubscribeResponse, requestId: string, desc = "OK"): T =
|
||||||
|
FilterSubscribeResponse(
|
||||||
|
requestId: requestId,
|
||||||
|
statusCode: 200,
|
||||||
|
statusDesc: some(desc)
|
||||||
|
)
|
77
waku/v2/protocol/waku_filter_v2/rpc_codec.nim
Normal file
77
waku/v2/protocol/waku_filter_v2/rpc_codec.nim
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/options
|
||||||
|
import
|
||||||
|
../../../common/protobuf,
|
||||||
|
../waku_message,
|
||||||
|
./rpc
|
||||||
|
|
||||||
|
const
|
||||||
|
MaxSubscribeSize* = 10 * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
|
||||||
|
MaxPushSize* = 10 * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
|
||||||
|
|
||||||
|
proc encode*(rpc: FilterSubscribeRequest): ProtoBuffer =
|
||||||
|
var pb = initProtoBuffer()
|
||||||
|
|
||||||
|
pb.write3(1, rpc.requestId)
|
||||||
|
pb.write3(2, uint32(ord(rpc.filterSubscribeType)))
|
||||||
|
|
||||||
|
pb.write3(10, rpc.pubsubTopic)
|
||||||
|
|
||||||
|
for contentTopic in rpc.contentTopics:
|
||||||
|
pb.write3(11, contentTopic)
|
||||||
|
|
||||||
|
pb
|
||||||
|
|
||||||
|
proc decode*(T: type FilterSubscribeRequest, buffer: seq[byte]): ProtobufResult[T] =
|
||||||
|
let pb = initProtoBuffer(buffer)
|
||||||
|
var rpc = FilterSubscribeRequest()
|
||||||
|
|
||||||
|
if not ?pb.getField(1, rpc.requestId):
|
||||||
|
return err(ProtobufError.missingRequiredField("request_id"))
|
||||||
|
|
||||||
|
var filterSubscribeType: uint32
|
||||||
|
if not ?pb.getField(2, filterSubscribeType):
|
||||||
|
return err(ProtobufError.missingRequiredField("filter_subscribe_type"))
|
||||||
|
else:
|
||||||
|
rpc.filterSubscribeType = FilterSubscribeType(filterSubscribeType)
|
||||||
|
|
||||||
|
var pubsubTopic: PubsubTopic
|
||||||
|
if not ?pb.getField(10, pubsubTopic):
|
||||||
|
rpc.pubsubTopic = none(PubsubTopic)
|
||||||
|
else:
|
||||||
|
rpc.pubsubTopic = some(pubsubTopic)
|
||||||
|
|
||||||
|
discard ?pb.getRepeatedField(11, rpc.contentTopics)
|
||||||
|
|
||||||
|
ok(rpc)
|
||||||
|
|
||||||
|
proc encode*(rpc: FilterSubscribeResponse): ProtoBuffer =
|
||||||
|
var pb = initProtoBuffer()
|
||||||
|
|
||||||
|
pb.write3(1, rpc.requestId)
|
||||||
|
pb.write3(2, rpc.statusCode)
|
||||||
|
pb.write3(3, rpc.statusDesc)
|
||||||
|
|
||||||
|
pb
|
||||||
|
|
||||||
|
proc decode*(T: type FilterSubscribeResponse, buffer: seq[byte]): ProtobufResult[T] =
|
||||||
|
let pb = initProtoBuffer(buffer)
|
||||||
|
var rpc = FilterSubscribeResponse()
|
||||||
|
|
||||||
|
if not ?pb.getField(1, rpc.requestId):
|
||||||
|
return err(ProtobufError.missingRequiredField("request_id"))
|
||||||
|
|
||||||
|
if not ?pb.getField(2, rpc.statusCode):
|
||||||
|
return err(ProtobufError.missingRequiredField("status_code"))
|
||||||
|
|
||||||
|
if not ?pb.getField(3, rpc.statusDesc):
|
||||||
|
rpc.statusDesc = none(string)
|
||||||
|
else:
|
||||||
|
rpc.statusDesc = some(rpc.statusDesc.get())
|
||||||
|
|
||||||
|
ok(rpc)
|
Loading…
x
Reference in New Issue
Block a user