mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-24 21:08:38 +00:00
fix/make-filter-work (#182)
* starts to actually get the filter protocol working * made it work * Update all_tests_v2.nim * Update test_waku_filter.nim
This commit is contained in:
parent
fabccdeca8
commit
e1414ac922
@ -3,6 +3,5 @@ import
|
|||||||
# TODO: enable this when it is altered into a proper waku relay test
|
# TODO: enable this when it is altered into a proper waku relay test
|
||||||
# ./v2/test_waku,
|
# ./v2/test_waku,
|
||||||
./v2/test_wakunode,
|
./v2/test_wakunode,
|
||||||
./v2/test_waku_store
|
./v2/test_waku_store,
|
||||||
# NOTE: Disabling broken filter protocol, we don't rely on it for Nangang
|
./v2/test_waku_filter
|
||||||
# ./v2/test_waku_filter
|
|
||||||
|
@ -17,53 +17,47 @@ import
|
|||||||
procSuite "Waku Filter":
|
procSuite "Waku Filter":
|
||||||
|
|
||||||
asyncTest "handle filter":
|
asyncTest "handle filter":
|
||||||
|
|
||||||
let
|
let
|
||||||
proto = WakuFilter.init()
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||||
subscription = proto.subscription()
|
peer = PeerInfo.init(key)
|
||||||
|
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "pew2")
|
||||||
|
|
||||||
|
var dialSwitch = newStandardSwitch()
|
||||||
|
discard await dialSwitch.start()
|
||||||
|
|
||||||
|
var listenSwitch = newStandardSwitch(some(key))
|
||||||
|
discard await listenSwitch.start()
|
||||||
|
|
||||||
|
var completionFut = newFuture[bool]()
|
||||||
|
proc handle(msg: MessagePush) {.async, gcsafe, closure.} =
|
||||||
|
check:
|
||||||
|
msg.messages.len() == 1
|
||||||
|
msg.messages[0] == post
|
||||||
|
completionFut.complete(true)
|
||||||
|
|
||||||
|
let
|
||||||
|
proto = WakuFilter.init(dialSwitch, handle)
|
||||||
|
rpc = FilterRequest(contentFilter: @[ContentFilter(topics: @["pew", "pew2"])], topic: "topic")
|
||||||
|
|
||||||
|
dialSwitch.mount(proto)
|
||||||
|
|
||||||
|
proc emptyHandle(msg: MessagePush) {.async, gcsafe, closure.} =
|
||||||
|
discard
|
||||||
|
|
||||||
|
let
|
||||||
|
proto2 = WakuFilter.init(listenSwitch, emptyHandle)
|
||||||
|
subscription = proto2.subscription()
|
||||||
|
|
||||||
var subscriptions = newTable[string, MessageNotificationSubscription]()
|
var subscriptions = newTable[string, MessageNotificationSubscription]()
|
||||||
subscriptions["test"] = subscription
|
subscriptions["test"] = subscription
|
||||||
|
listenSwitch.mount(proto2)
|
||||||
|
|
||||||
let
|
await proto.subscribe(listenSwitch.peerInfo, rpc)
|
||||||
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
|
|
||||||
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "pew")
|
|
||||||
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "pew2")
|
|
||||||
|
|
||||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
|
||||||
let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get()
|
|
||||||
let remotePeerInfo = PeerInfo.init(
|
|
||||||
remoteSecKey,
|
|
||||||
[ma],
|
|
||||||
["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
|
|
||||||
)
|
|
||||||
|
|
||||||
var serverFut: Future[void]
|
|
||||||
let msListen = newMultistream()
|
|
||||||
|
|
||||||
msListen.addHandler(WakuFilterCodec, proto)
|
|
||||||
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
|
|
||||||
await msListen.handle(conn)
|
|
||||||
|
|
||||||
var transport1 = TcpTransport.init()
|
|
||||||
serverFut = await transport1.listen(ma, connHandler)
|
|
||||||
|
|
||||||
let msDial = newMultistream()
|
|
||||||
let transport2: TcpTransport = TcpTransport.init()
|
|
||||||
let conn = await transport2.dial(transport1.ma)
|
|
||||||
|
|
||||||
var rpc = FilterRequest(contentFilter: @[waku_filter.ContentFilter(topics: @["pew", "pew2"])], topic: "topic")
|
|
||||||
discard await msDial.select(conn, WakuFilterCodec)
|
|
||||||
await conn.writeLP(rpc.encode().buffer)
|
|
||||||
|
|
||||||
await sleepAsync(2.seconds)
|
await sleepAsync(2.seconds)
|
||||||
|
|
||||||
subscriptions.notify("topic", msg)
|
await subscriptions.notify("topic", post)
|
||||||
subscriptions.notify("topic", msg2)
|
|
||||||
|
|
||||||
var message = await conn.readLp(64*1024)
|
|
||||||
|
|
||||||
let response = MessagePush.init(message)
|
|
||||||
let res = response.value
|
|
||||||
check:
|
check:
|
||||||
res.messages.len() == 1
|
(await completionFut.withTimeout(5.seconds)) == true
|
||||||
res.messages[0] == msg
|
|
||||||
|
@ -61,12 +61,20 @@ type
|
|||||||
MessagePush* = object
|
MessagePush* = object
|
||||||
messages*: seq[WakuMessage]
|
messages*: seq[WakuMessage]
|
||||||
|
|
||||||
|
FilterRPC* = object
|
||||||
|
request*: FilterRequest
|
||||||
|
push*: MessagePush
|
||||||
|
|
||||||
Subscriber* = object
|
Subscriber* = object
|
||||||
connection*: Connection
|
peer*: PeerInfo
|
||||||
filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN?
|
filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN?
|
||||||
|
|
||||||
|
MessagePushHandler* = proc(msg: MessagePush): Future[void] {.gcsafe, closure.}
|
||||||
|
|
||||||
WakuFilter* = ref object of LPProtocol
|
WakuFilter* = ref object of LPProtocol
|
||||||
|
switch*: Switch
|
||||||
subscribers*: seq[Subscriber]
|
subscribers*: seq[Subscriber]
|
||||||
|
pushHandler*: MessagePushHandler
|
||||||
|
|
||||||
# NOTE based on Eth2Node in NBC eth2_network.nim
|
# NOTE based on Eth2Node in NBC eth2_network.nim
|
||||||
WakuNode* = ref object of RootObj
|
WakuNode* = ref object of RootObj
|
||||||
|
@ -106,7 +106,10 @@ proc start*(node: WakuNode) {.async.} =
|
|||||||
node.switch.mount(node.wakuStore)
|
node.switch.mount(node.wakuStore)
|
||||||
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())
|
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())
|
||||||
|
|
||||||
node.wakuFilter = WakuFilter.init()
|
proc pushHandler(msg: MessagePush) {.async, gcsafe.} =
|
||||||
|
info "push received"
|
||||||
|
|
||||||
|
node.wakuFilter = WakuFilter.init(node.switch, pushHandler)
|
||||||
node.switch.mount(node.wakuFilter)
|
node.switch.mount(node.wakuFilter)
|
||||||
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())
|
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())
|
||||||
|
|
||||||
|
@ -7,6 +7,7 @@ import
|
|||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
libp2p/stream/connection,
|
libp2p/stream/connection,
|
||||||
|
libp2p/switch,
|
||||||
./message_notifier,
|
./message_notifier,
|
||||||
./../../node/v2/waku_types
|
./../../node/v2/waku_types
|
||||||
|
|
||||||
@ -74,38 +75,68 @@ proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] =
|
|||||||
|
|
||||||
ok(push)
|
ok(push)
|
||||||
|
|
||||||
|
proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] =
|
||||||
|
var rpc = FilterRPC()
|
||||||
|
let pb = initProtoBuffer(buffer)
|
||||||
|
|
||||||
|
var requestBuffer: seq[byte]
|
||||||
|
discard ? pb.getField(1, requestBuffer)
|
||||||
|
|
||||||
|
rpc.request = ? FilterRequest.init(requestBuffer)
|
||||||
|
|
||||||
|
var pushBuffer: seq[byte]
|
||||||
|
discard ? pb.getField(2, pushBuffer)
|
||||||
|
|
||||||
|
rpc.push = ? MessagePush.init(pushBuffer)
|
||||||
|
|
||||||
|
ok(rpc)
|
||||||
|
|
||||||
|
proc encode*(rpc: FilterRPC): ProtoBuffer =
|
||||||
|
result = initProtoBuffer()
|
||||||
|
|
||||||
|
result.write(1, rpc.request.encode())
|
||||||
|
result.write(2, rpc.push.encode())
|
||||||
|
|
||||||
method init*(wf: WakuFilter) =
|
method init*(wf: WakuFilter) =
|
||||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||||
var message = await conn.readLp(64*1024)
|
var message = await conn.readLp(64*1024)
|
||||||
var res = FilterRequest.init(message)
|
var res = FilterRPC.init(message)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
|
error "failed to decode rpc"
|
||||||
return
|
return
|
||||||
|
|
||||||
wf.subscribers.add(Subscriber(connection: conn, filter: res.value))
|
let value = res.value
|
||||||
# @TODO THIS IS A VERY ROUGH EXPERIMENT
|
if value.push != MessagePush():
|
||||||
|
await wf.pushHandler(value.push)
|
||||||
|
if value.request != FilterRequest():
|
||||||
|
wf.subscribers.add(Subscriber(peer: conn.peerInfo, filter: value.request))
|
||||||
|
|
||||||
wf.handler = handle
|
wf.handler = handle
|
||||||
wf.codec = WakuFilterCodec
|
wf.codec = WakuFilterCodec
|
||||||
|
|
||||||
proc init*(T: type WakuFilter): T =
|
proc init*(T: type WakuFilter, switch: Switch, handler: MessagePushHandler): T =
|
||||||
new result
|
new result
|
||||||
|
result.switch = switch
|
||||||
|
result.pushHandler = handler
|
||||||
result.init()
|
result.init()
|
||||||
|
|
||||||
proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
|
proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
|
||||||
## Returns a Filter for the specific protocol
|
## Returns a Filter for the specific protocol
|
||||||
## This filter can then be used to send messages to subscribers that match conditions.
|
## This filter can then be used to send messages to subscribers that match conditions.
|
||||||
proc handle(topic: string, msg: WakuMessage) {.async.} =
|
proc handle(topic: string, msg: WakuMessage) {.async.} =
|
||||||
var futures = newSeq[Future[void]]()
|
|
||||||
|
|
||||||
for subscriber in proto.subscribers:
|
for subscriber in proto.subscribers:
|
||||||
if subscriber.filter.topic != topic:
|
if subscriber.filter.topic != topic:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
for filter in subscriber.filter.contentFilter:
|
for filter in subscriber.filter.contentFilter:
|
||||||
if msg.contentTopic in filter.topics:
|
if msg.contentTopic in filter.topics:
|
||||||
futures.add(subscriber.connection.writeLp(MessagePush(messages: @[msg]).encode().buffer))
|
let push = FilterRPC(push: MessagePush(messages: @[msg]))
|
||||||
|
let conn = await proto.switch.dial(subscriber.peer.peerId, subscriber.peer.addrs, WakuFilterCodec)
|
||||||
|
await conn.writeLP(push.encode().buffer)
|
||||||
break
|
break
|
||||||
|
|
||||||
await allFutures(futures)
|
|
||||||
|
|
||||||
MessageNotificationSubscription.init(@[], handle)
|
MessageNotificationSubscription.init(@[], handle)
|
||||||
|
|
||||||
|
proc subscribe*(wf: WakuFilter, peer: PeerInfo, request: FilterRequest) {.async, gcsafe.} =
|
||||||
|
let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec)
|
||||||
|
await conn.writeLP(FilterRPC(request: request).encode().buffer)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user