Refactor timestamps type from float64 to int64 (milliseconds resolution)

This commit is contained in:
s1fr0 2022-02-03 18:11:45 +01:00
parent b42e5d1261
commit a3a0a09cca
19 changed files with 125 additions and 114 deletions

View File

@ -102,7 +102,7 @@ procSuite "Waku v2 JSON-RPC API":
response == true
# Publish a message on the default topic
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(int64(epochTime()))))
check:
# @TODO poll topic to verify message has been published
@ -260,7 +260,7 @@ procSuite "Waku v2 JSON-RPC API":
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(0.float64), some(9.float64), some(StorePagingOptions()))
let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(0.int64), some(9.int64), some(StorePagingOptions()))
check:
response.messages.len() == 8
response.pagingOptions.isSome()
@ -573,7 +573,7 @@ procSuite "Waku v2 JSON-RPC API":
pubSubTopic = "polling"
contentTopic = defaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime()))
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(int64(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]()
await node1.start()
@ -664,7 +664,7 @@ procSuite "Waku v2 JSON-RPC API":
pubSubTopic = "polling"
contentTopic = defaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime()))
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(int64(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]()
await node1.start()

View File

@ -16,9 +16,9 @@ suite "Message Store":
topic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
t1 = epochTime()
t2 = epochTime()
t3 = high(float64)
t1 = int64(epochTime())
t2 = int64(epochTime())
t3 = int64(high(float64))
var msgs = @[
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1),
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2),
@ -45,7 +45,7 @@ suite "Message Store":
var msgFlag, psTopicFlag = true
var responseCount = 0
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
proc data(receiverTimestamp: int64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
responseCount += 1
# Note: cannot use `check` within `{.raises: [Defect].}` block:
@ -136,7 +136,7 @@ suite "Message Store":
for i in 1..capacity:
let
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: i.float)
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: i.int64)
index = computeIndex(msg)
output = store.put(index, msg, pubsubTopic)
@ -145,9 +145,9 @@ suite "Message Store":
var
responseCount = 0
lastMessageTimestamp = 0.float
lastMessageTimestamp = 0.int64
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
proc data(receiverTimestamp: int64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
responseCount += 1
lastMessageTimestamp = msg.timestamp
@ -157,7 +157,7 @@ suite "Message Store":
check:
resMax.isOk
responseCount == capacity # We retrieved all items
lastMessageTimestamp == capacity.float # Returned rows were ordered correctly
lastMessageTimestamp == capacity.int64 # Returned rows were ordered correctly
# Now test getAll with a limit smaller than total stored items
responseCount = 0 # Reset response count
@ -167,7 +167,7 @@ suite "Message Store":
check:
resLimit.isOk
responseCount == capacity - 2 # We retrieved limited number of items
lastMessageTimestamp == capacity.float # We retrieved the youngest items in the store, in order
lastMessageTimestamp == capacity.int64 # We retrieved the youngest items in the store, in order
# Test zero limit
responseCount = 0 # Reset response count
@ -177,4 +177,4 @@ suite "Message Store":
check:
resZero.isOk
responseCount == 0 # No items retrieved
lastMessageTimestamp == 0.float # No items retrieved
lastMessageTimestamp == 0.int64 # No items retrieved

View File

@ -12,7 +12,7 @@ proc createSampleList(s: int): seq[IndexedWakuMessage] =
var data {.noinit.}: array[32, byte]
for x in data.mitems: x = 1
for i in 0..<s:
result.add(IndexedWakuMessage(msg: WakuMessage(payload: @[byte i]), index: Index(receiverTime: float64(i), senderTime: float64(i), digest: MDigest[256](data: data)) ))
result.add(IndexedWakuMessage(msg: WakuMessage(payload: @[byte i]), index: Index(receiverTime: int64(i), senderTime: int64(i), digest: MDigest[256](data: data)) ))
procSuite "pagination":
test "Index computation test":
@ -305,7 +305,7 @@ suite "time-window history query":
let
version = 0'u32
payload = @[byte 0, 1, 2]
timestamp = float64(10)
timestamp = int64(10)
msg = WakuMessage(payload: payload, version: version, timestamp: timestamp)
pb = msg.encode()
@ -338,4 +338,4 @@ suite "time-window history query":
let
timestampDecoded = msgDecoded.value.timestamp
check:
timestampDecoded == float64(0)
timestampDecoded == int64(0)

View File

@ -525,7 +525,7 @@ procSuite "Waku Store":
let
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11))
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: int64(10), endTime: int64(11))
pb = query.encode()
decodedQuery = HistoryQuery.init(pb.buffer)
@ -575,25 +575,25 @@ procSuite "Waku Store":
key2 = PrivateKey.random(ECDSA, rng[]).get()
# peer2 = PeerInfo.new(key2)
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1)),
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)),
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: float(6)),
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7)),
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8)),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(9))]
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: int64(0)),
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: int64(1)),
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: int64(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: int64(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: int64(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: int64(5)),
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: int64(6)),
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: int64(7)),
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: int64(8)),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: int64(9))]
msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: float(1)),
WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: float(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)),
WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: float(6)),
WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: float(7))]
msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: int64(0)),
WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: int64(1)),
WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: int64(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: int64(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: int64(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: int64(5)),
WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: int64(6)),
WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: int64(7))]
#--------------------
# setup default test store
@ -643,11 +643,11 @@ procSuite "Waku Store":
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 2
response.messages.anyIt(it.timestamp == float(3))
response.messages.anyIt(it.timestamp == float(5))
response.messages.anyIt(it.timestamp == int64(3))
response.messages.anyIt(it.timestamp == int64(5))
completionFut.complete(true)
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(5))
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: int64(2), endTime: int64(5))
await proto.query(rpc, handler)
check:
@ -663,7 +663,7 @@ procSuite "Waku Store":
response.messages.len() == 0
completionFut.complete(true)
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(2))
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: int64(2), endTime: int64(2))
await proto.query(rpc, handler)
check:
@ -680,7 +680,7 @@ procSuite "Waku Store":
completionFut.complete(true)
# time window is invalid since start time > end time
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(5), endTime: float(2))
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: int64(5), endTime: int64(2))
await proto.query(rpc, handler)
check:
@ -689,18 +689,18 @@ procSuite "Waku Store":
test "find last seen message":
var
msgList = @[IndexedWakuMessage(msg: WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(9))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: float(6))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(5)))]
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: int64(1))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: int64(2))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: int64(3))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: int64(4))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: int64(9))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: int64(6))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: int64(7))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: int64(8))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: int64(5)))]
check:
findLastSeen(msgList) == float(9)
findLastSeen(msgList) == int64(9)
asyncTest "resume message history":
# starts a new node
@ -727,7 +727,7 @@ procSuite "Waku Store":
response.messages.len() == 4
completionFut.complete(true)
let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
let rpc = HistoryQuery(startTime: int64(2), endTime: int64(5))
let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo.toRemotePeerInfo())
check:
@ -737,7 +737,7 @@ procSuite "Waku Store":
asyncTest "queryFromWithPaging with empty pagingInfo":
let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
let rpc = HistoryQuery(startTime: int64(2), endTime: int64(5))
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())
@ -747,7 +747,7 @@ procSuite "Waku Store":
asyncTest "queryFromWithPaging with pagination":
var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1)
let rpc = HistoryQuery(startTime: float(2), endTime: float(5), pagingInfo: pinfo)
let rpc = HistoryQuery(startTime: int64(2), endTime: int64(5), pagingInfo: pinfo)
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())

View File

@ -1099,7 +1099,7 @@ procSuite "WakuNode":
# count the total number of retrieved messages from the database
var responseCount = 0
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) =
proc data(receiverTimestamp: int64, msg: WakuMessage, psTopic: string) =
responseCount += 1
# retrieve all the messages in the db
let res = store.getAll(data)

View File

@ -91,7 +91,7 @@ func toWakuMessage(env: Envelope): WakuMessage =
# Translate a Waku v1 envelope to a Waku v2 message
WakuMessage(payload: env.data,
contentTopic: toV2ContentTopic(env.topic),
timestamp: float64(env.expiry - env.ttl),
timestamp: int64(env.expiry - env.ttl),
version: 1)
proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} =

View File

@ -16,7 +16,7 @@ proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
# Store API
proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[float64], endTime: Option[float64], pagingOptions: Option[StorePagingOptions]): StoreResponse
proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[int64], endTime: Option[int64], pagingOptions: Option[StorePagingOptions]): StoreResponse
# Filter API

View File

@ -21,7 +21,7 @@ type
payload*: seq[byte]
contentTopic*: Option[ContentTopic]
# sender generated timestamp
timestamp*: Option[float64]
timestamp*: Option[int64]
WakuPeer* = object
multiaddr*: string

View File

@ -41,12 +41,12 @@ proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse =
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage =
const defaultCT = ContentTopic("/waku/2/default-content/proto")
var t: float64
var t: int64
if relayMessage.timestamp.isSome:
t = relayMessage.timestamp.get
else:
# incoming WakuRelayMessages with no timestamp will get 0 timestamp
t = float64(0)
t = int64(0)
WakuMessage(payload: relayMessage.payload,
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,
version: version,
@ -60,12 +60,12 @@ proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref Br
dst: pubKey,
symkey: symkey)
var t: float64
var t: int64
if relayMessage.timestamp.isSome:
t = relayMessage.timestamp.get
else:
# incoming WakuRelayMessages with no timestamp will get 0 timestamp
t = float64(0)
t = int64(0)
WakuMessage(payload: payload.encode(version, rng[]).get(),
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,

View File

@ -17,7 +17,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Store API version 1 definitions
rpcsrv.rpc("get_waku_v2_store_v1_messages") do(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[float64], endTime: Option[float64], pagingOptions: Option[StorePagingOptions]) -> StoreResponse:
rpcsrv.rpc("get_waku_v2_store_v1_messages") do(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[int64], endTime: Option[int64], pagingOptions: Option[StorePagingOptions]) -> StoreResponse:
## Returns history for a list of content topics with optional paging
debug "get_waku_v2_store_v1_messages"
@ -29,8 +29,8 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
let historyQuery = HistoryQuery(pubsubTopic: if pubsubTopicOption.isSome: pubsubTopicOption.get() else: "",
contentFilters: if contentFiltersOption.isSome: contentFiltersOption.get() else: @[],
startTime: if startTime.isSome: startTime.get() else: 0.float64,
endTime: if endTime.isSome: endTime.get() else: 0.float64,
startTime: if startTime.isSome: startTime.get() else: 0.int64,
endTime: if endTime.isSome: endTime.get() else: 0.int64,
pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo())
await node.query(historyQuery, queryFuncHandler)

View File

@ -63,11 +63,11 @@ os.sleep(2000)
for i in 0..<topicAmount:
os.sleep(50)
# TODO: This would then publish on a subtopic here
var res3 = waitFor nodes[0].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(0).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
res3 = waitFor nodes[1].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(1).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
res3 = waitFor nodes[2].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(2).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
res3 = waitFor nodes[3].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(3).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
res3 = waitFor nodes[4].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(4).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
var res3 = waitFor nodes[0].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(0).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime()*1000)))
res3 = waitFor nodes[1].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(1).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime()*1000)))
res3 = waitFor nodes[2].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(2).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime()*1000)))
res3 = waitFor nodes[3].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(3).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime()*1000)))
res3 = waitFor nodes[4].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(4).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime()*1000)))
# Scenario xx2 - 14 full nodes, two edge nodes
# Assume one full topic

View File

@ -11,7 +11,7 @@ import
## retrieve historical messages
type
DataProc* = proc(receiverTimestamp: float64, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].}
DataProc* = proc(receiverTimestamp: int64, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].}
MessageStoreResult*[T] = Result[T, string]

View File

@ -31,12 +31,12 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T]
let prepare = db.prepareStmt("""
CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ (
id BLOB PRIMARY KEY,
receiverTimestamp REAL NOT NULL,
receiverTimestamp INTEGER NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
senderTimestamp REAL NOT NULL
senderTimestamp INTEGER NOT NULL
) WITHOUT ROWID;
""", NoParams, void)
@ -61,7 +61,7 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop
##
let prepare = db.database.prepareStmt(
"INSERT INTO " & TABLE_TITLE & " (id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp) VALUES (?, ?, ?, ?, ?, ?, ?);",
(seq[byte], float64, seq[byte], seq[byte], seq[byte], int64, float64),
(seq[byte], int64, seq[byte], seq[byte], seq[byte], int64, int64),
void
)
@ -111,8 +111,8 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc, limit = non
# TODO retrieve the version number
onData(receiverTimestamp.float64,
WakuMessage(contentTopic: contentTopic, payload: payload , version: uint32(version), timestamp: senderTimestamp.float64),
onData(receiverTimestamp.int64,
WakuMessage(contentTopic: contentTopic, payload: payload , version: uint32(version), timestamp: senderTimestamp.int64),
pubsubTopic)
var selectQuery = "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " &

View File

@ -13,12 +13,12 @@ DROP TABLE Message;
CREATE TABLE IF NOT EXISTS Message(
id BLOB PRIMARY KEY,
receiverTimestamp REAL NOT NULL,
receiverTimestamp INTEGER NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
senderTimestamp REAL NOT NULL
senderTimestamp INTEGER NOT NULL
) WITHOUT ROWID;

View File

@ -10,6 +10,7 @@
import
libp2p/protobuf/minprotobuf
import libp2p/varint
when defined(rln):
import waku_rln_relay/waku_rln_relay_types
@ -24,7 +25,7 @@ type
contentTopic*: ContentTopic
version*: uint32
# sender generated timestamp
timestamp*: float64
timestamp*: int64
# the proof field indicates that the message is not a spam
# this field will be used in the rln-relay protocol
# XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec and not yet part of WakuMessage spec
@ -43,7 +44,10 @@ proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =
discard ? pb.getField(2, msg.contentTopic)
discard ? pb.getField(3, msg.version)
discard ? pb.getField(4, msg.timestamp)
var timestamp: zint64
discard ? pb.getField(4, timestamp)
msg.timestamp = int64(timestamp)
# XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec and not yet part of WakuMessage spec
when defined(rln):
var proofBytes: seq[byte]
@ -60,7 +64,7 @@ proc encode*(message: WakuMessage): ProtoBuffer =
result.write(1, message.payload)
result.write(2, message.contentTopic)
result.write(3, message.version)
result.write(4, message.timestamp)
result.write(4, zint64(message.timestamp))
when defined(rln):
result.write(21, message.proof.encode())
else:

View File

@ -406,10 +406,11 @@ proc fromEpoch*(epoch: Epoch): uint64 =
let t = fromBytesLE(uint64, array[32,byte](epoch))
return t
proc calcEpoch*(t: float64): Epoch =
## gets time `t` as `flaot64` with subseconds resolution in the fractional part
proc calcEpoch*(t: int64): Epoch =
## gets time `t` as `int64` with milliseconds resolution
## and returns its corresponding rln `Epoch` value
let e = uint64(t/EPOCH_UNIT_SECONDS)
## TODO: check the uint64 cast doesn't induce overflows
let e = uint64(t/(EPOCH_UNIT_SECONDS*1000))
return toEpoch(e)
proc getCurrentEpoch*(): Epoch =
@ -427,12 +428,12 @@ proc compare*(e1, e2: Epoch): int64 =
return int64(epoch1) - int64(epoch2)
proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, timeOption: Option[float64] = none(float64)): MessageValidationResult =
proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, timeOption: Option[int64] = none(int64)): MessageValidationResult =
## validate the supplied `msg` based on the waku-rln-relay routing protocol i.e.,
## the `msg`'s epoch is within MAX_EPOCH_GAP of the current epoch
## the `msg` has valid rate limit proof
## the `msg` does not violate the rate limit
## `timeOption` indicates Unix epoch time (fractional part holds sub-seconds)
## `timeOption` indicates Unix epoch time (milliseconds resolution)
## if `timeOption` is supplied, then the current epoch is calculated based on that
@ -473,10 +474,10 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, timeOption: Optio
return MessageValidationResult.Valid
proc appendRLNProof*(rlnPeer: WakuRLNRelay, msg: var WakuMessage, senderEpochTime: float64): bool =
proc appendRLNProof*(rlnPeer: WakuRLNRelay, msg: var WakuMessage, senderEpochTime: int64): bool =
## returns true if it can create and append a `RateLimitProof` to the supplied `msg`
## returns false otherwise
## `senderEpochTime` indicates the number of seconds passed since Unix epoch. The fractional part holds sub-seconds.
## `senderEpochTime` indicates the number of milliseconds passed since Unix epoch.
## The `epoch` field of `RateLimitProof` is derived from the provided `senderEpochTime` (using `calcEpoch()`)
let

View File

@ -16,6 +16,7 @@ import
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
libp2p/varint,
metrics,
stew/[results, byteutils],
# internal imports
@ -54,7 +55,7 @@ const
# TODO Move serialization function to separate file, too noisy
# TODO Move pagination to separate file, self-contained logic
proc computeIndex*(msg: WakuMessage, receivedTime = getTime().toUnixFloat()): Index =
proc computeIndex*(msg: WakuMessage, receivedTime = int64(getTime().toUnixFloat()*1000)): Index =
## Takes a WakuMessage with received timestamp and returns its Index.
## Received timestamp will default to system time if not provided.
var ctx: sha256
@ -64,7 +65,7 @@ proc computeIndex*(msg: WakuMessage, receivedTime = getTime().toUnixFloat()): In
let digest = ctx.finish() # computes the hash
ctx.clear()
let receiverTime = receivedTime.round(3) # Ensure timestamp has (only) millisecond resolution
let receiverTime = receivedTime
var index = Index(digest:digest, receiverTime: receiverTime, senderTime: msg.timestamp)
return index
@ -77,8 +78,8 @@ proc encode*(index: Index): ProtoBuffer =
# encodes index
output.write(1, index.digest.data)
output.write(2, index.receiverTime)
output.write(3, index.senderTime)
output.write(2, zint64(index.receiverTime))
output.write(3, zint64(index.senderTime))
return output
@ -110,14 +111,14 @@ proc init*(T: type Index, buffer: seq[byte]): ProtoResult[T] =
index.digest.data[count] = b
# read the timestamp
var receiverTime: float64
var receiverTime: zint64
discard ? pb.getField(2, receiverTime)
index.receiverTime = receiverTime
index.receiverTime = int64(receiverTime)
# read the timestamp
var senderTime: float64
var senderTime: zint64
discard ? pb.getField(3, senderTime)
index.senderTime = senderTime
index.senderTime = int64(senderTime)
return ok(index)
@ -167,8 +168,13 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
msg.pagingInfo = ? PagingInfo.init(pagingInfoBuffer)
discard ? pb.getField(5, msg.startTime)
discard ? pb.getField(6, msg.endTime)
var startTime: zint64
discard ? pb.getField(5, startTime)
msg.startTime = int64(startTime)
var endTime: zint64
discard ? pb.getField(6, endTime)
msg.endTime = int64(endTime)
return ok(msg)
@ -226,8 +232,8 @@ proc encode*(query: HistoryQuery): ProtoBuffer =
output.write(4, query.pagingInfo.encode())
output.write(5, query.startTime)
output.write(6, query.endTime)
output.write(5, zint64(query.startTime))
output.write(6, zint64(query.endTime))
return output
@ -382,10 +388,10 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
else: none(seq[ContentTopic])
qPubSubTopic = if (query.pubsubTopic != ""): some(query.pubsubTopic)
else: none(string)
qStartTime = if query.startTime != float64(0): some(query.startTime)
else: none(float64)
qEndTime = if query.endTime != float64(0): some(query.endTime)
else: none(float64)
qStartTime = if query.startTime != int64(0): some(query.startTime)
else: none(int64)
qEndTime = if query.endTime != int64(0): some(query.endTime)
else: none(int64)
## Compose filter predicate for message from query criteria
proc matchesQuery(indMsg: IndexedWakuMessage): bool =
@ -466,7 +472,7 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
if ws.store.isNil:
return
proc onData(receiverTime: float64, msg: WakuMessage, pubsubTopic: string) =
proc onData(receiverTime: int64, msg: WakuMessage, pubsubTopic: string) =
# TODO index should not be recalculated
ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime), pubsubTopic: pubsubTopic))
@ -637,8 +643,8 @@ proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerI
debug "failed to resolve the query"
return err("failed to resolve the query")
proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
var lastSeenTime = float64(0)
proc findLastSeen*(list: seq[IndexedWakuMessage]): int64 =
var lastSeenTime = int64(0)
for iwmsg in list.items :
if iwmsg.msg.timestamp>lastSeenTime:
lastSeenTime = iwmsg.msg.timestamp
@ -663,12 +669,12 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
var currentTime = epochTime()
var lastSeenTime: float = findLastSeen(ws.messages.allItems())
var currentTime = int64(epochTime()*1000)
var lastSeenTime: int64 = findLastSeen(ws.messages.allItems())
debug "resume", currentEpochTime=currentTime
# adjust the time window with an offset of 20 seconds
let offset: float64 = 200000
let offset: int64 = 200000
currentTime = currentTime + offset
lastSeenTime = max(lastSeenTime - offset, 0)
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime

View File

@ -65,8 +65,8 @@ type
contentFilters*: seq[HistoryContentFilter]
pubsubTopic*: string
pagingInfo*: PagingInfo # used for pagination
startTime*: float64 # used for time-window query
endTime*: float64 # used for time-window query
startTime*: int64 # used for time-window query
endTime*: int64 # used for time-window query
HistoryResponseError* {.pure.} = enum
## HistoryResponseError contains error message to inform the querying node about the state of its request

View File

@ -10,5 +10,5 @@ type
Index* = object
## This type contains the description of an Index used in the pagination of WakuMessages
digest*: MDigest[256]
receiverTime*: float64
senderTime*: float64 # the time at which the message is generated
receiverTime*: int64
senderTime*: int64 # the time at which the message is generated