sync/filter-spec-alpha6 (#184)

This commit is contained in:
Dean Eigenmann 2020-09-24 14:06:41 +02:00 committed by GitHub
parent 1f68e63185
commit 29706734a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 11 additions and 4 deletions

View File

@ -5,6 +5,7 @@
import
std/tables,
chronos,
random,
libp2p/[switch, peerinfo, multiaddress, crypto/crypto],
libp2p/protobuf/minprotobuf,
libp2p/protocols/protocol,
@ -70,11 +71,13 @@ type
messages*: seq[WakuMessage]
FilterRPC* = object
requestId*: string
request*: FilterRequest
push*: MessagePush
Subscriber* = object
peer*: PeerInfo
requestId*: string
filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN?
MessagePushHandler* = proc(msg: MessagePush): Future[void] {.gcsafe, closure.}
@ -124,3 +127,7 @@ proc notify*(filters: Filters, msg: WakuMessage) =
if filter.contentFilter.topics.len > 0:
if msg.contentTopic in filter.contentFilter.topics:
filter.handler(msg.payload)
proc generateRequestId*(): string =
for _ in .. 10:
add(result, char(rand(int('A') .. int('z'))))

View File

@ -19,7 +19,7 @@ logScope:
topics = "wakufilter"
const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha5"
WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha6"
proc encode*(filter: ContentFilter): ProtoBuffer =
result = initProtoBuffer()
@ -109,7 +109,7 @@ method init*(wf: WakuFilter) =
if value.push != MessagePush():
await wf.pushHandler(value.push)
if value.request != FilterRequest():
wf.subscribers.add(Subscriber(peer: conn.peerInfo, filter: value.request))
wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request))
wf.handler = handle
wf.codec = WakuFilterCodec
@ -130,7 +130,7 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
for filter in subscriber.filter.contentFilter:
if msg.contentTopic in filter.topics:
let push = FilterRPC(push: MessagePush(messages: @[msg]))
let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg]))
let conn = await proto.switch.dial(subscriber.peer.peerId, subscriber.peer.addrs, WakuFilterCodec)
await conn.writeLP(push.encode().buffer)
break
@ -139,4 +139,4 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
proc subscribe*(wf: WakuFilter, peer: PeerInfo, request: FilterRequest) {.async, gcsafe.} =
let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec)
await conn.writeLP(FilterRPC(request: request).encode().buffer)
await conn.writeLP(FilterRPC(requestId: generateRequestId(), request: request).encode().buffer)