diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 000000000..0b359458a Binary files /dev/null and b/.DS_Store differ diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 95c5bee1f..f8419de28 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -57,3 +57,106 @@ procSuite "Waku Store": check: (await completionFut.withTimeout(5.seconds)) == true + + test "Index Protobuf encoder/decoder test": + let + index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1))) + pb = index.encode() + decodedIndex = Index.init(pb.buffer) + + check: + # the fields of decodedIndex must be the same as the original index + decodedIndex.isErr == false + decodedIndex.value == index + + let + emptyIndex = Index() + epb = emptyIndex.encode() + decodedEmptyIndex = Index.init(epb.buffer) + + check: + # check the correctness of init and encode for an empty Index + decodedEmptyIndex.isErr == false + decodedEmptyIndex.value == emptyIndex + + + test "PagingDirection Protobuf encod/init test": + let + pagingDirection = PagingDirection.BACKWARD + pb = pagingDirection.encode() + decodedPagingDirection = PagingDirection.init(pb.buffer) + + check: + # the decodedPagingDirection must be the same as the original pagingDirection + decodedPagingDirection.isErr == false + decodedPagingDirection.value == pagingDirection + + test "PagingInfo Protobuf encod/init test": + let + index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1))) + pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) + pb = pagingInfo.encode() + decodedPagingInfo = PagingInfo.init(pb.buffer) + + check: + # the fields of decodedPagingInfo must be the same as the original pagingInfo + decodedPagingInfo.isErr == false + decodedPagingInfo.value == pagingInfo + + let + emptyPagingInfo = PagingInfo() + epb = emptyPagingInfo.encode() + decodedEmptyPagingInfo = PagingInfo.init(epb.buffer) + + check: + # check the correctness of init and encode for an empty PagingInfo + decodedEmptyPagingInfo.isErr == false + decodedEmptyPagingInfo.value == emptyPagingInfo + + test "HistoryQuery Protobuf encod/init test": + let + index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1))) + pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) + query=HistoryQuery(topics: @[ContentTopic(1)], pagingInfo: pagingInfo) + pb = query.encode() + decodedQuery = HistoryQuery.init(pb.buffer) + + check: + # the fields of decoded query decodedQuery must be the same as the original query query + decodedQuery.isErr == false + decodedQuery.value == query + + let + emptyQuery=HistoryQuery() + epb = emptyQuery.encode() + decodedEmptyQuery = HistoryQuery.init(epb.buffer) + + check: + # check the correctness of init and encode for an empty HistoryQuery + decodedEmptyQuery.isErr == false + decodedEmptyQuery.value == emptyQuery + + test "HistoryResponse Protobuf encod/init test": + let + wm = WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)) + index = computeIndex(wm) + pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) + res = HistoryResponse(messages: @[wm], pagingInfo:pagingInfo) + pb = res.encode() + decodedRes = HistoryResponse.init(pb.buffer) + + check: + # the fields of decoded response decodedRes must be the same as the original response res + decodedRes.isErr == false + decodedRes.value == res + + let + emptyRes=HistoryResponse() + epb = emptyRes.encode() + decodedEmptyRes = HistoryResponse.init(epb.buffer) + + check: + # check the correctness of init and encode for an empty HistoryResponse + decodedEmptyRes.isErr == false + decodedEmptyRes.value == emptyRes + diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index ea181aae0..55f68a675 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -25,7 +25,8 @@ type payload*: seq[byte] contentTopic*: ContentTopic - MessageNotificationHandler* = proc(topic: string, msg: WakuMessage): Future[void] {.gcsafe, closure.} + MessageNotificationHandler* = proc(topic: string, msg: WakuMessage): Future[ + void] {.gcsafe, closure.} MessageNotificationSubscriptions* = TableRef[string, MessageNotificationSubscription] @@ -35,20 +36,27 @@ type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} + Index* = object - ## This type contains the description of an index used in the pagination of waku messages + ## This type contains the description of an Index used in the pagination of WakuMessages digest*: MDigest[256] - receivedTime*: float + receivedTime*: float64 IndexedWakuMessage* = object + ## This type is used to encapsulate a WakuMessage and its Index msg*: WakuMessage index*: Index + PagingDirection* {.pure.} = enum + ## PagingDirection determines the direction of pagination + BACKWARD = uint32(0) + FORWARD = uint32(1) + PagingInfo* = object ## This type holds the information needed for the pagination - pageSize*: int + pageSize*: uint64 cursor*: Index - direction*: bool + direction*: PagingDirection HistoryQuery* = object topics*: seq[ContentTopic] @@ -182,7 +190,7 @@ proc computeIndex*(msg: WakuMessage): Index = if msg.contentTopic != 0: # checks for non-empty contentTopic ctx.update(msg.contentTopic.toBytes()) # converts the topic to bytes ctx.update(msg.payload) - let digest = ctx.finish() # computes the hash + let digest = ctx.finish() # computes the hash ctx.clear() result.digest = digest diff --git a/waku/protocol/v2/waku_store.nim b/waku/protocol/v2/waku_store.nim index f5fa4b9e5..a46a5ce9f 100644 --- a/waku/protocol/v2/waku_store.nim +++ b/waku/protocol/v2/waku_store.nim @@ -16,6 +16,90 @@ logScope: const WakuStoreCodec* = "/vac/waku/store/2.0.0-beta1" + +proc encode*(index: Index): ProtoBuffer = + ## encodes an Index object into a ProtoBuffer + ## returns the resultant ProtoBuffer + + # intiate a ProtoBuffer + result = initProtoBuffer() + + # encodes index + result.write(1, index.digest.data) + result.write(2, index.receivedTime) + +proc encode*(pd: PagingDirection): ProtoBuffer = + ## encodes a PagingDirection into a ProtoBuffer + ## returns the resultant ProtoBuffer + + # intiate a ProtoBuffer + result = initProtoBuffer() + + # encodes pd + result.write(1, uint32(ord(pd))) + +proc encode*(pinfo: PagingInfo): ProtoBuffer = + ## encodes a PagingInfo object into a ProtoBuffer + ## returns the resultant ProtoBuffer + + # intiate a ProtoBuffer + result = initProtoBuffer() + + # encodes pinfo + result.write(1, pinfo.pageSize) + result.write(2, pinfo.cursor.encode()) + result.write(3, pinfo.direction.encode()) + +proc init*(T: type Index, buffer: seq[byte]): ProtoResult[T] = + ## creates and returns an Index object out of buffer + var index = Index() + let pb = initProtoBuffer(buffer) + + var data: seq[byte] + discard ? pb.getField(1, data) + + # create digest from data + index.digest = MDigest[256]() + for count, b in data: + index.digest.data[count] = b + + # read the receivedTime + var receivedTime: float64 + discard ? pb.getField(2, receivedTime) + index.receivedTime = receivedTime + + ok(index) + +proc init*(T: type PagingDirection, buffer: seq[byte]): ProtoResult[T] = + ## creates and returns a PagingDirection object out of buffer + let pb = initProtoBuffer(buffer) + + var dir: uint32 + discard ? pb.getField(1, dir) + var direction = PagingDirection(dir) + + ok(direction) + +proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] = + ## creates and returns a PagingInfo object out of buffer + var pagingInfo = PagingInfo() + let pb = initProtoBuffer(buffer) + + var pageSize: uint32 + discard ? pb.getField(1, pageSize) + pagingInfo.pageSize = pageSize + + + var cursorBuffer: seq[byte] + discard ? pb.getField(2, cursorBuffer) + pagingInfo.cursor = ? Index.init(cursorBuffer) + + var directionBuffer: seq[byte] + discard ? pb.getField(3, directionBuffer) + pagingInfo.direction = ? PagingDirection.init(directionBuffer) + + ok(pagingInfo) + proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = var msg = HistoryQuery() let pb = initProtoBuffer(buffer) @@ -25,6 +109,12 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = discard ? pb.getRepeatedField(1, topics) msg.topics = topics + + var pagingInfoBuffer: seq[byte] + discard ? pb.getField(2, pagingInfoBuffer) + + msg.pagingInfo = ? PagingInfo.init(pagingInfoBuffer) + ok(msg) proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] = @@ -35,7 +125,11 @@ proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] = discard ? pb.getRepeatedField(1, messages) for buf in messages: - msg.messages.add(? WakuMessage.init(buf)) + msg.messages.add( ? WakuMessage.init(buf)) + + var pagingInfoBuffer: seq[byte] + discard ? pb.getField(2,pagingInfoBuffer) + msg.pagingInfo= ? PagingInfo.init(pagingInfoBuffer) ok(msg) @@ -62,6 +156,8 @@ proc encode*(query: HistoryQuery): ProtoBuffer = for topic in query.topics: result.write(1, topic) + + result.write(2, query.pagingInfo.encode()) proc encode*(response: HistoryResponse): ProtoBuffer = result = initProtoBuffer() @@ -69,6 +165,8 @@ proc encode*(response: HistoryResponse): ProtoBuffer = for msg in response.messages: result.write(1, msg.encode()) + result.write(2, response.pagingInfo.encode()) + proc encode*(rpc: HistoryRPC): ProtoBuffer = result = initProtoBuffer()