deploy: eaef9cfe316f1f6edfc3778cb1798e138fc85ed4

This commit is contained in:
s1fr0 2022-02-17 15:26:49 +00:00
parent dad4a2ce12
commit e99e57443f
23 changed files with 225 additions and 141 deletions

View File

@ -13,7 +13,9 @@ The full list of changes is below.
### Changes
- ...
- A new type `Timestamp` for all timestamps is introduced (currently an alias for int64).
- All timestamps now have nanosecond resolution.
### Fixes
- ...

View File

@ -25,6 +25,7 @@ import
../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/utils/peers,
../../waku/v2/utils/time,
../test_helpers
template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]
@ -102,7 +103,7 @@ procSuite "Waku v2 JSON-RPC API":
response == true
# Publish a message on the default topic
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
check:
# @TODO poll topic to verify message has been published
@ -260,7 +261,7 @@ procSuite "Waku v2 JSON-RPC API":
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(0.float64), some(9.float64), some(StorePagingOptions()))
let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions()))
check:
response.messages.len() == 8
response.pagingOptions.isSome()
@ -573,7 +574,7 @@ procSuite "Waku v2 JSON-RPC API":
pubSubTopic = "polling"
contentTopic = defaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime()))
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]()
await node1.start()
@ -664,7 +665,7 @@ procSuite "Waku v2 JSON-RPC API":
pubSubTopic = "polling"
contentTopic = defaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime()))
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]()
await node1.start()

View File

@ -6,6 +6,7 @@ import
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/storage/sqlite,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/utils/time,
./utils
suite "Message Store":
@ -16,9 +17,9 @@ suite "Message Store":
topic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
t1 = epochTime()
t2 = epochTime()
t3 = high(float64)
t1 = getNanosecondTime(epochTime())
t2 = getNanosecondTime(epochTime())
t3 = getNanosecondTime(high(float64))
var msgs = @[
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1),
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2),
@ -45,7 +46,7 @@ suite "Message Store":
var msgFlag, psTopicFlag = true
var responseCount = 0
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
responseCount += 1
# Note: cannot use `check` within `{.raises: [Defect].}` block:
@ -136,7 +137,7 @@ suite "Message Store":
for i in 1..capacity:
let
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: i.float)
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
index = computeIndex(msg)
output = store.put(index, msg, pubsubTopic)
@ -145,9 +146,9 @@ suite "Message Store":
var
responseCount = 0
lastMessageTimestamp = 0.float
lastMessageTimestamp = Timestamp(0)
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
responseCount += 1
lastMessageTimestamp = msg.timestamp
@ -157,7 +158,7 @@ suite "Message Store":
check:
resMax.isOk
responseCount == capacity # We retrieved all items
lastMessageTimestamp == capacity.float # Returned rows were ordered correctly
lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly
# Now test getAll with a limit smaller than total stored items
responseCount = 0 # Reset response count
@ -167,7 +168,7 @@ suite "Message Store":
check:
resLimit.isOk
responseCount == capacity - 2 # We retrieved limited number of items
lastMessageTimestamp == capacity.float # We retrieved the youngest items in the store, in order
lastMessageTimestamp == Timestamp(capacity) # We retrieved the youngest items in the store, in order
# Test zero limit
responseCount = 0 # Reset response count
@ -177,4 +178,4 @@ suite "Message Store":
check:
resZero.isOk
responseCount == 0 # No items retrieved
lastMessageTimestamp == 0.float # No items retrieved
lastMessageTimestamp == Timestamp(0) # No items retrieved

View File

@ -5,7 +5,8 @@ import
chronos,
stew/byteutils,
libp2p/crypto/crypto,
../../waku/v2/utils/pagination
../../waku/v2/utils/pagination,
../../waku/v2/utils/time
procSuite "Pagination utils":
@ -24,26 +25,26 @@ procSuite "Pagination utils":
## Test vars
let
smallIndex1 = Index(digest: hashFromStr("1234"),
receiverTime: 0.00,
senderTime: 1000.00)
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
smallIndex2 = Index(digest: hashFromStr("1234567"), # digest is less significant than senderTime
receiverTime: 0.00,
senderTime: 1000.00)
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
largeIndex1 = Index(digest: hashFromStr("1234"),
receiverTime: 0.00,
senderTime: 9000.00) # only senderTime differ from smallIndex1
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(9000)) # only senderTime differ from smallIndex1
largeIndex2 = Index(digest: hashFromStr("12345"), # only digest differs from smallIndex1
receiverTime: 0.00,
senderTime: 1000.00)
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
eqIndex1 = Index(digest: hashFromStr("0003"),
receiverTime: 0.00,
senderTime: 54321.00)
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(54321))
eqIndex2 = Index(digest: hashFromStr("0003"),
receiverTime: 0.00,
senderTime: 54321.00)
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(54321))
eqIndex3 = Index(digest: hashFromStr("0003"),
receiverTime: 9999.00, # receiverTime difference should have no effect on comparisons
senderTime: 54321.00)
receiverTime: getNanosecondTime(9999), # receiverTime difference should have no effect on comparisons
senderTime: getNanosecondTime(54321))
## Test suite

View File

@ -4,6 +4,7 @@ import
testutils/unittests, nimcrypto/sha2,
libp2p/protobuf/minprotobuf,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/utils/time,
../test_helpers
@ -17,8 +18,8 @@ proc createSampleStoreQueue(s: int): StoreQueueRef =
for i in 0..<s:
discard testStoreQueue.add(IndexedWakuMessage(msg: WakuMessage(payload: @[byte i]),
index: Index(receiverTime: float64(i),
senderTime: float64(i),
index: Index(receiverTime: Timestamp(i),
senderTime: Timestamp(i),
digest: MDigest[256](data: data)) ))
return testStoreQueue
@ -273,7 +274,7 @@ suite "time-window history query":
let
version = 0'u32
payload = @[byte 0, 1, 2]
timestamp = float64(10)
timestamp = Timestamp(10)
msg = WakuMessage(payload: payload, version: version, timestamp: timestamp)
pb = msg.encode()
@ -307,4 +308,4 @@ suite "time-window history query":
let
timestampDecoded = msgDecoded.value.timestamp
check:
timestampDecoded == float64(0)
timestampDecoded == Timestamp(0)

View File

@ -12,6 +12,7 @@ import
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/time,
../test_helpers, ./utils
procSuite "Waku Store":
@ -525,7 +526,7 @@ procSuite "Waku Store":
let
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11))
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11))
pb = query.encode()
decodedQuery = HistoryQuery.init(pb.buffer)
@ -575,25 +576,25 @@ procSuite "Waku Store":
key2 = PrivateKey.random(ECDSA, rng[]).get()
# peer2 = PeerInfo.new(key2)
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1)),
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)),
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: float(6)),
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7)),
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8)),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(9))]
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)),
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: Timestamp(1)),
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: Timestamp(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: Timestamp(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: Timestamp(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: Timestamp(5)),
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: Timestamp(6)),
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: Timestamp(7)),
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: Timestamp(8)),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: Timestamp(9))]
msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: float(1)),
WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: float(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)),
WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: float(6)),
WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: float(7))]
msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)),
WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: Timestamp(1)),
WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: Timestamp(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: Timestamp(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: Timestamp(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: Timestamp(5)),
WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: Timestamp(6)),
WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: Timestamp(7))]
#--------------------
# setup default test store
@ -641,11 +642,11 @@ procSuite "Waku Store":
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 2
response.messages.anyIt(it.timestamp == float(3))
response.messages.anyIt(it.timestamp == float(5))
response.messages.anyIt(it.timestamp == Timestamp(3))
response.messages.anyIt(it.timestamp == Timestamp(5))
completionFut.complete(true)
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(5))
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(2), endTime: Timestamp(5))
await proto.query(rpc, handler)
check:
@ -661,7 +662,7 @@ procSuite "Waku Store":
response.messages.len() == 0
completionFut.complete(true)
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(2))
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(2), endTime: Timestamp(2))
await proto.query(rpc, handler)
check:
@ -678,7 +679,7 @@ procSuite "Waku Store":
completionFut.complete(true)
# time window is invalid since start time > end time
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(5), endTime: float(2))
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(5), endTime: Timestamp(2))
await proto.query(rpc, handler)
check:
@ -709,7 +710,7 @@ procSuite "Waku Store":
response.messages.len() == 4
completionFut.complete(true)
let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5))
let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo.toRemotePeerInfo())
check:
@ -719,7 +720,7 @@ procSuite "Waku Store":
asyncTest "queryFromWithPaging with empty pagingInfo":
let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5))
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())
@ -729,7 +730,7 @@ procSuite "Waku Store":
asyncTest "queryFromWithPaging with pagination":
var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1)
let rpc = HistoryQuery(startTime: float(2), endTime: float(5), pagingInfo: pinfo)
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5), pagingInfo: pinfo)
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())
@ -790,14 +791,14 @@ procSuite "Waku Store":
let store = WakuStore.init(PeerManager.new(newStandardSwitch()), crypto.newRng(), capacity = capacity)
for i in 1..capacity:
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic, timestamp: i.float64))
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic, timestamp: Timestamp(i)))
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
check:
store.messages.len == capacity # Store is at capacity
# Test that capacity holds
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: (capacity + 1).float64))
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: Timestamp(capacity + 1)))
check:
store.messages.len == capacity # Store is still at capacity

View File

@ -3,7 +3,8 @@
import
std/sequtils,
testutils/unittests,
../../waku/v2/protocol/waku_store/waku_store_types
../../waku/v2/protocol/waku_store/waku_store_types,
../../waku/v2/utils/time
procSuite "Sorted store queue":
@ -12,8 +13,8 @@ procSuite "Sorted store queue":
## Use i to generate an IndexedWakuMessage
var data {.noinit.}: array[32, byte]
for x in data.mitems: x = i.byte
return IndexedWakuMessage(msg: WakuMessage(payload: @[byte i], timestamp: float64(i)),
index: Index(receiverTime: float64(i), senderTime: float64(i), digest: MDigest[256](data: data)))
return IndexedWakuMessage(msg: WakuMessage(payload: @[byte i], timestamp: Timestamp(i)),
index: Index(receiverTime: Timestamp(i), senderTime: Timestamp(i), digest: MDigest[256](data: data)))
# Test variables
let
@ -62,8 +63,8 @@ procSuite "Sorted store queue":
let first = testStoreQueue.first()
check:
first.isOk()
first.get().msg.timestamp == 1.0
first.get().msg.timestamp == Timestamp(1)
# Error condition
let emptyQ = StoreQueueRef.new(capacity)
check:
@ -73,7 +74,7 @@ procSuite "Sorted store queue":
let last = testStoreQueue.last()
check:
last.isOk()
last.get().msg.timestamp == 5.0
last.get().msg.timestamp == Timestamp(5)
# Error condition
let emptyQ = StoreQueueRef.new(capacity)
@ -91,7 +92,7 @@ procSuite "Sorted store queue":
# First page
pInfo.pageSize == 3
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 3.0
pInfo.cursor.senderTime == Timestamp(3)
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[1,2,3]
@ -103,7 +104,7 @@ procSuite "Sorted store queue":
# Second page
pInfo.pageSize == 2
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 5.0
pInfo.cursor.senderTime == Timestamp(5)
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[4,5]
@ -114,7 +115,7 @@ procSuite "Sorted store queue":
# Empty last page
pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 5.0
pInfo.cursor.senderTime == Timestamp(5)
err == HistoryResponseError.NONE
res.len == 0
@ -129,7 +130,7 @@ procSuite "Sorted store queue":
# First page
pInfo.pageSize == 3
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 3.0
pInfo.cursor.senderTime == Timestamp(3)
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[3,4,5]
@ -141,7 +142,7 @@ procSuite "Sorted store queue":
# Second page
pInfo.pageSize == 2
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 1.0
pInfo.cursor.senderTime == Timestamp(1)
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[1,2]
@ -152,7 +153,7 @@ procSuite "Sorted store queue":
# Empty last page
pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 1.0
pInfo.cursor.senderTime == Timestamp(1)
err == HistoryResponseError.NONE
res.len == 0
@ -170,7 +171,7 @@ procSuite "Sorted store queue":
# First page
pInfo.pageSize == 2
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 4.0
pInfo.cursor.senderTime == Timestamp(4)
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[2,4]
@ -181,7 +182,7 @@ procSuite "Sorted store queue":
# Empty next page
pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 4.0
pInfo.cursor.senderTime == Timestamp(4)
err == HistoryResponseError.NONE
res.len == 0
@ -195,7 +196,7 @@ procSuite "Sorted store queue":
# First page
pInfo.pageSize == 2
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 3.0
pInfo.cursor.senderTime == Timestamp(3)
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[3,5]
@ -206,7 +207,7 @@ procSuite "Sorted store queue":
# Next page
pInfo.pageSize == 1
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 1.0
pInfo.cursor.senderTime == Timestamp(1)
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[1]
@ -217,7 +218,7 @@ procSuite "Sorted store queue":
# Empty last page
pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 1.0
pInfo.cursor.senderTime == Timestamp(1)
err == HistoryResponseError.NONE
res.len == 0
@ -228,14 +229,14 @@ procSuite "Sorted store queue":
var (res, pInfo, err) = testStoreQueue.getPage(predicate,
PagingInfo(pageSize: 3,
cursor: Index(receiverTime: float64(3), senderTime: float64(3), digest: MDigest[256]()),
cursor: Index(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]()),
direction: PagingDirection.BACKWARD))
check:
# Empty response with error
pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 3.0
pInfo.cursor.senderTime == Timestamp(3)
err == HistoryResponseError.INVALID_CURSOR
res.len == 0
@ -243,14 +244,14 @@ procSuite "Sorted store queue":
(res, pInfo, err) = testStoreQueue.getPage(predicate,
PagingInfo(pageSize: 3,
cursor: Index(receiverTime: float64(3), senderTime: float64(3), digest: MDigest[256]()),
cursor: Index(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]()),
direction: PagingDirection.FORWARD))
check:
# Empty response with error
pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 3.0
pInfo.cursor.senderTime == Timestamp(3)
err == HistoryResponseError.INVALID_CURSOR
res.len == 0
@ -271,7 +272,7 @@ procSuite "Sorted store queue":
# Empty response
pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 0.0
pInfo.cursor.senderTime == Timestamp(0)
err == HistoryResponseError.NONE
res.len == 0
@ -285,7 +286,7 @@ procSuite "Sorted store queue":
# Empty response
pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 0.0
pInfo.cursor.senderTime == Timestamp(0)
err == HistoryResponseError.NONE
res.len == 0

View File

@ -22,6 +22,7 @@ import
../../waku/v2/protocol/waku_lightpush/waku_lightpush,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/peers,
../../waku/v2/utils/time,
../../waku/v2/node/wakunode2,
../test_helpers
@ -1156,7 +1157,7 @@ procSuite "WakuNode":
# count the total number of retrieved messages from the database
var responseCount = 0
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) =
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) =
responseCount += 1
# retrieve all the messages in the db
let res = store.getAll(data)

View File

@ -13,6 +13,7 @@ import
# Waku v2 imports
libp2p/crypto/crypto,
../v2/utils/namespacing,
../v2/utils/time,
../v2/node/wakunode2,
# Common cli config
./config_bridge
@ -91,7 +92,7 @@ func toWakuMessage(env: Envelope): WakuMessage =
# Translate a Waku v1 envelope to a Waku v2 message
WakuMessage(payload: env.data,
contentTopic: toV2ContentTopic(env.topic),
timestamp: float64(env.expiry - env.ttl),
timestamp: (getNanosecondTime(env.expiry) - getNanosecondTime(env.ttl)),
version: 1)
proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} =

View File

@ -16,7 +16,7 @@ proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
# Store API
proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[float64], endTime: Option[float64], pagingOptions: Option[StorePagingOptions]): StoreResponse
proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]): StoreResponse
# Filter API

View File

@ -4,7 +4,8 @@ import
std/[options,tables],
eth/keys,
../../protocol/waku_message,
../../utils/pagination
../../utils/pagination,
../../utils/time
type
StoreResponse* = object
@ -21,7 +22,7 @@ type
payload*: seq[byte]
contentTopic*: Option[ContentTopic]
# sender generated timestamp
timestamp*: Option[float64]
timestamp*: Option[Timestamp]
WakuPeer* = object
multiaddr*: string

View File

@ -6,6 +6,7 @@ import
../../../v1/node/rpc/hexstrings,
../../protocol/waku_store/waku_store_types,
../../protocol/waku_message,
../../utils/time,
../waku_payload,
./jsonrpc_types
@ -41,12 +42,12 @@ proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse =
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage =
const defaultCT = ContentTopic("/waku/2/default-content/proto")
var t: float64
var t: Timestamp
if relayMessage.timestamp.isSome:
t = relayMessage.timestamp.get
else:
# incoming WakuRelayMessages with no timestamp will get 0 timestamp
t = float64(0)
t = Timestamp(0)
WakuMessage(payload: relayMessage.payload,
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,
version: version,
@ -60,12 +61,12 @@ proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref Br
dst: pubKey,
symkey: symkey)
var t: float64
var t: Timestamp
if relayMessage.timestamp.isSome:
t = relayMessage.timestamp.get
else:
# incoming WakuRelayMessages with no timestamp will get 0 timestamp
t = float64(0)
t = Timestamp(0)
WakuMessage(payload: payload.encode(version, rng[]).get(),
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,

View File

@ -5,6 +5,7 @@ import
chronicles,
json_rpc/rpcserver,
../wakunode2,
../../utils/time,
./jsonrpc_types, ./jsonrpc_utils
export jsonrpc_types
@ -17,7 +18,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Store API version 1 definitions
rpcsrv.rpc("get_waku_v2_store_v1_messages") do(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[float64], endTime: Option[float64], pagingOptions: Option[StorePagingOptions]) -> StoreResponse:
rpcsrv.rpc("get_waku_v2_store_v1_messages") do(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]) -> StoreResponse:
## Returns history for a list of content topics with optional paging
debug "get_waku_v2_store_v1_messages"
@ -29,8 +30,8 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
let historyQuery = HistoryQuery(pubsubTopic: if pubsubTopicOption.isSome: pubsubTopicOption.get() else: "",
contentFilters: if contentFiltersOption.isSome: contentFiltersOption.get() else: @[],
startTime: if startTime.isSome: startTime.get() else: 0.float64,
endTime: if endTime.isSome: endTime.get() else: 0.float64,
startTime: if startTime.isSome: startTime.get() else: Timestamp(0),
endTime: if endTime.isSome: endTime.get() else: Timestamp(0),
pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo())
await node.query(historyQuery, queryFuncHandler)

View File

@ -12,6 +12,7 @@ import
../protocol/waku_filter/waku_filter_types,
../protocol/waku_store/waku_store_types,
../protocol/waku_message,
../utils/time,
./wakunode2,
./waku_payload,
./jsonrpc/[jsonrpc_types,jsonrpc_utils]
@ -63,11 +64,11 @@ os.sleep(2000)
for i in 0..<topicAmount:
os.sleep(50)
# TODO: This would then publish on a subtopic here
var res3 = waitFor nodes[0].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(0).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
res3 = waitFor nodes[1].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(1).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
res3 = waitFor nodes[2].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(2).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
res3 = waitFor nodes[3].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(3).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
res3 = waitFor nodes[4].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(4).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
var res3 = waitFor nodes[0].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(0).buffer, contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
res3 = waitFor nodes[1].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(1).buffer, contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
res3 = waitFor nodes[2].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(2).buffer, contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
res3 = waitFor nodes[3].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(3).buffer, contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
res3 = waitFor nodes[4].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(4).buffer, contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
# Scenario xx2 - 14 full nodes, two edge nodes
# Assume one full topic

View File

@ -4,6 +4,7 @@ import
std/options,
stew/results,
../../../protocol/waku_message,
../../../utils/time,
../../../utils/pagination
## This module defines a message store interface. Implementations of
@ -11,7 +12,7 @@ import
## retrieve historical messages
type
DataProc* = proc(receiverTimestamp: float64, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].}
DataProc* = proc(receiverTimestamp: Timestamp, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].}
MessageStoreResult*[T] = Result[T, string]

View File

@ -7,7 +7,8 @@ import
./message_store,
../sqlite,
../../../protocol/waku_message,
../../../utils/pagination
../../../utils/pagination,
../../../utils/time
export sqlite
@ -31,12 +32,12 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T]
let prepare = db.prepareStmt("""
CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ (
id BLOB PRIMARY KEY,
receiverTimestamp REAL NOT NULL,
receiverTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
senderTimestamp REAL NOT NULL
senderTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL
) WITHOUT ROWID;
""", NoParams, void)
@ -61,7 +62,7 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop
##
let prepare = db.database.prepareStmt(
"INSERT INTO " & TABLE_TITLE & " (id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp) VALUES (?, ?, ?, ?, ?, ?, ?);",
(seq[byte], float64, seq[byte], seq[byte], seq[byte], int64, float64),
(seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp),
void
)
@ -91,7 +92,7 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc, limit = non
proc msg(s: ptr sqlite3_stmt) =
gotMessages = true
let
receiverTimestamp = sqlite3_column_double(s, 0)
receiverTimestamp = column_timestamp(s, 0)
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1))
topicLength = sqlite3_column_bytes(s,1)
@ -107,12 +108,12 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc, limit = non
version = sqlite3_column_int64(s, 4)
senderTimestamp = sqlite3_column_double(s, 5)
senderTimestamp = column_timestamp(s, 5)
# TODO retrieve the version number
onData(receiverTimestamp.float64,
WakuMessage(contentTopic: contentTopic, payload: payload , version: uint32(version), timestamp: senderTimestamp.float64),
onData(Timestamp(receiverTimestamp),
WakuMessage(contentTopic: contentTopic, payload: payload , version: uint32(version), timestamp: Timestamp(senderTimestamp)),
pubsubTopic)
var selectQuery = "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " &

View File

@ -0,0 +1,30 @@
CREATE TABLE IF NOT EXISTS Message_backup (
id BLOB PRIMARY KEY,
receiverTimestamp REAL NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
senderTimestamp REAL NOT NULL
) WITHOUT ROWID;
INSERT INTO Message_backup SELECT id, receiverTimestamp, contentTopic, pubsubTopic, payload, version, senderTimestamp FROM Message;
DROP TABLE Message;
CREATE TABLE IF NOT EXISTS Message(
id BLOB PRIMARY KEY,
receiverTimestamp INTEGER NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
senderTimestamp INTEGER NOT NULL
) WITHOUT ROWID;
INSERT INTO Message (id, receiverTimestamp, contentTopic, pubsubTopic, payload, version, senderTimestamp)
SELECT id, FLOOR(receiverTimestamp*1000000000), contentTopic, pubsubTopic, payload, version, FLOOR(senderTimestamp*1000000000)
FROM Message_backup;
DROP TABLE Message_backup;

View File

@ -9,7 +9,9 @@
{.push raises: [Defect].}
import
libp2p/protobuf/minprotobuf
libp2p/protobuf/minprotobuf,
libp2p/varint,
../utils/time
when defined(rln):
import waku_rln_relay/waku_rln_relay_types
@ -24,7 +26,7 @@ type
contentTopic*: ContentTopic
version*: uint32
# sender generated timestamp
timestamp*: float64
timestamp*: Timestamp
# the proof field indicates that the message is not a spam
# this field will be used in the rln-relay protocol
# XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec and not yet part of WakuMessage spec
@ -43,7 +45,10 @@ proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =
discard ? pb.getField(2, msg.contentTopic)
discard ? pb.getField(3, msg.version)
discard ? pb.getField(4, msg.timestamp)
var timestamp: zint64
discard ? pb.getField(4, timestamp)
msg.timestamp = Timestamp(timestamp)
# XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec and not yet part of WakuMessage spec
when defined(rln):
var proofBytes: seq[byte]
@ -60,7 +65,7 @@ proc encode*(message: WakuMessage): ProtoBuffer =
result.write(1, message.payload)
result.write(2, message.contentTopic)
result.write(3, message.version)
result.write(4, message.timestamp)
result.write(4, zint64(message.timestamp))
when defined(rln):
result.write(21, message.proof.encode())
else:

View File

@ -430,7 +430,6 @@ proc compare*(e1, e2: Epoch): int64 =
epoch2 = fromEpoch(e2)
return int64(epoch1) - int64(epoch2)
proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, timeOption: Option[float64] = none(float64)): MessageValidationResult =
## validate the supplied `msg` based on the waku-rln-relay routing protocol i.e.,
## the `msg`'s epoch is within MAX_EPOCH_GAP of the current epoch

View File

@ -16,12 +16,14 @@ import
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
libp2p/varint,
metrics,
stew/[results, byteutils],
# internal imports
../../node/storage/message/message_store,
../../node/peer_manager/peer_manager,
../../utils/requests,
../../utils/time,
../waku_swap/waku_swap,
./waku_store_types
@ -54,7 +56,7 @@ const
# TODO Move serialization function to separate file, too noisy
# TODO Move pagination to separate file, self-contained logic
proc computeIndex*(msg: WakuMessage, receivedTime = getTime().toUnixFloat()): Index =
proc computeIndex*(msg: WakuMessage, receivedTime = getNanosecondTime(getTime().toUnixFloat())): Index =
## Takes a WakuMessage with received timestamp and returns its Index.
## Received timestamp will default to system time if not provided.
var ctx: sha256
@ -64,7 +66,7 @@ proc computeIndex*(msg: WakuMessage, receivedTime = getTime().toUnixFloat()): In
let digest = ctx.finish() # computes the hash
ctx.clear()
let receiverTime = receivedTime.round(3) # Ensure timestamp has (only) millisecond resolution
let receiverTime = receivedTime
var index = Index(digest:digest, receiverTime: receiverTime, senderTime: msg.timestamp)
return index
@ -77,8 +79,8 @@ proc encode*(index: Index): ProtoBuffer =
# encodes index
output.write(1, index.digest.data)
output.write(2, index.receiverTime)
output.write(3, index.senderTime)
output.write(2, zint64(index.receiverTime))
output.write(3, zint64(index.senderTime))
return output
@ -110,14 +112,14 @@ proc init*(T: type Index, buffer: seq[byte]): ProtoResult[T] =
index.digest.data[count] = b
# read the timestamp
var receiverTime: float64
var receiverTime: zint64
discard ? pb.getField(2, receiverTime)
index.receiverTime = receiverTime
index.receiverTime = Timestamp(receiverTime)
# read the timestamp
var senderTime: float64
var senderTime: zint64
discard ? pb.getField(3, senderTime)
index.senderTime = senderTime
index.senderTime = Timestamp(senderTime)
return ok(index)
@ -167,8 +169,13 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
msg.pagingInfo = ? PagingInfo.init(pagingInfoBuffer)
discard ? pb.getField(5, msg.startTime)
discard ? pb.getField(6, msg.endTime)
var startTime: zint64
discard ? pb.getField(5, startTime)
msg.startTime = Timestamp(startTime)
var endTime: zint64
discard ? pb.getField(6, endTime)
msg.endTime = Timestamp(endTime)
return ok(msg)
@ -226,8 +233,8 @@ proc encode*(query: HistoryQuery): ProtoBuffer =
output.write(4, query.pagingInfo.encode())
output.write(5, query.startTime)
output.write(6, query.endTime)
output.write(5, zint64(query.startTime))
output.write(6, zint64(query.endTime))
return output
@ -262,10 +269,10 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
else: none(seq[ContentTopic])
qPubSubTopic = if (query.pubsubTopic != ""): some(query.pubsubTopic)
else: none(string)
qStartTime = if query.startTime != float64(0): some(query.startTime)
else: none(float64)
qEndTime = if query.endTime != float64(0): some(query.endTime)
else: none(float64)
qStartTime = if query.startTime != Timestamp(0): some(query.startTime)
else: none(Timestamp)
qEndTime = if query.endTime != Timestamp(0): some(query.endTime)
else: none(Timestamp)
## Compose filter predicate for message from query criteria
proc matchesQuery(indMsg: IndexedWakuMessage): bool =
@ -335,7 +342,7 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
if ws.store.isNil:
return
proc onData(receiverTime: float64, msg: WakuMessage, pubsubTopic: string) =
proc onData(receiverTime: Timestamp, msg: WakuMessage, pubsubTopic: string) =
# TODO index should not be recalculated
discard ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime), pubsubTopic: pubsubTopic))
@ -524,16 +531,17 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
var currentTime = epochTime()
var currentTime = getNanosecondTime(epochTime())
debug "resume", currentEpochTime=currentTime
let lastSeenItem = ws.messages.last()
var lastSeenTime = if lastSeenItem.isOk(): lastSeenItem.get().msg.timestamp
else: float64(0)
else: Timestamp(0)
# adjust the time window with an offset of 20 seconds
let offset: float64 = 200000
let offset: Timestamp = getNanosecondTime(20)
currentTime = currentTime + offset
lastSeenTime = max(lastSeenTime - offset, 0)
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime

View File

@ -12,6 +12,7 @@ import
# internal imports
../../node/storage/message/message_store,
../../utils/pagination,
../../utils/time,
../../node/peer_manager/peer_manager,
../waku_swap/waku_swap_types,
../waku_message
@ -67,8 +68,8 @@ type
contentFilters*: seq[HistoryContentFilter]
pubsubTopic*: string
pagingInfo*: PagingInfo # used for pagination
startTime*: float64 # used for time-window query
endTime*: float64 # used for time-window query
startTime*: Timestamp # used for time-window query
endTime*: Timestamp # used for time-window query
HistoryResponseError* {.pure.} = enum
## HistoryResponseError contains error message to inform the querying node about the state of its request

View File

@ -5,6 +5,7 @@
{.push raises: [Defect].}
import
./time,
nimcrypto/hash,
stew/byteutils
@ -14,8 +15,8 @@ type
Index* = object
## This type contains the description of an Index used in the pagination of WakuMessages
digest*: MDigest[256]
receiverTime*: float64
senderTime*: float64 # the time at which the message is generated
receiverTime*: Timestamp
senderTime*: Timestamp # the time at which the message is generated
proc `==`*(x, y: Index): bool =
## receiverTime plays no role in index comparison

24
waku/v2/utils/time.nim Normal file
View File

@ -0,0 +1,24 @@
## Contains types and utilities for timestamps.
{.push raises: [Defect].}
import sqlite3_abi
type Timestamp* = int64
const TIMESTAMP_TABLE_TYPE* = "INTEGER"
proc getNanosecondTime*[T](timeInSeconds: T): Timestamp =
var ns = Timestamp(timeInSeconds*100000000)
return ns
proc getMicrosecondTime*[T](timeInSeconds: T): Timestamp =
var us = Timestamp(timeInSeconds*1000000)
return us
proc getMillisecondTime*[T](timeInSeconds: T): Timestamp =
var ms = Timestamp(timeInSeconds*1000)
return ms
proc column_timestamp*(a1: ptr sqlite3_stmt, iCol: cint): int64 =
return sqlite3_column_int64(a1, iCol)