mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 00:13:06 +00:00
adds protobuf methods for pagination (#248)
This commit is contained in:
parent
8d73c8c74c
commit
94a28d9fb7
@ -57,3 +57,106 @@ procSuite "Waku Store":
|
|||||||
|
|
||||||
check:
|
check:
|
||||||
(await completionFut.withTimeout(5.seconds)) == true
|
(await completionFut.withTimeout(5.seconds)) == true
|
||||||
|
|
||||||
|
test "Index Protobuf encoder/decoder test":
|
||||||
|
let
|
||||||
|
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)))
|
||||||
|
pb = index.encode()
|
||||||
|
decodedIndex = Index.init(pb.buffer)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# the fields of decodedIndex must be the same as the original index
|
||||||
|
decodedIndex.isErr == false
|
||||||
|
decodedIndex.value == index
|
||||||
|
|
||||||
|
let
|
||||||
|
emptyIndex = Index()
|
||||||
|
epb = emptyIndex.encode()
|
||||||
|
decodedEmptyIndex = Index.init(epb.buffer)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# check the correctness of init and encode for an empty Index
|
||||||
|
decodedEmptyIndex.isErr == false
|
||||||
|
decodedEmptyIndex.value == emptyIndex
|
||||||
|
|
||||||
|
|
||||||
|
test "PagingDirection Protobuf encod/init test":
|
||||||
|
let
|
||||||
|
pagingDirection = PagingDirection.BACKWARD
|
||||||
|
pb = pagingDirection.encode()
|
||||||
|
decodedPagingDirection = PagingDirection.init(pb.buffer)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# the decodedPagingDirection must be the same as the original pagingDirection
|
||||||
|
decodedPagingDirection.isErr == false
|
||||||
|
decodedPagingDirection.value == pagingDirection
|
||||||
|
|
||||||
|
test "PagingInfo Protobuf encod/init test":
|
||||||
|
let
|
||||||
|
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)))
|
||||||
|
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||||
|
pb = pagingInfo.encode()
|
||||||
|
decodedPagingInfo = PagingInfo.init(pb.buffer)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# the fields of decodedPagingInfo must be the same as the original pagingInfo
|
||||||
|
decodedPagingInfo.isErr == false
|
||||||
|
decodedPagingInfo.value == pagingInfo
|
||||||
|
|
||||||
|
let
|
||||||
|
emptyPagingInfo = PagingInfo()
|
||||||
|
epb = emptyPagingInfo.encode()
|
||||||
|
decodedEmptyPagingInfo = PagingInfo.init(epb.buffer)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# check the correctness of init and encode for an empty PagingInfo
|
||||||
|
decodedEmptyPagingInfo.isErr == false
|
||||||
|
decodedEmptyPagingInfo.value == emptyPagingInfo
|
||||||
|
|
||||||
|
test "HistoryQuery Protobuf encod/init test":
|
||||||
|
let
|
||||||
|
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)))
|
||||||
|
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||||
|
query=HistoryQuery(topics: @[ContentTopic(1)], pagingInfo: pagingInfo)
|
||||||
|
pb = query.encode()
|
||||||
|
decodedQuery = HistoryQuery.init(pb.buffer)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# the fields of decoded query decodedQuery must be the same as the original query query
|
||||||
|
decodedQuery.isErr == false
|
||||||
|
decodedQuery.value == query
|
||||||
|
|
||||||
|
let
|
||||||
|
emptyQuery=HistoryQuery()
|
||||||
|
epb = emptyQuery.encode()
|
||||||
|
decodedEmptyQuery = HistoryQuery.init(epb.buffer)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# check the correctness of init and encode for an empty HistoryQuery
|
||||||
|
decodedEmptyQuery.isErr == false
|
||||||
|
decodedEmptyQuery.value == emptyQuery
|
||||||
|
|
||||||
|
test "HistoryResponse Protobuf encod/init test":
|
||||||
|
let
|
||||||
|
wm = WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1))
|
||||||
|
index = computeIndex(wm)
|
||||||
|
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||||
|
res = HistoryResponse(messages: @[wm], pagingInfo:pagingInfo)
|
||||||
|
pb = res.encode()
|
||||||
|
decodedRes = HistoryResponse.init(pb.buffer)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# the fields of decoded response decodedRes must be the same as the original response res
|
||||||
|
decodedRes.isErr == false
|
||||||
|
decodedRes.value == res
|
||||||
|
|
||||||
|
let
|
||||||
|
emptyRes=HistoryResponse()
|
||||||
|
epb = emptyRes.encode()
|
||||||
|
decodedEmptyRes = HistoryResponse.init(epb.buffer)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# check the correctness of init and encode for an empty HistoryResponse
|
||||||
|
decodedEmptyRes.isErr == false
|
||||||
|
decodedEmptyRes.value == emptyRes
|
||||||
|
|
||||||
|
|||||||
@ -25,7 +25,8 @@ type
|
|||||||
payload*: seq[byte]
|
payload*: seq[byte]
|
||||||
contentTopic*: ContentTopic
|
contentTopic*: ContentTopic
|
||||||
|
|
||||||
MessageNotificationHandler* = proc(topic: string, msg: WakuMessage): Future[void] {.gcsafe, closure.}
|
MessageNotificationHandler* = proc(topic: string, msg: WakuMessage): Future[
|
||||||
|
void] {.gcsafe, closure.}
|
||||||
|
|
||||||
MessageNotificationSubscriptions* = TableRef[string, MessageNotificationSubscription]
|
MessageNotificationSubscriptions* = TableRef[string, MessageNotificationSubscription]
|
||||||
|
|
||||||
@ -35,20 +36,27 @@ type
|
|||||||
|
|
||||||
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
|
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
|
||||||
|
|
||||||
|
|
||||||
Index* = object
|
Index* = object
|
||||||
## This type contains the description of an index used in the pagination of waku messages
|
## This type contains the description of an Index used in the pagination of WakuMessages
|
||||||
digest*: MDigest[256]
|
digest*: MDigest[256]
|
||||||
receivedTime*: float
|
receivedTime*: float64
|
||||||
|
|
||||||
IndexedWakuMessage* = object
|
IndexedWakuMessage* = object
|
||||||
|
## This type is used to encapsulate a WakuMessage and its Index
|
||||||
msg*: WakuMessage
|
msg*: WakuMessage
|
||||||
index*: Index
|
index*: Index
|
||||||
|
|
||||||
|
PagingDirection* {.pure.} = enum
|
||||||
|
## PagingDirection determines the direction of pagination
|
||||||
|
BACKWARD = uint32(0)
|
||||||
|
FORWARD = uint32(1)
|
||||||
|
|
||||||
PagingInfo* = object
|
PagingInfo* = object
|
||||||
## This type holds the information needed for the pagination
|
## This type holds the information needed for the pagination
|
||||||
pageSize*: int
|
pageSize*: uint64
|
||||||
cursor*: Index
|
cursor*: Index
|
||||||
direction*: bool
|
direction*: PagingDirection
|
||||||
|
|
||||||
HistoryQuery* = object
|
HistoryQuery* = object
|
||||||
topics*: seq[ContentTopic]
|
topics*: seq[ContentTopic]
|
||||||
@ -182,7 +190,7 @@ proc computeIndex*(msg: WakuMessage): Index =
|
|||||||
if msg.contentTopic != 0: # checks for non-empty contentTopic
|
if msg.contentTopic != 0: # checks for non-empty contentTopic
|
||||||
ctx.update(msg.contentTopic.toBytes()) # converts the topic to bytes
|
ctx.update(msg.contentTopic.toBytes()) # converts the topic to bytes
|
||||||
ctx.update(msg.payload)
|
ctx.update(msg.payload)
|
||||||
let digest = ctx.finish() # computes the hash
|
let digest = ctx.finish() # computes the hash
|
||||||
ctx.clear()
|
ctx.clear()
|
||||||
|
|
||||||
result.digest = digest
|
result.digest = digest
|
||||||
|
|||||||
@ -16,6 +16,90 @@ logScope:
|
|||||||
const
|
const
|
||||||
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta1"
|
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta1"
|
||||||
|
|
||||||
|
|
||||||
|
proc encode*(index: Index): ProtoBuffer =
|
||||||
|
## encodes an Index object into a ProtoBuffer
|
||||||
|
## returns the resultant ProtoBuffer
|
||||||
|
|
||||||
|
# intiate a ProtoBuffer
|
||||||
|
result = initProtoBuffer()
|
||||||
|
|
||||||
|
# encodes index
|
||||||
|
result.write(1, index.digest.data)
|
||||||
|
result.write(2, index.receivedTime)
|
||||||
|
|
||||||
|
proc encode*(pd: PagingDirection): ProtoBuffer =
|
||||||
|
## encodes a PagingDirection into a ProtoBuffer
|
||||||
|
## returns the resultant ProtoBuffer
|
||||||
|
|
||||||
|
# intiate a ProtoBuffer
|
||||||
|
result = initProtoBuffer()
|
||||||
|
|
||||||
|
# encodes pd
|
||||||
|
result.write(1, uint32(ord(pd)))
|
||||||
|
|
||||||
|
proc encode*(pinfo: PagingInfo): ProtoBuffer =
|
||||||
|
## encodes a PagingInfo object into a ProtoBuffer
|
||||||
|
## returns the resultant ProtoBuffer
|
||||||
|
|
||||||
|
# intiate a ProtoBuffer
|
||||||
|
result = initProtoBuffer()
|
||||||
|
|
||||||
|
# encodes pinfo
|
||||||
|
result.write(1, pinfo.pageSize)
|
||||||
|
result.write(2, pinfo.cursor.encode())
|
||||||
|
result.write(3, pinfo.direction.encode())
|
||||||
|
|
||||||
|
proc init*(T: type Index, buffer: seq[byte]): ProtoResult[T] =
|
||||||
|
## creates and returns an Index object out of buffer
|
||||||
|
var index = Index()
|
||||||
|
let pb = initProtoBuffer(buffer)
|
||||||
|
|
||||||
|
var data: seq[byte]
|
||||||
|
discard ? pb.getField(1, data)
|
||||||
|
|
||||||
|
# create digest from data
|
||||||
|
index.digest = MDigest[256]()
|
||||||
|
for count, b in data:
|
||||||
|
index.digest.data[count] = b
|
||||||
|
|
||||||
|
# read the receivedTime
|
||||||
|
var receivedTime: float64
|
||||||
|
discard ? pb.getField(2, receivedTime)
|
||||||
|
index.receivedTime = receivedTime
|
||||||
|
|
||||||
|
ok(index)
|
||||||
|
|
||||||
|
proc init*(T: type PagingDirection, buffer: seq[byte]): ProtoResult[T] =
|
||||||
|
## creates and returns a PagingDirection object out of buffer
|
||||||
|
let pb = initProtoBuffer(buffer)
|
||||||
|
|
||||||
|
var dir: uint32
|
||||||
|
discard ? pb.getField(1, dir)
|
||||||
|
var direction = PagingDirection(dir)
|
||||||
|
|
||||||
|
ok(direction)
|
||||||
|
|
||||||
|
proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] =
|
||||||
|
## creates and returns a PagingInfo object out of buffer
|
||||||
|
var pagingInfo = PagingInfo()
|
||||||
|
let pb = initProtoBuffer(buffer)
|
||||||
|
|
||||||
|
var pageSize: uint32
|
||||||
|
discard ? pb.getField(1, pageSize)
|
||||||
|
pagingInfo.pageSize = pageSize
|
||||||
|
|
||||||
|
|
||||||
|
var cursorBuffer: seq[byte]
|
||||||
|
discard ? pb.getField(2, cursorBuffer)
|
||||||
|
pagingInfo.cursor = ? Index.init(cursorBuffer)
|
||||||
|
|
||||||
|
var directionBuffer: seq[byte]
|
||||||
|
discard ? pb.getField(3, directionBuffer)
|
||||||
|
pagingInfo.direction = ? PagingDirection.init(directionBuffer)
|
||||||
|
|
||||||
|
ok(pagingInfo)
|
||||||
|
|
||||||
proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
|
proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
|
||||||
var msg = HistoryQuery()
|
var msg = HistoryQuery()
|
||||||
let pb = initProtoBuffer(buffer)
|
let pb = initProtoBuffer(buffer)
|
||||||
@ -25,6 +109,12 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
|
|||||||
discard ? pb.getRepeatedField(1, topics)
|
discard ? pb.getRepeatedField(1, topics)
|
||||||
|
|
||||||
msg.topics = topics
|
msg.topics = topics
|
||||||
|
|
||||||
|
var pagingInfoBuffer: seq[byte]
|
||||||
|
discard ? pb.getField(2, pagingInfoBuffer)
|
||||||
|
|
||||||
|
msg.pagingInfo = ? PagingInfo.init(pagingInfoBuffer)
|
||||||
|
|
||||||
ok(msg)
|
ok(msg)
|
||||||
|
|
||||||
proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
|
proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
|
||||||
@ -35,7 +125,11 @@ proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
|
|||||||
discard ? pb.getRepeatedField(1, messages)
|
discard ? pb.getRepeatedField(1, messages)
|
||||||
|
|
||||||
for buf in messages:
|
for buf in messages:
|
||||||
msg.messages.add(? WakuMessage.init(buf))
|
msg.messages.add( ? WakuMessage.init(buf))
|
||||||
|
|
||||||
|
var pagingInfoBuffer: seq[byte]
|
||||||
|
discard ? pb.getField(2,pagingInfoBuffer)
|
||||||
|
msg.pagingInfo= ? PagingInfo.init(pagingInfoBuffer)
|
||||||
|
|
||||||
ok(msg)
|
ok(msg)
|
||||||
|
|
||||||
@ -62,6 +156,8 @@ proc encode*(query: HistoryQuery): ProtoBuffer =
|
|||||||
|
|
||||||
for topic in query.topics:
|
for topic in query.topics:
|
||||||
result.write(1, topic)
|
result.write(1, topic)
|
||||||
|
|
||||||
|
result.write(2, query.pagingInfo.encode())
|
||||||
|
|
||||||
proc encode*(response: HistoryResponse): ProtoBuffer =
|
proc encode*(response: HistoryResponse): ProtoBuffer =
|
||||||
result = initProtoBuffer()
|
result = initProtoBuffer()
|
||||||
@ -69,6 +165,8 @@ proc encode*(response: HistoryResponse): ProtoBuffer =
|
|||||||
for msg in response.messages:
|
for msg in response.messages:
|
||||||
result.write(1, msg.encode())
|
result.write(1, msg.encode())
|
||||||
|
|
||||||
|
result.write(2, response.pagingInfo.encode())
|
||||||
|
|
||||||
proc encode*(rpc: HistoryRPC): ProtoBuffer =
|
proc encode*(rpc: HistoryRPC): ProtoBuffer =
|
||||||
result = initProtoBuffer()
|
result = initProtoBuffer()
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user