From f89e6869cbae9143d4443e6c17a276d7e7cb1678 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Thu, 17 Nov 2022 20:40:08 +0100 Subject: [PATCH] fix(store): waku store rpc codec support optional fields --- tests/v2/test_waku_store_rpc_codec.nim | 21 ++- waku/common/protobuf.nim | 6 +- waku/v2/node/jsonrpc/jsonrpc_utils.nim | 18 ++- waku/v2/protocol/waku_store/client.nim | 10 +- waku/v2/protocol/waku_store/protocol.nim | 11 +- waku/v2/protocol/waku_store/rpc.nim | 112 ++++++------- waku/v2/protocol/waku_store/rpc_codec.nim | 188 +++++++++++++--------- 7 files changed, 212 insertions(+), 154 deletions(-) diff --git a/tests/v2/test_waku_store_rpc_codec.nim b/tests/v2/test_waku_store_rpc_codec.nim index 828cfc05f..becf4c5fa 100644 --- a/tests/v2/test_waku_store_rpc_codec.nim +++ b/tests/v2/test_waku_store_rpc_codec.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, times], + std/options, testutils/unittests, chronos import @@ -50,7 +50,7 @@ procSuite "Waku Store - RPC codec": ## Given let index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic) - pagingInfo = PagingInfoRPC(pageSize: 1, cursor: index, direction: PagingDirectionRPC.FORWARD) + pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.FORWARD)) ## When let pb = pagingInfo.encode() @@ -61,7 +61,7 @@ procSuite "Waku Store - RPC codec": decodedPagingInfo.isOk() check: - # the fields of decodedPagingInfo must be the same as the original pagingInfo + # The fields of decodedPagingInfo must be the same as the original pagingInfo decodedPagingInfo.value == pagingInfo decodedPagingInfo.value.direction == pagingInfo.direction @@ -85,8 +85,13 @@ procSuite "Waku Store - RPC codec": ## Given let index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic) - pagingInfo = PagingInfoRPC(pageSize: 1, cursor: index, direction: PagingDirectionRPC.BACKWARD) - query = HistoryQueryRPC(contentFilters: @[HistoryContentFilterRPC(contentTopic: DefaultContentTopic), HistoryContentFilterRPC(contentTopic: DefaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11)) + pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.BACKWARD)) + query = HistoryQueryRPC( + contentFilters: @[HistoryContentFilterRPC(contentTopic: DefaultContentTopic), HistoryContentFilterRPC(contentTopic: DefaultContentTopic)], + pagingInfo: some(pagingInfo), + startTime: some(Timestamp(10)), + endTime: some(Timestamp(11)) + ) ## When let pb = query.encode() @@ -121,8 +126,8 @@ procSuite "Waku Store - RPC codec": let message = fakeWakuMessage() index = PagingIndexRPC.compute(message, receivedTime=ts(), pubsubTopic=DefaultPubsubTopic) - pagingInfo = PagingInfoRPC(pageSize: 1, cursor: index, direction: PagingDirectionRPC.BACKWARD) - res = HistoryResponseRPC(messages: @[message], pagingInfo:pagingInfo, error: HistoryResponseErrorRPC.INVALID_CURSOR) + pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.BACKWARD)) + res = HistoryResponseRPC(messages: @[message], pagingInfo: some(pagingInfo), error: HistoryResponseErrorRPC.INVALID_CURSOR) ## When let pb = res.encode() @@ -150,4 +155,4 @@ procSuite "Waku Store - RPC codec": check: # check the correctness of init and encode for an empty HistoryResponseRPC - decodedEmptyRes.value == emptyRes \ No newline at end of file + decodedEmptyRes.value == emptyRes diff --git a/waku/common/protobuf.nim b/waku/common/protobuf.nim index 291de6038..050ebef9d 100644 --- a/waku/common/protobuf.nim +++ b/waku/common/protobuf.nim @@ -6,6 +6,7 @@ else: {.push raises: [].} import + std/options, libp2p/protobuf/minprotobuf, libp2p/varint @@ -15,7 +16,10 @@ export proc write3*(proto: var ProtoBuffer, field: int, value: auto) = - if default(type(value)) != value: + when value is Option: + if value.isSome(): + proto.write(field, value.get()) + else: proto.write(field, value) proc finish3*(proto: var ProtoBuffer) = diff --git a/waku/v2/node/jsonrpc/jsonrpc_utils.nim b/waku/v2/node/jsonrpc/jsonrpc_utils.nim index 124694ed7..e4a679ec9 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_utils.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_utils.nim @@ -31,14 +31,20 @@ proc `%`*(value: WakuMessage): JsonNode = ## we need to convert between these and the types for the Nim API proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfoRPC = - PagingInfoRPC(pageSize: pagingOptions.pageSize, - cursor: if pagingOptions.cursor.isSome: pagingOptions.cursor.get else: PagingIndexRPC(), - direction: if pagingOptions.forward: PagingDirectionRPC.FORWARD else: PagingDirectionRPC.BACKWARD) + PagingInfoRPC( + pageSize: some(pagingOptions.pageSize), + cursor: pagingOptions.cursor, + direction: if pagingOptions.forward: some(PagingDirectionRPC.FORWARD) + else: some(PagingDirectionRPC.BACKWARD) + ) proc toPagingOptions*(pagingInfo: PagingInfoRPC): StorePagingOptions = - StorePagingOptions(pageSize: pagingInfo.pageSize, - cursor: some(pagingInfo.cursor), - forward: if pagingInfo.direction == PagingDirectionRPC.FORWARD: true else: false) + StorePagingOptions( + pageSize: pagingInfo.pageSize.get(0'u64), + cursor: pagingInfo.cursor, + forward: if pagingInfo.direction.isNone(): true + else: pagingInfo.direction.get() == PagingDirectionRPC.FORWARD + ) proc toJsonRPCStoreResponse*(response: HistoryResponse): StoreResponse = StoreResponse( diff --git a/waku/v2/protocol/waku_store/client.nim b/waku/v2/protocol/waku_store/client.nim index 9741e0f34..068056a7b 100644 --- a/waku/v2/protocol/waku_store/client.nim +++ b/waku/v2/protocol/waku_store/client.nim @@ -53,7 +53,7 @@ proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeer let connection = connOpt.get() - let reqRpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req.toRPC()) + let reqRpc = HistoryRPC(requestId: generateRequestId(w.rng), query: some(req.toRPC())) await connection.writeLP(reqRpc.encode().buffer) @@ -69,11 +69,11 @@ proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeer # Disabled ,for now, since the default response is a possible case (no messages, pagesize = 0, error = NONE(0)) # TODO: Rework the RPC protocol to differentiate the default value from an empty value (e.g., status = 200 (OK)) # and rework the protobuf parsing to return Option[T] when empty values are received - # if respRpc.response == default(HistoryResponseRPC): - # waku_store_errors.inc(labelValues = [emptyRpcResponseFailure]) - # return err(HistoryError(kind: HistoryErrorKind.BAD_RESPONSE, cause: emptyRpcResponseFailure)) + if respRpc.response.isNone(): + waku_store_errors.inc(labelValues = [emptyRpcResponseFailure]) + return err(HistoryError(kind: HistoryErrorKind.BAD_RESPONSE, cause: emptyRpcResponseFailure)) - let resp = respRpc.response + let resp = respRpc.response.get() return resp.toAPI() diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index cf7ded6f6..8ffdee1e9 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -222,7 +222,7 @@ proc initProtocolHandler*(ws: WakuStore) = let reqRpc = decodeRes.value - if reqRpc.query == default(HistoryQueryRPC): + if reqRpc.query.isNone(): error "empty query rpc", peerId=conn.peerId, requestId=reqRpc.requestId waku_store_errors.inc(labelValues = [emptyRpcQueryFailure]) # TODO: Return (BAD_REQUEST, cause: "empty query") @@ -239,19 +239,20 @@ proc initProtocolHandler*(ws: WakuStore) = error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error= $respErr let resp = HistoryResponseRPC(error: respErr.toRPC()) - let rpc = HistoryRPC(requestId: reqRpc.requestId, response: resp) + let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp)) await conn.writeLp(rpc.encode().buffer) return - let query = reqRpc.query.toApi() + let query = reqRpc.query.get().toAPI() + let respRes = ws.findMessages(query) if respRes.isErr(): error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error=respRes.error let resp = respRes.toRPC() - let rpc = HistoryRPC(requestId: reqRpc.requestId, response: resp) + let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp)) await conn.writeLp(rpc.encode().buffer) return @@ -270,7 +271,7 @@ proc initProtocolHandler*(ws: WakuStore) = info "sending history response", peerId=conn.peerId, requestId=reqRpc.requestId, messages=resp.messages.len - let rpc = HistoryRPC(requestId: reqRpc.requestId, response: resp) + let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp)) await conn.writeLp(rpc.encode().buffer) ws.handler = handler diff --git a/waku/v2/protocol/waku_store/rpc.nim b/waku/v2/protocol/waku_store/rpc.nim index 5a22fc99a..f547c53fb 100644 --- a/waku/v2/protocol/waku_store/rpc.nim +++ b/waku/v2/protocol/waku_store/rpc.nim @@ -49,9 +49,9 @@ type PagingInfoRPC* = object ## This type holds the information needed for the pagination - pageSize*: uint64 - cursor*: PagingIndexRPC - direction*: PagingDirectionRPC + pageSize*: Option[uint64] + cursor*: Option[PagingIndexRPC] + direction*: Option[PagingDirectionRPC] type @@ -60,10 +60,10 @@ type HistoryQueryRPC* = object contentFilters*: seq[HistoryContentFilterRPC] - pubsubTopic*: PubsubTopic - pagingInfo*: PagingInfoRPC # used for pagination - startTime*: Timestamp # used for time-window query - endTime*: Timestamp # used for time-window query + pubsubTopic*: Option[PubsubTopic] + pagingInfo*: Option[PagingInfoRPC] + startTime*: Option[int64] + endTime*: Option[int64] HistoryResponseErrorRPC* {.pure.} = enum ## HistoryResponseErrorRPC contains error message to inform the querying node about @@ -74,13 +74,13 @@ type HistoryResponseRPC* = object messages*: seq[WakuMessage] - pagingInfo*: PagingInfoRPC # used for pagination + pagingInfo*: Option[PagingInfoRPC] error*: HistoryResponseErrorRPC HistoryRPC* = object requestId*: string - query*: HistoryQueryRPC - response*: HistoryResponseRPC + query*: Option[HistoryQueryRPC] + response*: Option[HistoryResponseRPC] proc parse*(T: type HistoryResponseErrorRPC, kind: uint32): T = @@ -112,55 +112,53 @@ proc toAPI*(rpc: PagingIndexRPC): HistoryCursor = proc toRPC*(query: HistoryQuery): HistoryQueryRPC = - let - contentFilters = query.contentTopics.mapIt(HistoryContentFilterRPC(contentTopic: it)) + var rpc = HistoryQueryRPC() - pubsubTopic = query.pubsubTopic.get(default(string)) - - pageSize = query.pageSize + rpc.contentFilters = query.contentTopics.mapIt(HistoryContentFilterRPC(contentTopic: it)) - cursor = query.cursor.get(default(HistoryCursor)).toRPC() + rpc.pubsubTopic = query.pubsubTopic - direction = if query.ascending: PagingDirectionRPC.FORWARD - else: PagingDirectionRPC.BACKWARD + rpc.pagingInfo = block: + if query.cursor.isNone() and + query.pageSize == default(type query.pageSize) and + query.ascending == default(type query.ascending): + none(PagingInfoRPC) + else: + let + pageSize = some(query.pageSize) + cursor = query.cursor.map(toRPC) + direction = if query.ascending: some(PagingDirectionRPC.FORWARD) + else: some(PagingDirectionRPC.BACKWARD) + some(PagingInfoRPC( + pageSize: pageSize, + cursor: cursor, + direction: direction + )) + + rpc.startTime = query.startTime + rpc.endTime = query.endTime - startTime = query.startTime.get(default(Timestamp)) - - endTime = query.endTime.get(default(Timestamp)) + rpc - HistoryQueryRPC( - contentFilters: contentFilters, - pubsubTopic: pubsubTopic, - pagingInfo: PagingInfoRPC( - pageSize: pageSize, - cursor: cursor, - direction: direction - ), - startTime: startTime, - endTime: endTime - ) proc toAPI*(rpc: HistoryQueryRPC): HistoryQuery = let - pubsubTopic = if rpc.pubsubTopic == default(string): none(PubsubTopic) - else: some(rpc.pubsubTopic) + pubsubTopic = rpc.pubsubTopic contentTopics = rpc.contentFilters.mapIt(it.contentTopic) - cursor = if rpc.pagingInfo == default(PagingInfoRPC) or rpc.pagingInfo.cursor == default(PagingIndexRPC): none(HistoryCursor) - else: some(rpc.pagingInfo.cursor.toAPI()) + cursor = if rpc.pagingInfo.isNone() or rpc.pagingInfo.get().cursor.isNone(): none(HistoryCursor) + else: rpc.pagingInfo.get().cursor.map(toAPI) - startTime = if rpc.startTime == default(Timestamp): none(Timestamp) - else: some(rpc.startTime) + startTime = rpc.startTime - endTime = if rpc.endTime == default(Timestamp): none(Timestamp) - else: some(rpc.endTime) + endTime = rpc.endTime - pageSize = if rpc.pagingInfo == default(PagingInfoRPC): 0.uint64 - else: rpc.pagingInfo.pageSize + pageSize = if rpc.pagingInfo.isNone() or rpc.pagingInfo.get().pageSize.isNone(): 0'u64 + else: rpc.pagingInfo.get().pageSize.get() - ascending = if rpc.pagingInfo == default(PagingInfoRPC): true - else: rpc.pagingInfo.direction == PagingDirectionRPC.FORWARD + ascending = if rpc.pagingInfo.isNone() or rpc.pagingInfo.get().direction.isNone(): true + else: rpc.pagingInfo.get().direction.get() == PagingDirectionRPC.FORWARD HistoryQuery( pubsubTopic: pubsubTopic, @@ -182,7 +180,7 @@ proc toRPC*(err: HistoryError): HistoryResponseErrorRPC = of HistoryErrorKind.SERVICE_UNAVAILABLE: HistoryResponseErrorRPC.SERVICE_UNAVAILABLE else: - HistoryResponseErrorRPC.INVALID_CURSOR + HistoryResponseErrorRPC.INVALID_CURSOR proc toAPI*(err: HistoryResponseErrorRPC): HistoryError = # TODO: Better error mappings/move to error codes @@ -208,18 +206,18 @@ proc toRPC*(res: HistoryResult): HistoryResponseRPC = pagingInfo = block: if resp.cursor.isNone(): - default(PagingInfoRPC) + none(PagingInfoRPC) else: let - pageSize = resp.pageSize - cursor = resp.cursor.get(default(HistoryCursor)).toRPC() - direction = if resp.ascending: PagingDirectionRPC.FORWARD - else: PagingDirectionRPC.BACKWARD - PagingInfoRPC( + pageSize = some(resp.pageSize) + cursor = resp.cursor.map(toRPC) + direction = if resp.ascending: some(PagingDirectionRPC.FORWARD) + else: some(PagingDirectionRPC.BACKWARD) + some(PagingInfoRPC( pageSize: pageSize, cursor: cursor, direction: direction - ) + )) error = HistoryResponseErrorRPC.NONE @@ -236,12 +234,14 @@ proc toAPI*(rpc: HistoryResponseRPC): HistoryResult = let messages = rpc.messages - pageSize = rpc.pagingInfo.pageSize + pageSize = if rpc.pagingInfo.isNone(): 0'u64 + else: rpc.pagingInfo.get().pageSize.get(0'u64) - ascending = rpc.pagingInfo == default(PagingInfoRPC) or rpc.pagingInfo.direction == PagingDirectionRPC.FORWARD + ascending = if rpc.pagingInfo.isNone(): true + else: rpc.pagingInfo.get().direction.get(PagingDirectionRPC.FORWARD) == PagingDirectionRPC.FORWARD - cursor = if rpc.pagingInfo == default(PagingInfoRPC) or rpc.pagingInfo.cursor == default(PagingIndexRPC): none(HistoryCursor) - else: some(rpc.pagingInfo.cursor.toAPI()) + cursor = if rpc.pagingInfo.isNone(): none(HistoryCursor) + else: rpc.pagingInfo.get().cursor.map(toAPI) ok(HistoryResponse( messages: messages, diff --git a/waku/v2/protocol/waku_store/rpc_codec.nim b/waku/v2/protocol/waku_store/rpc_codec.nim index 7bfa68711..857c5b255 100644 --- a/waku/v2/protocol/waku_store/rpc_codec.nim +++ b/waku/v2/protocol/waku_store/rpc_codec.nim @@ -4,10 +4,10 @@ else: {.push raises: [].} import + std/options, nimcrypto/hash import ../../../common/protobuf, - ../../utils/time, ../waku_message, ./common, ./rpc @@ -33,165 +33,199 @@ proc encode*(index: PagingIndexRPC): ProtoBuffer = proc decode*(T: type PagingIndexRPC, buffer: seq[byte]): ProtoResult[T] = ## creates and returns an Index object out of buffer - var index = PagingIndexRPC() + var rpc = PagingIndexRPC() let pb = initProtoBuffer(buffer) var data: seq[byte] - discard ?pb.getField(1, data) + if not ?pb.getField(1, data): + return err(ProtoError.RequiredFieldMissing) + else: + var digest = MessageDigest() + for count, b in data: + digest.data[count] = b - # create digest from data - index.digest = MessageDigest() - for count, b in data: - index.digest.data[count] = b + rpc.digest = digest - # read the timestamp var receiverTime: zint64 - discard ?pb.getField(2, receiverTime) - index.receiverTime = Timestamp(receiverTime) + if not ?pb.getField(2, receiverTime): + return err(ProtoError.RequiredFieldMissing) + else: + rpc.receiverTime = int64(receiverTime) - # read the timestamp var senderTime: zint64 - discard ?pb.getField(3, senderTime) - index.senderTime = Timestamp(senderTime) + if not ?pb.getField(3, senderTime): + return err(ProtoError.RequiredFieldMissing) + else: + rpc.senderTime = int64(senderTime) - # read the pubsubTopic - discard ?pb.getField(4, index.pubsubTopic) + var pubsubTopic: string + if not ?pb.getField(4, pubsubTopic): + return err(ProtoError.RequiredFieldMissing) + else: + rpc.pubsubTopic = pubsubTopic - ok(index) + ok(rpc) -proc encode*(pinfo: PagingInfoRPC): ProtoBuffer = +proc encode*(rpc: PagingInfoRPC): ProtoBuffer = ## Encodes a PagingInfo object into a ProtoBuffer ## returns the resultant ProtoBuffer var pb = initProtoBuffer() - pb.write3(1, pinfo.pageSize) - pb.write3(2, pinfo.cursor.encode()) - pb.write3(3, uint32(ord(pinfo.direction))) + pb.write3(1, rpc.pageSize.map(proc(size: uint64): zint64 = zint64(size))) + pb.write3(2, rpc.cursor.map(encode)) + pb.write3(3, rpc.direction.map(proc(d: PagingDirectionRPC): uint32 = uint32(ord(d)))) pb.finish3() pb proc decode*(T: type PagingInfoRPC, buffer: seq[byte]): ProtoResult[T] = ## creates and returns a PagingInfo object out of buffer - var pagingInfo = PagingInfoRPC() + var rpc = PagingInfoRPC() let pb = initProtoBuffer(buffer) - var pageSize: uint64 - discard ?pb.getField(1, pageSize) - pagingInfo.pageSize = pageSize + var pageSize: zint64 + if not ?pb.getField(1, pageSize): + rpc.pageSize = none(uint64) + else: + rpc.pageSize = some(uint64(pageSize)) var cursorBuffer: seq[byte] - discard ?pb.getField(2, cursorBuffer) - pagingInfo.cursor = ?PagingIndexRPC.decode(cursorBuffer) + if not ?pb.getField(2, cursorBuffer): + rpc.cursor = none(PagingIndexRPC) + else: + let cursor = ?PagingIndexRPC.decode(cursorBuffer) + rpc.cursor = some(cursor) var direction: uint32 - discard ?pb.getField(3, direction) - pagingInfo.direction = PagingDirectionRPC(direction) + if not ?pb.getField(3, direction): + rpc.direction = none(PagingDirectionRPC) + else: + rpc.direction = some(PagingDirectionRPC(direction)) - ok(pagingInfo) + ok(rpc) ## Wire protocol -proc encode*(filter: HistoryContentFilterRPC): ProtoBuffer = +proc encode*(rpc: HistoryContentFilterRPC): ProtoBuffer = var pb = initProtoBuffer() - pb.write3(1, filter.contentTopic) + pb.write3(1, rpc.contentTopic) pb.finish3() pb proc decode*(T: type HistoryContentFilterRPC, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) + var contentTopic: ContentTopic - discard ?pb.getField(1, contentTopic) + if not ?pb.getField(1, contentTopic): + return err(ProtoError.RequiredFieldMissing) ok(HistoryContentFilterRPC(contentTopic: contentTopic)) -proc encode*(query: HistoryQueryRPC): ProtoBuffer = +proc encode*(rpc: HistoryQueryRPC): ProtoBuffer = var pb = initProtoBuffer() - pb.write3(2, query.pubsubTopic) + pb.write3(2, rpc.pubsubTopic) - for filter in query.contentFilters: + for filter in rpc.contentFilters: pb.write3(3, filter.encode()) - pb.write3(4, query.pagingInfo.encode()) - pb.write3(5, zint64(query.startTime)) - pb.write3(6, zint64(query.endTime)) + pb.write3(4, rpc.pagingInfo.map(encode)) + pb.write3(5, rpc.startTime.map(proc (time: int64): zint64 = zint64(time))) + pb.write3(6, rpc.endTime.map(proc (time: int64): zint64 = zint64(time))) pb.finish3() pb proc decode*(T: type HistoryQueryRPC, buffer: seq[byte]): ProtoResult[T] = - var msg = HistoryQueryRPC() + var rpc = HistoryQueryRPC() let pb = initProtoBuffer(buffer) - discard ?pb.getField(2, msg.pubsubTopic) + var pubsubTopic: string + if not ?pb.getField(2, pubsubTopic): + rpc.pubsubTopic = none(string) + else: + rpc.pubsubTopic = some(pubsubTopic) var buffs: seq[seq[byte]] - discard ?pb.getRepeatedField(3, buffs) - - for pb in buffs: - msg.contentFilters.add(? HistoryContentFilterRPC.decode(pb)) + if not ?pb.getRepeatedField(3, buffs): + rpc.contentFilters = @[] + else: + for pb in buffs: + let filter = ?HistoryContentFilterRPC.decode(pb) + rpc.contentFilters.add(filter) var pagingInfoBuffer: seq[byte] - discard ?pb.getField(4, pagingInfoBuffer) - - msg.pagingInfo = ?PagingInfoRPC.decode(pagingInfoBuffer) + if not ?pb.getField(4, pagingInfoBuffer): + rpc.pagingInfo = none(PagingInfoRPC) + else: + let pagingInfo = ?PagingInfoRPC.decode(pagingInfoBuffer) + rpc.pagingInfo = some(pagingInfo) var startTime: zint64 - discard ?pb.getField(5, startTime) - msg.startTime = Timestamp(startTime) + if not ?pb.getField(5, startTime): + rpc.startTime = none(int64) + else: + rpc.startTime = some(int64(startTime)) var endTime: zint64 - discard ?pb.getField(6, endTime) - msg.endTime = Timestamp(endTime) + if not ?pb.getField(6, endTime): + rpc.endTime = none(int64) + else: + rpc.endTime = some(int64(endTime)) - ok(msg) + ok(rpc) proc encode*(response: HistoryResponseRPC): ProtoBuffer = var pb = initProtoBuffer() - for msg in response.messages: - pb.write3(2, msg.encode()) + for rpc in response.messages: + pb.write3(2, rpc.encode()) - pb.write3(3, response.pagingInfo.encode()) + pb.write3(3, response.pagingInfo.map(encode)) pb.write3(4, uint32(ord(response.error))) pb.finish3() pb proc decode*(T: type HistoryResponseRPC, buffer: seq[byte]): ProtoResult[T] = - var msg = HistoryResponseRPC() + var rpc = HistoryResponseRPC() let pb = initProtoBuffer(buffer) var messages: seq[seq[byte]] - discard ?pb.getRepeatedField(2, messages) - - for pb in messages: - let message = ?WakuMessage.decode(pb) - msg.messages.add(message) + if ?pb.getRepeatedField(2, messages): + for pb in messages: + let message = ?WakuMessage.decode(pb) + rpc.messages.add(message) + else: + rpc.messages = @[] var pagingInfoBuffer: seq[byte] - discard ?pb.getField(3, pagingInfoBuffer) - msg.pagingInfo = ?PagingInfoRPC.decode(pagingInfoBuffer) + if ?pb.getField(3, pagingInfoBuffer): + let pagingInfo = ?PagingInfoRPC.decode(pagingInfoBuffer) + rpc.pagingInfo = some(pagingInfo) + else: + rpc.pagingInfo = none(PagingInfoRPC) var error: uint32 - discard ?pb.getField(4, error) - msg.error = HistoryResponseErrorRPC.parse(error) + if not ?pb.getField(4, error): + return err(ProtoError.RequiredFieldMissing) + else: + rpc.error = HistoryResponseErrorRPC.parse(error) - ok(msg) + ok(rpc) proc encode*(rpc: HistoryRPC): ProtoBuffer = var pb = initProtoBuffer() pb.write3(1, rpc.requestId) - pb.write3(2, rpc.query.encode()) - pb.write3(3, rpc.response.encode()) + pb.write3(2, rpc.query.map(encode)) + pb.write3(3, rpc.response.map(encode)) pb.finish3() pb @@ -199,14 +233,22 @@ proc encode*(rpc: HistoryRPC): ProtoBuffer = proc decode*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] = var rpc = HistoryRPC() let pb = initProtoBuffer(buffer) - discard ?pb.getField(1, rpc.requestId) + + if not ?pb.getField(1, rpc.requestId): + return err(ProtoError.RequiredFieldMissing) var queryBuffer: seq[byte] - discard ?pb.getField(2, queryBuffer) - rpc.query = ?HistoryQueryRPC.decode(queryBuffer) + if not ?pb.getField(2, queryBuffer): + rpc.query = none(HistoryQueryRPC) + else: + let query = ?HistoryQueryRPC.decode(queryBuffer) + rpc.query = some(query) var responseBuffer: seq[byte] - discard ?pb.getField(3, responseBuffer) - rpc.response = ?HistoryResponseRPC.decode(responseBuffer) + if not ?pb.getField(3, responseBuffer): + rpc.response = none(HistoryResponseRPC) + else: + let response = ?HistoryResponseRPC.decode(responseBuffer) + rpc.response = some(response) ok(rpc)