nwaku/waku/waku_store/rpc_codec.nim

211 lines
4.9 KiB
Nim
Raw Normal View History

when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
2024-04-25 13:09:52 +00:00
import std/options, stew/arrayops, nimcrypto/hash
import ../common/[protobuf, paging], ../waku_core, ./common
const DefaultMaxRpcSize* = -1
2024-04-25 13:09:52 +00:00
### Request ###
2024-04-25 13:09:52 +00:00
proc encode*(req: StoreQueryRequest): ProtoBuffer =
var pb = initProtoBuffer()
2024-04-25 13:09:52 +00:00
pb.write3(1, req.requestId)
pb.write3(2, req.includeData)
pb.write3(10, req.pubsubTopic)
2024-04-25 13:09:52 +00:00
for contentTopic in req.contentTopics:
pb.write3(11, contentTopic)
pb.write3(
12,
2024-04-25 13:09:52 +00:00
req.startTime.map(
proc(time: int64): zint64 =
zint64(time)
),
)
pb.write3(
13,
2024-04-25 13:09:52 +00:00
req.endTime.map(
proc(time: int64): zint64 =
zint64(time)
),
)
2024-04-25 13:09:52 +00:00
for hash in req.messagehashes:
pb.write3(20, hash)
2024-04-25 13:09:52 +00:00
pb.write3(51, req.paginationCursor)
pb.write3(52, uint32(req.paginationForward))
pb.write3(53, req.paginationLimit)
2024-04-25 13:09:52 +00:00
pb.finish3()
2024-04-25 13:09:52 +00:00
return pb
2024-04-25 13:09:52 +00:00
proc decode*(
T: type StoreQueryRequest, buffer: seq[byte]
): ProtobufResult[StoreQueryRequest] =
var req = StoreQueryRequest()
let pb = initProtoBuffer(buffer)
2024-04-25 13:09:52 +00:00
if not ?pb.getField(1, req.requestId):
return err(ProtobufError.missingRequiredField("request_id"))
var inclData: uint
if not ?pb.getField(2, inclData):
req.includeData = false
else:
2024-04-25 13:09:52 +00:00
req.includeData = inclData == 1
2024-04-25 13:09:52 +00:00
var pubsubTopic: string
if not ?pb.getField(10, pubsubTopic):
2024-04-25 13:09:52 +00:00
req.pubsubTopic = none(string)
else:
2024-04-25 13:09:52 +00:00
req.pubsubTopic = some(pubsubTopic)
2024-04-25 13:09:52 +00:00
var topics: seq[string]
if not ?pb.getRepeatedField(11, topics):
2024-04-25 13:09:52 +00:00
req.contentTopics = @[]
else:
2024-04-25 13:09:52 +00:00
req.contentTopics = topics
2024-04-25 13:09:52 +00:00
var start: zint64
if not ?pb.getField(12, start):
2024-04-25 13:09:52 +00:00
req.startTime = none(Timestamp)
else:
2024-04-25 13:09:52 +00:00
req.startTime = some(Timestamp(int64(start)))
var endTime: zint64
if not ?pb.getField(13, endTime):
2024-04-25 13:09:52 +00:00
req.endTime = none(Timestamp)
else:
2024-04-25 13:09:52 +00:00
req.endTime = some(Timestamp(int64(endTime)))
2024-04-25 13:09:52 +00:00
var buffer: seq[seq[byte]]
if not ?pb.getRepeatedField(20, buffer):
2024-04-25 13:09:52 +00:00
req.messageHashes = @[]
else:
req.messageHashes = newSeqOfCap[WakuMessageHash](buffer.len)
for buf in buffer:
var hash: WakuMessageHash
discard copyFrom[byte](hash, buf)
req.messageHashes.add(hash)
2024-04-25 13:09:52 +00:00
var cursor: seq[byte]
if not ?pb.getField(51, cursor):
2024-04-25 13:09:52 +00:00
req.paginationCursor = none(WakuMessageHash)
else:
var hash: WakuMessageHash
discard copyFrom[byte](hash, cursor)
req.paginationCursor = some(hash)
2024-04-25 13:09:52 +00:00
var paging: uint32
if not ?pb.getField(52, paging):
2024-04-25 13:09:52 +00:00
req.paginationForward = PagingDirection.default()
else:
req.paginationForward = PagingDirection(paging)
2024-04-25 13:09:52 +00:00
var limit: uint64
if not ?pb.getField(53, limit):
2024-04-25 13:09:52 +00:00
req.paginationLimit = none(uint64)
else:
req.paginationLimit = some(limit)
2024-04-25 13:09:52 +00:00
return ok(req)
2024-04-25 13:09:52 +00:00
### Response ###
2024-04-25 13:09:52 +00:00
proc encode*(keyValue: WakuMessageKeyValue): ProtoBuffer =
var pb = initProtoBuffer()
2024-04-25 13:09:52 +00:00
pb.write3(1, keyValue.messageHash)
pb.write3(2, keyValue.message.encode())
2024-04-25 13:09:52 +00:00
pb.finish3()
2024-04-25 13:09:52 +00:00
return pb
2024-04-25 13:09:52 +00:00
proc encode*(res: StoreQueryResponse): ProtoBuffer =
var pb = initProtoBuffer()
2024-04-25 13:09:52 +00:00
pb.write3(1, res.requestId)
pb.write3(10, res.statusCode)
pb.write3(11, res.statusDesc)
2024-04-25 13:09:52 +00:00
for msg in res.messages:
pb.write3(20, msg.encode())
2024-04-25 13:09:52 +00:00
pb.write3(51, res.paginationCursor)
2024-04-25 13:09:52 +00:00
pb.finish3()
2024-04-25 13:09:52 +00:00
return pb
2024-04-25 13:09:52 +00:00
proc decode*(
T: type WakuMessageKeyValue, buffer: seq[byte]
): ProtobufResult[WakuMessageKeyValue] =
var keyValue = WakuMessageKeyValue()
let pb = initProtoBuffer(buffer)
2024-04-25 13:09:52 +00:00
var buf: seq[byte]
if not ?pb.getField(1, buf):
return err(ProtobufError.missingRequiredField("message_hash"))
else:
var hash: WakuMessageHash
discard copyFrom[byte](hash, buf)
keyValue.messagehash = hash
var proto: ProtoBuffer
if not ?pb.getField(2, proto):
return err(ProtobufError.missingRequiredField("message"))
else:
keyValue.message = ?WakuMessage.decode(proto.buffer)
return ok(keyValue)
proc decode*(
T: type StoreQueryResponse, buffer: seq[byte]
): ProtobufResult[StoreQueryResponse] =
var res = StoreQueryResponse()
let pb = initProtoBuffer(buffer)
if not ?pb.getField(1, res.requestId):
return err(ProtobufError.missingRequiredField("request_id"))
2024-04-25 13:09:52 +00:00
var code: uint32
if not ?pb.getField(10, code):
2024-04-25 13:09:52 +00:00
return err(ProtobufError.missingRequiredField("status_code"))
else:
res.statusCode = code
var desc: string
if not ?pb.getField(11, desc):
2024-04-25 13:09:52 +00:00
return err(ProtobufError.missingRequiredField("status_desc"))
else:
res.statusDesc = desc
var buffer: seq[seq[byte]]
if not ?pb.getRepeatedField(20, buffer):
2024-04-25 13:09:52 +00:00
res.messages = @[]
else:
2024-04-25 13:09:52 +00:00
res.messages = newSeqOfCap[WakuMessageKeyValue](buffer.len)
for buf in buffer:
let msg = ?WakuMessageKeyValue.decode(buf)
res.messages.add(msg)
2024-04-25 13:09:52 +00:00
var cursor: seq[byte]
if not ?pb.getField(51, cursor):
2024-04-25 13:09:52 +00:00
res.paginationCursor = none(WakuMessageHash)
else:
2024-04-25 13:09:52 +00:00
var hash: WakuMessageHash
discard copyFrom[byte](hash, cursor)
res.paginationCursor = some(hash)
2024-04-25 13:09:52 +00:00
return ok(res)