mirror of https://github.com/waku-org/nwaku.git
adds protobuf methods for pagination (#248)
This commit is contained in:
parent
4b21bb6b9b
commit
0cb2e125b6
|
@ -57,3 +57,106 @@ procSuite "Waku Store":
|
|||
|
||||
check:
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
|
||||
test "Index Protobuf encoder/decoder test":
|
||||
let
|
||||
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)))
|
||||
pb = index.encode()
|
||||
decodedIndex = Index.init(pb.buffer)
|
||||
|
||||
check:
|
||||
# the fields of decodedIndex must be the same as the original index
|
||||
decodedIndex.isErr == false
|
||||
decodedIndex.value == index
|
||||
|
||||
let
|
||||
emptyIndex = Index()
|
||||
epb = emptyIndex.encode()
|
||||
decodedEmptyIndex = Index.init(epb.buffer)
|
||||
|
||||
check:
|
||||
# check the correctness of init and encode for an empty Index
|
||||
decodedEmptyIndex.isErr == false
|
||||
decodedEmptyIndex.value == emptyIndex
|
||||
|
||||
|
||||
test "PagingDirection Protobuf encod/init test":
|
||||
let
|
||||
pagingDirection = PagingDirection.BACKWARD
|
||||
pb = pagingDirection.encode()
|
||||
decodedPagingDirection = PagingDirection.init(pb.buffer)
|
||||
|
||||
check:
|
||||
# the decodedPagingDirection must be the same as the original pagingDirection
|
||||
decodedPagingDirection.isErr == false
|
||||
decodedPagingDirection.value == pagingDirection
|
||||
|
||||
test "PagingInfo Protobuf encod/init test":
|
||||
let
|
||||
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)))
|
||||
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||
pb = pagingInfo.encode()
|
||||
decodedPagingInfo = PagingInfo.init(pb.buffer)
|
||||
|
||||
check:
|
||||
# the fields of decodedPagingInfo must be the same as the original pagingInfo
|
||||
decodedPagingInfo.isErr == false
|
||||
decodedPagingInfo.value == pagingInfo
|
||||
|
||||
let
|
||||
emptyPagingInfo = PagingInfo()
|
||||
epb = emptyPagingInfo.encode()
|
||||
decodedEmptyPagingInfo = PagingInfo.init(epb.buffer)
|
||||
|
||||
check:
|
||||
# check the correctness of init and encode for an empty PagingInfo
|
||||
decodedEmptyPagingInfo.isErr == false
|
||||
decodedEmptyPagingInfo.value == emptyPagingInfo
|
||||
|
||||
test "HistoryQuery Protobuf encod/init test":
|
||||
let
|
||||
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)))
|
||||
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||
query=HistoryQuery(topics: @[ContentTopic(1)], pagingInfo: pagingInfo)
|
||||
pb = query.encode()
|
||||
decodedQuery = HistoryQuery.init(pb.buffer)
|
||||
|
||||
check:
|
||||
# the fields of decoded query decodedQuery must be the same as the original query query
|
||||
decodedQuery.isErr == false
|
||||
decodedQuery.value == query
|
||||
|
||||
let
|
||||
emptyQuery=HistoryQuery()
|
||||
epb = emptyQuery.encode()
|
||||
decodedEmptyQuery = HistoryQuery.init(epb.buffer)
|
||||
|
||||
check:
|
||||
# check the correctness of init and encode for an empty HistoryQuery
|
||||
decodedEmptyQuery.isErr == false
|
||||
decodedEmptyQuery.value == emptyQuery
|
||||
|
||||
test "HistoryResponse Protobuf encod/init test":
|
||||
let
|
||||
wm = WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1))
|
||||
index = computeIndex(wm)
|
||||
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||
res = HistoryResponse(messages: @[wm], pagingInfo:pagingInfo)
|
||||
pb = res.encode()
|
||||
decodedRes = HistoryResponse.init(pb.buffer)
|
||||
|
||||
check:
|
||||
# the fields of decoded response decodedRes must be the same as the original response res
|
||||
decodedRes.isErr == false
|
||||
decodedRes.value == res
|
||||
|
||||
let
|
||||
emptyRes=HistoryResponse()
|
||||
epb = emptyRes.encode()
|
||||
decodedEmptyRes = HistoryResponse.init(epb.buffer)
|
||||
|
||||
check:
|
||||
# check the correctness of init and encode for an empty HistoryResponse
|
||||
decodedEmptyRes.isErr == false
|
||||
decodedEmptyRes.value == emptyRes
|
||||
|
||||
|
|
|
@ -25,7 +25,8 @@ type
|
|||
payload*: seq[byte]
|
||||
contentTopic*: ContentTopic
|
||||
|
||||
MessageNotificationHandler* = proc(topic: string, msg: WakuMessage): Future[void] {.gcsafe, closure.}
|
||||
MessageNotificationHandler* = proc(topic: string, msg: WakuMessage): Future[
|
||||
void] {.gcsafe, closure.}
|
||||
|
||||
MessageNotificationSubscriptions* = TableRef[string, MessageNotificationSubscription]
|
||||
|
||||
|
@ -35,20 +36,27 @@ type
|
|||
|
||||
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
|
||||
|
||||
|
||||
Index* = object
|
||||
## This type contains the description of an index used in the pagination of waku messages
|
||||
## This type contains the description of an Index used in the pagination of WakuMessages
|
||||
digest*: MDigest[256]
|
||||
receivedTime*: float
|
||||
receivedTime*: float64
|
||||
|
||||
IndexedWakuMessage* = object
|
||||
## This type is used to encapsulate a WakuMessage and its Index
|
||||
msg*: WakuMessage
|
||||
index*: Index
|
||||
|
||||
PagingDirection* {.pure.} = enum
|
||||
## PagingDirection determines the direction of pagination
|
||||
BACKWARD = uint32(0)
|
||||
FORWARD = uint32(1)
|
||||
|
||||
PagingInfo* = object
|
||||
## This type holds the information needed for the pagination
|
||||
pageSize*: int
|
||||
pageSize*: uint64
|
||||
cursor*: Index
|
||||
direction*: bool
|
||||
direction*: PagingDirection
|
||||
|
||||
HistoryQuery* = object
|
||||
topics*: seq[ContentTopic]
|
||||
|
@ -182,7 +190,7 @@ proc computeIndex*(msg: WakuMessage): Index =
|
|||
if msg.contentTopic != 0: # checks for non-empty contentTopic
|
||||
ctx.update(msg.contentTopic.toBytes()) # converts the topic to bytes
|
||||
ctx.update(msg.payload)
|
||||
let digest = ctx.finish() # computes the hash
|
||||
let digest = ctx.finish() # computes the hash
|
||||
ctx.clear()
|
||||
|
||||
result.digest = digest
|
||||
|
|
|
@ -16,6 +16,90 @@ logScope:
|
|||
const
|
||||
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta1"
|
||||
|
||||
|
||||
proc encode*(index: Index): ProtoBuffer =
|
||||
## encodes an Index object into a ProtoBuffer
|
||||
## returns the resultant ProtoBuffer
|
||||
|
||||
# intiate a ProtoBuffer
|
||||
result = initProtoBuffer()
|
||||
|
||||
# encodes index
|
||||
result.write(1, index.digest.data)
|
||||
result.write(2, index.receivedTime)
|
||||
|
||||
proc encode*(pd: PagingDirection): ProtoBuffer =
|
||||
## encodes a PagingDirection into a ProtoBuffer
|
||||
## returns the resultant ProtoBuffer
|
||||
|
||||
# intiate a ProtoBuffer
|
||||
result = initProtoBuffer()
|
||||
|
||||
# encodes pd
|
||||
result.write(1, uint32(ord(pd)))
|
||||
|
||||
proc encode*(pinfo: PagingInfo): ProtoBuffer =
|
||||
## encodes a PagingInfo object into a ProtoBuffer
|
||||
## returns the resultant ProtoBuffer
|
||||
|
||||
# intiate a ProtoBuffer
|
||||
result = initProtoBuffer()
|
||||
|
||||
# encodes pinfo
|
||||
result.write(1, pinfo.pageSize)
|
||||
result.write(2, pinfo.cursor.encode())
|
||||
result.write(3, pinfo.direction.encode())
|
||||
|
||||
proc init*(T: type Index, buffer: seq[byte]): ProtoResult[T] =
|
||||
## creates and returns an Index object out of buffer
|
||||
var index = Index()
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var data: seq[byte]
|
||||
discard ? pb.getField(1, data)
|
||||
|
||||
# create digest from data
|
||||
index.digest = MDigest[256]()
|
||||
for count, b in data:
|
||||
index.digest.data[count] = b
|
||||
|
||||
# read the receivedTime
|
||||
var receivedTime: float64
|
||||
discard ? pb.getField(2, receivedTime)
|
||||
index.receivedTime = receivedTime
|
||||
|
||||
ok(index)
|
||||
|
||||
proc init*(T: type PagingDirection, buffer: seq[byte]): ProtoResult[T] =
|
||||
## creates and returns a PagingDirection object out of buffer
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var dir: uint32
|
||||
discard ? pb.getField(1, dir)
|
||||
var direction = PagingDirection(dir)
|
||||
|
||||
ok(direction)
|
||||
|
||||
proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] =
|
||||
## creates and returns a PagingInfo object out of buffer
|
||||
var pagingInfo = PagingInfo()
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var pageSize: uint32
|
||||
discard ? pb.getField(1, pageSize)
|
||||
pagingInfo.pageSize = pageSize
|
||||
|
||||
|
||||
var cursorBuffer: seq[byte]
|
||||
discard ? pb.getField(2, cursorBuffer)
|
||||
pagingInfo.cursor = ? Index.init(cursorBuffer)
|
||||
|
||||
var directionBuffer: seq[byte]
|
||||
discard ? pb.getField(3, directionBuffer)
|
||||
pagingInfo.direction = ? PagingDirection.init(directionBuffer)
|
||||
|
||||
ok(pagingInfo)
|
||||
|
||||
proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
|
||||
var msg = HistoryQuery()
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
@ -25,6 +109,12 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
|
|||
discard ? pb.getRepeatedField(1, topics)
|
||||
|
||||
msg.topics = topics
|
||||
|
||||
var pagingInfoBuffer: seq[byte]
|
||||
discard ? pb.getField(2, pagingInfoBuffer)
|
||||
|
||||
msg.pagingInfo = ? PagingInfo.init(pagingInfoBuffer)
|
||||
|
||||
ok(msg)
|
||||
|
||||
proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
|
||||
|
@ -35,7 +125,11 @@ proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
|
|||
discard ? pb.getRepeatedField(1, messages)
|
||||
|
||||
for buf in messages:
|
||||
msg.messages.add(? WakuMessage.init(buf))
|
||||
msg.messages.add( ? WakuMessage.init(buf))
|
||||
|
||||
var pagingInfoBuffer: seq[byte]
|
||||
discard ? pb.getField(2,pagingInfoBuffer)
|
||||
msg.pagingInfo= ? PagingInfo.init(pagingInfoBuffer)
|
||||
|
||||
ok(msg)
|
||||
|
||||
|
@ -62,6 +156,8 @@ proc encode*(query: HistoryQuery): ProtoBuffer =
|
|||
|
||||
for topic in query.topics:
|
||||
result.write(1, topic)
|
||||
|
||||
result.write(2, query.pagingInfo.encode())
|
||||
|
||||
proc encode*(response: HistoryResponse): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
@ -69,6 +165,8 @@ proc encode*(response: HistoryResponse): ProtoBuffer =
|
|||
for msg in response.messages:
|
||||
result.write(1, msg.encode())
|
||||
|
||||
result.write(2, response.pagingInfo.encode())
|
||||
|
||||
proc encode*(rpc: HistoryRPC): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
|
|
Loading…
Reference in New Issue