diff --git a/waku/common/protobuf.nim b/waku/common/protobuf.nim index 050ebef9d..54bd7f8f6 100644 --- a/waku/common/protobuf.nim +++ b/waku/common/protobuf.nim @@ -19,6 +19,8 @@ proc write3*(proto: var ProtoBuffer, field: int, value: auto) = when value is Option: if value.isSome(): proto.write(field, value.get()) + elif value is bool: + proto.write(field, zint(value)) else: proto.write(field, value) diff --git a/waku/v2/protocol/waku_filter/client.nim b/waku/v2/protocol/waku_filter/client.nim index ac708f7ec..9e8b63733 100644 --- a/waku/v2/protocol/waku_filter/client.nim +++ b/waku/v2/protocol/waku_filter/client.nim @@ -98,7 +98,7 @@ proc initProtocolHandler(wf: WakuFilterClient) = let rpc = decodeReqRes.get() trace "filter message received" - if rpc.push == MessagePush(): + if rpc.push.isNone(): waku_filter_errors.inc(labelValues = [emptyMessagePushFailure]) # TODO: Manage the empty push message error. Perform any action? return @@ -108,7 +108,7 @@ proc initProtocolHandler(wf: WakuFilterClient) = let peerId = conn.peerId requestId = rpc.requestId - push = rpc.push + push = rpc.push.get() info "received filter message push", peerId=conn.peerId, requestId=requestId wf.handleMessagePush(peerId, requestId, push) @@ -149,11 +149,11 @@ proc sendFilterRequestRpc(wf: WakuFilterClient, let rpc = FilterRpc( requestId: requestId, - request: FilterRequest( + request: some(FilterRequest( subscribe: subscribe, pubSubTopic: pubsubTopic, contentFilters: contentFilters - ) + )) ) let sendRes = await wf.sendFilterRpc(rpc, peer) diff --git a/waku/v2/protocol/waku_filter/protocol.nim b/waku/v2/protocol/waku_filter/protocol.nim index 8d215a5cd..d062b7076 100644 --- a/waku/v2/protocol/waku_filter/protocol.nim +++ b/waku/v2/protocol/waku_filter/protocol.nim @@ -72,9 +72,9 @@ type proc handleFilterRequest(wf: WakuFilter, peerId: PeerId, rpc: FilterRPC) = let requestId = rpc.requestId - subscribe = rpc.request.subscribe - pubsubTopic = rpc.request.pubsubTopic - contentTopics = rpc.request.contentFilters.mapIt(it.contentTopic) + subscribe = rpc.request.get().subscribe + pubsubTopic = rpc.request.get().pubsubTopic + contentTopics = rpc.request.get().contentFilters.mapIt(it.contentTopic) if subscribe: info "added filter subscritpiton", peerId=peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics @@ -101,7 +101,7 @@ proc initProtocolHandler(wf: WakuFilter) = ## Filter request # Subscription/unsubscription request - if rpc.request == FilterRequest(): + if rpc.request.isNone(): waku_filter_errors.inc(labelValues = [emptyFilterRequestFailure]) # TODO: Manage the empty filter request message error. Perform any action? return @@ -185,7 +185,7 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, msg: WakuMessage) let rpc = FilterRPC( requestId: sub.requestId, - push: MessagePush(messages: @[msg]) + push: some(MessagePush(messages: @[msg])) ) let res = await wf.sendFilterRpc(rpc, sub.peer) diff --git a/waku/v2/protocol/waku_filter/rpc.nim b/waku/v2/protocol/waku_filter/rpc.nim index daf7f8933..39c842421 100644 --- a/waku/v2/protocol/waku_filter/rpc.nim +++ b/waku/v2/protocol/waku_filter/rpc.nim @@ -1,12 +1,21 @@ -import ../waku_message +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/options +import + ../waku_message + type ContentFilter* = object - contentTopic*: ContentTopic + contentTopic*: string FilterRequest* = object contentFilters*: seq[ContentFilter] - pubSubTopic*: string + pubsubTopic*: string subscribe*: bool MessagePush* = object @@ -14,5 +23,5 @@ type FilterRPC* = object requestId*: string - request*: FilterRequest - push*: MessagePush + request*: Option[FilterRequest] + push*: Option[MessagePush] diff --git a/waku/v2/protocol/waku_filter/rpc_codec.nim b/waku/v2/protocol/waku_filter/rpc_codec.nim index 20f8bdac0..db128229e 100644 --- a/waku/v2/protocol/waku_filter/rpc_codec.nim +++ b/waku/v2/protocol/waku_filter/rpc_codec.nim @@ -3,6 +3,8 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} +import + std/options import ../../../common/protobuf, ../waku_message, @@ -24,17 +26,21 @@ proc encode*(filter: ContentFilter): ProtoBuffer = proc decode*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) + var rpc = ContentFilter() - var contentTopic: ContentTopic - discard ?pb.getField(1, contentTopic) + var contentTopic: string + if not ?pb.getField(1, contentTopic): + return err(ProtoError.RequiredFieldMissing) + else: + rpc.contentTopic = contentTopic - ok(ContentFilter(contentTopic: contentTopic)) + ok(rpc) proc encode*(rpc: FilterRequest): ProtoBuffer = var pb = initProtoBuffer() - pb.write3(1, uint64(rpc.subscribe)) + pb.write3(1, rpc.subscribe) pb.write3(2, rpc.pubSubTopic) for filter in rpc.contentFilters: @@ -46,20 +52,27 @@ proc encode*(rpc: FilterRequest): ProtoBuffer = proc decode*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) - var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "") + var rpc = FilterRequest() var subflag: uint64 - if ?pb.getField(1, subflag): + if not ?pb.getField(1, subflag): + return err(ProtoError.RequiredFieldMissing) + else: rpc.subscribe = bool(subflag) - var pubSubTopic: PubsubTopic - discard ?pb.getField(2, pubSubTopic) - rpc.pubSubTopic = pubSubTopic + var pubsubTopic: string + if not ?pb.getField(2, pubsubTopic): + return err(ProtoError.RequiredFieldMissing) + else: + rpc.pubsubTopic = pubsubTopic var buffs: seq[seq[byte]] - discard ?pb.getRepeatedField(3, buffs) - for buf in buffs: - rpc.contentFilters.add(?ContentFilter.decode(buf)) + if not ?pb.getRepeatedField(3, buffs): + return err(ProtoError.RequiredFieldMissing) + else: + for buf in buffs: + let filter = ?ContentFilter.decode(buf) + rpc.contentFilters.add(filter) ok(rpc) @@ -76,23 +89,25 @@ proc encode*(push: MessagePush): ProtoBuffer = proc decode*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) - var push = MessagePush() + var rpc = MessagePush() var messages: seq[seq[byte]] - discard ?pb.getRepeatedField(1, messages) + if not ?pb.getRepeatedField(1, messages): + return err(ProtoError.RequiredFieldMissing) + else: + for buf in messages: + let msg = ?WakuMessage.decode(buf) + rpc.messages.add(msg) - for buf in messages: - push.messages.add(?WakuMessage.decode(buf)) - - ok(push) + ok(rpc) proc encode*(rpc: FilterRPC): ProtoBuffer = var pb = initProtoBuffer() pb.write3(1, rpc.requestId) - pb.write3(2, rpc.request.encode()) - pb.write3(3, rpc.push.encode()) + pb.write3(2, rpc.request.map(encode)) + pb.write3(3, rpc.push.map(encode)) pb.finish3() pb @@ -102,15 +117,23 @@ proc decode*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = var rpc = FilterRPC() var requestId: string - discard ?pb.getField(1, requestId) - rpc.requestId = requestId + if not ?pb.getField(1, requestId): + return err(ProtoError.RequiredFieldMissing) + else: + rpc.requestId = requestId var requestBuffer: seq[byte] - discard ?pb.getField(2, requestBuffer) - rpc.request = ?FilterRequest.decode(requestBuffer) + if not ?pb.getField(2, requestBuffer): + rpc.request = none(FilterRequest) + else: + let request = ?FilterRequest.decode(requestBuffer) + rpc.request = some(request) var pushBuffer: seq[byte] - discard ?pb.getField(3, pushBuffer) - rpc.push = ?MessagePush.decode(pushBuffer) + if not ?pb.getField(3, pushBuffer): + rpc.push = none(MessagePush) + else: + let push = ?MessagePush.decode(pushBuffer) + rpc.push = some(push) ok(rpc)