diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 40a75654a..31b971ea2 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -23,7 +23,7 @@ import ../../waku/v2/protocol/waku_relay, ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_swap/waku_swap, - ../../waku/v2/protocol/waku_filter/waku_filter, + ../../waku/v2/protocol/waku_filter, ../../waku/v2/utils/peers, ../../waku/v2/utils/time diff --git a/tests/v2/test_waku_bridge.nim b/tests/v2/test_waku_bridge.nim index b84053e5b..edf4e37d7 100644 --- a/tests/v2/test_waku_bridge.nim +++ b/tests/v2/test_waku_bridge.nim @@ -17,7 +17,7 @@ import ../../waku/v1/protocol/waku_protocol, ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_store, - ../../waku/v2/protocol/waku_filter/waku_filter, + ../../waku/v2/protocol/waku_filter, ../../waku/v2/node/[wakunode2, waku_payload], ../../waku/v2/utils/peers, ../test_helpers diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 62d9ec177..5df24da65 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -2,286 +2,316 @@ import std/[options, tables, sets], - testutils/unittests, chronos, chronicles, + testutils/unittests, + chronos, + chronicles, libp2p/switch, - libp2p/protobuf/minprotobuf, - libp2p/stream/[bufferstream, connection], libp2p/crypto/crypto, - libp2p/multistream, + libp2p/multistream +import ../../waku/v2/node/peer_manager/peer_manager, - ../../waku/v2/protocol/waku_filter/waku_filter, - ../test_helpers, ./utils + ../../waku/v2/protocol/waku_message, + ../../waku/v2/protocol/waku_filter, + ../test_helpers, + ./utils + +const + DefaultPubsubTopic = "/waku/2/default-waku/proto" + DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") + +const dummyHandler = proc(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = discard + +proc newTestSwitch(key=none(PrivateKey), address=none(MultiAddress)): Switch = + let peerKey = key.get(PrivateKey.random(ECDSA, rng[]).get()) + let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get()) + return newStandardSwitch(some(peerKey), addrs=peerAddr) + + +# TODO: Extend test coverage procSuite "Waku Filter": - asyncTest "handle filter": - const defaultTopic = "/waku/2/default-waku/proto" + asyncTest "should forward messages to client after subscribed": + ## Setup + let rng = crypto.newRng() + let + clientSwitch = newTestSwitch() + serverSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + ## Given + # Server + let + serverPeerManager = PeerManager.new(serverSwitch) + serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler) + serverSwitch.mount(serverProto) + + # Client + let handlerFuture = newFuture[(string, MessagePush)]() + proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} = + handlerFuture.complete((requestId, push)) let - key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.new(key) - contentTopic = ContentTopic("/waku/2/default-content/proto") - post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) + clientPeerManager = PeerManager.new(clientSwitch) + clientProto = WakuFilter.init(clientPeerManager, rng, handler) + clientSwitch.mount(clientProto) - var dialSwitch = newStandardSwitch() - await dialSwitch.start() + clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - var listenSwitch = newStandardSwitch(some(key)) - await listenSwitch.start() + ## When + let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic]) + require resSubscription.isOk() - var responseRequestIdFuture = newFuture[string]() - proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = - check: - msg.messages.len() == 1 - msg.messages[0] == post - responseRequestIdFuture.complete(requestId) + await sleepAsync(5.milliseconds) - let - proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) - rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true) + let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic) + await serverProto.handleMessage(DefaultPubsubTopic, message) - dialSwitch.mount(proto) - proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) - - proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = - discard - - let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle) - - listenSwitch.mount(proto2) - - let id = (await proto.subscribe(rpc)).get() - - await sleepAsync(2.seconds) - - await proto2.handleMessage(defaultTopic, post) + ## Then + let subscriptionRequestId = resSubscription.get() + let (requestId, push) = await handlerFuture check: - (await responseRequestIdFuture) == id - - asyncTest "Can subscribe and unsubscribe from content filter": - const defaultTopic = "/waku/2/default-waku/proto" + requestId == subscriptionRequestId + push.messages == @[message] + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "should not forward messages to client after unsuscribed": + ## Setup + let rng = crypto.newRng() + let + clientSwitch = newTestSwitch() + serverSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + ## Given + # Server + let + serverPeerManager = PeerManager.new(serverSwitch) + serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler) + serverSwitch.mount(serverProto) + + # Client + var handlerFuture = newFuture[void]() + proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} = + handlerFuture.complete() let - key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.new(key) - contentTopic = ContentTopic("/waku/2/default-content/proto") - post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) + clientPeerManager = PeerManager.new(clientSwitch) + clientProto = WakuFilter.init(clientPeerManager, rng, handler) + clientSwitch.mount(clientProto) - var dialSwitch = newStandardSwitch() - await dialSwitch.start() + clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - var listenSwitch = newStandardSwitch(some(key)) - await listenSwitch.start() + ## Given + let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic) - var responseCompletionFuture = newFuture[bool]() - proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = - check: - msg.messages.len() == 1 - msg.messages[0] == post - responseCompletionFuture.complete(true) + let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic]) + require resSubscription.isOk() - let - proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) - rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true) + await sleepAsync(5.milliseconds) - dialSwitch.mount(proto) - proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) + await serverProto.handleMessage(DefaultPubsubTopic, message) + let handlerWasCalledAfterSubscription = await handlerFuture.withTimeout(1.seconds) + require handlerWasCalledAfterSubscription - proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = - discard - - let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle) - - listenSwitch.mount(proto2) - - let id = (await proto.subscribe(rpc)).get() - - await sleepAsync(2.seconds) - - await proto2.handleMessage(defaultTopic, post) - - check: - # Check that subscription works as expected - (await responseCompletionFuture.withTimeout(3.seconds)) == true - # Reset to test unsubscribe - responseCompletionFuture = newFuture[bool]() + handlerFuture = newFuture[void]() - let - rpcU = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: false) + let resUnsubscription = await clientProto.unsubscribe(DefaultPubsubTopic, @[DefaultContentTopic]) + require resUnsubscription.isOk() - await proto.unsubscribe(rpcU) + await sleepAsync(5.milliseconds) - await sleepAsync(2.seconds) - - await proto2.handleMessage(defaultTopic, post) + await serverProto.handleMessage(DefaultPubsubTopic, message) + ## Then + let handlerWasCalledAfterUnsubscription = await handlerFuture.withTimeout(1.seconds) check: - # Check that unsubscribe works as expected - (await responseCompletionFuture.withTimeout(5.seconds)) == false + not handlerWasCalledAfterUnsubscription + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) - asyncTest "handle filter subscribe failures": - const defaultTopic = "/waku/2/default-waku/proto" + asyncTest "subscription should fail if no filter peer is provided": + ## Setup + let clientSwitch = newTestSwitch() + await clientSwitch.start() - let - contentTopic = ContentTopic("/waku/2/default-content/proto") + ## Given + let clientProto = WakuFilter.init(PeerManager.new(clientSwitch), crypto.newRng(), dummyHandler) + clientSwitch.mount(clientProto) - var dialSwitch = newStandardSwitch() - await dialSwitch.start() - - var responseRequestIdFuture = newFuture[string]() - proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = - discard - - let - proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) - rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true) - - dialSwitch.mount(proto) - - let idOpt = (await proto.subscribe(rpc)) + ## When + let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic]) + ## Then check: - idOpt.isNone + resSubscription.isErr() + resSubscription.error() == "peer_not_found_failure" - asyncTest "Handle failed clients": - const defaultTopic = "/waku/2/default-waku/proto" + asyncTest "peer subscription should be dropped if connection fails for second time after the timeout has elapsed": + ## Setup + let rng = crypto.newRng() + let + clientSwitch = newTestSwitch() + serverSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + ## Given + # Server + let + serverPeerManager = PeerManager.new(serverSwitch) + serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler, timeout=1.seconds) + serverSwitch.mount(serverProto) + + # Client + var handlerFuture = newFuture[void]() + proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} = + handlerFuture.complete() let - key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.new(key) - contentTopic = ContentTopic("/waku/2/default-content/proto") - post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) + clientPeerManager = PeerManager.new(clientSwitch) + clientProto = WakuFilter.init(clientPeerManager, rng, handler) + clientSwitch.mount(clientProto) - var dialSwitch = newStandardSwitch() - await dialSwitch.start() + clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - var listenSwitch = newStandardSwitch(some(key)) - await listenSwitch.start() + ## When + let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic]) + check resSubscription.isOk() - var responseCompletionFuture = newFuture[bool]() - proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = - check: - msg.messages.len() == 1 - msg.messages[0] == post - responseCompletionFuture.complete(true) + await sleepAsync(5.milliseconds) - let - proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) - rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true) + let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic) - dialSwitch.mount(proto) - proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) + await serverProto.handleMessage(DefaultPubsubTopic, message) + let handlerShouldHaveBeenCalled = await handlerFuture.withTimeout(1.seconds) + require handlerShouldHaveBeenCalled - proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = - discard + # Stop client node to test timeout unsubscription + await clientSwitch.stop() - let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 1.seconds) - - listenSwitch.mount(proto2) - - let id = (await proto.subscribe(rpc)).get() - - await sleepAsync(2.seconds) - - await proto2.handleMessage(defaultTopic, post) - - check: - # Check that subscription works as expected - (await responseCompletionFuture.withTimeout(3.seconds)) == true + await sleepAsync(5.milliseconds) - # Stop switch to test unsubscribe - discard dialSwitch.stop() - - await sleepAsync(2.seconds) + # First failure should not remove the subscription + await serverProto.handleMessage(DefaultPubsubTopic, message) + let + subscriptionsBeforeTimeout = serverProto.subscriptions.len() + failedPeersBeforeTimeout = serverProto.failedPeers.len() - #First failure should not remove the subscription - await proto2.handleMessage(defaultTopic, post) - - await sleepAsync(2000.millis) - check: - proto2.subscribers.len() == 1 - - #Second failure should remove the subscription - await proto2.handleMessage(defaultTopic, post) - - check: - proto2.subscribers.len() == 0 - - asyncTest "Handles failed clients coming back up": - const defaultTopic = "/waku/2/default-waku/proto" - - let - dialKey = PrivateKey.random(ECDSA, rng[]).get() - listenKey = PrivateKey.random(ECDSA, rng[]).get() - contentTopic = ContentTopic("/waku/2/default-content/proto") - post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) - - var dialSwitch = newStandardSwitch(privKey = some(dialKey), addrs = MultiAddress.init("/ip4/127.0.0.1/tcp/65000").tryGet()) - await dialSwitch.start() - - var listenSwitch = newStandardSwitch(some(listenKey)) - await listenSwitch.start() - - var responseCompletionFuture = newFuture[bool]() - proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = - check: - msg.messages.len() == 1 - msg.messages[0] == post - responseCompletionFuture.complete(true) - - let - proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) - rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true) - - dialSwitch.mount(proto) - proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) - - proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = - discard - - let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 2.seconds) - - listenSwitch.mount(proto2) - - let id = (await proto.subscribe(rpc)).get() - - await sleepAsync(2.seconds) - - await proto2.handleMessage(defaultTopic, post) - - check: - # Check that subscription works as expected - (await responseCompletionFuture.withTimeout(3.seconds)) == true - - responseCompletionFuture = newFuture[bool]() - - # Stop switch to test unsubscribe - await dialSwitch.stop() - + # Wait for peer connection failure timeout to elapse await sleepAsync(1.seconds) - #First failure should add to failure list - await proto2.handleMessage(defaultTopic, post) - - check: - proto2.failedPeers.len() == 1 - - # Start switch with same key as before - var dialSwitch2 = newStandardSwitch(some(dialKey), addrs = MultiAddress.init("/ip4/127.0.0.1/tcp/65000").tryGet()) - await dialSwitch2.start() - dialSwitch2.mount(proto) - #Second failure should remove the subscription - await proto2.handleMessage(defaultTopic, post) + await serverProto.handleMessage(DefaultPubsubTopic, message) + let + subscriptionsAfterTimeout = serverProto.subscriptions.len() + failedPeersAfterTimeout = serverProto.failedPeers.len() + ## Then check: - # Check that subscription works as expected - (await responseCompletionFuture.withTimeout(3.seconds)) == true + subscriptionsBeforeTimeout == 1 + failedPeersBeforeTimeout == 1 + subscriptionsAfterTimeout == 0 + failedPeersAfterTimeout == 0 - check: - proto2.failedPeers.len() == 0 + ## Cleanup + await serverSwitch.stop() + + asyncTest "peer subscription should not be dropped if connection recovers before timeout elapses": + ## Setup + let + clientKey = PrivateKey.random(ECDSA, rng[]).get() + clientAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/65000").get() - await dialSwitch2.stop() - await listenSwitch.stop() + let rng = crypto.newRng() + let + clientSwitch = newTestSwitch(some(clientKey), some(clientAddress)) + serverSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + ## Given + # Server + let + serverPeerManager = PeerManager.new(serverSwitch) + serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler, timeout=2.seconds) + serverSwitch.mount(serverProto) + + # Client + var handlerFuture = newFuture[void]() + proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} = + handlerFuture.complete() + + let + clientPeerManager = PeerManager.new(clientSwitch) + clientProto = WakuFilter.init(clientPeerManager, rng, handler) + clientSwitch.mount(clientProto) + + clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + + ## When + let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic) + + let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic]) + check resSubscription.isOk() + + await sleepAsync(5.milliseconds) + + await serverProto.handleMessage(DefaultPubsubTopic, message) + handlerFuture = newFuture[void]() + + let + subscriptionsBeforeFailure = serverProto.subscriptions.len() + failedPeersBeforeFailure = serverProto.failedPeers.len() + + # Stop switch to test unsubscribe + await clientSwitch.stop() + + await sleepAsync(5.milliseconds) + + # First failure should add to failure list + await serverProto.handleMessage(DefaultPubsubTopic, message) + handlerFuture = newFuture[void]() + + let + subscriptionsAfterFailure = serverProto.subscriptions.len() + failedPeersAfterFailure = serverProto.failedPeers.len() + + await sleepAsync(250.milliseconds) + + # Start switch with same key as before + var clientSwitch2 = newTestSwitch(some(clientKey), some(clientAddress)) + await clientSwitch2.start() + clientSwitch2.mount(clientProto) + + # If push succeeds after failure, the peer should removed from failed peers list + await serverProto.handleMessage(DefaultPubsubTopic, message) + let handlerShouldHaveBeenCalled = await handlerFuture.withTimeout(1.seconds) + + let + subscriptionsAfterSuccessfulConnection = serverProto.subscriptions.len() + failedPeersAfterSuccessfulConnection = serverProto.failedPeers.len() + + ## Then + check: + handlerShouldHaveBeenCalled + + check: + subscriptionsBeforeFailure == 1 + subscriptionsAfterFailure == 1 + subscriptionsAfterSuccessfulConnection == 1 + + check: + failedPeersBeforeFailure == 0 + failedPeersAfterFailure == 1 + failedPeersAfterSuccessfulConnection == 0 + + ## Cleanup + await allFutures(clientSwitch2.stop(), serverSwitch.stop()) diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 6293d0fea..b02ca8b1c 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -490,7 +490,7 @@ procSuite "Waku Store - fault tolerant store": ] for msg in msgList: - await proto.handleMessage(DEFAULT_PUBSUB_TOPIC, msg) + await proto.handleMessage(DefaultPubsubTopic, msg) let (listenSwitch2, dialSwitch2, proto2) = await newTestWakuStore() let msgList2 = @[ @@ -505,7 +505,7 @@ procSuite "Waku Store - fault tolerant store": ] for msg in msgList2: - await proto2.handleMessage(DEFAULT_PUBSUB_TOPIC, msg) + await proto2.handleMessage(DefaultPubsubTopic, msg) asyncTest "handle temporal history query with a valid time window": diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 2fb306f99..65c430e3a 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -18,7 +18,7 @@ import ../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/protocol/[waku_relay, waku_message], ../../waku/v2/protocol/waku_store, - ../../waku/v2/protocol/waku_filter/waku_filter, + ../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_lightpush, ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/peers, diff --git a/waku/v2/node/quicksim2.nim b/waku/v2/node/quicksim2.nim index 165f58b42..ab103bcd1 100644 --- a/waku/v2/node/quicksim2.nim +++ b/waku/v2/node/quicksim2.nim @@ -6,7 +6,7 @@ import json_rpc/[rpcclient, rpcserver], libp2p/protobuf/minprotobuf import - ../protocol/waku_filter/waku_filter_types, + ../protocol/waku_filter, ../protocol/waku_store, ../protocol/waku_message, ../utils/time, diff --git a/waku/v2/node/scripts/rpc_info.nim b/waku/v2/node/scripts/rpc_info.nim index 068405b3c..8de0ccae6 100644 --- a/waku/v2/node/scripts/rpc_info.nim +++ b/waku/v2/node/scripts/rpc_info.nim @@ -8,7 +8,7 @@ import ../wakunode2, ../waku_payload, ../jsonrpc/jsonrpc_types, - ../../protocol/waku_filter/waku_filter_types, + ../../protocol/waku_filter, ../../protocol/waku_store, ../../../v1/node/rpc/hexstrings diff --git a/waku/v2/node/scripts/rpc_publish.nim b/waku/v2/node/scripts/rpc_publish.nim index ba85a1ef4..a8e75eb2e 100644 --- a/waku/v2/node/scripts/rpc_publish.nim +++ b/waku/v2/node/scripts/rpc_publish.nim @@ -8,7 +8,7 @@ import ../wakunode2, ../waku_payload, ../jsonrpc/jsonrpc_types, - ../../protocol/waku_filter/waku_filter_types, + ../../protocol/waku_filter, ../../protocol/waku_store, ../../../v1/node/rpc/hexstrings diff --git a/waku/v2/node/scripts/rpc_query.nim b/waku/v2/node/scripts/rpc_query.nim index 3ff76357d..dc62a31d6 100644 --- a/waku/v2/node/scripts/rpc_query.nim +++ b/waku/v2/node/scripts/rpc_query.nim @@ -8,7 +8,7 @@ import ../wakunode2, ../waku_payload, ../jsonrpc/jsonrpc_types, - ../../protocol/waku_filter/waku_filter_types, + ../../protocol/waku_filter, ../../protocol/waku_store, ../../../v1/node/rpc/hexstrings diff --git a/waku/v2/node/scripts/rpc_subscribe.nim b/waku/v2/node/scripts/rpc_subscribe.nim index e15ed22bd..50b6b4788 100644 --- a/waku/v2/node/scripts/rpc_subscribe.nim +++ b/waku/v2/node/scripts/rpc_subscribe.nim @@ -7,7 +7,7 @@ import ../wakunode2, ../waku_payload, ../jsonrpc/jsonrpc_types, - ../../protocol/waku_filter/waku_filter_types, + ../../protocol/waku_filter, ../../protocol/waku_store, ../../../v1/node/rpc/hexstrings diff --git a/waku/v2/node/scripts/rpc_subscribe_filter.nim b/waku/v2/node/scripts/rpc_subscribe_filter.nim index 70094d08f..c3ed55cad 100644 --- a/waku/v2/node/scripts/rpc_subscribe_filter.nim +++ b/waku/v2/node/scripts/rpc_subscribe_filter.nim @@ -8,7 +8,7 @@ import ../wakunode2, ../waku_payload, ../jsonrpc/jsonrpc_types, - ../../protocol/waku_filter/waku_filter_types, + ../../protocol/waku_filter, ../../protocol/waku_store, ../../../v1/node/rpc/hexstrings diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 5e635539d..57737f96c 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -19,7 +19,7 @@ import ../protocol/[waku_relay, waku_message], ../protocol/waku_store, ../protocol/waku_swap/waku_swap, - ../protocol/waku_filter/waku_filter, + ../protocol/waku_filter, ../protocol/waku_lightpush, ../protocol/waku_rln_relay/[waku_rln_relay_types], ../utils/[peers, requests, wakuswitch, wakuenr], @@ -67,30 +67,6 @@ proc protocolMatcher(codec: string): Matcher = return match -proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilter]) {.gcsafe.} = - # Flatten all unsubscribe topics into single seq - let unsubscribeTopics = contentFilters.mapIt(it.contentTopic) - - debug "unsubscribing", unsubscribeTopics=unsubscribeTopics - - var rIdToRemove: seq[string] = @[] - for rId, f in filters.mpairs: - # Iterate filter entries to remove matching content topics - - # make sure we delete the content filter - # if no more topics are left - f.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopic notin unsubscribeTopics) - - if f.contentFilters.len == 0: - rIdToRemove.add(rId) - - # make sure we delete the filter entry - # if no more content filters left - for rId in rIdToRemove: - filters.del(rId) - - debug "filters modified", filters=filters - proc updateSwitchPeerInfo(node: WakuNode) = ## TODO: remove this when supported upstream ## @@ -217,7 +193,7 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, switch: switch, rng: rng, enr: enr, - filters: initTable[string, Filter](), + filters: Filters.init(), announcedAddresses: announcedAddresses ) @@ -279,18 +255,20 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa var id = generateRequestId(node.rng) if node.wakuFilter.isNil == false: - let idOpt = await node.wakuFilter.subscribe(request) + let + pubsubTopic = request.pubsubTopic + contentTopics = request.contentFilters.mapIt(it.contentTopic) + let resSubscription = await node.wakuFilter.subscribe(pubsubTopic, contentTopics) - if idOpt.isSome(): - # Subscribed successfully. - id = idOpt.get() + if resSubscription.isOk(): + id = resSubscription.get() else: # Failed to subscribe error "remote subscription to filter failed", filter = request waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) # Register handler for filter, whether remote subscription succeeded or not - node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler, pubSubTopic: request.pubSubTopic) + node.filters.addContentFilters(id, request.pubSubTopic, request.contentFilters, handler) waku_node_filters.set(node.filters.len.int64) proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = @@ -333,7 +311,10 @@ proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} = info "unsubscribe content", filter=request - await node.wakuFilter.unsubscribe(request) + let + pubsubTopic = request.pubsubTopic + contentTopics = request.contentFilters.mapIt(it.contentTopic) + discard await node.wakuFilter.unsubscribe(pubsubTopic, contentTopics) node.filters.removeContentFilters(request.contentFilters) waku_node_filters.set(node.filters.len.int64) @@ -420,10 +401,9 @@ proc info*(node: WakuNode): WakuInfo = let wakuInfo = WakuInfo(listenAddresses: listenStr, enrUri: enrUri) return wakuInfo -proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.raises: [Defect, KeyError, LPError]} = +proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.raises: [Defect, LPError]} = info "mounting filter" - proc filterHandler(requestId: string, msg: MessagePush) - {.async, gcsafe, raises: [Defect, KeyError].} = + proc filterHandler(requestId: string, msg: MessagePush) {.async, gcsafe.} = info "push received" for message in msg.messages: diff --git a/waku/v2/node/wakunode2_types.nim b/waku/v2/node/wakunode2_types.nim index 26a1a0945..7e3314a01 100644 --- a/waku/v2/node/wakunode2_types.nim +++ b/waku/v2/node/wakunode2_types.nim @@ -5,7 +5,7 @@ import ../protocol/waku_relay, ../protocol/waku_store, ../protocol/waku_swap/waku_swap, - ../protocol/waku_filter/waku_filter, + ../protocol/waku_filter, ../protocol/waku_lightpush, ../protocol/waku_rln_relay/waku_rln_relay_types, ./peer_manager/peer_manager, diff --git a/waku/v2/protocol/waku_filter.nim b/waku/v2/protocol/waku_filter.nim new file mode 100644 index 000000000..a4f6d728e --- /dev/null +++ b/waku/v2/protocol/waku_filter.nim @@ -0,0 +1,13 @@ +{.push raises: [Defect].} + +import + ./waku_filter/rpc, + ./waku_filter/rpc_codec, + ./waku_filter/protocol, + ./waku_filter/client + +export + rpc, + rpc_codec, + protocol, + client diff --git a/waku/v2/protocol/waku_filter/client.nim b/waku/v2/protocol/waku_filter/client.nim new file mode 100644 index 000000000..33580d9b5 --- /dev/null +++ b/waku/v2/protocol/waku_filter/client.nim @@ -0,0 +1,69 @@ +{.push raises: [Defect].} + +import + std/[tables, sequtils], + chronicles +import + ../waku_message, + ./rpc + +type + ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure, raises: [Defect].} + + Filter* = object + pubSubTopic*: string + contentFilters*: seq[ContentFilter] + handler*: ContentFilterHandler + + Filters* = Table[string, Filter] + + +proc init*(T: type Filters): T = + initTable[string, Filter]() + +proc addContentFilters*(filters: var Filters, requestId: string, pubsubTopic: string, contentFilters: seq[ContentFilter], handler: ContentFilterHandler) {.gcsafe.}= + filters[requestId] = Filter( + pubSubTopic: pubsubTopic, + contentFilters: contentFilters, + handler: handler + ) + +proc removeContentFilters*(filters: var Filters, contentFilters: seq[ContentFilter]) {.gcsafe.} = + # Flatten all unsubscribe topics into single seq + let unsubscribeTopics = contentFilters.mapIt(it.contentTopic) + + debug "unsubscribing", unsubscribeTopics=unsubscribeTopics + + var rIdToRemove: seq[string] = @[] + for rId, f in filters.mpairs: + # Iterate filter entries to remove matching content topics + + # make sure we delete the content filter + # if no more topics are left + f.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopic notin unsubscribeTopics) + + if f.contentFilters.len == 0: + rIdToRemove.add(rId) + + # make sure we delete the filter entry + # if no more content filters left + for rId in rIdToRemove: + filters.del(rId) + + debug "filters modified", filters=filters + +proc notify*(filters: Filters, msg: WakuMessage, requestId: string) = + for key, filter in filters.pairs: + # We do this because the key for the filter is set to the requestId received from the filter protocol. + # This means we do not need to check the content filter explicitly as all MessagePushs already contain + # the requestId of the coresponding filter. + if requestId != "" and requestId == key: + filter.handler(msg) + continue + + # TODO: In case of no topics we should either trigger here for all messages, + # or we should not allow such filter to exist in the first place. + for contentFilter in filter.contentFilters: + if msg.contentTopic == contentFilter.contentTopic: + filter.handler(msg) + break diff --git a/waku/v2/protocol/waku_filter/protocol.nim b/waku/v2/protocol/waku_filter/protocol.nim new file mode 100644 index 000000000..d1d4d40a6 --- /dev/null +++ b/waku/v2/protocol/waku_filter/protocol.nim @@ -0,0 +1,287 @@ +import + std/[options, sets, tables, sequtils], + stew/results, + chronicles, + chronos, + metrics, + bearssl, + libp2p/protocols/protocol, + libp2p/crypto/crypto +import + ../waku_message, + ../../node/peer_manager/peer_manager, + ../../utils/requests, + ./rpc, + ./rpc_codec + + +declarePublicGauge waku_filter_peers, "number of filter peers" +declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers" +declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"] +declarePublicGauge waku_filter_messages, "number of filter messages received", ["type"] + +logScope: + topics = "wakufilter" + + +const + # We add a 64kB safety buffer for protocol overhead. + # 10x-multiplier also for safety: currently we never + # push more than 1 message at a time. + MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024 + + WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" + WakuFilterTimeout: Duration = 2.hours + + +# Error types (metric label values) +const + dialFailure = "dial_failure" + decodeRpcFailure = "decode_rpc_failure" + peerNotFoundFailure = "peer_not_found_failure" + + +type Subscription = object + requestId: string + peer: PeerID + pubsubTopic: string + contentTopics: HashSet[ContentTopic] + + +proc addSubscription(subscriptions: var seq[Subscription], peer: PeerID, requestId: string, pubsubTopic: string, contentTopics: seq[ContentTopic]) = + let subscription = Subscription( + requestId: requestId, + peer: peer, + pubsubTopic: pubsubTopic, + contentTopics: toHashSet(contentTopics) + ) + subscriptions.add(subscription) + +proc removeSubscription(subscriptions: var seq[Subscription], peer: PeerId, unsubscribeTopics: seq[ContentTopic]) = + for sub in subscriptions.mitems: + if sub.peer != peer: + continue + + sub.contentTopics.excl(toHashSet(unsubscribeTopics)) + + # Delete the subscriber if no more content filters left + subscriptions.keepItIf(it.contentTopics.len > 0) + + +type + MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.} + + WakuFilterResult*[T] = Result[T, string] + + WakuFilter* = ref object of LPProtocol + rng*: ref BrHmacDrbgContext + peerManager*: PeerManager + pushHandler*: MessagePushHandler + subscriptions*: seq[Subscription] + failedPeers*: Table[string, chronos.Moment] + timeout*: chronos.Duration + +proc init(wf: WakuFilter) = + + proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + let message = await conn.readLp(MaxRpcSize.int) + + let res = FilterRPC.init(message) + if res.isErr(): + waku_filter_errors.inc(labelValues = [decodeRpcFailure]) + return + + trace "filter message received" + + let rpc = res.get() + + ## Filter request + # We are receiving a subscription/unsubscription request + if rpc.request != FilterRequest(): + waku_filter_messages.inc(labelValues = ["FilterRequest"]) + + let + requestId = rpc.requestId + subscribe = rpc.request.subscribe + pubsubTopic = rpc.request.pubsubTopic + contentTopics = rpc.request.contentFilters.mapIt(it.contentTopic) + + if subscribe: + info "added filter subscritpiton", peerId=conn.peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics + wf.subscriptions.addSubscription(conn.peerId, requestId, pubsubTopic, contentTopics) + else: + info "removed filter subscritpiton", peerId=conn.peerId, contentTopics=contentTopics + wf.subscriptions.removeSubscription(conn.peerId, contentTopics) + + waku_filter_subscribers.set(wf.subscriptions.len.int64) + + + ## Push message + # We are receiving a messages from the peer that we subscribed to + if rpc.push != MessagePush(): + waku_filter_messages.inc(labelValues = ["MessagePush"]) + + let + requestId = rpc.requestId + push = rpc.push + + info "received filter message push", peerId=conn.peerId + await wf.pushHandler(requestId, push) + + + wf.handler = handle + wf.codec = WakuFilterCodec + +proc init*(T: type WakuFilter, + peerManager: PeerManager, + rng: ref BrHmacDrbgContext, + handler: MessagePushHandler, + timeout: Duration = WakuFilterTimeout): T = + let wf = WakuFilter(rng: rng, + peerManager: peerManager, + pushHandler: handler, + timeout: timeout) + wf.init() + return wf + + +proc setPeer*(wf: WakuFilter, peer: RemotePeerInfo) = + wf.peerManager.addPeer(peer, WakuFilterCodec) + waku_filter_peers.inc() + + +proc sendFilterRpcToPeer(wf: WakuFilter, rpc: FilterRPC, peer: PeerId): Future[WakuFilterResult[void]] {.async, gcsafe.}= + let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) + if connOpt.isNone(): + return err(dialFailure) + + let connection = connOpt.get() + + await connection.writeLP(rpc.encode().buffer) + + return ok() + +proc sendFilterRpcToRemotePeer(wf: WakuFilter, rpc: FilterRPC, peer: RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}= + let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) + if connOpt.isNone(): + return err(dialFailure) + + let connection = connOpt.get() + + await connection.writeLP(rpc.encode().buffer) + + return ok() + + +### Send message to subscriptors +proc removePeerFromFailedPeersTable(wf: WakuFilter, subs: seq[Subscription]) = + ## Clear the failed peer table if subscriber was able to connect + for sub in subs: + wf.failedPeers.del($sub) + +proc handleClientError(wf: WakuFilter, subs: seq[Subscription]) {.raises: [Defect, KeyError].} = + ## If we have already failed to send message to this peer, + ## check for elapsed time and if it's been too long, remove the peer. + for sub in subs: + let subKey: string = $(sub) + + if not wf.failedPeers.hasKey(subKey): + # add the peer to the failed peers table. + wf.failedPeers[subKey] = Moment.now() + return + + let elapsedTime = Moment.now() - wf.failedPeers[subKey] + if elapsedTime > wf.timeout: + wf.failedPeers.del(subKey) + + let index = wf.subscriptions.find(sub) + wf.subscriptions.delete(index) + + +proc handleMessage*(wf: WakuFilter, pubsubTopic: string, msg: WakuMessage) {.async.} = + if wf.subscriptions.len <= 0: + return + + var failedSubscriptions: seq[Subscription] + var connectedSubscriptions: seq[Subscription] + + for sub in wf.subscriptions: + # TODO: Review when pubsubTopic can be empty and if it is a valid case + if sub.pubSubTopic != "" and sub.pubSubTopic != pubsubTopic: + continue + + if msg.contentTopic notin sub.contentTopics: + continue + + let rpc = FilterRPC( + requestId: sub.requestId, + push: MessagePush(messages: @[msg]) + ) + + let res = await wf.sendFilterRpcToPeer(rpc, sub.peer) + if res.isErr(): + waku_filter_errors.inc(labelValues = [res.error()]) + failedSubscriptions.add(sub) + continue + + connectedSubscriptions.add(sub) + + wf.removePeerFromFailedPeersTable(connectedSubscriptions) + + wf.handleClientError(failedSubscriptions) + + +### Send subscription/unsubscription + +proc subscribe(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic], peer: RemotePeerInfo): Future[WakuFilterResult[string]] {.async, gcsafe.} = + let id = generateRequestId(wf.rng) + let rpc = FilterRPC( + requestId: id, + request: FilterRequest( + subscribe: true, + pubSubTopic: pubsubTopic, + contentFilters: contentTopics.mapIt(ContentFilter(contentTopic: it)) + ) + ) + + let res = await wf.sendFilterRpcToRemotePeer(rpc, peer) + if res.isErr(): + waku_filter_errors.inc(labelValues = [res.error()]) + return err(res.error()) + + return ok(id) + +proc subscribe*(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic]): Future[WakuFilterResult[string]] {.async, gcsafe.} = + let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) + if peerOpt.isNone(): + waku_filter_errors.inc(labelValues = [peerNotFoundFailure]) + return err(peerNotFoundFailure) + + return await wf.subscribe(pubsubTopic, contentTopics, peerOpt.get()) + + +proc unsubscribe(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic], peer: RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.} = + let id = generateRequestId(wf.rng) + let rpc = FilterRPC( + requestId: id, + request: FilterRequest( + subscribe: false, + pubSubTopic: pubsubTopic, + contentFilters: contentTopics.mapIt(ContentFilter(contentTopic: it)) + ) + ) + + let res = await wf.sendFilterRpcToRemotePeer(rpc, peer) + if res.isErr(): + waku_filter_errors.inc(labelValues = [res.error()]) + return err(res.error()) + + return ok() + +proc unsubscribe*(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic]): Future[WakuFilterResult[void]] {.async, gcsafe.} = + let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) + if peerOpt.isNone(): + waku_filter_errors.inc(labelValues = [peerNotFoundFailure]) + return err(peerNotFoundFailure) + + return await wf.unsubscribe(pubsubTopic, contentTopics, peerOpt.get()) \ No newline at end of file diff --git a/waku/v2/protocol/waku_filter/rpc.nim b/waku/v2/protocol/waku_filter/rpc.nim new file mode 100644 index 000000000..daf7f8933 --- /dev/null +++ b/waku/v2/protocol/waku_filter/rpc.nim @@ -0,0 +1,18 @@ +import ../waku_message + +type + ContentFilter* = object + contentTopic*: ContentTopic + + FilterRequest* = object + contentFilters*: seq[ContentFilter] + pubSubTopic*: string + subscribe*: bool + + MessagePush* = object + messages*: seq[WakuMessage] + + FilterRPC* = object + requestId*: string + request*: FilterRequest + push*: MessagePush diff --git a/waku/v2/protocol/waku_filter/rpc_codec.nim b/waku/v2/protocol/waku_filter/rpc_codec.nim new file mode 100644 index 000000000..5fed99c89 --- /dev/null +++ b/waku/v2/protocol/waku_filter/rpc_codec.nim @@ -0,0 +1,109 @@ +{.push raises: [Defect].} + +import + libp2p/protobuf/minprotobuf, + libp2p/varint +import + ../waku_message, + ../../utils/protobuf, + ./rpc + + +proc encode*(filter: ContentFilter): ProtoBuffer = + var output = initProtoBuffer() + output.write3(1, filter.contentTopic) + output.finish3() + + return output + +proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + + var contentTopic: ContentTopic + discard ?pb.getField(1, contentTopic) + + return ok(ContentFilter(contentTopic: contentTopic)) + + +proc encode*(rpc: FilterRequest): ProtoBuffer = + var output = initProtoBuffer() + output.write3(1, uint64(rpc.subscribe)) + output.write3(2, rpc.pubSubTopic) + + for filter in rpc.contentFilters: + output.write3(3, filter.encode()) + + output.finish3() + + return output + +proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + + var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "") + + var subflag: uint64 + if ?pb.getField(1, subflag): + rpc.subscribe = bool(subflag) + + var pubSubTopic: string + discard ?pb.getField(2, pubSubTopic) + rpc.pubSubTopic = pubSubTopic + + var buffs: seq[seq[byte]] + discard ?pb.getRepeatedField(3, buffs) + for buf in buffs: + rpc.contentFilters.add(?ContentFilter.init(buf)) + + return ok(rpc) + + +proc encode*(push: MessagePush): ProtoBuffer = + var output = initProtoBuffer() + for push in push.messages: + output.write3(1, push.encode()) + output.finish3() + + return output + +proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + + var push = MessagePush() + + var messages: seq[seq[byte]] + discard ?pb.getRepeatedField(1, messages) + + for buf in messages: + push.messages.add(?WakuMessage.init(buf)) + + return ok(push) + + +proc encode*(rpc: FilterRPC): ProtoBuffer = + var output = initProtoBuffer() + output.write3(1, rpc.requestId) + output.write3(2, rpc.request.encode()) + output.write3(3, rpc.push.encode()) + output.finish3() + + return output + +proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + + var rpc = FilterRPC() + + var requestId: string + discard ?pb.getField(1, requestId) + rpc.requestId = requestId + + var requestBuffer: seq[byte] + discard ?pb.getField(2, requestBuffer) + rpc.request = ?FilterRequest.init(requestBuffer) + + var pushBuffer: seq[byte] + discard ?pb.getField(3, pushBuffer) + rpc.push = ?MessagePush.init(pushBuffer) + + return ok(rpc) diff --git a/waku/v2/protocol/waku_filter/waku_filter.nim b/waku/v2/protocol/waku_filter/waku_filter.nim deleted file mode 100644 index fc2b3a383..000000000 --- a/waku/v2/protocol/waku_filter/waku_filter.nim +++ /dev/null @@ -1,308 +0,0 @@ -{.push raises: [Defect].} - -import - std/[tables, sequtils, options], - bearssl, - chronos, chronicles, metrics, stew/results, - libp2p/protocols/pubsub/pubsubpeer, - libp2p/protocols/pubsub/floodsub, - libp2p/protocols/pubsub/gossipsub, - libp2p/protocols/protocol, - libp2p/protobuf/minprotobuf, - libp2p/stream/connection, - libp2p/crypto/crypto, - waku_filter_types, - ../../utils/requests, - ../../utils/protobuf, - ../../node/peer_manager/peer_manager - -# NOTE This is just a start, the design of this protocol isn't done yet. It -# should be direct payload exchange (a la req-resp), not be coupled with the -# relay protocol. - -export waku_filter_types - -declarePublicGauge waku_filter_peers, "number of filter peers" -declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers" -declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"] -declarePublicGauge waku_filter_messages, "number of filter messages received", ["type"] - -logScope: - topics = "wakufilter" - -const - WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" - WakuFilterTimeout: Duration = 1.days - -# Error types (metric label values) -const - dialFailure = "dial_failure" - decodeRpcFailure = "decode_rpc_failure" - -proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") {.raises: [Defect, KeyError]} = - for key in filters.keys: - let filter = filters[key] - # We do this because the key for the filter is set to the requestId received from the filter protocol. - # This means we do not need to check the content filter explicitly as all MessagePushs already contain - # the requestId of the coresponding filter. - if requestId != "" and requestId == key: - filter.handler(msg) - continue - - # TODO: In case of no topics we should either trigger here for all messages, - # or we should not allow such filter to exist in the first place. - for contentFilter in filter.contentFilters: - if msg.contentTopic == contentFilter.contentTopic: - filter.handler(msg) - break - -proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest, peerId: PeerID) = - # Flatten all unsubscribe topics into single seq - let unsubscribeTopics = request.contentFilters.mapIt(it.contentTopic) - debug "unsubscribing", peerId=peerId, unsubscribeTopics=unsubscribeTopics - - for subscriber in subscribers.mitems: - if subscriber.peer != peerId: continue - - # make sure we delete the content filter - # if no more topics are left - subscriber.filter.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopic notin unsubscribeTopics) - - # make sure we delete the subscriber - # if no more content filters left - subscribers.keepIf(proc (s: auto): bool = s.filter.contentFilters.len > 0) - - debug "subscribers modified", subscribers=subscribers - # @TODO: metrics? - -proc encode*(filter: ContentFilter): ProtoBuffer = - var output = initProtoBuffer() - - output.write3(1, filter.contentTopic) - - output.finish3() - - return output - -proc encode*(rpc: FilterRequest): ProtoBuffer = - var output = initProtoBuffer() - - output.write3(1, uint64(rpc.subscribe)) - - output.write3(2, rpc.pubSubTopic) - - for filter in rpc.contentFilters: - output.write3(3, filter.encode()) - - output.finish3() - - return output - -proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = - let pb = initProtoBuffer(buffer) - - var contentTopic: ContentTopic - discard ? pb.getField(1, contentTopic) - - return ok(ContentFilter(contentTopic: contentTopic)) - -proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = - var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "") - let pb = initProtoBuffer(buffer) - - var subflag: uint64 - if ? pb.getField(1, subflag): - rpc.subscribe = bool(subflag) - - discard ? pb.getField(2, rpc.pubSubTopic) - - var buffs: seq[seq[byte]] - discard ? pb.getRepeatedField(3, buffs) - - for buf in buffs: - rpc.contentFilters.add(? ContentFilter.init(buf)) - - return ok(rpc) - -proc encode*(push: MessagePush): ProtoBuffer = - var output = initProtoBuffer() - - for push in push.messages: - output.write3(1, push.encode()) - - output.finish3() - - return output - -proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = - var push = MessagePush() - let pb = initProtoBuffer(buffer) - - var messages: seq[seq[byte]] - discard ? pb.getRepeatedField(1, messages) - - for buf in messages: - push.messages.add(? WakuMessage.init(buf)) - - return ok(push) - -proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = - var rpc = FilterRPC() - let pb = initProtoBuffer(buffer) - - discard ? pb.getField(1, rpc.requestId) - - var requestBuffer: seq[byte] - discard ? pb.getField(2, requestBuffer) - - rpc.request = ? FilterRequest.init(requestBuffer) - - var pushBuffer: seq[byte] - discard ? pb.getField(3, pushBuffer) - - rpc.push = ? MessagePush.init(pushBuffer) - - return ok(rpc) - -proc encode*(rpc: FilterRPC): ProtoBuffer = - var output = initProtoBuffer() - - output.write3(1, rpc.requestId) - output.write3(2, rpc.request.encode()) - output.write3(3, rpc.push.encode()) - - output.finish3() - - return output - -method init*(wf: WakuFilter) = - proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - var message = await conn.readLp(MaxRpcSize.int) - var res = FilterRPC.init(message) - if res.isErr: - error "failed to decode rpc" - waku_filter_errors.inc(labelValues = [decodeRpcFailure]) - return - - info "filter message received" - - let value = res.value - if value.push != MessagePush(): - waku_filter_messages.inc(labelValues = ["MessagePush"]) - await wf.pushHandler(value.requestId, value.push) - if value.request != FilterRequest(): - waku_filter_messages.inc(labelValues = ["FilterRequest"]) - if value.request.subscribe: - wf.subscribers.add(Subscriber(peer: conn.peerId, requestId: value.requestId, filter: value.request)) - else: - wf.subscribers.unsubscribeFilters(value.request, conn.peerId) - - waku_filter_subscribers.set(wf.subscribers.len.int64) - - wf.handler = handle - wf.codec = WakuFilterCodec - -proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: MessagePushHandler,timeout: Duration = WakuFilterTimeout): T = - let rng = crypto.newRng() - var wf = WakuFilter(rng: rng, - peerManager: peerManager, - pushHandler: handler, - timeout: timeout) - wf.init() - return wf - -proc setPeer*(wf: WakuFilter, peer: RemotePeerInfo) = - wf.peerManager.addPeer(peer, WakuFilterCodec) - waku_filter_peers.inc() - -#clear the failed peer table if subscriber was able to connect. -proc handleClientSuccess(wf: WakuFilter, subscribers: seq[Subscriber]){.raises: [Defect, KeyError].} = - for subscriber in subscribers: - var subKey: string = $(subscriber) - if wf.failedPeers.hasKey(subKey): - wf.failedPeers.del(subKey) - -# If we have already failed to send message to this peer, -# check for elapsed time and if it's been too long, remove the peer. -proc handleClientError(wf: WakuFilter, subscribers: seq[Subscriber]){.raises: [Defect, KeyError].} = - for subscriber in subscribers: - var subKey: string = $(subscriber) - if wf.failedPeers.hasKey(subKey): - var elapsedTime = Moment.now() - wf.failedPeers[subKey] - if(elapsedTime > wf.timeout): - trace "Remove peer if timeout has reached for", peer=subscriber - var index = wf.subscribers.find(subscriber) - wf.subscribers.delete(index) - wf.failedPeers.del(subKey) - else: - # add the peer to the failed peers table. - wf.failedPeers[subKey] = Moment.now() - return - -proc handleMessage*(wf: WakuFilter, topic: string, msg: WakuMessage) {.async.} = - # Handle WakuMessage according to filter protocol - trace "handle message in WakuFilter", topic=topic, msg=msg - var handleMessageFailed = false - var failedSubscriber: seq[Subscriber] - var connectedSubscribers: seq[Subscriber] - for subscriber in wf.subscribers: - if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic: - trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic - continue - - for filter in subscriber.filter.contentFilters: - if msg.contentTopic == filter.contentTopic: - trace "Found matching contentTopic", filter=filter, msg=msg - let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg])) - let connOpt = await wf.peerManager.dialPeer(subscriber.peer, WakuFilterCodec) - if connOpt.isSome: - await connOpt.get().writeLP(push.encode().buffer) - connectedSubscribers.add(subscriber) - else: - # @TODO more sophisticated error handling here - handleMessageFailed = true - failedSubscriber.add(subscriber) - error "failed to push messages to remote peer" - waku_filter_errors.inc(labelValues = [dialFailure]) - break - handleClientSuccess(wf, connectedSubscribers) - if handleMessageFailed: - handleClientError(wf, failedSubscriber) - -proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} = - let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) - - if peerOpt.isSome: - let peer = peerOpt.get() - - let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) - - if connOpt.isSome: - # This is the only successful path to subscription - let id = generateRequestId(wf.rng) - await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer) - return some(id) - else: - # @TODO more sophisticated error handling here - error "failed to connect to remote peer" - waku_filter_errors.inc(labelValues = [dialFailure]) - return none(string) - -proc unsubscribe*(wf: WakuFilter, request: FilterRequest) {.async, gcsafe.} = - # @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC. - let - id = generateRequestId(wf.rng) - peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) - - if peerOpt.isSome: - # @TODO: if there are more than one WakuFilter peer, WakuFilter should unsubscribe from all peers - let peer = peerOpt.get() - - let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) - - if connOpt.isSome: - await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer) - else: - # @TODO more sophisticated error handling here - error "failed to connect to remote peer" - waku_filter_errors.inc(labelValues = [dialFailure]) diff --git a/waku/v2/protocol/waku_filter/waku_filter_types.nim b/waku/v2/protocol/waku_filter/waku_filter_types.nim deleted file mode 100644 index f4d45b197..000000000 --- a/waku/v2/protocol/waku_filter/waku_filter_types.nim +++ /dev/null @@ -1,59 +0,0 @@ -import - std/[tables], - chronos, - bearssl, - libp2p/protocols/protocol, - ../../node/peer_manager/peer_manager, - ../waku_message - -export waku_message - -const - # We add a 64kB safety buffer for protocol overhead. - # 10x-multiplier also for safety: currently we never - # push more than 1 message at a time. - MaxRpcSize* = 10 * MaxWakuMessageSize + 64*1024 - -type - PubSubTopic* = string - - ContentFilter* = object - contentTopic*: ContentTopic - - ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure, raises: [Defect].} - - Filter* = object - contentFilters*: seq[ContentFilter] - pubSubTopic*: PubSubTopic - handler*: ContentFilterHandler - - # @TODO MAYBE MORE INFO? - Filters* = Table[string, Filter] - - FilterRequest* = object - contentFilters*: seq[ContentFilter] - pubSubTopic*: PubSubTopic - subscribe*: bool - - MessagePush* = object - messages*: seq[WakuMessage] - - FilterRPC* = object - requestId*: string - request*: FilterRequest - push*: MessagePush - - Subscriber* = object - peer*: PeerID - requestId*: string - filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN? - - MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.} - - WakuFilter* = ref object of LPProtocol - rng*: ref BrHmacDrbgContext - peerManager*: PeerManager - subscribers*: seq[Subscriber] - pushHandler*: MessagePushHandler - failedPeers*: Table[string, chronos.Moment] - timeout*: chronos.Duration