From a3a0a09cca677d31c258ba3cf9968288e1e3593a Mon Sep 17 00:00:00 2001 From: s1fr0 <28568419+s1fr0@users.noreply.github.com> Date: Thu, 3 Feb 2022 18:11:45 +0100 Subject: [PATCH] Refactor timestamps type from float64 to int64 (milliseconds resolution) --- tests/v2/test_jsonrpc_waku.nim | 8 +- tests/v2/test_message_store.nim | 20 ++--- tests/v2/test_waku_pagination.nim | 6 +- tests/v2/test_waku_store.nim | 74 +++++++++---------- tests/v2/test_wakunode.nim | 2 +- waku/common/wakubridge.nim | 2 +- waku/v2/node/jsonrpc/jsonrpc_callsigs.nim | 2 +- waku/v2/node/jsonrpc/jsonrpc_types.nim | 2 +- waku/v2/node/jsonrpc/jsonrpc_utils.nim | 8 +- waku/v2/node/jsonrpc/store_api.nim | 6 +- waku/v2/node/quicksim2.nim | 10 +-- .../v2/node/storage/message/message_store.nim | 2 +- .../storage/message/waku_message_store.nim | 10 +-- .../message/00002_addSenderTimeStamp.up.sql | 4 +- waku/v2/protocol/waku_message.nim | 10 ++- .../waku_rln_relay/waku_rln_relay_utils.nim | 15 ++-- waku/v2/protocol/waku_store/waku_store.nim | 50 +++++++------ .../protocol/waku_store/waku_store_types.nim | 4 +- waku/v2/utils/pagination.nim | 4 +- 19 files changed, 125 insertions(+), 114 deletions(-) diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 973d4e313..a3cb320a7 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -102,7 +102,7 @@ procSuite "Waku v2 JSON-RPC API": response == true # Publish a message on the default topic - response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(epochTime()))) + response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(int64(epochTime())))) check: # @TODO poll topic to verify message has been published @@ -260,7 +260,7 @@ procSuite "Waku v2 JSON-RPC API": let client = newRpcHttpClient() await client.connect("127.0.0.1", rpcPort, false) - let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(0.float64), some(9.float64), some(StorePagingOptions())) + let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(0.int64), some(9.int64), some(StorePagingOptions())) check: response.messages.len() == 8 response.pagingOptions.isSome() @@ -573,7 +573,7 @@ procSuite "Waku v2 JSON-RPC API": pubSubTopic = "polling" contentTopic = defaultContentTopic payload = @[byte 9] - message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime())) + message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(int64(epochTime()))) topicCache = newTable[string, seq[WakuMessage]]() await node1.start() @@ -664,7 +664,7 @@ procSuite "Waku v2 JSON-RPC API": pubSubTopic = "polling" contentTopic = defaultContentTopic payload = @[byte 9] - message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime())) + message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(int64(epochTime()))) topicCache = newTable[string, seq[WakuMessage]]() await node1.start() diff --git a/tests/v2/test_message_store.nim b/tests/v2/test_message_store.nim index 0f898d4c0..8c500f9a4 100644 --- a/tests/v2/test_message_store.nim +++ b/tests/v2/test_message_store.nim @@ -16,9 +16,9 @@ suite "Message Store": topic = ContentTopic("/waku/2/default-content/proto") pubsubTopic = "/waku/2/default-waku/proto" - t1 = epochTime() - t2 = epochTime() - t3 = high(float64) + t1 = int64(epochTime()) + t2 = int64(epochTime()) + t3 = int64(high(float64)) var msgs = @[ WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1), WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2), @@ -45,7 +45,7 @@ suite "Message Store": var msgFlag, psTopicFlag = true var responseCount = 0 - proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} = + proc data(receiverTimestamp: int64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} = responseCount += 1 # Note: cannot use `check` within `{.raises: [Defect].}` block: @@ -136,7 +136,7 @@ suite "Message Store": for i in 1..capacity: let - msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: i.float) + msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: i.int64) index = computeIndex(msg) output = store.put(index, msg, pubsubTopic) @@ -145,9 +145,9 @@ suite "Message Store": var responseCount = 0 - lastMessageTimestamp = 0.float + lastMessageTimestamp = 0.int64 - proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} = + proc data(receiverTimestamp: int64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} = responseCount += 1 lastMessageTimestamp = msg.timestamp @@ -157,7 +157,7 @@ suite "Message Store": check: resMax.isOk responseCount == capacity # We retrieved all items - lastMessageTimestamp == capacity.float # Returned rows were ordered correctly + lastMessageTimestamp == capacity.int64 # Returned rows were ordered correctly # Now test getAll with a limit smaller than total stored items responseCount = 0 # Reset response count @@ -167,7 +167,7 @@ suite "Message Store": check: resLimit.isOk responseCount == capacity - 2 # We retrieved limited number of items - lastMessageTimestamp == capacity.float # We retrieved the youngest items in the store, in order + lastMessageTimestamp == capacity.int64 # We retrieved the youngest items in the store, in order # Test zero limit responseCount = 0 # Reset response count @@ -177,4 +177,4 @@ suite "Message Store": check: resZero.isOk responseCount == 0 # No items retrieved - lastMessageTimestamp == 0.float # No items retrieved + lastMessageTimestamp == 0.int64 # No items retrieved diff --git a/tests/v2/test_waku_pagination.nim b/tests/v2/test_waku_pagination.nim index 55c52c3df..a5e15f457 100644 --- a/tests/v2/test_waku_pagination.nim +++ b/tests/v2/test_waku_pagination.nim @@ -12,7 +12,7 @@ proc createSampleList(s: int): seq[IndexedWakuMessage] = var data {.noinit.}: array[32, byte] for x in data.mitems: x = 1 for i in 0.. end time - let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(5), endTime: float(2)) + let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: int64(5), endTime: int64(2)) await proto.query(rpc, handler) check: @@ -689,18 +689,18 @@ procSuite "Waku Store": test "find last seen message": var msgList = @[IndexedWakuMessage(msg: WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"))), - IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1))), - IndexedWakuMessage(msg: WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2))), - IndexedWakuMessage(msg: WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3))), - IndexedWakuMessage(msg: WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4))), - IndexedWakuMessage(msg: WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(9))), - IndexedWakuMessage(msg: WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: float(6))), - IndexedWakuMessage(msg: WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7))), - IndexedWakuMessage(msg: WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8))), - IndexedWakuMessage(msg: WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(5)))] + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: int64(1))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: int64(2))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: int64(3))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: int64(4))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: int64(9))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: int64(6))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: int64(7))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: int64(8))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: int64(5)))] check: - findLastSeen(msgList) == float(9) + findLastSeen(msgList) == int64(9) asyncTest "resume message history": # starts a new node @@ -727,7 +727,7 @@ procSuite "Waku Store": response.messages.len() == 4 completionFut.complete(true) - let rpc = HistoryQuery(startTime: float(2), endTime: float(5)) + let rpc = HistoryQuery(startTime: int64(2), endTime: int64(5)) let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo.toRemotePeerInfo()) check: @@ -737,7 +737,7 @@ procSuite "Waku Store": asyncTest "queryFromWithPaging with empty pagingInfo": - let rpc = HistoryQuery(startTime: float(2), endTime: float(5)) + let rpc = HistoryQuery(startTime: int64(2), endTime: int64(5)) let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo()) @@ -747,7 +747,7 @@ procSuite "Waku Store": asyncTest "queryFromWithPaging with pagination": var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1) - let rpc = HistoryQuery(startTime: float(2), endTime: float(5), pagingInfo: pinfo) + let rpc = HistoryQuery(startTime: int64(2), endTime: int64(5), pagingInfo: pinfo) let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo()) diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index f9ed67d06..c83f8811f 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -1099,7 +1099,7 @@ procSuite "WakuNode": # count the total number of retrieved messages from the database var responseCount = 0 - proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) = + proc data(receiverTimestamp: int64, msg: WakuMessage, psTopic: string) = responseCount += 1 # retrieve all the messages in the db let res = store.getAll(data) diff --git a/waku/common/wakubridge.nim b/waku/common/wakubridge.nim index b134ce075..f97a3cb6d 100644 --- a/waku/common/wakubridge.nim +++ b/waku/common/wakubridge.nim @@ -91,7 +91,7 @@ func toWakuMessage(env: Envelope): WakuMessage = # Translate a Waku v1 envelope to a Waku v2 message WakuMessage(payload: env.data, contentTopic: toV2ContentTopic(env.topic), - timestamp: float64(env.expiry - env.ttl), + timestamp: int64(env.expiry - env.ttl), version: 1) proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} = diff --git a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim index 240fcca3c..330317e07 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim @@ -16,7 +16,7 @@ proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool # Store API -proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[float64], endTime: Option[float64], pagingOptions: Option[StorePagingOptions]): StoreResponse +proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[int64], endTime: Option[int64], pagingOptions: Option[StorePagingOptions]): StoreResponse # Filter API diff --git a/waku/v2/node/jsonrpc/jsonrpc_types.nim b/waku/v2/node/jsonrpc/jsonrpc_types.nim index fa4219744..611eb2e20 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_types.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_types.nim @@ -21,7 +21,7 @@ type payload*: seq[byte] contentTopic*: Option[ContentTopic] # sender generated timestamp - timestamp*: Option[float64] + timestamp*: Option[int64] WakuPeer* = object multiaddr*: string diff --git a/waku/v2/node/jsonrpc/jsonrpc_utils.nim b/waku/v2/node/jsonrpc/jsonrpc_utils.nim index d4ddaa237..1243f0c67 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_utils.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_utils.nim @@ -41,12 +41,12 @@ proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse = proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage = const defaultCT = ContentTopic("/waku/2/default-content/proto") - var t: float64 + var t: int64 if relayMessage.timestamp.isSome: t = relayMessage.timestamp.get else: # incoming WakuRelayMessages with no timestamp will get 0 timestamp - t = float64(0) + t = int64(0) WakuMessage(payload: relayMessage.payload, contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT, version: version, @@ -60,12 +60,12 @@ proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref Br dst: pubKey, symkey: symkey) - var t: float64 + var t: int64 if relayMessage.timestamp.isSome: t = relayMessage.timestamp.get else: # incoming WakuRelayMessages with no timestamp will get 0 timestamp - t = float64(0) + t = int64(0) WakuMessage(payload: payload.encode(version, rng[]).get(), contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT, diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim index 0914ce8a6..474de6484 100644 --- a/waku/v2/node/jsonrpc/store_api.nim +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -17,7 +17,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = ## Store API version 1 definitions - rpcsrv.rpc("get_waku_v2_store_v1_messages") do(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[float64], endTime: Option[float64], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: + rpcsrv.rpc("get_waku_v2_store_v1_messages") do(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[int64], endTime: Option[int64], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: ## Returns history for a list of content topics with optional paging debug "get_waku_v2_store_v1_messages" @@ -29,8 +29,8 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = let historyQuery = HistoryQuery(pubsubTopic: if pubsubTopicOption.isSome: pubsubTopicOption.get() else: "", contentFilters: if contentFiltersOption.isSome: contentFiltersOption.get() else: @[], - startTime: if startTime.isSome: startTime.get() else: 0.float64, - endTime: if endTime.isSome: endTime.get() else: 0.float64, + startTime: if startTime.isSome: startTime.get() else: 0.int64, + endTime: if endTime.isSome: endTime.get() else: 0.int64, pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo()) await node.query(historyQuery, queryFuncHandler) diff --git a/waku/v2/node/quicksim2.nim b/waku/v2/node/quicksim2.nim index 327388947..de1614e6d 100644 --- a/waku/v2/node/quicksim2.nim +++ b/waku/v2/node/quicksim2.nim @@ -63,11 +63,11 @@ os.sleep(2000) for i in 0..lastSeenTime: lastSeenTime = iwmsg.msg.timestamp @@ -663,12 +669,12 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem ## The history gets fetched successfully if the dialed peer has been online during the queried time window. ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string - var currentTime = epochTime() - var lastSeenTime: float = findLastSeen(ws.messages.allItems()) + var currentTime = int64(epochTime()*1000) + var lastSeenTime: int64 = findLastSeen(ws.messages.allItems()) debug "resume", currentEpochTime=currentTime # adjust the time window with an offset of 20 seconds - let offset: float64 = 200000 + let offset: int64 = 200000 currentTime = currentTime + offset lastSeenTime = max(lastSeenTime - offset, 0) debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index 8184d1a1e..7174933da 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -65,8 +65,8 @@ type contentFilters*: seq[HistoryContentFilter] pubsubTopic*: string pagingInfo*: PagingInfo # used for pagination - startTime*: float64 # used for time-window query - endTime*: float64 # used for time-window query + startTime*: int64 # used for time-window query + endTime*: int64 # used for time-window query HistoryResponseError* {.pure.} = enum ## HistoryResponseError contains error message to inform the querying node about the state of its request diff --git a/waku/v2/utils/pagination.nim b/waku/v2/utils/pagination.nim index d2c617e7a..99087e9a2 100644 --- a/waku/v2/utils/pagination.nim +++ b/waku/v2/utils/pagination.nim @@ -10,5 +10,5 @@ type Index* = object ## This type contains the description of an Index used in the pagination of WakuMessages digest*: MDigest[256] - receiverTime*: float64 - senderTime*: float64 # the time at which the message is generated + receiverTime*: int64 + senderTime*: int64 # the time at which the message is generated