Refactoring timestamps (#842)

* Refactor timestamps type from float64 to int64 (milliseconds resolution)

* Revert epochs to float64

* Update 00002_addSenderTimeStamp.up.sql

* Update quicksim2.nim

* Add files via upload

* Delete 00003_convertTimestampsToInts.up.sql

* Add files via upload

* Rename 00003_convertTimestampsToInts.up.sql to 00003_addTimestampsToInts.up.sql

* Delete 00003_addTimestampsToInts.up.sql

* Rln-relay integration into chat2 (#835)

* adds ProofMetadata

* adds EPOCH_INTERVAL

* adds messageLog field

* adds updateLog, toEpoch, fromEpoch, getEpoch, compareTo

* adds unit test for toEpoch and fromEpoch

* adds unit test for Epoch comparison

* adds result codes for updateLog

* adds unit test for update log

* renames epoch related consts

* modifies updateLog with new return type and new logic of spam detection

* adds unit text for the modified updateLog

* changes max epoch gap type size

* splits updateLog into two procs isSpam and updateLog

* updates unittests

* fixes a bug, returns false when the message is not spam

* renames messageLog to nullifierLog

* renames isSpam to hasDuplicate

* updates the rln validator, adds comments

* adds appendRLNProof proc plus some code beatification

* unit test for validate message

* adds unhappy test to validateMessage unit test

* renames EPOCH_UNIT_SECONDS

* renames MAX_CLOCK_GAP_SECONDS

* WIP: integration test

* fixes compile errors

* sets a real epoch value

* updates on old unittests

* adds comments to the rln relay tests

* adds more comments

* makes rln import conditional

* adds todos

* adds more todos

* adds rln-relay mount process into chat2

* further todos

* logs contentTopic

* introduces rln relay configs

* changes default pubsub topic

* adds contentTopic config

* imports rln relay dependencies

* consolidates imports

* removes module identifier from ContentTopic

* adds contentTopic field

* adds contentTopic argument to mountRlnRelay calls

* appends rln proof to chat2 messages

* changes the default chat2 contentTopic

* adds missing content topic fields

* fixes a bug

* adds a new logic about empty content topics

* appends proof only when rln flag is active

* removes unnecessary todos

* fixes an indentation issue

* adds log messages

* verifies the proof against the concatenation of msg payload and content topic

* a bug fix

* removes duplicate epoch time calculation

* updates log level to trace

* updates default rln-relay content topic

* adds support for empty content topics

* updates changelog

* changelog updates

* removes a commented code block

* updates addRLNRelayValidator string doc

* Squashed commit of the following:

commit bc36c99ab2
Merge: dc2b2946 5a77d6e2
Author: G <28568419+s1fr0@users.noreply.github.com>
Date:   Sat Feb 5 01:10:06 2022 +0100

    Merge branch 'master' into int64-timestamps-ns

commit dc2b294667
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Sat Feb 5 00:24:45 2022 +0100

    Fix

commit f97b95a036
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Sat Feb 5 00:13:18 2022 +0100

    Missing import

commit 060c4f8d64
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Sat Feb 5 00:10:36 2022 +0100

    Fixed typo

commit 08ca99b6f6
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 23:59:20 2022 +0100

    Time util file

commit 2b5c360746
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 23:33:20 2022 +0100

    Moved time utility functions to utils/time

commit fdaf121f08
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 23:10:25 2022 +0100

    Fix comment

commit c7e06ab4e7
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 23:04:13 2022 +0100

    Restore previous migration script

commit 80282db1d7
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 22:54:15 2022 +0100

    Typo

commit b9d67f89b0
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 22:49:29 2022 +0100

    Added utilities to get int64 nanosecond, microsecond, millisecond time resolution from float

commit 0130d496e6
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 22:36:35 2022 +0100

    Switched to nanoseconds support.

* Update CHANGELOG.md

* Create 00003_convertTimestampsToInt64.up.sql

Migration script

* Moved migration script to right location

* Update waku_rln_relay_utils.nim

* Update waku_rln_relay_utils.nim

* Addressed reviewers' comments

* Update default fleet metrics dashboard (#844)

* Fix

* No need for float

* Aligning master to changes in PR

* Further fixes

Co-authored-by: Sanaz Taheri Boshrooyeh <35961250+staheri14@users.noreply.github.com>
Co-authored-by: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com>
This commit is contained in:
G 2022-02-17 16:00:15 +01:00 committed by GitHub
parent bb3e59454e
commit 21cac6d491
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 225 additions and 141 deletions

View File

@ -13,7 +13,9 @@ The full list of changes is below.
### Changes ### Changes
- ... - A new type `Timestamp` for all timestamps is introduced (currently an alias for int64).
- All timestamps now have nanosecond resolution.
### Fixes ### Fixes
- ... - ...

View File

@ -25,6 +25,7 @@ import
../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/utils/peers, ../../waku/v2/utils/peers,
../../waku/v2/utils/time,
../test_helpers ../test_helpers
template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0] template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]
@ -102,7 +103,7 @@ procSuite "Waku v2 JSON-RPC API":
response == true response == true
# Publish a message on the default topic # Publish a message on the default topic
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(epochTime()))) response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
check: check:
# @TODO poll topic to verify message has been published # @TODO poll topic to verify message has been published
@ -260,7 +261,7 @@ procSuite "Waku v2 JSON-RPC API":
let client = newRpcHttpClient() let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false) await client.connect("127.0.0.1", rpcPort, false)
let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(0.float64), some(9.float64), some(StorePagingOptions())) let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions()))
check: check:
response.messages.len() == 8 response.messages.len() == 8
response.pagingOptions.isSome() response.pagingOptions.isSome()
@ -573,7 +574,7 @@ procSuite "Waku v2 JSON-RPC API":
pubSubTopic = "polling" pubSubTopic = "polling"
contentTopic = defaultContentTopic contentTopic = defaultContentTopic
payload = @[byte 9] payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime())) message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]() topicCache = newTable[string, seq[WakuMessage]]()
await node1.start() await node1.start()
@ -664,7 +665,7 @@ procSuite "Waku v2 JSON-RPC API":
pubSubTopic = "polling" pubSubTopic = "polling"
contentTopic = defaultContentTopic contentTopic = defaultContentTopic
payload = @[byte 9] payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime())) message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]() topicCache = newTable[string, seq[WakuMessage]]()
await node1.start() await node1.start()

View File

@ -6,6 +6,7 @@ import
../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/storage/sqlite, ../../waku/v2/node/storage/sqlite,
../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/utils/time,
./utils ./utils
suite "Message Store": suite "Message Store":
@ -16,9 +17,9 @@ suite "Message Store":
topic = ContentTopic("/waku/2/default-content/proto") topic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto" pubsubTopic = "/waku/2/default-waku/proto"
t1 = epochTime() t1 = getNanosecondTime(epochTime())
t2 = epochTime() t2 = getNanosecondTime(epochTime())
t3 = high(float64) t3 = getNanosecondTime(high(float64))
var msgs = @[ var msgs = @[
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1), WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1),
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2), WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2),
@ -45,7 +46,7 @@ suite "Message Store":
var msgFlag, psTopicFlag = true var msgFlag, psTopicFlag = true
var responseCount = 0 var responseCount = 0
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} = proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
responseCount += 1 responseCount += 1
# Note: cannot use `check` within `{.raises: [Defect].}` block: # Note: cannot use `check` within `{.raises: [Defect].}` block:
@ -136,7 +137,7 @@ suite "Message Store":
for i in 1..capacity: for i in 1..capacity:
let let
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: i.float) msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
index = computeIndex(msg) index = computeIndex(msg)
output = store.put(index, msg, pubsubTopic) output = store.put(index, msg, pubsubTopic)
@ -145,9 +146,9 @@ suite "Message Store":
var var
responseCount = 0 responseCount = 0
lastMessageTimestamp = 0.float lastMessageTimestamp = Timestamp(0)
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} = proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
responseCount += 1 responseCount += 1
lastMessageTimestamp = msg.timestamp lastMessageTimestamp = msg.timestamp
@ -157,7 +158,7 @@ suite "Message Store":
check: check:
resMax.isOk resMax.isOk
responseCount == capacity # We retrieved all items responseCount == capacity # We retrieved all items
lastMessageTimestamp == capacity.float # Returned rows were ordered correctly lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly
# Now test getAll with a limit smaller than total stored items # Now test getAll with a limit smaller than total stored items
responseCount = 0 # Reset response count responseCount = 0 # Reset response count
@ -167,7 +168,7 @@ suite "Message Store":
check: check:
resLimit.isOk resLimit.isOk
responseCount == capacity - 2 # We retrieved limited number of items responseCount == capacity - 2 # We retrieved limited number of items
lastMessageTimestamp == capacity.float # We retrieved the youngest items in the store, in order lastMessageTimestamp == Timestamp(capacity) # We retrieved the youngest items in the store, in order
# Test zero limit # Test zero limit
responseCount = 0 # Reset response count responseCount = 0 # Reset response count
@ -177,4 +178,4 @@ suite "Message Store":
check: check:
resZero.isOk resZero.isOk
responseCount == 0 # No items retrieved responseCount == 0 # No items retrieved
lastMessageTimestamp == 0.float # No items retrieved lastMessageTimestamp == Timestamp(0) # No items retrieved

View File

@ -5,7 +5,8 @@ import
chronos, chronos,
stew/byteutils, stew/byteutils,
libp2p/crypto/crypto, libp2p/crypto/crypto,
../../waku/v2/utils/pagination ../../waku/v2/utils/pagination,
../../waku/v2/utils/time
procSuite "Pagination utils": procSuite "Pagination utils":
@ -24,26 +25,26 @@ procSuite "Pagination utils":
## Test vars ## Test vars
let let
smallIndex1 = Index(digest: hashFromStr("1234"), smallIndex1 = Index(digest: hashFromStr("1234"),
receiverTime: 0.00, receiverTime: getNanosecondTime(0),
senderTime: 1000.00) senderTime: getNanosecondTime(1000))
smallIndex2 = Index(digest: hashFromStr("1234567"), # digest is less significant than senderTime smallIndex2 = Index(digest: hashFromStr("1234567"), # digest is less significant than senderTime
receiverTime: 0.00, receiverTime: getNanosecondTime(0),
senderTime: 1000.00) senderTime: getNanosecondTime(1000))
largeIndex1 = Index(digest: hashFromStr("1234"), largeIndex1 = Index(digest: hashFromStr("1234"),
receiverTime: 0.00, receiverTime: getNanosecondTime(0),
senderTime: 9000.00) # only senderTime differ from smallIndex1 senderTime: getNanosecondTime(9000)) # only senderTime differ from smallIndex1
largeIndex2 = Index(digest: hashFromStr("12345"), # only digest differs from smallIndex1 largeIndex2 = Index(digest: hashFromStr("12345"), # only digest differs from smallIndex1
receiverTime: 0.00, receiverTime: getNanosecondTime(0),
senderTime: 1000.00) senderTime: getNanosecondTime(1000))
eqIndex1 = Index(digest: hashFromStr("0003"), eqIndex1 = Index(digest: hashFromStr("0003"),
receiverTime: 0.00, receiverTime: getNanosecondTime(0),
senderTime: 54321.00) senderTime: getNanosecondTime(54321))
eqIndex2 = Index(digest: hashFromStr("0003"), eqIndex2 = Index(digest: hashFromStr("0003"),
receiverTime: 0.00, receiverTime: getNanosecondTime(0),
senderTime: 54321.00) senderTime: getNanosecondTime(54321))
eqIndex3 = Index(digest: hashFromStr("0003"), eqIndex3 = Index(digest: hashFromStr("0003"),
receiverTime: 9999.00, # receiverTime difference should have no effect on comparisons receiverTime: getNanosecondTime(9999), # receiverTime difference should have no effect on comparisons
senderTime: 54321.00) senderTime: getNanosecondTime(54321))
## Test suite ## Test suite

View File

@ -4,6 +4,7 @@ import
testutils/unittests, nimcrypto/sha2, testutils/unittests, nimcrypto/sha2,
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/utils/time,
../test_helpers ../test_helpers
@ -17,8 +18,8 @@ proc createSampleStoreQueue(s: int): StoreQueueRef =
for i in 0..<s: for i in 0..<s:
discard testStoreQueue.add(IndexedWakuMessage(msg: WakuMessage(payload: @[byte i]), discard testStoreQueue.add(IndexedWakuMessage(msg: WakuMessage(payload: @[byte i]),
index: Index(receiverTime: float64(i), index: Index(receiverTime: Timestamp(i),
senderTime: float64(i), senderTime: Timestamp(i),
digest: MDigest[256](data: data)) )) digest: MDigest[256](data: data)) ))
return testStoreQueue return testStoreQueue
@ -273,7 +274,7 @@ suite "time-window history query":
let let
version = 0'u32 version = 0'u32
payload = @[byte 0, 1, 2] payload = @[byte 0, 1, 2]
timestamp = float64(10) timestamp = Timestamp(10)
msg = WakuMessage(payload: payload, version: version, timestamp: timestamp) msg = WakuMessage(payload: payload, version: version, timestamp: timestamp)
pb = msg.encode() pb = msg.encode()
@ -307,4 +308,4 @@ suite "time-window history query":
let let
timestampDecoded = msgDecoded.value.timestamp timestampDecoded = msgDecoded.value.timestamp
check: check:
timestampDecoded == float64(0) timestampDecoded == Timestamp(0)

View File

@ -12,6 +12,7 @@ import
../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/time,
../test_helpers, ./utils ../test_helpers, ./utils
procSuite "Waku Store": procSuite "Waku Store":
@ -525,7 +526,7 @@ procSuite "Waku Store":
let let
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic)) index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11)) query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11))
pb = query.encode() pb = query.encode()
decodedQuery = HistoryQuery.init(pb.buffer) decodedQuery = HistoryQuery.init(pb.buffer)
@ -575,25 +576,25 @@ procSuite "Waku Store":
key2 = PrivateKey.random(ECDSA, rng[]).get() key2 = PrivateKey.random(ECDSA, rng[]).get()
# peer2 = PeerInfo.new(key2) # peer2 = PeerInfo.new(key2)
var var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)), msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)),
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1)), WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: Timestamp(1)),
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2)), WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: Timestamp(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)), WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: Timestamp(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)), WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: Timestamp(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)), WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: Timestamp(5)),
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: float(6)), WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: Timestamp(6)),
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7)), WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: Timestamp(7)),
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8)), WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: Timestamp(8)),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(9))] WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: Timestamp(9))]
msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)), msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)),
WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: float(1)), WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: Timestamp(1)),
WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: float(2)), WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: Timestamp(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)), WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: Timestamp(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)), WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: Timestamp(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)), WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: Timestamp(5)),
WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: float(6)), WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: Timestamp(6)),
WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: float(7))] WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: Timestamp(7))]
#-------------------- #--------------------
# setup default test store # setup default test store
@ -641,11 +642,11 @@ procSuite "Waku Store":
proc handler(response: HistoryResponse) {.gcsafe, closure.} = proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check: check:
response.messages.len() == 2 response.messages.len() == 2
response.messages.anyIt(it.timestamp == float(3)) response.messages.anyIt(it.timestamp == Timestamp(3))
response.messages.anyIt(it.timestamp == float(5)) response.messages.anyIt(it.timestamp == Timestamp(5))
completionFut.complete(true) completionFut.complete(true)
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(5)) let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(2), endTime: Timestamp(5))
await proto.query(rpc, handler) await proto.query(rpc, handler)
check: check:
@ -661,7 +662,7 @@ procSuite "Waku Store":
response.messages.len() == 0 response.messages.len() == 0
completionFut.complete(true) completionFut.complete(true)
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(2)) let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(2), endTime: Timestamp(2))
await proto.query(rpc, handler) await proto.query(rpc, handler)
check: check:
@ -678,7 +679,7 @@ procSuite "Waku Store":
completionFut.complete(true) completionFut.complete(true)
# time window is invalid since start time > end time # time window is invalid since start time > end time
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(5), endTime: float(2)) let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(5), endTime: Timestamp(2))
await proto.query(rpc, handler) await proto.query(rpc, handler)
check: check:
@ -709,7 +710,7 @@ procSuite "Waku Store":
response.messages.len() == 4 response.messages.len() == 4
completionFut.complete(true) completionFut.complete(true)
let rpc = HistoryQuery(startTime: float(2), endTime: float(5)) let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5))
let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo.toRemotePeerInfo()) let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo.toRemotePeerInfo())
check: check:
@ -719,7 +720,7 @@ procSuite "Waku Store":
asyncTest "queryFromWithPaging with empty pagingInfo": asyncTest "queryFromWithPaging with empty pagingInfo":
let rpc = HistoryQuery(startTime: float(2), endTime: float(5)) let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5))
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo()) let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())
@ -729,7 +730,7 @@ procSuite "Waku Store":
asyncTest "queryFromWithPaging with pagination": asyncTest "queryFromWithPaging with pagination":
var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1) var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1)
let rpc = HistoryQuery(startTime: float(2), endTime: float(5), pagingInfo: pinfo) let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5), pagingInfo: pinfo)
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo()) let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())
@ -790,14 +791,14 @@ procSuite "Waku Store":
let store = WakuStore.init(PeerManager.new(newStandardSwitch()), crypto.newRng(), capacity = capacity) let store = WakuStore.init(PeerManager.new(newStandardSwitch()), crypto.newRng(), capacity = capacity)
for i in 1..capacity: for i in 1..capacity:
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic, timestamp: i.float64)) await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic, timestamp: Timestamp(i)))
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
check: check:
store.messages.len == capacity # Store is at capacity store.messages.len == capacity # Store is at capacity
# Test that capacity holds # Test that capacity holds
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: (capacity + 1).float64)) await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: Timestamp(capacity + 1)))
check: check:
store.messages.len == capacity # Store is still at capacity store.messages.len == capacity # Store is still at capacity

View File

@ -3,7 +3,8 @@
import import
std/sequtils, std/sequtils,
testutils/unittests, testutils/unittests,
../../waku/v2/protocol/waku_store/waku_store_types ../../waku/v2/protocol/waku_store/waku_store_types,
../../waku/v2/utils/time
procSuite "Sorted store queue": procSuite "Sorted store queue":
@ -12,8 +13,8 @@ procSuite "Sorted store queue":
## Use i to generate an IndexedWakuMessage ## Use i to generate an IndexedWakuMessage
var data {.noinit.}: array[32, byte] var data {.noinit.}: array[32, byte]
for x in data.mitems: x = i.byte for x in data.mitems: x = i.byte
return IndexedWakuMessage(msg: WakuMessage(payload: @[byte i], timestamp: float64(i)), return IndexedWakuMessage(msg: WakuMessage(payload: @[byte i], timestamp: Timestamp(i)),
index: Index(receiverTime: float64(i), senderTime: float64(i), digest: MDigest[256](data: data))) index: Index(receiverTime: Timestamp(i), senderTime: Timestamp(i), digest: MDigest[256](data: data)))
# Test variables # Test variables
let let
@ -62,7 +63,7 @@ procSuite "Sorted store queue":
let first = testStoreQueue.first() let first = testStoreQueue.first()
check: check:
first.isOk() first.isOk()
first.get().msg.timestamp == 1.0 first.get().msg.timestamp == Timestamp(1)
# Error condition # Error condition
let emptyQ = StoreQueueRef.new(capacity) let emptyQ = StoreQueueRef.new(capacity)
@ -73,7 +74,7 @@ procSuite "Sorted store queue":
let last = testStoreQueue.last() let last = testStoreQueue.last()
check: check:
last.isOk() last.isOk()
last.get().msg.timestamp == 5.0 last.get().msg.timestamp == Timestamp(5)
# Error condition # Error condition
let emptyQ = StoreQueueRef.new(capacity) let emptyQ = StoreQueueRef.new(capacity)
@ -91,7 +92,7 @@ procSuite "Sorted store queue":
# First page # First page
pInfo.pageSize == 3 pInfo.pageSize == 3
pInfo.direction == PagingDirection.FORWARD pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 3.0 pInfo.cursor.senderTime == Timestamp(3)
err == HistoryResponseError.NONE err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[1,2,3] res.mapIt(it.timestamp.int) == @[1,2,3]
@ -103,7 +104,7 @@ procSuite "Sorted store queue":
# Second page # Second page
pInfo.pageSize == 2 pInfo.pageSize == 2
pInfo.direction == PagingDirection.FORWARD pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 5.0 pInfo.cursor.senderTime == Timestamp(5)
err == HistoryResponseError.NONE err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[4,5] res.mapIt(it.timestamp.int) == @[4,5]
@ -114,7 +115,7 @@ procSuite "Sorted store queue":
# Empty last page # Empty last page
pInfo.pageSize == 0 pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 5.0 pInfo.cursor.senderTime == Timestamp(5)
err == HistoryResponseError.NONE err == HistoryResponseError.NONE
res.len == 0 res.len == 0
@ -129,7 +130,7 @@ procSuite "Sorted store queue":
# First page # First page
pInfo.pageSize == 3 pInfo.pageSize == 3
pInfo.direction == PagingDirection.BACKWARD pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 3.0 pInfo.cursor.senderTime == Timestamp(3)
err == HistoryResponseError.NONE err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[3,4,5] res.mapIt(it.timestamp.int) == @[3,4,5]
@ -141,7 +142,7 @@ procSuite "Sorted store queue":
# Second page # Second page
pInfo.pageSize == 2 pInfo.pageSize == 2
pInfo.direction == PagingDirection.BACKWARD pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 1.0 pInfo.cursor.senderTime == Timestamp(1)
err == HistoryResponseError.NONE err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[1,2] res.mapIt(it.timestamp.int) == @[1,2]
@ -152,7 +153,7 @@ procSuite "Sorted store queue":
# Empty last page # Empty last page
pInfo.pageSize == 0 pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 1.0 pInfo.cursor.senderTime == Timestamp(1)
err == HistoryResponseError.NONE err == HistoryResponseError.NONE
res.len == 0 res.len == 0
@ -170,7 +171,7 @@ procSuite "Sorted store queue":
# First page # First page
pInfo.pageSize == 2 pInfo.pageSize == 2
pInfo.direction == PagingDirection.FORWARD pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 4.0 pInfo.cursor.senderTime == Timestamp(4)
err == HistoryResponseError.NONE err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[2,4] res.mapIt(it.timestamp.int) == @[2,4]
@ -181,7 +182,7 @@ procSuite "Sorted store queue":
# Empty next page # Empty next page
pInfo.pageSize == 0 pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 4.0 pInfo.cursor.senderTime == Timestamp(4)
err == HistoryResponseError.NONE err == HistoryResponseError.NONE
res.len == 0 res.len == 0
@ -195,7 +196,7 @@ procSuite "Sorted store queue":
# First page # First page
pInfo.pageSize == 2 pInfo.pageSize == 2
pInfo.direction == PagingDirection.BACKWARD pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 3.0 pInfo.cursor.senderTime == Timestamp(3)
err == HistoryResponseError.NONE err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[3,5] res.mapIt(it.timestamp.int) == @[3,5]
@ -206,7 +207,7 @@ procSuite "Sorted store queue":
# Next page # Next page
pInfo.pageSize == 1 pInfo.pageSize == 1
pInfo.direction == PagingDirection.BACKWARD pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 1.0 pInfo.cursor.senderTime == Timestamp(1)
err == HistoryResponseError.NONE err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[1] res.mapIt(it.timestamp.int) == @[1]
@ -217,7 +218,7 @@ procSuite "Sorted store queue":
# Empty last page # Empty last page
pInfo.pageSize == 0 pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 1.0 pInfo.cursor.senderTime == Timestamp(1)
err == HistoryResponseError.NONE err == HistoryResponseError.NONE
res.len == 0 res.len == 0
@ -228,14 +229,14 @@ procSuite "Sorted store queue":
var (res, pInfo, err) = testStoreQueue.getPage(predicate, var (res, pInfo, err) = testStoreQueue.getPage(predicate,
PagingInfo(pageSize: 3, PagingInfo(pageSize: 3,
cursor: Index(receiverTime: float64(3), senderTime: float64(3), digest: MDigest[256]()), cursor: Index(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]()),
direction: PagingDirection.BACKWARD)) direction: PagingDirection.BACKWARD))
check: check:
# Empty response with error # Empty response with error
pInfo.pageSize == 0 pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 3.0 pInfo.cursor.senderTime == Timestamp(3)
err == HistoryResponseError.INVALID_CURSOR err == HistoryResponseError.INVALID_CURSOR
res.len == 0 res.len == 0
@ -243,14 +244,14 @@ procSuite "Sorted store queue":
(res, pInfo, err) = testStoreQueue.getPage(predicate, (res, pInfo, err) = testStoreQueue.getPage(predicate,
PagingInfo(pageSize: 3, PagingInfo(pageSize: 3,
cursor: Index(receiverTime: float64(3), senderTime: float64(3), digest: MDigest[256]()), cursor: Index(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]()),
direction: PagingDirection.FORWARD)) direction: PagingDirection.FORWARD))
check: check:
# Empty response with error # Empty response with error
pInfo.pageSize == 0 pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 3.0 pInfo.cursor.senderTime == Timestamp(3)
err == HistoryResponseError.INVALID_CURSOR err == HistoryResponseError.INVALID_CURSOR
res.len == 0 res.len == 0
@ -271,7 +272,7 @@ procSuite "Sorted store queue":
# Empty response # Empty response
pInfo.pageSize == 0 pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 0.0 pInfo.cursor.senderTime == Timestamp(0)
err == HistoryResponseError.NONE err == HistoryResponseError.NONE
res.len == 0 res.len == 0
@ -285,7 +286,7 @@ procSuite "Sorted store queue":
# Empty response # Empty response
pInfo.pageSize == 0 pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 0.0 pInfo.cursor.senderTime == Timestamp(0)
err == HistoryResponseError.NONE err == HistoryResponseError.NONE
res.len == 0 res.len == 0

View File

@ -22,6 +22,7 @@ import
../../waku/v2/protocol/waku_lightpush/waku_lightpush, ../../waku/v2/protocol/waku_lightpush/waku_lightpush,
../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/peers, ../../waku/v2/utils/peers,
../../waku/v2/utils/time,
../../waku/v2/node/wakunode2, ../../waku/v2/node/wakunode2,
../test_helpers ../test_helpers
@ -1156,7 +1157,7 @@ procSuite "WakuNode":
# count the total number of retrieved messages from the database # count the total number of retrieved messages from the database
var responseCount = 0 var responseCount = 0
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) = proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) =
responseCount += 1 responseCount += 1
# retrieve all the messages in the db # retrieve all the messages in the db
let res = store.getAll(data) let res = store.getAll(data)

View File

@ -13,6 +13,7 @@ import
# Waku v2 imports # Waku v2 imports
libp2p/crypto/crypto, libp2p/crypto/crypto,
../v2/utils/namespacing, ../v2/utils/namespacing,
../v2/utils/time,
../v2/node/wakunode2, ../v2/node/wakunode2,
# Common cli config # Common cli config
./config_bridge ./config_bridge
@ -91,7 +92,7 @@ func toWakuMessage(env: Envelope): WakuMessage =
# Translate a Waku v1 envelope to a Waku v2 message # Translate a Waku v1 envelope to a Waku v2 message
WakuMessage(payload: env.data, WakuMessage(payload: env.data,
contentTopic: toV2ContentTopic(env.topic), contentTopic: toV2ContentTopic(env.topic),
timestamp: float64(env.expiry - env.ttl), timestamp: (getNanosecondTime(env.expiry) - getNanosecondTime(env.ttl)),
version: 1) version: 1)
proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} = proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} =

View File

@ -16,7 +16,7 @@ proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
# Store API # Store API
proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[float64], endTime: Option[float64], pagingOptions: Option[StorePagingOptions]): StoreResponse proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]): StoreResponse
# Filter API # Filter API

View File

@ -4,7 +4,8 @@ import
std/[options,tables], std/[options,tables],
eth/keys, eth/keys,
../../protocol/waku_message, ../../protocol/waku_message,
../../utils/pagination ../../utils/pagination,
../../utils/time
type type
StoreResponse* = object StoreResponse* = object
@ -21,7 +22,7 @@ type
payload*: seq[byte] payload*: seq[byte]
contentTopic*: Option[ContentTopic] contentTopic*: Option[ContentTopic]
# sender generated timestamp # sender generated timestamp
timestamp*: Option[float64] timestamp*: Option[Timestamp]
WakuPeer* = object WakuPeer* = object
multiaddr*: string multiaddr*: string

View File

@ -6,6 +6,7 @@ import
../../../v1/node/rpc/hexstrings, ../../../v1/node/rpc/hexstrings,
../../protocol/waku_store/waku_store_types, ../../protocol/waku_store/waku_store_types,
../../protocol/waku_message, ../../protocol/waku_message,
../../utils/time,
../waku_payload, ../waku_payload,
./jsonrpc_types ./jsonrpc_types
@ -41,12 +42,12 @@ proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse =
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage = proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage =
const defaultCT = ContentTopic("/waku/2/default-content/proto") const defaultCT = ContentTopic("/waku/2/default-content/proto")
var t: float64 var t: Timestamp
if relayMessage.timestamp.isSome: if relayMessage.timestamp.isSome:
t = relayMessage.timestamp.get t = relayMessage.timestamp.get
else: else:
# incoming WakuRelayMessages with no timestamp will get 0 timestamp # incoming WakuRelayMessages with no timestamp will get 0 timestamp
t = float64(0) t = Timestamp(0)
WakuMessage(payload: relayMessage.payload, WakuMessage(payload: relayMessage.payload,
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT, contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,
version: version, version: version,
@ -60,12 +61,12 @@ proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref Br
dst: pubKey, dst: pubKey,
symkey: symkey) symkey: symkey)
var t: float64 var t: Timestamp
if relayMessage.timestamp.isSome: if relayMessage.timestamp.isSome:
t = relayMessage.timestamp.get t = relayMessage.timestamp.get
else: else:
# incoming WakuRelayMessages with no timestamp will get 0 timestamp # incoming WakuRelayMessages with no timestamp will get 0 timestamp
t = float64(0) t = Timestamp(0)
WakuMessage(payload: payload.encode(version, rng[]).get(), WakuMessage(payload: payload.encode(version, rng[]).get(),
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT, contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,

View File

@ -5,6 +5,7 @@ import
chronicles, chronicles,
json_rpc/rpcserver, json_rpc/rpcserver,
../wakunode2, ../wakunode2,
../../utils/time,
./jsonrpc_types, ./jsonrpc_utils ./jsonrpc_types, ./jsonrpc_utils
export jsonrpc_types export jsonrpc_types
@ -17,7 +18,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Store API version 1 definitions ## Store API version 1 definitions
rpcsrv.rpc("get_waku_v2_store_v1_messages") do(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[float64], endTime: Option[float64], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: rpcsrv.rpc("get_waku_v2_store_v1_messages") do(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]) -> StoreResponse:
## Returns history for a list of content topics with optional paging ## Returns history for a list of content topics with optional paging
debug "get_waku_v2_store_v1_messages" debug "get_waku_v2_store_v1_messages"
@ -29,8 +30,8 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
let historyQuery = HistoryQuery(pubsubTopic: if pubsubTopicOption.isSome: pubsubTopicOption.get() else: "", let historyQuery = HistoryQuery(pubsubTopic: if pubsubTopicOption.isSome: pubsubTopicOption.get() else: "",
contentFilters: if contentFiltersOption.isSome: contentFiltersOption.get() else: @[], contentFilters: if contentFiltersOption.isSome: contentFiltersOption.get() else: @[],
startTime: if startTime.isSome: startTime.get() else: 0.float64, startTime: if startTime.isSome: startTime.get() else: Timestamp(0),
endTime: if endTime.isSome: endTime.get() else: 0.float64, endTime: if endTime.isSome: endTime.get() else: Timestamp(0),
pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo()) pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo())
await node.query(historyQuery, queryFuncHandler) await node.query(historyQuery, queryFuncHandler)

View File

@ -12,6 +12,7 @@ import
../protocol/waku_filter/waku_filter_types, ../protocol/waku_filter/waku_filter_types,
../protocol/waku_store/waku_store_types, ../protocol/waku_store/waku_store_types,
../protocol/waku_message, ../protocol/waku_message,
../utils/time,
./wakunode2, ./wakunode2,
./waku_payload, ./waku_payload,
./jsonrpc/[jsonrpc_types,jsonrpc_utils] ./jsonrpc/[jsonrpc_types,jsonrpc_utils]
@ -63,11 +64,11 @@ os.sleep(2000)
for i in 0..<topicAmount: for i in 0..<topicAmount:
os.sleep(50) os.sleep(50)
# TODO: This would then publish on a subtopic here # TODO: This would then publish on a subtopic here
var res3 = waitFor nodes[0].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(0).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime()))) var res3 = waitFor nodes[0].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(0).buffer, contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
res3 = waitFor nodes[1].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(1).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime()))) res3 = waitFor nodes[1].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(1).buffer, contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
res3 = waitFor nodes[2].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(2).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime()))) res3 = waitFor nodes[2].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(2).buffer, contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
res3 = waitFor nodes[3].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(3).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime()))) res3 = waitFor nodes[3].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(3).buffer, contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
res3 = waitFor nodes[4].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(4).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime()))) res3 = waitFor nodes[4].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(4).buffer, contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
# Scenario xx2 - 14 full nodes, two edge nodes # Scenario xx2 - 14 full nodes, two edge nodes
# Assume one full topic # Assume one full topic

View File

@ -4,6 +4,7 @@ import
std/options, std/options,
stew/results, stew/results,
../../../protocol/waku_message, ../../../protocol/waku_message,
../../../utils/time,
../../../utils/pagination ../../../utils/pagination
## This module defines a message store interface. Implementations of ## This module defines a message store interface. Implementations of
@ -11,7 +12,7 @@ import
## retrieve historical messages ## retrieve historical messages
type type
DataProc* = proc(receiverTimestamp: float64, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].} DataProc* = proc(receiverTimestamp: Timestamp, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].}
MessageStoreResult*[T] = Result[T, string] MessageStoreResult*[T] = Result[T, string]

View File

@ -7,7 +7,8 @@ import
./message_store, ./message_store,
../sqlite, ../sqlite,
../../../protocol/waku_message, ../../../protocol/waku_message,
../../../utils/pagination ../../../utils/pagination,
../../../utils/time
export sqlite export sqlite
@ -31,12 +32,12 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T]
let prepare = db.prepareStmt(""" let prepare = db.prepareStmt("""
CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ ( CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ (
id BLOB PRIMARY KEY, id BLOB PRIMARY KEY,
receiverTimestamp REAL NOT NULL, receiverTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL,
contentTopic BLOB NOT NULL, contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL, pubsubTopic BLOB NOT NULL,
payload BLOB, payload BLOB,
version INTEGER NOT NULL, version INTEGER NOT NULL,
senderTimestamp REAL NOT NULL senderTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL
) WITHOUT ROWID; ) WITHOUT ROWID;
""", NoParams, void) """, NoParams, void)
@ -61,7 +62,7 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop
## ##
let prepare = db.database.prepareStmt( let prepare = db.database.prepareStmt(
"INSERT INTO " & TABLE_TITLE & " (id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp) VALUES (?, ?, ?, ?, ?, ?, ?);", "INSERT INTO " & TABLE_TITLE & " (id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp) VALUES (?, ?, ?, ?, ?, ?, ?);",
(seq[byte], float64, seq[byte], seq[byte], seq[byte], int64, float64), (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp),
void void
) )
@ -91,7 +92,7 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc, limit = non
proc msg(s: ptr sqlite3_stmt) = proc msg(s: ptr sqlite3_stmt) =
gotMessages = true gotMessages = true
let let
receiverTimestamp = sqlite3_column_double(s, 0) receiverTimestamp = column_timestamp(s, 0)
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1)) topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1))
topicLength = sqlite3_column_bytes(s,1) topicLength = sqlite3_column_bytes(s,1)
@ -107,12 +108,12 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc, limit = non
version = sqlite3_column_int64(s, 4) version = sqlite3_column_int64(s, 4)
senderTimestamp = sqlite3_column_double(s, 5) senderTimestamp = column_timestamp(s, 5)
# TODO retrieve the version number # TODO retrieve the version number
onData(receiverTimestamp.float64, onData(Timestamp(receiverTimestamp),
WakuMessage(contentTopic: contentTopic, payload: payload , version: uint32(version), timestamp: senderTimestamp.float64), WakuMessage(contentTopic: contentTopic, payload: payload , version: uint32(version), timestamp: Timestamp(senderTimestamp)),
pubsubTopic) pubsubTopic)
var selectQuery = "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " & var selectQuery = "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " &

View File

@ -0,0 +1,30 @@
CREATE TABLE IF NOT EXISTS Message_backup (
id BLOB PRIMARY KEY,
receiverTimestamp REAL NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
senderTimestamp REAL NOT NULL
) WITHOUT ROWID;
INSERT INTO Message_backup SELECT id, receiverTimestamp, contentTopic, pubsubTopic, payload, version, senderTimestamp FROM Message;
DROP TABLE Message;
CREATE TABLE IF NOT EXISTS Message(
id BLOB PRIMARY KEY,
receiverTimestamp INTEGER NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
senderTimestamp INTEGER NOT NULL
) WITHOUT ROWID;
INSERT INTO Message (id, receiverTimestamp, contentTopic, pubsubTopic, payload, version, senderTimestamp)
SELECT id, FLOOR(receiverTimestamp*1000000000), contentTopic, pubsubTopic, payload, version, FLOOR(senderTimestamp*1000000000)
FROM Message_backup;
DROP TABLE Message_backup;

View File

@ -9,7 +9,9 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
libp2p/protobuf/minprotobuf libp2p/protobuf/minprotobuf,
libp2p/varint,
../utils/time
when defined(rln): when defined(rln):
import waku_rln_relay/waku_rln_relay_types import waku_rln_relay/waku_rln_relay_types
@ -24,7 +26,7 @@ type
contentTopic*: ContentTopic contentTopic*: ContentTopic
version*: uint32 version*: uint32
# sender generated timestamp # sender generated timestamp
timestamp*: float64 timestamp*: Timestamp
# the proof field indicates that the message is not a spam # the proof field indicates that the message is not a spam
# this field will be used in the rln-relay protocol # this field will be used in the rln-relay protocol
# XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec and not yet part of WakuMessage spec # XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec and not yet part of WakuMessage spec
@ -43,7 +45,10 @@ proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =
discard ? pb.getField(2, msg.contentTopic) discard ? pb.getField(2, msg.contentTopic)
discard ? pb.getField(3, msg.version) discard ? pb.getField(3, msg.version)
discard ? pb.getField(4, msg.timestamp) var timestamp: zint64
discard ? pb.getField(4, timestamp)
msg.timestamp = Timestamp(timestamp)
# XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec and not yet part of WakuMessage spec # XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec and not yet part of WakuMessage spec
when defined(rln): when defined(rln):
var proofBytes: seq[byte] var proofBytes: seq[byte]
@ -60,7 +65,7 @@ proc encode*(message: WakuMessage): ProtoBuffer =
result.write(1, message.payload) result.write(1, message.payload)
result.write(2, message.contentTopic) result.write(2, message.contentTopic)
result.write(3, message.version) result.write(3, message.version)
result.write(4, message.timestamp) result.write(4, zint64(message.timestamp))
when defined(rln): when defined(rln):
result.write(21, message.proof.encode()) result.write(21, message.proof.encode())
else: else:

View File

@ -430,7 +430,6 @@ proc compare*(e1, e2: Epoch): int64 =
epoch2 = fromEpoch(e2) epoch2 = fromEpoch(e2)
return int64(epoch1) - int64(epoch2) return int64(epoch1) - int64(epoch2)
proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, timeOption: Option[float64] = none(float64)): MessageValidationResult = proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, timeOption: Option[float64] = none(float64)): MessageValidationResult =
## validate the supplied `msg` based on the waku-rln-relay routing protocol i.e., ## validate the supplied `msg` based on the waku-rln-relay routing protocol i.e.,
## the `msg`'s epoch is within MAX_EPOCH_GAP of the current epoch ## the `msg`'s epoch is within MAX_EPOCH_GAP of the current epoch

View File

@ -16,12 +16,14 @@ import
libp2p/protocols/protocol, libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
libp2p/stream/connection, libp2p/stream/connection,
libp2p/varint,
metrics, metrics,
stew/[results, byteutils], stew/[results, byteutils],
# internal imports # internal imports
../../node/storage/message/message_store, ../../node/storage/message/message_store,
../../node/peer_manager/peer_manager, ../../node/peer_manager/peer_manager,
../../utils/requests, ../../utils/requests,
../../utils/time,
../waku_swap/waku_swap, ../waku_swap/waku_swap,
./waku_store_types ./waku_store_types
@ -54,7 +56,7 @@ const
# TODO Move serialization function to separate file, too noisy # TODO Move serialization function to separate file, too noisy
# TODO Move pagination to separate file, self-contained logic # TODO Move pagination to separate file, self-contained logic
proc computeIndex*(msg: WakuMessage, receivedTime = getTime().toUnixFloat()): Index = proc computeIndex*(msg: WakuMessage, receivedTime = getNanosecondTime(getTime().toUnixFloat())): Index =
## Takes a WakuMessage with received timestamp and returns its Index. ## Takes a WakuMessage with received timestamp and returns its Index.
## Received timestamp will default to system time if not provided. ## Received timestamp will default to system time if not provided.
var ctx: sha256 var ctx: sha256
@ -64,7 +66,7 @@ proc computeIndex*(msg: WakuMessage, receivedTime = getTime().toUnixFloat()): In
let digest = ctx.finish() # computes the hash let digest = ctx.finish() # computes the hash
ctx.clear() ctx.clear()
let receiverTime = receivedTime.round(3) # Ensure timestamp has (only) millisecond resolution let receiverTime = receivedTime
var index = Index(digest:digest, receiverTime: receiverTime, senderTime: msg.timestamp) var index = Index(digest:digest, receiverTime: receiverTime, senderTime: msg.timestamp)
return index return index
@ -77,8 +79,8 @@ proc encode*(index: Index): ProtoBuffer =
# encodes index # encodes index
output.write(1, index.digest.data) output.write(1, index.digest.data)
output.write(2, index.receiverTime) output.write(2, zint64(index.receiverTime))
output.write(3, index.senderTime) output.write(3, zint64(index.senderTime))
return output return output
@ -110,14 +112,14 @@ proc init*(T: type Index, buffer: seq[byte]): ProtoResult[T] =
index.digest.data[count] = b index.digest.data[count] = b
# read the timestamp # read the timestamp
var receiverTime: float64 var receiverTime: zint64
discard ? pb.getField(2, receiverTime) discard ? pb.getField(2, receiverTime)
index.receiverTime = receiverTime index.receiverTime = Timestamp(receiverTime)
# read the timestamp # read the timestamp
var senderTime: float64 var senderTime: zint64
discard ? pb.getField(3, senderTime) discard ? pb.getField(3, senderTime)
index.senderTime = senderTime index.senderTime = Timestamp(senderTime)
return ok(index) return ok(index)
@ -167,8 +169,13 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
msg.pagingInfo = ? PagingInfo.init(pagingInfoBuffer) msg.pagingInfo = ? PagingInfo.init(pagingInfoBuffer)
discard ? pb.getField(5, msg.startTime) var startTime: zint64
discard ? pb.getField(6, msg.endTime) discard ? pb.getField(5, startTime)
msg.startTime = Timestamp(startTime)
var endTime: zint64
discard ? pb.getField(6, endTime)
msg.endTime = Timestamp(endTime)
return ok(msg) return ok(msg)
@ -226,8 +233,8 @@ proc encode*(query: HistoryQuery): ProtoBuffer =
output.write(4, query.pagingInfo.encode()) output.write(4, query.pagingInfo.encode())
output.write(5, query.startTime) output.write(5, zint64(query.startTime))
output.write(6, query.endTime) output.write(6, zint64(query.endTime))
return output return output
@ -262,10 +269,10 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
else: none(seq[ContentTopic]) else: none(seq[ContentTopic])
qPubSubTopic = if (query.pubsubTopic != ""): some(query.pubsubTopic) qPubSubTopic = if (query.pubsubTopic != ""): some(query.pubsubTopic)
else: none(string) else: none(string)
qStartTime = if query.startTime != float64(0): some(query.startTime) qStartTime = if query.startTime != Timestamp(0): some(query.startTime)
else: none(float64) else: none(Timestamp)
qEndTime = if query.endTime != float64(0): some(query.endTime) qEndTime = if query.endTime != Timestamp(0): some(query.endTime)
else: none(float64) else: none(Timestamp)
## Compose filter predicate for message from query criteria ## Compose filter predicate for message from query criteria
proc matchesQuery(indMsg: IndexedWakuMessage): bool = proc matchesQuery(indMsg: IndexedWakuMessage): bool =
@ -335,7 +342,7 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
if ws.store.isNil: if ws.store.isNil:
return return
proc onData(receiverTime: float64, msg: WakuMessage, pubsubTopic: string) = proc onData(receiverTime: Timestamp, msg: WakuMessage, pubsubTopic: string) =
# TODO index should not be recalculated # TODO index should not be recalculated
discard ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime), pubsubTopic: pubsubTopic)) discard ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime), pubsubTopic: pubsubTopic))
@ -524,16 +531,17 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
## The history gets fetched successfully if the dialed peer has been online during the queried time window. ## The history gets fetched successfully if the dialed peer has been online during the queried time window.
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
var currentTime = epochTime()
var currentTime = getNanosecondTime(epochTime())
debug "resume", currentEpochTime=currentTime debug "resume", currentEpochTime=currentTime
let lastSeenItem = ws.messages.last() let lastSeenItem = ws.messages.last()
var lastSeenTime = if lastSeenItem.isOk(): lastSeenItem.get().msg.timestamp var lastSeenTime = if lastSeenItem.isOk(): lastSeenItem.get().msg.timestamp
else: float64(0) else: Timestamp(0)
# adjust the time window with an offset of 20 seconds # adjust the time window with an offset of 20 seconds
let offset: float64 = 200000 let offset: Timestamp = getNanosecondTime(20)
currentTime = currentTime + offset currentTime = currentTime + offset
lastSeenTime = max(lastSeenTime - offset, 0) lastSeenTime = max(lastSeenTime - offset, 0)
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime

View File

@ -12,6 +12,7 @@ import
# internal imports # internal imports
../../node/storage/message/message_store, ../../node/storage/message/message_store,
../../utils/pagination, ../../utils/pagination,
../../utils/time,
../../node/peer_manager/peer_manager, ../../node/peer_manager/peer_manager,
../waku_swap/waku_swap_types, ../waku_swap/waku_swap_types,
../waku_message ../waku_message
@ -67,8 +68,8 @@ type
contentFilters*: seq[HistoryContentFilter] contentFilters*: seq[HistoryContentFilter]
pubsubTopic*: string pubsubTopic*: string
pagingInfo*: PagingInfo # used for pagination pagingInfo*: PagingInfo # used for pagination
startTime*: float64 # used for time-window query startTime*: Timestamp # used for time-window query
endTime*: float64 # used for time-window query endTime*: Timestamp # used for time-window query
HistoryResponseError* {.pure.} = enum HistoryResponseError* {.pure.} = enum
## HistoryResponseError contains error message to inform the querying node about the state of its request ## HistoryResponseError contains error message to inform the querying node about the state of its request

View File

@ -5,6 +5,7 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
./time,
nimcrypto/hash, nimcrypto/hash,
stew/byteutils stew/byteutils
@ -14,8 +15,8 @@ type
Index* = object Index* = object
## This type contains the description of an Index used in the pagination of WakuMessages ## This type contains the description of an Index used in the pagination of WakuMessages
digest*: MDigest[256] digest*: MDigest[256]
receiverTime*: float64 receiverTime*: Timestamp
senderTime*: float64 # the time at which the message is generated senderTime*: Timestamp # the time at which the message is generated
proc `==`*(x, y: Index): bool = proc `==`*(x, y: Index): bool =
## receiverTime plays no role in index comparison ## receiverTime plays no role in index comparison

24
waku/v2/utils/time.nim Normal file
View File

@ -0,0 +1,24 @@
## Contains types and utilities for timestamps.
{.push raises: [Defect].}
import sqlite3_abi
type Timestamp* = int64
const TIMESTAMP_TABLE_TYPE* = "INTEGER"
proc getNanosecondTime*[T](timeInSeconds: T): Timestamp =
var ns = Timestamp(timeInSeconds*100000000)
return ns
proc getMicrosecondTime*[T](timeInSeconds: T): Timestamp =
var us = Timestamp(timeInSeconds*1000000)
return us
proc getMillisecondTime*[T](timeInSeconds: T): Timestamp =
var ms = Timestamp(timeInSeconds*1000)
return ms
proc column_timestamp*(a1: ptr sqlite3_stmt, iCol: cint): int64 =
return sqlite3_column_int64(a1, iCol)