From 29706734a8a9672472e5d0cdcdfe7112159bc075 Mon Sep 17 00:00:00 2001 From: Dean Eigenmann <7621705+decanus@users.noreply.github.com> Date: Thu, 24 Sep 2020 14:06:41 +0200 Subject: [PATCH] sync/filter-spec-alpha6 (#184) --- waku/node/v2/waku_types.nim | 7 +++++++ waku/protocol/v2/waku_filter.nim | 8 ++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index a140fc5ba..2597223fb 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -5,6 +5,7 @@ import std/tables, chronos, + random, libp2p/[switch, peerinfo, multiaddress, crypto/crypto], libp2p/protobuf/minprotobuf, libp2p/protocols/protocol, @@ -70,11 +71,13 @@ type messages*: seq[WakuMessage] FilterRPC* = object + requestId*: string request*: FilterRequest push*: MessagePush Subscriber* = object peer*: PeerInfo + requestId*: string filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN? MessagePushHandler* = proc(msg: MessagePush): Future[void] {.gcsafe, closure.} @@ -124,3 +127,7 @@ proc notify*(filters: Filters, msg: WakuMessage) = if filter.contentFilter.topics.len > 0: if msg.contentTopic in filter.contentFilter.topics: filter.handler(msg.payload) + +proc generateRequestId*(): string = + for _ in .. 10: + add(result, char(rand(int('A') .. int('z')))) \ No newline at end of file diff --git a/waku/protocol/v2/waku_filter.nim b/waku/protocol/v2/waku_filter.nim index 76a0a8ed2..2292b33af 100644 --- a/waku/protocol/v2/waku_filter.nim +++ b/waku/protocol/v2/waku_filter.nim @@ -19,7 +19,7 @@ logScope: topics = "wakufilter" const - WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha5" + WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha6" proc encode*(filter: ContentFilter): ProtoBuffer = result = initProtoBuffer() @@ -109,7 +109,7 @@ method init*(wf: WakuFilter) = if value.push != MessagePush(): await wf.pushHandler(value.push) if value.request != FilterRequest(): - wf.subscribers.add(Subscriber(peer: conn.peerInfo, filter: value.request)) + wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request)) wf.handler = handle wf.codec = WakuFilterCodec @@ -130,7 +130,7 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription = for filter in subscriber.filter.contentFilter: if msg.contentTopic in filter.topics: - let push = FilterRPC(push: MessagePush(messages: @[msg])) + let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg])) let conn = await proto.switch.dial(subscriber.peer.peerId, subscriber.peer.addrs, WakuFilterCodec) await conn.writeLP(push.encode().buffer) break @@ -139,4 +139,4 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription = proc subscribe*(wf: WakuFilter, peer: PeerInfo, request: FilterRequest) {.async, gcsafe.} = let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec) - await conn.writeLP(FilterRPC(request: request).encode().buffer) + await conn.writeLP(FilterRPC(requestId: generateRequestId(), request: request).encode().buffer)