diff --git a/tests/v2/waku_filter_v2/test_waku_filter_v2.nim b/tests/v2/waku_filter_v2/test_waku_filter_v2.nim new file mode 100644 index 000000000..d24601a0a --- /dev/null +++ b/tests/v2/waku_filter_v2/test_waku_filter_v2.nim @@ -0,0 +1,239 @@ +{.used.} + +import + std/[options,sets,strutils,tables], + testutils/unittests, + chronos, + chronicles +import + ../../../waku/v2/node/peer_manager, + ../../../waku/v2/protocol/waku_filter_v2, + ../../../waku/v2/protocol/waku_filter_v2/rpc, + ../../../waku/v2/protocol/waku_message, + ../testlib/common, + ../testlib/waku2 + +proc newTestWakuFilter(switch: Switch): Future[WakuFilter] {.async.} = + let + peerManager = PeerManager.new(switch) + proto = WakuFilter.new(peerManager) + + await proto.start() + switch.mount(proto) + + return proto + +proc generateRequestId(rng: ref HmacDrbgContext): string = + var bytes: array[10, byte] + hmacDrbgGenerate(rng[], bytes) + return toHex(bytes) + +proc createRequest(filterSubscribeType: FilterSubscribeType, pubsubTopic = none(PubsubTopic), contentTopics = newSeq[ContentTopic]()): FilterSubscribeRequest = + let requestId = generateRequestId(rng) + + return FilterSubscribeRequest( + requestId: requestId, + filterSubscribeType: filterSubscribeType, + pubsubTopic: pubsubTopic, + contentTopics: contentTopics + ) + +proc getSubscribedContentTopics(wakuFilter: WakuFilter, peerId: PeerId): seq[ContentTopic] = + var contentTopics: seq[ContentTopic] + for filterCriterion in wakuFilter.subscriptions[peerId]: + contentTopics.add(filterCriterion[1]) + + return contentTopics + +suite "Waku Filter - handling subscribe requests": + + asyncTest "simple subscribe and unsubscribe request": + # Given + let + switch = newStandardSwitch() + wakuFilter = await newTestWakuFilter(switch) + peerId = PeerId.random().get() + filterSubscribeRequest = createRequest( + filterSubscribeType = FilterSubscribeType.SUBSCRIBE, + pubsubTopic = some(DefaultPubsubTopic), + contentTopics = @[DefaultContentTopic] + ) + filterUnsubscribeRequest = createRequest( + filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE, + pubsubTopic = filterSubscribeRequest.pubsubTopic, + contentTopics = filterSubscribeRequest.contentTopics + ) + + # When + let response = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest) + + # Then + check: + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions[peerId].len == 1 + response.requestId == filterSubscribeRequest.requestId + response.statusCode == 200 + response.statusDesc.get() == "OK" + + # When + let response2 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeRequest) + + # Then + check: + wakuFilter.subscriptions.len == 0 # peerId is removed from subscriptions + response2.requestId == filterUnsubscribeRequest.requestId + response2.statusCode == 200 + response2.statusDesc.get() == "OK" + + asyncTest "simple subscribe and unsubscribe all for multiple content topics": + # Given + let + switch = newStandardSwitch() + wakuFilter = await newTestWakuFilter(switch) + peerId = PeerId.random().get() + nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto") + filterSubscribeRequest = createRequest( + filterSubscribeType = FilterSubscribeType.SUBSCRIBE, + pubsubTopic = some(DefaultPubsubTopic), + contentTopics = @[DefaultContentTopic, nonDefaultContentTopic] + ) + filterUnsubscribeAllRequest = createRequest( + filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE_ALL + ) + + # When + let response = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest) + + # Then + check: + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions[peerId].len == 2 + wakuFilter.getSubscribedContentTopics(peerId) == filterSubscribeRequest.contentTopics + response.requestId == filterSubscribeRequest.requestId + response.statusCode == 200 + response.statusDesc.get() == "OK" + + # When + let response2 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeAllRequest) + + # Then + check: + wakuFilter.subscriptions.len == 0 # peerId is removed from subscriptions + response2.requestId == filterUnsubscribeAllRequest.requestId + response2.statusCode == 200 + response2.statusDesc.get() == "OK" + + asyncTest "subscribe and unsubscribe to multiple content topics": + # Given + let + switch = newStandardSwitch() + wakuFilter = await newTestWakuFilter(switch) + peerId = PeerId.random().get() + nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto") + filterSubscribeRequest1 = createRequest( + filterSubscribeType = FilterSubscribeType.SUBSCRIBE, + pubsubTopic = some(DefaultPubsubTopic), + contentTopics = @[DefaultContentTopic] + ) + filterSubscribeRequest2 = createRequest( + filterSubscribeType = FilterSubscribeType.SUBSCRIBE, + pubsubTopic = filterSubscribeRequest1.pubsubTopic, + contentTopics = @[nonDefaultContentTopic] + ) + filterUnsubscribeRequest1 = createRequest( + filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE, + pubsubTopic = filterSubscribeRequest1.pubsubTopic, + contentTopics = filterSubscribeRequest1.contentTopics + ) + filterUnsubscribeRequest2 = createRequest( + filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE, + pubsubTopic = filterSubscribeRequest2.pubsubTopic, + contentTopics = filterSubscribeRequest2.contentTopics + ) + + # When + let response1 = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest1) + + # Then + check: + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions[peerId].len == 1 + wakuFilter.getSubscribedContentTopics(peerId) == filterSubscribeRequest1.contentTopics + response1.requestId == filterSubscribeRequest1.requestId + response1.statusCode == 200 + response1.statusDesc.get() == "OK" + + # When + let response2 = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest2) + + # Then + check: + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions[peerId].len == 2 + wakuFilter.getSubscribedContentTopics(peerId) == + filterSubscribeRequest1.contentTopics & + filterSubscribeRequest2.contentTopics + response2.requestId == filterSubscribeRequest2.requestId + response2.statusCode == 200 + response2.statusDesc.get() == "OK" + + # When + let response3 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeRequest1) + + # Then + check: + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions[peerId].len == 1 + wakuFilter.getSubscribedContentTopics(peerId) == filterSubscribeRequest2.contentTopics + response3.requestId == filterUnsubscribeRequest1.requestId + response3.statusCode == 200 + response3.statusDesc.get() == "OK" + + # When + let response4 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeRequest2) + + # Then + check: + wakuFilter.subscriptions.len == 0 # peerId is removed from subscriptions + response4.requestId == filterUnsubscribeRequest2.requestId + response4.statusCode == 200 + response4.statusDesc.get() == "OK" + + asyncTest "ping subscriber": + # Given + let + switch = newStandardSwitch() + wakuFilter = await newTestWakuFilter(switch) + peerId = PeerId.random().get() + pingRequest = createRequest( + filterSubscribeType = FilterSubscribeType.SUBSCRIBER_PING + ) + filterSubscribeRequest = createRequest( + filterSubscribeType = FilterSubscribeType.SUBSCRIBE, + pubsubTopic = some(DefaultPubsubTopic), + contentTopics = @[DefaultContentTopic] + ) + + # When + let response1 = wakuFilter.handleSubscribeRequest(peerId, pingRequest) + + # Then + check: + response1.requestId == pingRequest.requestId + response1.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32 + response1.statusDesc.get().contains("peer has no subscriptions") + + # When + let + response2 = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest) + response3 = wakuFilter.handleSubscribeRequest(peerId, pingRequest) + + # Then + check: + response2.requestId == filterSubscribeRequest.requestId + response2.statusCode == 200 + response2.statusDesc.get() == "OK" + response3.requestId == pingRequest.requestId + response3.statusCode == 200 + response3.statusDesc.get() == "OK" + diff --git a/waku/v2/protocol/waku_filter_v2.nim b/waku/v2/protocol/waku_filter_v2.nim new file mode 100644 index 000000000..7a8984ad6 --- /dev/null +++ b/waku/v2/protocol/waku_filter_v2.nim @@ -0,0 +1,7 @@ +import + ./waku_filter_v2/common, + ./waku_filter_v2/protocol + +export + common, + protocol diff --git a/waku/v2/protocol/waku_filter_v2/common.nim b/waku/v2/protocol/waku_filter_v2/common.nim new file mode 100644 index 000000000..f4f4e4a33 --- /dev/null +++ b/waku/v2/protocol/waku_filter_v2/common.nim @@ -0,0 +1,52 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + stew/results + +const + WakuFilterSubscribeCodec* = "/vac/waku/filter-subscribe/2.0.0-beta1" + WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1" + +type + FilterSubscribeErrorKind* {.pure.} = enum + UNKNOWN = uint32(000) + BAD_REQUEST = uint32(400) + NOT_FOUND = uint32(404) + SERVICE_UNAVAILABLE = uint32(503) + + FilterSubscribeError* = object + kind*: FilterSubscribeErrorKind + cause*: string + + FilterSubscribeResult* = Result[void, FilterSubscribeError] + +# Convenience functions + +proc badRequest*(T: type FilterSubscribeError, cause = "bad request"): FilterSubscribeError = + FilterSubscribeError( + kind: FilterSubscribeErrorKind.BAD_REQUEST, + cause: cause) + +proc notFound*(T: type FilterSubscribeError, cause = "peer has no subscriptions"): FilterSubscribeError = + FilterSubscribeError( + kind: FilterSubscribeErrorKind.NOT_FOUND, + cause: cause) + +proc serviceUnavailable*(T: type FilterSubscribeError, cause = "service unavailable"): FilterSubscribeError = + FilterSubscribeError( + kind: FilterSubscribeErrorKind.SERVICE_UNAVAILABLE, + cause: cause) + +proc `$`*(err: FilterSubscribeError): string = + case err.kind: + of FilterSubscribeErrorKind.BAD_REQUEST: + "BAD_REQUEST: " & err.cause + of FilterSubscribeErrorKind.NOT_FOUND: + "NOT_FOUND: " & err.cause + of FilterSubscribeErrorKind.SERVICE_UNAVAILABLE: + "SERVICE_UNAVAILABLE: " & err.cause + of FilterSubscribeErrorKind.UNKNOWN: + "UNKNOWN" diff --git a/waku/v2/protocol/waku_filter_v2/protocol.nim b/waku/v2/protocol/waku_filter_v2/protocol.nim new file mode 100644 index 000000000..150306031 --- /dev/null +++ b/waku/v2/protocol/waku_filter_v2/protocol.nim @@ -0,0 +1,159 @@ +## Waku Filter protocol for subscribing and filtering messages + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[options,sequtils,sets,tables], + chronicles, + chronos, + libp2p/peerid, + libp2p/protocols/protocol +import + ../../node/peer_manager, + ../waku_message, + ./common, + ./protocol_metrics, + ./rpc_codec, + ./rpc + +logScope: + topics = "waku filter" + +const + MaxSubscriptions* = 1000 # TODO make configurable + MaxCriteriaPerSubscription = 1000 + +type + FilterCriterion* = (PubsubTopic, ContentTopic) # a single filter criterion is fully defined by a pubsub topic and content topic + FilterCriteria* = HashSet[FilterCriterion] # a sequence of filter criteria + + WakuFilter* = ref object of LPProtocol + subscriptions*: Table[PeerID, FilterCriteria] # a mapping of peer ids to a sequence of filter criteria + peerManager: PeerManager + +proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult = + trace "pinging subscriber", peerId=peerId + + if peerId notin wf.subscriptions: + debug "pinging peer has no subscriptions", peerId=peerId + return err(FilterSubscribeError.notFound()) + + ok() + +proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic]): FilterSubscribeResult = + if pubsubTopic.isNone() or contentTopics.len() == 0: + return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")) + + let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it))) + + trace "subscribing peer to filter criteria", peerId=peerId, filterCriteria=filterCriteria + + if peerId in wf.subscriptions: + var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]()) + if peerSubscription.len() + filterCriteria.len() >= MaxCriteriaPerSubscription: + return err(FilterSubscribeError.serviceUnavailable("peer has reached maximum number of filter criteria")) + + peerSubscription.incl(filterCriteria) + wf.subscriptions[peerId] = peerSubscription + else: + if wf.subscriptions.len() >= MaxSubscriptions: + return err(FilterSubscribeError.serviceUnavailable("node has reached maximum number of subscriptions")) + debug "creating new subscription", peerId=peerId + wf.subscriptions[peerId] = filterCriteria + + ok() + +proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic]): FilterSubscribeResult = + if pubsubTopic.isNone() or contentTopics.len() == 0: + return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")) + + let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it))) + + trace "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria + + if peerId notin wf.subscriptions: + debug "unsubscibing peer has no subscriptions", peerId=peerId + return err(FilterSubscribeError.notFound()) + + var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]()) + # TODO: consider error response if filter criteria does not exist + peerSubscription.excl(filterCriteria) + + if peerSubscription.len() == 0: + debug "peer has no more subscriptions, removing subscription", peerId=peerId + wf.subscriptions.del(peerId) + else: + wf.subscriptions[peerId] = peerSubscription + + ok() + +proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult = + if peerId notin wf.subscriptions: + debug "unsubscibing peer has no subscriptions", peerId=peerId + return err(FilterSubscribeError.notFound()) + + debug "removing peer subscription", peerId=peerId + wf.subscriptions.del(peerId) + + ok() + +proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubscribeRequest): FilterSubscribeResponse = + info "received filter subscribe request", peerId=peerId, request=request + waku_filter_requests.inc(labelValues = [$request.filterSubscribeType]) + + var subscribeResult: FilterSubscribeResult + + case request.filterSubscribeType + of FilterSubscribeType.SUBSCRIBER_PING: + subscribeResult = wf.pingSubscriber(peerId) + of FilterSubscribeType.SUBSCRIBE: + subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics) + of FilterSubscribeType.UNSUBSCRIBE: + subscribeResult = wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics) + of FilterSubscribeType.UNSUBSCRIBE_ALL: + subscribeResult = wf.unsubscribeAll(peerId) + + if subscribeResult.isErr(): + return FilterSubscribeResponse( + requestId: request.requestId, + statusCode: subscribeResult.error.kind.uint32, + statusDesc: some($subscribeResult.error) + ) + else: + return FilterSubscribeResponse.ok(request.requestId) + +proc handleMessage*(wf: WakuFilter, message: WakuMessage) = + raiseAssert "Unimplemented" + +proc initProtocolHandler(wf: WakuFilter) = + + proc handler(conn: Connection, proto: string) {.async.} = + let buf = await conn.readLp(MaxSubscribeSize) + + let decodeRes = FilterSubscribeRequest.decode(buf) + if decodeRes.isErr(): + error "Failed to decode filter subscribe request", peerId=conn.peerId + waku_filter_errors.inc(labelValues = [decodeRpcFailure]) + return + + let request = decodeRes.value #TODO: toAPI() split here + + let response = wf.handleSubscribeRequest(conn.peerId, request) + + await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here + return + + wf.handler = handler + wf.codec = WakuFilterSubscribeCodec + +proc new*(T: type WakuFilter, + peerManager: PeerManager): T = + + let wf = WakuFilter( + peerManager: peerManager + ) + wf.initProtocolHandler() + wf diff --git a/waku/v2/protocol/waku_filter_v2/protocol_metrics.nim b/waku/v2/protocol/waku_filter_v2/protocol_metrics.nim new file mode 100644 index 000000000..db7daad3b --- /dev/null +++ b/waku/v2/protocol/waku_filter_v2/protocol_metrics.nim @@ -0,0 +1,15 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import metrics + +export metrics + +declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"] +declarePublicGauge waku_filter_requests, "number of filter subscribe requests received", ["type"] + +# Error types (metric label values) +const + decodeRpcFailure* = "decode_rpc_failure" diff --git a/waku/v2/protocol/waku_filter_v2/rpc.nim b/waku/v2/protocol/waku_filter_v2/rpc.nim new file mode 100644 index 000000000..904471a24 --- /dev/null +++ b/waku/v2/protocol/waku_filter_v2/rpc.nim @@ -0,0 +1,44 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/options +import + ../waku_message + +type + FilterSubscribeType* {.pure.} = enum + # Indicates the type of request from client to service node + SUBSCRIBER_PING = uint32(0) + SUBSCRIBE = uint32(1) + UNSUBSCRIBE = uint32(2) + UNSUBSCRIBE_ALL = uint32(3) + + FilterSubscribeRequest* = object + # Request from client to service node + requestId*: string + filterSubscribeType*: FilterSubscribeType + pubsubTopic*: Option[PubsubTopic] + contentTopics*: seq[ContentTopic] + + FilterSubscribeResponse* = object + # Response from service node to client + requestId*: string + statusCode*: uint32 + statusDesc*: Option[string] + + MessagePush* = object + # Message pushed from service node to client + wakuMessage*: WakuMessage + pubsubTopic*: Option[string] + +# Convenience functions + +proc ok*(T: type FilterSubscribeResponse, requestId: string, desc = "OK"): T = + FilterSubscribeResponse( + requestId: requestId, + statusCode: 200, + statusDesc: some(desc) + ) diff --git a/waku/v2/protocol/waku_filter_v2/rpc_codec.nim b/waku/v2/protocol/waku_filter_v2/rpc_codec.nim new file mode 100644 index 000000000..f66b4f8af --- /dev/null +++ b/waku/v2/protocol/waku_filter_v2/rpc_codec.nim @@ -0,0 +1,77 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/options +import + ../../../common/protobuf, + ../waku_message, + ./rpc + +const + MaxSubscribeSize* = 10 * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead + MaxPushSize* = 10 * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead + +proc encode*(rpc: FilterSubscribeRequest): ProtoBuffer = + var pb = initProtoBuffer() + + pb.write3(1, rpc.requestId) + pb.write3(2, uint32(ord(rpc.filterSubscribeType))) + + pb.write3(10, rpc.pubsubTopic) + + for contentTopic in rpc.contentTopics: + pb.write3(11, contentTopic) + + pb + +proc decode*(T: type FilterSubscribeRequest, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + var rpc = FilterSubscribeRequest() + + if not ?pb.getField(1, rpc.requestId): + return err(ProtobufError.missingRequiredField("request_id")) + + var filterSubscribeType: uint32 + if not ?pb.getField(2, filterSubscribeType): + return err(ProtobufError.missingRequiredField("filter_subscribe_type")) + else: + rpc.filterSubscribeType = FilterSubscribeType(filterSubscribeType) + + var pubsubTopic: PubsubTopic + if not ?pb.getField(10, pubsubTopic): + rpc.pubsubTopic = none(PubsubTopic) + else: + rpc.pubsubTopic = some(pubsubTopic) + + discard ?pb.getRepeatedField(11, rpc.contentTopics) + + ok(rpc) + +proc encode*(rpc: FilterSubscribeResponse): ProtoBuffer = + var pb = initProtoBuffer() + + pb.write3(1, rpc.requestId) + pb.write3(2, rpc.statusCode) + pb.write3(3, rpc.statusDesc) + + pb + +proc decode*(T: type FilterSubscribeResponse, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + var rpc = FilterSubscribeResponse() + + if not ?pb.getField(1, rpc.requestId): + return err(ProtobufError.missingRequiredField("request_id")) + + if not ?pb.getField(2, rpc.statusCode): + return err(ProtobufError.missingRequiredField("status_code")) + + if not ?pb.getField(3, rpc.statusDesc): + rpc.statusDesc = none(string) + else: + rpc.statusDesc = some(rpc.statusDesc.get()) + + ok(rpc)