feat(filter-v2): new filter protocol increment - message handling and clients (#1600)

* feat: further filter v2 progress

* feat: filter client implementation

* chore: rename test modules

* feat: extend tests and minor improvements
This commit is contained in:
Hanno Cornelius 2023-03-20 13:19:53 +02:00 committed by GitHub
parent 2f3ba3d6d1
commit be446b9892
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 833 additions and 47 deletions

View File

@ -0,0 +1,375 @@
{.used.}
import
std/[options,tables],
testutils/unittests,
chronos,
chronicles,
libp2p/peerstore
import
../../../waku/v2/node/peer_manager,
../../../waku/v2/protocol/waku_filter_v2,
../../../waku/v2/protocol/waku_filter_v2/client,
../../../waku/v2/protocol/waku_filter_v2/rpc,
../../../waku/v2/protocol/waku_message,
../testlib/common,
../testlib/waku2
proc newTestWakuFilter(switch: Switch): Future[WakuFilter] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuFilter.new(peerManager)
await proto.start()
switch.mount(proto)
return proto
proc newTestWakuFilterClient(switch: Switch, messagePushHandler: MessagePushHandler): Future[WakuFilterClient] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuFilterClient.new(rng, messagePushHandler, peerManager)
await proto.start()
switch.mount(proto)
return proto
suite "Waku Filter - end to end":
asyncTest "ping":
# Given
var
voidHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) =
discard
let
serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, voidHandler)
# When
await allFutures(serverSwitch.start(), clientSwitch.start())
let response = await wakuFilterClient.ping(serverSwitch.peerInfo.toRemotePeerInfo())
# Then
check:
response.isErr() # Not subscribed
response.error().kind == FilterSubscribeErrorKind.NOT_FOUND
# When
let response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), DefaultPubsubTopic, @[DefaultContentTopic])
require response2.isOk()
let response3 = await wakuFilterClient.ping(serverSwitch.peerInfo.toRemotePeerInfo())
# Then
check:
response3.isOk() # Subscribed
# Teardown
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop())
asyncTest "simple subscribe and unsubscribe request":
# Given
var
pushHandlerFuture = newFuture[(string, WakuMessage)]()
messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) =
pushHandlerFuture.complete((pubsubTopic, message))
let
serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler)
clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic
contentTopics = @[DefaultContentTopic]
# When
await allFutures(serverSwitch.start(), clientSwitch.start())
let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics)
# Then
check:
response.isOk()
wakuFilter.subscriptions.len == 1
wakuFilter.subscriptions.hasKey(clientPeerId)
# When
let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic)
await wakuFilter.handleMessage(DefaultPubsubTopic, msg1)
require await pushHandlerFuture.withTimeout(3.seconds)
# Then
let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read()
check:
pushedMsgPubsubTopic == DefaultPubsubTopic
pushedMsg == msg1
# When
let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics)
# Then
check:
response2.isOk()
wakuFilter.subscriptions.len == 0
# When
let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic)
pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future
await wakuFilter.handleMessage(DefaultPubsubTopic, msg2)
# Then
check:
not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed
# Teardown
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop())
asyncTest "subscribe, unsubscribe multiple content topics":
# Given
var
pushHandlerFuture = newFuture[(string, WakuMessage)]()
messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) =
pushHandlerFuture.complete((pubsubTopic, message))
let
serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler)
clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic
contentTopic2 = ContentTopic("/waku/2/non-default-content/proto")
contentTopics = @[DefaultContentTopic, contentTopic2]
# When
await allFutures(serverSwitch.start(), clientSwitch.start())
let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics)
# Then
check:
response.isOk()
wakuFilter.subscriptions.len == 1
wakuFilter.subscriptions.hasKey(clientPeerId)
# When
let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic)
await wakuFilter.handleMessage(DefaultPubsubTopic, msg1)
require await pushHandlerFuture.withTimeout(3.seconds)
# Then
let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read()
check:
pushedMsgPubsubTopic == DefaultPubsubTopic
pushedMsg == msg1
# When
let msg2 = fakeWakuMessage(contentTopic=contentTopic2)
pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future
await wakuFilter.handleMessage(DefaultPubsubTopic, msg2)
require await pushHandlerFuture.withTimeout(3.seconds)
# Then
let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read()
check:
pushedMsgPubsubTopic2 == DefaultPubsubTopic
pushedMsg2 == msg2
# When
let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, @[contentTopic2]) # Unsubscribe only one content topic
# Then
check:
response2.isOk()
wakuFilter.subscriptions.len == 1
# When
let msg3 = fakeWakuMessage(contentTopic=DefaultContentTopic)
pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future
await wakuFilter.handleMessage(DefaultPubsubTopic, msg3)
require await pushHandlerFuture.withTimeout(3.seconds)
# Then
let (pushedMsgPubsubTopic3, pushedMsg3) = pushHandlerFuture.read()
check:
pushedMsgPubsubTopic3 == DefaultPubsubTopic
pushedMsg3 == msg3
# When
let msg4 = fakeWakuMessage(contentTopic=contentTopic2)
pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future
await wakuFilter.handleMessage(DefaultPubsubTopic, msg4)
# Then
check:
not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed
asyncTest "subscribe to multiple content topics and unsubscribe all":
# Given
var
pushHandlerFuture = newFuture[(string, WakuMessage)]()
messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) =
pushHandlerFuture.complete((pubsubTopic, message))
let
serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler)
clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic
contentTopic2 = ContentTopic("/waku/2/non-default-content/proto")
contentTopics = @[DefaultContentTopic, contentTopic2]
# When
await allFutures(serverSwitch.start(), clientSwitch.start())
let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics)
# Then
check:
response.isOk()
wakuFilter.subscriptions.len == 1
wakuFilter.subscriptions.hasKey(clientPeerId)
# When
let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic)
await wakuFilter.handleMessage(DefaultPubsubTopic, msg1)
require await pushHandlerFuture.withTimeout(3.seconds)
# Then
let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read()
check:
pushedMsgPubsubTopic == DefaultPubsubTopic
pushedMsg == msg1
# When
let msg2 = fakeWakuMessage(contentTopic=contentTopic2)
pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future
await wakuFilter.handleMessage(DefaultPubsubTopic, msg2)
require await pushHandlerFuture.withTimeout(3.seconds)
# Then
let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read()
check:
pushedMsgPubsubTopic2 == DefaultPubsubTopic
pushedMsg2 == msg2
# When
let response2 = await wakuFilterClient.unsubscribeAll(serverSwitch.peerInfo.toRemotePeerInfo())
# Then
check:
response2.isOk()
wakuFilter.subscriptions.len == 0
# When
let
msg3 = fakeWakuMessage(contentTopic=DefaultContentTopic)
msg4 = fakeWakuMessage(contentTopic=contentTopic2)
pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future
await wakuFilter.handleMessage(DefaultPubsubTopic, msg3)
await wakuFilter.handleMessage(DefaultPubsubTopic, msg4)
# Then
check:
not (await pushHandlerFuture.withTimeout(2.seconds)) # Neither message should be pushed
# Teardown
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop())
asyncTest "subscribe, unsubscribe multiple pubsub topics and content topics":
# Given
var
pushHandlerFuture = newFuture[(string, WakuMessage)]()
messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) =
pushHandlerFuture.complete((pubsubTopic, message))
let
serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler)
clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic
pubsubTopic2 = PubsubTopic("/waku/2/non-default-pubsub/proto")
contentTopic2 = ContentTopic("/waku/2/non-default-content/proto")
contentTopics = @[DefaultContentTopic, contentTopic2]
## Step 1: We can subscribe to multiple pubsub topics and content topics
# When
await allFutures(serverSwitch.start(), clientSwitch.start())
let
response1 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics)
response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic2, contentTopics)
# Then
check:
response1.isOk()
response2.isOk()
wakuFilter.subscriptions.len == 1
wakuFilter.subscriptions.hasKey(clientPeerId)
## Step 2: We receive messages for multiple subscribed pubsub topics and content topics
# When
let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic)
await wakuFilter.handleMessage(DefaultPubsubTopic, msg1)
require await pushHandlerFuture.withTimeout(3.seconds)
# Then
let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read()
check:
pushedMsgPubsubTopic == DefaultPubsubTopic
pushedMsg == msg1
# When
let msg2 = fakeWakuMessage(contentTopic=contentTopic2)
pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future
await wakuFilter.handleMessage(pubsubTopic2, msg2)
require await pushHandlerFuture.withTimeout(3.seconds)
# Then
let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read()
check:
pushedMsgPubsubTopic2 == pubsubTopic2
pushedMsg2 == msg2
## Step 3: We can selectively unsubscribe from pubsub topics and content topic(s)
# When
let response3 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic2, @[contentTopic2])
require response3.isOk()
let msg3 = fakeWakuMessage(contentTopic=contentTopic2)
pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future
await wakuFilter.handleMessage(pubsubTopic2, msg3)
# Then
check:
not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed
## Step 4: We can still receive messages for other subscribed pubsub topics and content topics
# When
pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future
await wakuFilter.handleMessage(DefaultPubsubTopic, msg3)
require await pushHandlerFuture.withTimeout(3.seconds)
# Then
let (pushedMsgPubsubTopic3, pushedMsg3) = pushHandlerFuture.read()
check:
pushedMsgPubsubTopic3 == DefaultPubsubTopic
pushedMsg3 == msg3

View File

@ -4,7 +4,8 @@ import
std/[options,sets,strutils,tables],
testutils/unittests,
chronos,
chronicles
chronicles,
libp2p/peerstore
import
../../../waku/v2/node/peer_manager,
../../../waku/v2/protocol/waku_filter_v2,
@ -13,14 +14,11 @@ import
../testlib/common,
../testlib/waku2
proc newTestWakuFilter(switch: Switch): Future[WakuFilter] {.async.} =
proc newTestWakuFilter(switch: Switch): WakuFilter =
let
peerManager = PeerManager.new(switch)
proto = WakuFilter.new(peerManager)
await proto.start()
switch.mount(proto)
return proto
proc generateRequestId(rng: ref HmacDrbgContext): string =
@ -51,7 +49,7 @@ suite "Waku Filter - handling subscribe requests":
# Given
let
switch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(switch)
wakuFilter = newTestWakuFilter(switch)
peerId = PeerId.random().get()
filterSubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
@ -89,7 +87,7 @@ suite "Waku Filter - handling subscribe requests":
# Given
let
switch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(switch)
wakuFilter = newTestWakuFilter(switch)
peerId = PeerId.random().get()
nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto")
filterSubscribeRequest = createRequest(
@ -127,7 +125,7 @@ suite "Waku Filter - handling subscribe requests":
# Given
let
switch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(switch)
wakuFilter = newTestWakuFilter(switch)
peerId = PeerId.random().get()
nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto")
filterSubscribeRequest1 = createRequest(
@ -203,7 +201,7 @@ suite "Waku Filter - handling subscribe requests":
# Given
let
switch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(switch)
wakuFilter = newTestWakuFilter(switch)
peerId = PeerId.random().get()
pingRequest = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBER_PING
@ -237,3 +235,64 @@ suite "Waku Filter - handling subscribe requests":
response3.statusCode == 200
response3.statusDesc.get() == "OK"
suite "Waku Filter - subscription maintenance":
asyncTest "simple maintenance":
# Given
let
switch = newStandardSwitch()
wakuFilter = newTestWakuFilter(switch)
peerId1 = PeerId.random().get()
peerId2 = PeerId.random().get()
peerId3 = PeerId.random().get()
filterSubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic]
)
# When
switch.peerStore[ProtoBook][peerId1] = @[WakuFilterPushCodec]
switch.peerStore[ProtoBook][peerId2] = @[WakuFilterPushCodec]
switch.peerStore[ProtoBook][peerId3] = @[WakuFilterPushCodec]
require wakuFilter.handleSubscribeRequest(peerId1, filterSubscribeRequest).isOk()
require wakuFilter.handleSubscribeRequest(peerId2, filterSubscribeRequest).isOk()
require wakuFilter.handleSubscribeRequest(peerId3, filterSubscribeRequest).isOk()
# Then
check:
wakuFilter.subscriptions.len == 3
wakuFilter.subscriptions.hasKey(peerId1)
wakuFilter.subscriptions.hasKey(peerId2)
wakuFilter.subscriptions.hasKey(peerId3)
# When
# Maintenance loop should leave all peers in peer store intact
wakuFilter.maintainSubscriptions()
# Then
check:
wakuFilter.subscriptions.len == 3
wakuFilter.subscriptions.hasKey(peerId1)
wakuFilter.subscriptions.hasKey(peerId2)
wakuFilter.subscriptions.hasKey(peerId3)
# When
# Remove peerId1 and peerId3 from peer store
switch.peerStore.del(peerId1)
switch.peerStore.del(peerId3)
wakuFilter.maintainSubscriptions()
# Then
check:
wakuFilter.subscriptions.len == 1
wakuFilter.subscriptions.hasKey(peerId2)
# When
# Remove peerId2 from peer store
switch.peerStore.del(peerId2)
wakuFilter.maintainSubscriptions()
# Then
check:
wakuFilter.subscriptions.len == 0

View File

@ -3,7 +3,7 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import
import
std/[options, tables, sequtils],
stew/results,
chronicles,
@ -79,10 +79,10 @@ type WakuFilterClient* = ref object of LPProtocol
proc handleMessagePush(wf: WakuFilterClient, peerId: PeerId, requestId: string, rpc: MessagePush) =
for msg in rpc.messages:
let
let
pubsubTopic = Defaultstring # TODO: Extend the filter push rpc to provide the pubsub topic. This is a limitation
contentTopic = msg.contentTopic
contentTopic = msg.contentTopic
wf.subManager.notifySubscriptionHandler(pubsubTopic, contentTopic, msg)
@ -104,10 +104,10 @@ proc initProtocolHandler(wf: WakuFilterClient) =
return
waku_filter_messages.inc(labelValues = ["MessagePush"])
let
peerId = conn.peerId
requestId = rpc.requestId
requestId = rpc.requestId
push = rpc.push.get()
info "received filter message push", peerId=conn.peerId, requestId=requestId
@ -118,8 +118,8 @@ proc initProtocolHandler(wf: WakuFilterClient) =
proc new*(T: type WakuFilterClient,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext): T =
rng: ref rand.HmacDrbgContext): T =
let wf = WakuFilterClient(
peerManager: peerManager,
rng: rng,
@ -138,9 +138,9 @@ proc sendFilterRpc(wf: WakuFilterClient, rpc: FilterRPC, peer: PeerId|RemotePeer
await connection.writeLP(rpc.encode().buffer)
return ok()
proc sendFilterRequestRpc(wf: WakuFilterClient,
pubsubTopic: PubsubTopic,
contentTopics: seq[ContentTopic],
proc sendFilterRequestRpc(wf: WakuFilterClient,
pubsubTopic: PubsubTopic,
contentTopics: seq[ContentTopic],
subscribe: bool,
peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} =
@ -150,8 +150,8 @@ proc sendFilterRequestRpc(wf: WakuFilterClient,
let rpc = FilterRpc(
requestId: requestId,
request: some(FilterRequest(
subscribe: subscribe,
pubSubTopic: pubsubTopic,
subscribe: subscribe,
pubSubTopic: pubsubTopic,
contentFilters: contentFilters
))
)
@ -160,15 +160,15 @@ proc sendFilterRequestRpc(wf: WakuFilterClient,
if sendRes.isErr():
waku_filter_errors.inc(labelValues = [sendRes.error])
return err(sendRes.error)
return ok()
proc subscribe*(wf: WakuFilterClient,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic|seq[ContentTopic],
proc subscribe*(wf: WakuFilterClient,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic|seq[ContentTopic],
handler: FilterPushHandler,
peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} =
peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} =
var topics: seq[ContentTopic]
when contentTopic is seq[ContentTopic]:
topics = contentTopic
@ -184,8 +184,8 @@ proc subscribe*(wf: WakuFilterClient,
return ok()
proc unsubscribe*(wf: WakuFilterClient,
pubsubTopic: PubsubTopic,
proc unsubscribe*(wf: WakuFilterClient,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic|seq[ContentTopic],
peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} =
var topics: seq[ContentTopic]
@ -207,4 +207,4 @@ proc clearSubscriptions*(wf: WakuFilterClient) =
wf.subManager.clear()
proc getSubscriptionsCount*(wf: WakuFilterClient): int =
wf.subManager.getSubscriptionsCount()
wf.subManager.getSubscriptionsCount()

View File

@ -0,0 +1,141 @@
## Waku Filter client for subscribing and receiving filtered messages
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/options,
chronicles,
chronos,
libp2p/protocols/protocol,
bearssl/rand
import
../../node/peer_manager,
../waku_message,
./common,
./protocol_metrics,
./rpc_codec,
./rpc
logScope:
topics = "waku filter client"
type
MessagePushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.}
WakuFilterClient* = ref object of LPProtocol
rng: ref HmacDrbgContext
messagePushHandler: MessagePushHandler
peerManager: PeerManager
func generateRequestId(rng: ref HmacDrbgContext): string =
var bytes: array[10, byte]
hmacDrbgGenerate(rng[], bytes)
return toHex(bytes)
proc sendSubscribeRequest(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, filterSubscribeRequest: FilterSubscribeRequest): Future[FilterSubscribeResult] {.async.} =
trace "Sending filter subscribe request", servicePeer, filterSubscribeRequest
let connOpt = await wfc.peerManager.dialPeer(servicePeer, WakuFilterSubscribeCodec)
if connOpt.isNone():
trace "Failed to dial filter service peer", servicePeer
waku_filter_errors.inc(labelValues = [dialFailure])
return err(FilterSubscribeError.peerDialFailure($servicePeer))
let connection = connOpt.get()
# TODO: this can raise an exception
await connection.writeLP(filterSubscribeRequest.encode().buffer)
let respBuf = await connection.readLp(MaxSubscribeResponseSize)
let respDecodeRes = FilterSubscribeResponse.decode(respBuf)
if respDecodeRes.isErr():
trace "Failed to decode filter subscribe response", servicePeer
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return err(FilterSubscribeError.badResponse(decodeRpcFailure))
let response = respDecodeRes.get()
if response.requestId != filterSubscribeRequest.requestId:
trace "Filter subscribe response requestId mismatch", servicePeer, response
waku_filter_errors.inc(labelValues = [requestIdMismatch])
return err(FilterSubscribeError.badResponse(requestIdMismatch))
if response.statusCode != 200:
trace "Filter subscribe error response", servicePeer, response
waku_filter_errors.inc(labelValues = [errorResponse])
let cause = if response.statusDesc.isSome(): response.statusDesc.get()
else: "filter subscribe error"
return err(FilterSubscribeError.parse(response.statusCode, cause=cause))
return ok()
proc ping*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSubscribeResult] {.async.} =
let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.ping(requestId)
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
proc subscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} =
let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.subscribe(
requestId = requestId,
pubsubTopic = pubsubTopic,
contentTopics = contentTopics
)
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
proc unsubscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} =
let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe(
requestId = requestId,
pubsubTopic = pubsubTopic,
contentTopics = contentTopics
)
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSubscribeResult] {.async.} =
let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.unsubscribeAll(
requestId = requestId
)
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
proc initProtocolHandler(wfc: WakuFilterClient) =
proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(MaxPushSize)
let decodeRes = MessagePush.decode(buf)
if decodeRes.isErr():
error "Failed to decode message push", peerId=conn.peerId
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return
let messagePush = decodeRes.value #TODO: toAPI() split here
trace "Received message push", peerId=conn.peerId, messagePush
wfc.messagePushHandler(messagePush.pubsubTopic, messagePush.wakuMessage)
# Protocol specifies no response for now
return
wfc.handler = handler
wfc.codec = WakuFilterPushCodec
proc new*(T: type WakuFilterClient,
rng: ref HmacDrbgContext,
messagePushHandler: MessagePushHandler,
peerManager: PeerManager): T =
let wfc = WakuFilterClient(
rng: rng,
messagePushHandler: messagePushHandler,
peerManager: peerManager
)
wfc.initProtocolHandler()
wfc

View File

@ -13,17 +13,33 @@ const
type
FilterSubscribeErrorKind* {.pure.} = enum
UNKNOWN = uint32(000)
PEER_DIAL_FAILURE = uint32(200) # TODO shouldn't this be an error code, e.g. 504 Gateway Timeout?
BAD_RESPONSE = uint32(300)
BAD_REQUEST = uint32(400)
NOT_FOUND = uint32(404)
SERVICE_UNAVAILABLE = uint32(503)
FilterSubscribeError* = object
kind*: FilterSubscribeErrorKind
cause*: string
case kind*: FilterSubscribeErrorKind
of PEER_DIAL_FAILURE:
address*: string
of BAD_RESPONSE, BAD_REQUEST, NOT_FOUND, SERVICE_UNAVAILABLE:
cause*: string
else:
discard
FilterSubscribeResult* = Result[void, FilterSubscribeError]
# Convenience functions
proc peerDialFailure*(T: type FilterSubscribeError, address: string): FilterSubscribeError =
FilterSubscribeError(
kind: FilterSubscribeErrorKind.PEER_DIAL_FAILURE,
address: address)
proc badResponse*(T: type FilterSubscribeError, cause = "bad response"): FilterSubscribeError =
FilterSubscribeError(
kind: FilterSubscribeErrorKind.BAD_RESPONSE,
cause: cause)
proc badRequest*(T: type FilterSubscribeError, cause = "bad request"): FilterSubscribeError =
FilterSubscribeError(
@ -40,8 +56,34 @@ proc serviceUnavailable*(T: type FilterSubscribeError, cause = "service unavaila
kind: FilterSubscribeErrorKind.SERVICE_UNAVAILABLE,
cause: cause)
proc parse*(T: type FilterSubscribeErrorKind, kind: uint32): T =
case kind:
of 000, 200, 300, 400, 404, 503:
FilterSubscribeErrorKind(kind)
else:
FilterSubscribeErrorKind.UNKNOWN
proc parse*(T: type FilterSubscribeError, kind: uint32, cause = "", address = ""): T =
let kind = FilterSubscribeErrorKind.parse(kind)
case kind:
of PEER_DIAL_FAILURE:
FilterSubscribeError(
kind: kind,
address: address)
of BAD_RESPONSE, BAD_REQUEST, NOT_FOUND, SERVICE_UNAVAILABLE:
FilterSubscribeError(
kind: kind,
cause: cause)
else:
FilterSubscribeError(
kind: kind)
proc `$`*(err: FilterSubscribeError): string =
case err.kind:
of FilterSubscribeErrorKind.PEER_DIAL_FAILURE:
"PEER_DIAL_FAILURE: " & err.address
of FilterSubscribeErrorKind.BAD_RESPONSE:
"BAD_RESPONSE: " & err.cause
of FilterSubscribeErrorKind.BAD_REQUEST:
"BAD_REQUEST: " & err.cause
of FilterSubscribeErrorKind.NOT_FOUND:

View File

@ -17,21 +17,15 @@ import
./common,
./protocol_metrics,
./rpc_codec,
./rpc
./rpc,
./subscriptions
logScope:
topics = "waku filter"
const
MaxSubscriptions* = 1000 # TODO make configurable
MaxCriteriaPerSubscription = 1000
type
FilterCriterion* = (PubsubTopic, ContentTopic) # a single filter criterion is fully defined by a pubsub topic and content topic
FilterCriteria* = HashSet[FilterCriterion] # a sequence of filter criteria
WakuFilter* = ref object of LPProtocol
subscriptions*: Table[PeerID, FilterCriteria] # a mapping of peer ids to a sequence of filter criteria
subscriptions*: FilterSubscriptions # a mapping of peer ids to a sequence of filter criteria
peerManager: PeerManager
proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
@ -59,7 +53,7 @@ proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic],
peerSubscription.incl(filterCriteria)
wf.subscriptions[peerId] = peerSubscription
else:
if wf.subscriptions.len() >= MaxSubscriptions:
if wf.subscriptions.len() >= MaxTotalSubscriptions:
return err(FilterSubscribeError.serviceUnavailable("node has reached maximum number of subscriptions"))
debug "creating new subscription", peerId=peerId
wf.subscriptions[peerId] = filterCriteria
@ -125,8 +119,61 @@ proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubs
else:
return FilterSubscribeResponse.ok(request.requestId)
proc handleMessage*(wf: WakuFilter, message: WakuMessage) =
raiseAssert "Unimplemented"
proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
trace "pushing message to subscribed peer", peer=peer
if not wf.peerManager.peerStore.hasPeer(peer, WakuFilterPushCodec):
# Check that peer has not been removed from peer store
trace "no addresses for peer", peer=peer
return
let conn = await wf.peerManager.dialPeer(peer, WakuFilterPushCodec)
if conn.isNone():
## We do not remove this peer, but allow the underlying peer manager
## to do so if it is deemed necessary
trace "no connection to peer", peer=peer
return
await conn.get().writeLp(buffer)
proc pushToPeers(wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush) {.async.} =
trace "pushing message to subscribed peers", peers=peers, messagePush=messagePush
let bufferToPublish = messagePush.encode().buffer
var pushFuts: seq[Future[void]]
for peerId in peers:
let pushFut = wf.pushToPeer(peerId, bufferToPublish)
pushFuts.add(pushFut)
await allFutures(pushFuts)
proc maintainSubscriptions*(wf: WakuFilter) =
trace "maintaining subscriptions"
var peersToRemove: seq[PeerId]
for peerId, peerSubscription in wf.subscriptions.pairs():
## TODO: currently we only maintain by syncing with peer store. We could
## consider other metrics, such as subscription age, activity, etc.
if not wf.peerManager.peerStore.hasPeer(peerId, WakuFilterPushCodec):
debug "peer has been removed from peer store, removing subscription", peerId=peerId
peersToRemove.add(peerId)
wf.subscriptions.removePeers(peersToRemove)
proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} =
trace "handling message", pubsubTopic=pubsubTopic, message=message
let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
if subscribedPeers.len() == 0:
trace "no subscribed peers found", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
return
let messagePush = MessagePush(
pubsubTopic: pubsubTopic,
wakuMessage: message)
await wf.pushToPeers(subscribedPeers, messagePush)
proc initProtocolHandler(wf: WakuFilter) =
@ -157,3 +204,24 @@ proc new*(T: type WakuFilter,
)
wf.initProtocolHandler()
wf
const MaintainSubscriptionsInterval* = 1.minutes
proc startMaintainingSubscriptions*(wf: WakuFilter, interval: Duration) =
trace "starting to maintain subscriptions"
var maintainSubs: proc(udata: pointer) {.gcsafe, raises: [Defect].}
maintainSubs = proc(udata: pointer) {.gcsafe.} =
maintainSubscriptions(wf)
discard setTimer(Moment.fromNow(interval), maintainSubs)
discard setTimer(Moment.fromNow(interval), maintainSubs)
method start*(wf: WakuFilter) {.async.} =
debug "starting filter protocol"
wf.startMaintainingSubscriptions(MaintainSubscriptionsInterval)
await procCall LPProtocol(wf).start()
method stop*(wf: WakuFilter) {.async.} =
debug "stopping filter protocol"
await procCall LPProtocol(wf).stop()

View File

@ -12,4 +12,7 @@ declarePublicGauge waku_filter_requests, "number of filter subscribe requests re
# Error types (metric label values)
const
dialFailure* = "dial_failure"
decodeRpcFailure* = "decode_rpc_failure"
requestIdMismatch* = "request_id_mismatch"
errorResponse* = "error_response"

View File

@ -32,10 +32,38 @@ type
MessagePush* = object
# Message pushed from service node to client
wakuMessage*: WakuMessage
pubsubTopic*: Option[string]
pubsubTopic*: string
# Convenience functions
proc ping*(T: type FilterSubscribeRequest, requestId: string): T =
FilterSubscribeRequest(
requestId: requestId,
filterSubscribeType: SUBSCRIBER_PING
)
proc subscribe*(T: type FilterSubscribeRequest, requestId: string, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): T =
FilterSubscribeRequest(
requestId: requestId,
filterSubscribeType: SUBSCRIBE,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopics
)
proc unsubscribe*(T: type FilterSubscribeRequest, requestId: string, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): T =
FilterSubscribeRequest(
requestId: requestId,
filterSubscribeType: UNSUBSCRIBE,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopics
)
proc unsubscribeAll*(T: type FilterSubscribeRequest, requestId: string): T =
FilterSubscribeRequest(
requestId: requestId,
filterSubscribeType: UNSUBSCRIBE_ALL
)
proc ok*(T: type FilterSubscribeResponse, requestId: string, desc = "OK"): T =
FilterSubscribeResponse(
requestId: requestId,

View File

@ -12,6 +12,7 @@ import
const
MaxSubscribeSize* = 10 * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
MaxSubscribeResponseSize* = 64*1024 # Responses are small. 64kB safety buffer.
MaxPushSize* = 10 * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
proc encode*(rpc: FilterSubscribeRequest): ProtoBuffer =
@ -69,9 +70,33 @@ proc decode*(T: type FilterSubscribeResponse, buffer: seq[byte]): ProtobufResult
if not ?pb.getField(2, rpc.statusCode):
return err(ProtobufError.missingRequiredField("status_code"))
if not ?pb.getField(3, rpc.statusDesc):
var statusDesc: string
if not ?pb.getField(3, statusDesc):
rpc.statusDesc = none(string)
else:
rpc.statusDesc = some(rpc.statusDesc.get())
rpc.statusDesc = some(statusDesc)
ok(rpc)
proc encode*(rpc: MessagePush): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, rpc.wakuMessage.encode())
pb.write3(2, rpc.pubsubTopic)
pb
proc decode*(T: type MessagePush, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = MessagePush()
var message: seq[byte]
if not ?pb.getField(1, message):
return err(ProtobufError.missingRequiredField("message"))
else:
rpc.wakuMessage = ?WakuMessage.decode(message)
if not ?pb.getField(2, rpc.pubsubTopic):
return err(ProtobufError.missingRequiredField("pubsub_topic"))
ok(rpc)

View File

@ -0,0 +1,45 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[sets,tables],
chronicles,
libp2p/peerid
import
../waku_message
logScope:
topics = "waku filter subscriptions"
const
MaxTotalSubscriptions* = 1000 # TODO make configurable
MaxCriteriaPerSubscription* = 1000
type
FilterCriterion* = (PubsubTopic, ContentTopic) # a single filter criterion is fully defined by a pubsub topic and content topic
FilterCriteria* = HashSet[FilterCriterion] # a sequence of filter criteria
FilterSubscriptions* = Table[PeerID, FilterCriteria] # a mapping of peer ids to a sequence of filter criteria
proc findSubscribedPeers*(subscriptions: FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic): seq[PeerID] =
## Find all peers subscribed to a given topic and content topic
let filterCriterion = (pubsubTopic, contentTopic)
var subscribedPeers: seq[PeerID]
# TODO: for large maps, this can be optimized using a reverse index
for (peerId, criteria) in subscriptions.pairs():
if filterCriterion in criteria:
subscribedPeers.add(peerId)
subscribedPeers
proc removePeer*(subscriptions: var FilterSubscriptions, peerId: PeerID) =
## Remove all subscriptions for a given peer
subscriptions.del(peerId)
proc removePeers*(subscriptions: var FilterSubscriptions, peerIds: seq[PeerID]) =
## Remove all subscriptions for a given list of peers
for peerId in peerIds:
subscriptions.removePeer(peerId)