History response pagination using sender-generated timestamp (#657)

* enables pagination based on sender timestamp

* uncomments a test

* bumps up version number

* updates migration script

* clean up

* unpdates changelog

* undo removal of receiver timestamp

* updates message_storage

* uses epochTime()

* minor

* removes a comment

* removes receiver timestamp deletion migration script

* fixes formatting issues

* fixes a bad field name

* fixes field issue

* adjusts spacing
This commit is contained in:
Sanaz Taheri Boshrooyeh 2021-07-07 16:56:20 -07:00 committed by GitHub
parent 0c0205f436
commit cf7b8faf27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 32 additions and 24 deletions

View File

@ -60,9 +60,9 @@ suite "Message Store":
if msg.timestamp == t3: t3Flag = true
# check correct retrieval of receiver timestamps
if receiverTimestamp == indexes[0].receivedTime: rt1Flag = true
if receiverTimestamp == indexes[1].receivedTime: rt2Flag = true
if receiverTimestamp == indexes[2].receivedTime: rt3Flag = true
if receiverTimestamp == indexes[0].receiverTime: rt1Flag = true
if receiverTimestamp == indexes[1].receiverTime: rt2Flag = true
if receiverTimestamp == indexes[2].receiverTime: rt3Flag = true
let res = store.getAll(data)

View File

@ -12,18 +12,19 @@ proc createSampleList(s: int): seq[IndexedWakuMessage] =
var data {.noinit.}: array[32, byte]
for x in data.mitems: x = 1
for i in 0..<s:
result.add(IndexedWakuMessage(msg: WakuMessage(payload: @[byte i]), index: Index(receivedTime: float64(i), digest: MDigest[256](data: data)) ))
result.add(IndexedWakuMessage(msg: WakuMessage(payload: @[byte i]), index: Index(receiverTime: float64(i), senderTime: float64(i), digest: MDigest[256](data: data)) ))
procSuite "pagination":
test "Index computation test":
let
wm = WakuMessage(payload: @[byte 1, 2, 3])
wm = WakuMessage(payload: @[byte 1, 2, 3], timestamp: 2)
index = wm.computeIndex()
check:
# the fields of the index should be non-empty
len(index.digest.data) != 0
len(index.digest.data) == 32 # sha2 output length in bytes
index.receivedTime != 0 # the timestamp should be a non-zero value
index.receiverTime != 0 # the receiver timestamp should be a non-zero value
index.senderTime == 2
let
wm1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("/waku/2/default-content/proto"))
@ -44,9 +45,9 @@ procSuite "pagination":
for x in data3.mitems: x = 3
let
index1 = Index(receivedTime: 1, digest: MDigest[256](data: data1))
index2 = Index(receivedTime: 1, digest: MDigest[256](data: data2))
index3 = Index(receivedTime: 2, digest: MDigest[256](data: data3))
index1 = Index(receiverTime: 2, senderTime: 1, digest: MDigest[256](data: data1))
index2 = Index(receiverTime: 2, senderTime: 1, digest: MDigest[256](data: data2))
index3 = Index(receiverTime: 1, senderTime: 2, digest: MDigest[256](data: data3))
iwm1 = IndexedWakuMessage(index: index1)
iwm2 = IndexedWakuMessage(index: index2)
iwm3 = IndexedWakuMessage(index: index3)

View File

@ -786,8 +786,8 @@ procSuite "WakuNode":
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
contentTopic = ContentTopic("/waku/2/default-content/proto")
msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic)
msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic)
msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic, timestamp: 1)
msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic, timestamp: 2)
# setup sqlite database for node1
let

View File

@ -8,7 +8,7 @@ import
## retrieve historical messages
type
DataProc* = proc(timestamp: float64, msg: WakuMessage, pubsubTopic: string) {.closure.}
DataProc* = proc(receiverTimestamp: float64, msg: WakuMessage, pubsubTopic: string) {.closure.}
MessageStoreResult*[T] = Result[T, string]

View File

@ -72,7 +72,7 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop
if prepare.isErr:
return err("failed to prepare")
let res = prepare.value.exec((@(cursor.digest.data), cursor.receivedTime, message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version), message.timestamp))
let res = prepare.value.exec((@(cursor.digest.data), cursor.receiverTime, message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version), message.timestamp))
if res.isErr:
return err("failed")
@ -128,4 +128,4 @@ proc close*(db: WakuMessageStore) =
## Closes the database.
db.database.close()

View File

@ -47,8 +47,8 @@ proc computeIndex*(msg: WakuMessage): Index =
ctx.update(msg.payload)
let digest = ctx.finish() # computes the hash
ctx.clear()
result.digest = digest
result.receivedTime = epochTime() # gets the unix timestamp
var index = Index(digest:digest, receiverTime: epochTime(), senderTime: msg.timestamp)
return index
proc encode*(index: Index): ProtoBuffer =
## encodes an Index object into a ProtoBuffer
@ -59,7 +59,8 @@ proc encode*(index: Index): ProtoBuffer =
# encodes index
result.write(1, index.digest.data)
result.write(2, index.receivedTime)
result.write(2, index.receiverTime)
result.write(3, index.senderTime)
proc encode*(pinfo: PagingInfo): ProtoBuffer =
## encodes a PagingInfo object into a ProtoBuffer
@ -86,10 +87,15 @@ proc init*(T: type Index, buffer: seq[byte]): ProtoResult[T] =
for count, b in data:
index.digest.data[count] = b
# read the receivedTime
var receivedTime: float64
discard ? pb.getField(2, receivedTime)
index.receivedTime = receivedTime
# read the timestamp
var receiverTime: float64
discard ? pb.getField(2, receiverTime)
index.receiverTime = receiverTime
# read the timestamp
var senderTime: float64
discard ? pb.getField(3, senderTime)
index.senderTime = senderTime
ok(index)
@ -218,7 +224,7 @@ proc indexComparison* (x, y: Index): int =
## returns -1 if x < y
## returns 1 if x > y
let
timecmp = system.cmp(x.receivedTime, y.receivedTime)
timecmp = system.cmp(x.senderTime, y.senderTime)
digestcm = system.cmp(x.digest.data, y.digest.data)
if timecmp != 0: # timestamp has a higher priority for comparison
return timecmp
@ -377,7 +383,7 @@ proc init*(ws: WakuStore) {.raises: [Defect, Exception]} =
if ws.store.isNil:
return
proc onData(timestamp: float64, msg: WakuMessage, pubsubTopic: string) =
proc onData(receiverTime: float64, msg: WakuMessage, pubsubTopic: string) =
# TODO index should not be recalculated
ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(), pubsubTopic: pubsubTopic))

View File

@ -8,4 +8,5 @@ type
Index* = object
## This type contains the description of an Index used in the pagination of WakuMessages
digest*: MDigest[256]
receivedTime*: float64
receiverTime*: float64
senderTime*: float64 # the time at which the message is generated