fix(store): waku store rpc codec support optional fields

This commit is contained in:
Lorenzo Delgado 2022-11-17 20:40:08 +01:00 committed by GitHub
parent 39bf289f43
commit f89e6869cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 212 additions and 154 deletions

View File

@ -1,7 +1,7 @@
{.used.} {.used.}
import import
std/[options, times], std/options,
testutils/unittests, testutils/unittests,
chronos chronos
import import
@ -50,7 +50,7 @@ procSuite "Waku Store - RPC codec":
## Given ## Given
let let
index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic) index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfoRPC(pageSize: 1, cursor: index, direction: PagingDirectionRPC.FORWARD) pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.FORWARD))
## When ## When
let pb = pagingInfo.encode() let pb = pagingInfo.encode()
@ -61,7 +61,7 @@ procSuite "Waku Store - RPC codec":
decodedPagingInfo.isOk() decodedPagingInfo.isOk()
check: check:
# the fields of decodedPagingInfo must be the same as the original pagingInfo # The fields of decodedPagingInfo must be the same as the original pagingInfo
decodedPagingInfo.value == pagingInfo decodedPagingInfo.value == pagingInfo
decodedPagingInfo.value.direction == pagingInfo.direction decodedPagingInfo.value.direction == pagingInfo.direction
@ -85,8 +85,13 @@ procSuite "Waku Store - RPC codec":
## Given ## Given
let let
index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic) index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfoRPC(pageSize: 1, cursor: index, direction: PagingDirectionRPC.BACKWARD) pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.BACKWARD))
query = HistoryQueryRPC(contentFilters: @[HistoryContentFilterRPC(contentTopic: DefaultContentTopic), HistoryContentFilterRPC(contentTopic: DefaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11)) query = HistoryQueryRPC(
contentFilters: @[HistoryContentFilterRPC(contentTopic: DefaultContentTopic), HistoryContentFilterRPC(contentTopic: DefaultContentTopic)],
pagingInfo: some(pagingInfo),
startTime: some(Timestamp(10)),
endTime: some(Timestamp(11))
)
## When ## When
let pb = query.encode() let pb = query.encode()
@ -121,8 +126,8 @@ procSuite "Waku Store - RPC codec":
let let
message = fakeWakuMessage() message = fakeWakuMessage()
index = PagingIndexRPC.compute(message, receivedTime=ts(), pubsubTopic=DefaultPubsubTopic) index = PagingIndexRPC.compute(message, receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfoRPC(pageSize: 1, cursor: index, direction: PagingDirectionRPC.BACKWARD) pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.BACKWARD))
res = HistoryResponseRPC(messages: @[message], pagingInfo:pagingInfo, error: HistoryResponseErrorRPC.INVALID_CURSOR) res = HistoryResponseRPC(messages: @[message], pagingInfo: some(pagingInfo), error: HistoryResponseErrorRPC.INVALID_CURSOR)
## When ## When
let pb = res.encode() let pb = res.encode()

View File

@ -6,6 +6,7 @@ else:
{.push raises: [].} {.push raises: [].}
import import
std/options,
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
libp2p/varint libp2p/varint
@ -15,7 +16,10 @@ export
proc write3*(proto: var ProtoBuffer, field: int, value: auto) = proc write3*(proto: var ProtoBuffer, field: int, value: auto) =
if default(type(value)) != value: when value is Option:
if value.isSome():
proto.write(field, value.get())
else:
proto.write(field, value) proto.write(field, value)
proc finish3*(proto: var ProtoBuffer) = proc finish3*(proto: var ProtoBuffer) =

View File

@ -31,14 +31,20 @@ proc `%`*(value: WakuMessage): JsonNode =
## we need to convert between these and the types for the Nim API ## we need to convert between these and the types for the Nim API
proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfoRPC = proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfoRPC =
PagingInfoRPC(pageSize: pagingOptions.pageSize, PagingInfoRPC(
cursor: if pagingOptions.cursor.isSome: pagingOptions.cursor.get else: PagingIndexRPC(), pageSize: some(pagingOptions.pageSize),
direction: if pagingOptions.forward: PagingDirectionRPC.FORWARD else: PagingDirectionRPC.BACKWARD) cursor: pagingOptions.cursor,
direction: if pagingOptions.forward: some(PagingDirectionRPC.FORWARD)
else: some(PagingDirectionRPC.BACKWARD)
)
proc toPagingOptions*(pagingInfo: PagingInfoRPC): StorePagingOptions = proc toPagingOptions*(pagingInfo: PagingInfoRPC): StorePagingOptions =
StorePagingOptions(pageSize: pagingInfo.pageSize, StorePagingOptions(
cursor: some(pagingInfo.cursor), pageSize: pagingInfo.pageSize.get(0'u64),
forward: if pagingInfo.direction == PagingDirectionRPC.FORWARD: true else: false) cursor: pagingInfo.cursor,
forward: if pagingInfo.direction.isNone(): true
else: pagingInfo.direction.get() == PagingDirectionRPC.FORWARD
)
proc toJsonRPCStoreResponse*(response: HistoryResponse): StoreResponse = proc toJsonRPCStoreResponse*(response: HistoryResponse): StoreResponse =
StoreResponse( StoreResponse(

View File

@ -53,7 +53,7 @@ proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeer
let connection = connOpt.get() let connection = connOpt.get()
let reqRpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req.toRPC()) let reqRpc = HistoryRPC(requestId: generateRequestId(w.rng), query: some(req.toRPC()))
await connection.writeLP(reqRpc.encode().buffer) await connection.writeLP(reqRpc.encode().buffer)
@ -69,11 +69,11 @@ proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeer
# Disabled ,for now, since the default response is a possible case (no messages, pagesize = 0, error = NONE(0)) # Disabled ,for now, since the default response is a possible case (no messages, pagesize = 0, error = NONE(0))
# TODO: Rework the RPC protocol to differentiate the default value from an empty value (e.g., status = 200 (OK)) # TODO: Rework the RPC protocol to differentiate the default value from an empty value (e.g., status = 200 (OK))
# and rework the protobuf parsing to return Option[T] when empty values are received # and rework the protobuf parsing to return Option[T] when empty values are received
# if respRpc.response == default(HistoryResponseRPC): if respRpc.response.isNone():
# waku_store_errors.inc(labelValues = [emptyRpcResponseFailure]) waku_store_errors.inc(labelValues = [emptyRpcResponseFailure])
# return err(HistoryError(kind: HistoryErrorKind.BAD_RESPONSE, cause: emptyRpcResponseFailure)) return err(HistoryError(kind: HistoryErrorKind.BAD_RESPONSE, cause: emptyRpcResponseFailure))
let resp = respRpc.response let resp = respRpc.response.get()
return resp.toAPI() return resp.toAPI()

View File

@ -222,7 +222,7 @@ proc initProtocolHandler*(ws: WakuStore) =
let reqRpc = decodeRes.value let reqRpc = decodeRes.value
if reqRpc.query == default(HistoryQueryRPC): if reqRpc.query.isNone():
error "empty query rpc", peerId=conn.peerId, requestId=reqRpc.requestId error "empty query rpc", peerId=conn.peerId, requestId=reqRpc.requestId
waku_store_errors.inc(labelValues = [emptyRpcQueryFailure]) waku_store_errors.inc(labelValues = [emptyRpcQueryFailure])
# TODO: Return (BAD_REQUEST, cause: "empty query") # TODO: Return (BAD_REQUEST, cause: "empty query")
@ -239,19 +239,20 @@ proc initProtocolHandler*(ws: WakuStore) =
error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error= $respErr error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error= $respErr
let resp = HistoryResponseRPC(error: respErr.toRPC()) let resp = HistoryResponseRPC(error: respErr.toRPC())
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: resp) let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp))
await conn.writeLp(rpc.encode().buffer) await conn.writeLp(rpc.encode().buffer)
return return
let query = reqRpc.query.toApi() let query = reqRpc.query.get().toAPI()
let respRes = ws.findMessages(query) let respRes = ws.findMessages(query)
if respRes.isErr(): if respRes.isErr():
error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error=respRes.error error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error=respRes.error
let resp = respRes.toRPC() let resp = respRes.toRPC()
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: resp) let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp))
await conn.writeLp(rpc.encode().buffer) await conn.writeLp(rpc.encode().buffer)
return return
@ -270,7 +271,7 @@ proc initProtocolHandler*(ws: WakuStore) =
info "sending history response", peerId=conn.peerId, requestId=reqRpc.requestId, messages=resp.messages.len info "sending history response", peerId=conn.peerId, requestId=reqRpc.requestId, messages=resp.messages.len
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: resp) let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp))
await conn.writeLp(rpc.encode().buffer) await conn.writeLp(rpc.encode().buffer)
ws.handler = handler ws.handler = handler

View File

@ -49,9 +49,9 @@ type
PagingInfoRPC* = object PagingInfoRPC* = object
## This type holds the information needed for the pagination ## This type holds the information needed for the pagination
pageSize*: uint64 pageSize*: Option[uint64]
cursor*: PagingIndexRPC cursor*: Option[PagingIndexRPC]
direction*: PagingDirectionRPC direction*: Option[PagingDirectionRPC]
type type
@ -60,10 +60,10 @@ type
HistoryQueryRPC* = object HistoryQueryRPC* = object
contentFilters*: seq[HistoryContentFilterRPC] contentFilters*: seq[HistoryContentFilterRPC]
pubsubTopic*: PubsubTopic pubsubTopic*: Option[PubsubTopic]
pagingInfo*: PagingInfoRPC # used for pagination pagingInfo*: Option[PagingInfoRPC]
startTime*: Timestamp # used for time-window query startTime*: Option[int64]
endTime*: Timestamp # used for time-window query endTime*: Option[int64]
HistoryResponseErrorRPC* {.pure.} = enum HistoryResponseErrorRPC* {.pure.} = enum
## HistoryResponseErrorRPC contains error message to inform the querying node about ## HistoryResponseErrorRPC contains error message to inform the querying node about
@ -74,13 +74,13 @@ type
HistoryResponseRPC* = object HistoryResponseRPC* = object
messages*: seq[WakuMessage] messages*: seq[WakuMessage]
pagingInfo*: PagingInfoRPC # used for pagination pagingInfo*: Option[PagingInfoRPC]
error*: HistoryResponseErrorRPC error*: HistoryResponseErrorRPC
HistoryRPC* = object HistoryRPC* = object
requestId*: string requestId*: string
query*: HistoryQueryRPC query*: Option[HistoryQueryRPC]
response*: HistoryResponseRPC response*: Option[HistoryResponseRPC]
proc parse*(T: type HistoryResponseErrorRPC, kind: uint32): T = proc parse*(T: type HistoryResponseErrorRPC, kind: uint32): T =
@ -112,55 +112,53 @@ proc toAPI*(rpc: PagingIndexRPC): HistoryCursor =
proc toRPC*(query: HistoryQuery): HistoryQueryRPC = proc toRPC*(query: HistoryQuery): HistoryQueryRPC =
let var rpc = HistoryQueryRPC()
contentFilters = query.contentTopics.mapIt(HistoryContentFilterRPC(contentTopic: it))
pubsubTopic = query.pubsubTopic.get(default(string)) rpc.contentFilters = query.contentTopics.mapIt(HistoryContentFilterRPC(contentTopic: it))
pageSize = query.pageSize rpc.pubsubTopic = query.pubsubTopic
cursor = query.cursor.get(default(HistoryCursor)).toRPC() rpc.pagingInfo = block:
if query.cursor.isNone() and
query.pageSize == default(type query.pageSize) and
query.ascending == default(type query.ascending):
none(PagingInfoRPC)
else:
let
pageSize = some(query.pageSize)
cursor = query.cursor.map(toRPC)
direction = if query.ascending: some(PagingDirectionRPC.FORWARD)
else: some(PagingDirectionRPC.BACKWARD)
some(PagingInfoRPC(
pageSize: pageSize,
cursor: cursor,
direction: direction
))
direction = if query.ascending: PagingDirectionRPC.FORWARD rpc.startTime = query.startTime
else: PagingDirectionRPC.BACKWARD rpc.endTime = query.endTime
startTime = query.startTime.get(default(Timestamp)) rpc
endTime = query.endTime.get(default(Timestamp))
HistoryQueryRPC(
contentFilters: contentFilters,
pubsubTopic: pubsubTopic,
pagingInfo: PagingInfoRPC(
pageSize: pageSize,
cursor: cursor,
direction: direction
),
startTime: startTime,
endTime: endTime
)
proc toAPI*(rpc: HistoryQueryRPC): HistoryQuery = proc toAPI*(rpc: HistoryQueryRPC): HistoryQuery =
let let
pubsubTopic = if rpc.pubsubTopic == default(string): none(PubsubTopic) pubsubTopic = rpc.pubsubTopic
else: some(rpc.pubsubTopic)
contentTopics = rpc.contentFilters.mapIt(it.contentTopic) contentTopics = rpc.contentFilters.mapIt(it.contentTopic)
cursor = if rpc.pagingInfo == default(PagingInfoRPC) or rpc.pagingInfo.cursor == default(PagingIndexRPC): none(HistoryCursor) cursor = if rpc.pagingInfo.isNone() or rpc.pagingInfo.get().cursor.isNone(): none(HistoryCursor)
else: some(rpc.pagingInfo.cursor.toAPI()) else: rpc.pagingInfo.get().cursor.map(toAPI)
startTime = if rpc.startTime == default(Timestamp): none(Timestamp) startTime = rpc.startTime
else: some(rpc.startTime)
endTime = if rpc.endTime == default(Timestamp): none(Timestamp) endTime = rpc.endTime
else: some(rpc.endTime)
pageSize = if rpc.pagingInfo == default(PagingInfoRPC): 0.uint64 pageSize = if rpc.pagingInfo.isNone() or rpc.pagingInfo.get().pageSize.isNone(): 0'u64
else: rpc.pagingInfo.pageSize else: rpc.pagingInfo.get().pageSize.get()
ascending = if rpc.pagingInfo == default(PagingInfoRPC): true ascending = if rpc.pagingInfo.isNone() or rpc.pagingInfo.get().direction.isNone(): true
else: rpc.pagingInfo.direction == PagingDirectionRPC.FORWARD else: rpc.pagingInfo.get().direction.get() == PagingDirectionRPC.FORWARD
HistoryQuery( HistoryQuery(
pubsubTopic: pubsubTopic, pubsubTopic: pubsubTopic,
@ -208,18 +206,18 @@ proc toRPC*(res: HistoryResult): HistoryResponseRPC =
pagingInfo = block: pagingInfo = block:
if resp.cursor.isNone(): if resp.cursor.isNone():
default(PagingInfoRPC) none(PagingInfoRPC)
else: else:
let let
pageSize = resp.pageSize pageSize = some(resp.pageSize)
cursor = resp.cursor.get(default(HistoryCursor)).toRPC() cursor = resp.cursor.map(toRPC)
direction = if resp.ascending: PagingDirectionRPC.FORWARD direction = if resp.ascending: some(PagingDirectionRPC.FORWARD)
else: PagingDirectionRPC.BACKWARD else: some(PagingDirectionRPC.BACKWARD)
PagingInfoRPC( some(PagingInfoRPC(
pageSize: pageSize, pageSize: pageSize,
cursor: cursor, cursor: cursor,
direction: direction direction: direction
) ))
error = HistoryResponseErrorRPC.NONE error = HistoryResponseErrorRPC.NONE
@ -236,12 +234,14 @@ proc toAPI*(rpc: HistoryResponseRPC): HistoryResult =
let let
messages = rpc.messages messages = rpc.messages
pageSize = rpc.pagingInfo.pageSize pageSize = if rpc.pagingInfo.isNone(): 0'u64
else: rpc.pagingInfo.get().pageSize.get(0'u64)
ascending = rpc.pagingInfo == default(PagingInfoRPC) or rpc.pagingInfo.direction == PagingDirectionRPC.FORWARD ascending = if rpc.pagingInfo.isNone(): true
else: rpc.pagingInfo.get().direction.get(PagingDirectionRPC.FORWARD) == PagingDirectionRPC.FORWARD
cursor = if rpc.pagingInfo == default(PagingInfoRPC) or rpc.pagingInfo.cursor == default(PagingIndexRPC): none(HistoryCursor) cursor = if rpc.pagingInfo.isNone(): none(HistoryCursor)
else: some(rpc.pagingInfo.cursor.toAPI()) else: rpc.pagingInfo.get().cursor.map(toAPI)
ok(HistoryResponse( ok(HistoryResponse(
messages: messages, messages: messages,

View File

@ -4,10 +4,10 @@ else:
{.push raises: [].} {.push raises: [].}
import import
std/options,
nimcrypto/hash nimcrypto/hash
import import
../../../common/protobuf, ../../../common/protobuf,
../../utils/time,
../waku_message, ../waku_message,
./common, ./common,
./rpc ./rpc
@ -33,165 +33,199 @@ proc encode*(index: PagingIndexRPC): ProtoBuffer =
proc decode*(T: type PagingIndexRPC, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type PagingIndexRPC, buffer: seq[byte]): ProtoResult[T] =
## creates and returns an Index object out of buffer ## creates and returns an Index object out of buffer
var index = PagingIndexRPC() var rpc = PagingIndexRPC()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var data: seq[byte] var data: seq[byte]
discard ?pb.getField(1, data) if not ?pb.getField(1, data):
return err(ProtoError.RequiredFieldMissing)
else:
var digest = MessageDigest()
for count, b in data:
digest.data[count] = b
# create digest from data rpc.digest = digest
index.digest = MessageDigest()
for count, b in data:
index.digest.data[count] = b
# read the timestamp
var receiverTime: zint64 var receiverTime: zint64
discard ?pb.getField(2, receiverTime) if not ?pb.getField(2, receiverTime):
index.receiverTime = Timestamp(receiverTime) return err(ProtoError.RequiredFieldMissing)
else:
rpc.receiverTime = int64(receiverTime)
# read the timestamp
var senderTime: zint64 var senderTime: zint64
discard ?pb.getField(3, senderTime) if not ?pb.getField(3, senderTime):
index.senderTime = Timestamp(senderTime) return err(ProtoError.RequiredFieldMissing)
else:
rpc.senderTime = int64(senderTime)
# read the pubsubTopic var pubsubTopic: string
discard ?pb.getField(4, index.pubsubTopic) if not ?pb.getField(4, pubsubTopic):
return err(ProtoError.RequiredFieldMissing)
else:
rpc.pubsubTopic = pubsubTopic
ok(index) ok(rpc)
proc encode*(pinfo: PagingInfoRPC): ProtoBuffer = proc encode*(rpc: PagingInfoRPC): ProtoBuffer =
## Encodes a PagingInfo object into a ProtoBuffer ## Encodes a PagingInfo object into a ProtoBuffer
## returns the resultant ProtoBuffer ## returns the resultant ProtoBuffer
var pb = initProtoBuffer() var pb = initProtoBuffer()
pb.write3(1, pinfo.pageSize) pb.write3(1, rpc.pageSize.map(proc(size: uint64): zint64 = zint64(size)))
pb.write3(2, pinfo.cursor.encode()) pb.write3(2, rpc.cursor.map(encode))
pb.write3(3, uint32(ord(pinfo.direction))) pb.write3(3, rpc.direction.map(proc(d: PagingDirectionRPC): uint32 = uint32(ord(d))))
pb.finish3() pb.finish3()
pb pb
proc decode*(T: type PagingInfoRPC, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type PagingInfoRPC, buffer: seq[byte]): ProtoResult[T] =
## creates and returns a PagingInfo object out of buffer ## creates and returns a PagingInfo object out of buffer
var pagingInfo = PagingInfoRPC() var rpc = PagingInfoRPC()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var pageSize: uint64 var pageSize: zint64
discard ?pb.getField(1, pageSize) if not ?pb.getField(1, pageSize):
pagingInfo.pageSize = pageSize rpc.pageSize = none(uint64)
else:
rpc.pageSize = some(uint64(pageSize))
var cursorBuffer: seq[byte] var cursorBuffer: seq[byte]
discard ?pb.getField(2, cursorBuffer) if not ?pb.getField(2, cursorBuffer):
pagingInfo.cursor = ?PagingIndexRPC.decode(cursorBuffer) rpc.cursor = none(PagingIndexRPC)
else:
let cursor = ?PagingIndexRPC.decode(cursorBuffer)
rpc.cursor = some(cursor)
var direction: uint32 var direction: uint32
discard ?pb.getField(3, direction) if not ?pb.getField(3, direction):
pagingInfo.direction = PagingDirectionRPC(direction) rpc.direction = none(PagingDirectionRPC)
else:
rpc.direction = some(PagingDirectionRPC(direction))
ok(pagingInfo) ok(rpc)
## Wire protocol ## Wire protocol
proc encode*(filter: HistoryContentFilterRPC): ProtoBuffer = proc encode*(rpc: HistoryContentFilterRPC): ProtoBuffer =
var pb = initProtoBuffer() var pb = initProtoBuffer()
pb.write3(1, filter.contentTopic) pb.write3(1, rpc.contentTopic)
pb.finish3() pb.finish3()
pb pb
proc decode*(T: type HistoryContentFilterRPC, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type HistoryContentFilterRPC, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var contentTopic: ContentTopic var contentTopic: ContentTopic
discard ?pb.getField(1, contentTopic) if not ?pb.getField(1, contentTopic):
return err(ProtoError.RequiredFieldMissing)
ok(HistoryContentFilterRPC(contentTopic: contentTopic)) ok(HistoryContentFilterRPC(contentTopic: contentTopic))
proc encode*(query: HistoryQueryRPC): ProtoBuffer = proc encode*(rpc: HistoryQueryRPC): ProtoBuffer =
var pb = initProtoBuffer() var pb = initProtoBuffer()
pb.write3(2, query.pubsubTopic) pb.write3(2, rpc.pubsubTopic)
for filter in query.contentFilters: for filter in rpc.contentFilters:
pb.write3(3, filter.encode()) pb.write3(3, filter.encode())
pb.write3(4, query.pagingInfo.encode()) pb.write3(4, rpc.pagingInfo.map(encode))
pb.write3(5, zint64(query.startTime)) pb.write3(5, rpc.startTime.map(proc (time: int64): zint64 = zint64(time)))
pb.write3(6, zint64(query.endTime)) pb.write3(6, rpc.endTime.map(proc (time: int64): zint64 = zint64(time)))
pb.finish3() pb.finish3()
pb pb
proc decode*(T: type HistoryQueryRPC, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type HistoryQueryRPC, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryQueryRPC() var rpc = HistoryQueryRPC()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
discard ?pb.getField(2, msg.pubsubTopic) var pubsubTopic: string
if not ?pb.getField(2, pubsubTopic):
rpc.pubsubTopic = none(string)
else:
rpc.pubsubTopic = some(pubsubTopic)
var buffs: seq[seq[byte]] var buffs: seq[seq[byte]]
discard ?pb.getRepeatedField(3, buffs) if not ?pb.getRepeatedField(3, buffs):
rpc.contentFilters = @[]
for pb in buffs: else:
msg.contentFilters.add(? HistoryContentFilterRPC.decode(pb)) for pb in buffs:
let filter = ?HistoryContentFilterRPC.decode(pb)
rpc.contentFilters.add(filter)
var pagingInfoBuffer: seq[byte] var pagingInfoBuffer: seq[byte]
discard ?pb.getField(4, pagingInfoBuffer) if not ?pb.getField(4, pagingInfoBuffer):
rpc.pagingInfo = none(PagingInfoRPC)
msg.pagingInfo = ?PagingInfoRPC.decode(pagingInfoBuffer) else:
let pagingInfo = ?PagingInfoRPC.decode(pagingInfoBuffer)
rpc.pagingInfo = some(pagingInfo)
var startTime: zint64 var startTime: zint64
discard ?pb.getField(5, startTime) if not ?pb.getField(5, startTime):
msg.startTime = Timestamp(startTime) rpc.startTime = none(int64)
else:
rpc.startTime = some(int64(startTime))
var endTime: zint64 var endTime: zint64
discard ?pb.getField(6, endTime) if not ?pb.getField(6, endTime):
msg.endTime = Timestamp(endTime) rpc.endTime = none(int64)
else:
rpc.endTime = some(int64(endTime))
ok(msg) ok(rpc)
proc encode*(response: HistoryResponseRPC): ProtoBuffer = proc encode*(response: HistoryResponseRPC): ProtoBuffer =
var pb = initProtoBuffer() var pb = initProtoBuffer()
for msg in response.messages: for rpc in response.messages:
pb.write3(2, msg.encode()) pb.write3(2, rpc.encode())
pb.write3(3, response.pagingInfo.encode()) pb.write3(3, response.pagingInfo.map(encode))
pb.write3(4, uint32(ord(response.error))) pb.write3(4, uint32(ord(response.error)))
pb.finish3() pb.finish3()
pb pb
proc decode*(T: type HistoryResponseRPC, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type HistoryResponseRPC, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryResponseRPC() var rpc = HistoryResponseRPC()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var messages: seq[seq[byte]] var messages: seq[seq[byte]]
discard ?pb.getRepeatedField(2, messages) if ?pb.getRepeatedField(2, messages):
for pb in messages:
for pb in messages: let message = ?WakuMessage.decode(pb)
let message = ?WakuMessage.decode(pb) rpc.messages.add(message)
msg.messages.add(message) else:
rpc.messages = @[]
var pagingInfoBuffer: seq[byte] var pagingInfoBuffer: seq[byte]
discard ?pb.getField(3, pagingInfoBuffer) if ?pb.getField(3, pagingInfoBuffer):
msg.pagingInfo = ?PagingInfoRPC.decode(pagingInfoBuffer) let pagingInfo = ?PagingInfoRPC.decode(pagingInfoBuffer)
rpc.pagingInfo = some(pagingInfo)
else:
rpc.pagingInfo = none(PagingInfoRPC)
var error: uint32 var error: uint32
discard ?pb.getField(4, error) if not ?pb.getField(4, error):
msg.error = HistoryResponseErrorRPC.parse(error) return err(ProtoError.RequiredFieldMissing)
else:
rpc.error = HistoryResponseErrorRPC.parse(error)
ok(msg) ok(rpc)
proc encode*(rpc: HistoryRPC): ProtoBuffer = proc encode*(rpc: HistoryRPC): ProtoBuffer =
var pb = initProtoBuffer() var pb = initProtoBuffer()
pb.write3(1, rpc.requestId) pb.write3(1, rpc.requestId)
pb.write3(2, rpc.query.encode()) pb.write3(2, rpc.query.map(encode))
pb.write3(3, rpc.response.encode()) pb.write3(3, rpc.response.map(encode))
pb.finish3() pb.finish3()
pb pb
@ -199,14 +233,22 @@ proc encode*(rpc: HistoryRPC): ProtoBuffer =
proc decode*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = HistoryRPC() var rpc = HistoryRPC()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
discard ?pb.getField(1, rpc.requestId)
if not ?pb.getField(1, rpc.requestId):
return err(ProtoError.RequiredFieldMissing)
var queryBuffer: seq[byte] var queryBuffer: seq[byte]
discard ?pb.getField(2, queryBuffer) if not ?pb.getField(2, queryBuffer):
rpc.query = ?HistoryQueryRPC.decode(queryBuffer) rpc.query = none(HistoryQueryRPC)
else:
let query = ?HistoryQueryRPC.decode(queryBuffer)
rpc.query = some(query)
var responseBuffer: seq[byte] var responseBuffer: seq[byte]
discard ?pb.getField(3, responseBuffer) if not ?pb.getField(3, responseBuffer):
rpc.response = ?HistoryResponseRPC.decode(responseBuffer) rpc.response = none(HistoryResponseRPC)
else:
let response = ?HistoryResponseRPC.decode(responseBuffer)
rpc.response = some(response)
ok(rpc) ok(rpc)