fix(filter): waku filter rpc codec support optional fields

This commit is contained in:
Lorenzo Delgado 2022-11-18 18:37:08 +01:00 committed by GitHub
parent 1d72ee31b0
commit af4fb5f5a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 74 additions and 40 deletions

View File

@ -19,6 +19,8 @@ proc write3*(proto: var ProtoBuffer, field: int, value: auto) =
when value is Option: when value is Option:
if value.isSome(): if value.isSome():
proto.write(field, value.get()) proto.write(field, value.get())
elif value is bool:
proto.write(field, zint(value))
else: else:
proto.write(field, value) proto.write(field, value)

View File

@ -98,7 +98,7 @@ proc initProtocolHandler(wf: WakuFilterClient) =
let rpc = decodeReqRes.get() let rpc = decodeReqRes.get()
trace "filter message received" trace "filter message received"
if rpc.push == MessagePush(): if rpc.push.isNone():
waku_filter_errors.inc(labelValues = [emptyMessagePushFailure]) waku_filter_errors.inc(labelValues = [emptyMessagePushFailure])
# TODO: Manage the empty push message error. Perform any action? # TODO: Manage the empty push message error. Perform any action?
return return
@ -108,7 +108,7 @@ proc initProtocolHandler(wf: WakuFilterClient) =
let let
peerId = conn.peerId peerId = conn.peerId
requestId = rpc.requestId requestId = rpc.requestId
push = rpc.push push = rpc.push.get()
info "received filter message push", peerId=conn.peerId, requestId=requestId info "received filter message push", peerId=conn.peerId, requestId=requestId
wf.handleMessagePush(peerId, requestId, push) wf.handleMessagePush(peerId, requestId, push)
@ -149,11 +149,11 @@ proc sendFilterRequestRpc(wf: WakuFilterClient,
let rpc = FilterRpc( let rpc = FilterRpc(
requestId: requestId, requestId: requestId,
request: FilterRequest( request: some(FilterRequest(
subscribe: subscribe, subscribe: subscribe,
pubSubTopic: pubsubTopic, pubSubTopic: pubsubTopic,
contentFilters: contentFilters contentFilters: contentFilters
) ))
) )
let sendRes = await wf.sendFilterRpc(rpc, peer) let sendRes = await wf.sendFilterRpc(rpc, peer)

View File

@ -72,9 +72,9 @@ type
proc handleFilterRequest(wf: WakuFilter, peerId: PeerId, rpc: FilterRPC) = proc handleFilterRequest(wf: WakuFilter, peerId: PeerId, rpc: FilterRPC) =
let let
requestId = rpc.requestId requestId = rpc.requestId
subscribe = rpc.request.subscribe subscribe = rpc.request.get().subscribe
pubsubTopic = rpc.request.pubsubTopic pubsubTopic = rpc.request.get().pubsubTopic
contentTopics = rpc.request.contentFilters.mapIt(it.contentTopic) contentTopics = rpc.request.get().contentFilters.mapIt(it.contentTopic)
if subscribe: if subscribe:
info "added filter subscritpiton", peerId=peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics info "added filter subscritpiton", peerId=peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics
@ -101,7 +101,7 @@ proc initProtocolHandler(wf: WakuFilter) =
## Filter request ## Filter request
# Subscription/unsubscription request # Subscription/unsubscription request
if rpc.request == FilterRequest(): if rpc.request.isNone():
waku_filter_errors.inc(labelValues = [emptyFilterRequestFailure]) waku_filter_errors.inc(labelValues = [emptyFilterRequestFailure])
# TODO: Manage the empty filter request message error. Perform any action? # TODO: Manage the empty filter request message error. Perform any action?
return return
@ -185,7 +185,7 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, msg: WakuMessage)
let rpc = FilterRPC( let rpc = FilterRPC(
requestId: sub.requestId, requestId: sub.requestId,
push: MessagePush(messages: @[msg]) push: some(MessagePush(messages: @[msg]))
) )
let res = await wf.sendFilterRpc(rpc, sub.peer) let res = await wf.sendFilterRpc(rpc, sub.peer)

View File

@ -1,12 +1,21 @@
import ../waku_message when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/options
import
../waku_message
type type
ContentFilter* = object ContentFilter* = object
contentTopic*: ContentTopic contentTopic*: string
FilterRequest* = object FilterRequest* = object
contentFilters*: seq[ContentFilter] contentFilters*: seq[ContentFilter]
pubSubTopic*: string pubsubTopic*: string
subscribe*: bool subscribe*: bool
MessagePush* = object MessagePush* = object
@ -14,5 +23,5 @@ type
FilterRPC* = object FilterRPC* = object
requestId*: string requestId*: string
request*: FilterRequest request*: Option[FilterRequest]
push*: MessagePush push*: Option[MessagePush]

View File

@ -3,6 +3,8 @@ when (NimMajor, NimMinor) < (1, 4):
else: else:
{.push raises: [].} {.push raises: [].}
import
std/options
import import
../../../common/protobuf, ../../../common/protobuf,
../waku_message, ../waku_message,
@ -24,17 +26,21 @@ proc encode*(filter: ContentFilter): ProtoBuffer =
proc decode*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var rpc = ContentFilter()
var contentTopic: ContentTopic var contentTopic: string
discard ?pb.getField(1, contentTopic) if not ?pb.getField(1, contentTopic):
return err(ProtoError.RequiredFieldMissing)
else:
rpc.contentTopic = contentTopic
ok(ContentFilter(contentTopic: contentTopic)) ok(rpc)
proc encode*(rpc: FilterRequest): ProtoBuffer = proc encode*(rpc: FilterRequest): ProtoBuffer =
var pb = initProtoBuffer() var pb = initProtoBuffer()
pb.write3(1, uint64(rpc.subscribe)) pb.write3(1, rpc.subscribe)
pb.write3(2, rpc.pubSubTopic) pb.write3(2, rpc.pubSubTopic)
for filter in rpc.contentFilters: for filter in rpc.contentFilters:
@ -46,20 +52,27 @@ proc encode*(rpc: FilterRequest): ProtoBuffer =
proc decode*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "") var rpc = FilterRequest()
var subflag: uint64 var subflag: uint64
if ?pb.getField(1, subflag): if not ?pb.getField(1, subflag):
return err(ProtoError.RequiredFieldMissing)
else:
rpc.subscribe = bool(subflag) rpc.subscribe = bool(subflag)
var pubSubTopic: PubsubTopic var pubsubTopic: string
discard ?pb.getField(2, pubSubTopic) if not ?pb.getField(2, pubsubTopic):
rpc.pubSubTopic = pubSubTopic return err(ProtoError.RequiredFieldMissing)
else:
rpc.pubsubTopic = pubsubTopic
var buffs: seq[seq[byte]] var buffs: seq[seq[byte]]
discard ?pb.getRepeatedField(3, buffs) if not ?pb.getRepeatedField(3, buffs):
for buf in buffs: return err(ProtoError.RequiredFieldMissing)
rpc.contentFilters.add(?ContentFilter.decode(buf)) else:
for buf in buffs:
let filter = ?ContentFilter.decode(buf)
rpc.contentFilters.add(filter)
ok(rpc) ok(rpc)
@ -76,23 +89,25 @@ proc encode*(push: MessagePush): ProtoBuffer =
proc decode*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var push = MessagePush() var rpc = MessagePush()
var messages: seq[seq[byte]] var messages: seq[seq[byte]]
discard ?pb.getRepeatedField(1, messages) if not ?pb.getRepeatedField(1, messages):
return err(ProtoError.RequiredFieldMissing)
else:
for buf in messages:
let msg = ?WakuMessage.decode(buf)
rpc.messages.add(msg)
for buf in messages: ok(rpc)
push.messages.add(?WakuMessage.decode(buf))
ok(push)
proc encode*(rpc: FilterRPC): ProtoBuffer = proc encode*(rpc: FilterRPC): ProtoBuffer =
var pb = initProtoBuffer() var pb = initProtoBuffer()
pb.write3(1, rpc.requestId) pb.write3(1, rpc.requestId)
pb.write3(2, rpc.request.encode()) pb.write3(2, rpc.request.map(encode))
pb.write3(3, rpc.push.encode()) pb.write3(3, rpc.push.map(encode))
pb.finish3() pb.finish3()
pb pb
@ -102,15 +117,23 @@ proc decode*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = FilterRPC() var rpc = FilterRPC()
var requestId: string var requestId: string
discard ?pb.getField(1, requestId) if not ?pb.getField(1, requestId):
rpc.requestId = requestId return err(ProtoError.RequiredFieldMissing)
else:
rpc.requestId = requestId
var requestBuffer: seq[byte] var requestBuffer: seq[byte]
discard ?pb.getField(2, requestBuffer) if not ?pb.getField(2, requestBuffer):
rpc.request = ?FilterRequest.decode(requestBuffer) rpc.request = none(FilterRequest)
else:
let request = ?FilterRequest.decode(requestBuffer)
rpc.request = some(request)
var pushBuffer: seq[byte] var pushBuffer: seq[byte]
discard ?pb.getField(3, pushBuffer) if not ?pb.getField(3, pushBuffer):
rpc.push = ?MessagePush.decode(pushBuffer) rpc.push = none(MessagePush)
else:
let push = ?MessagePush.decode(pushBuffer)
rpc.push = some(push)
ok(rpc) ok(rpc)