Adds timestamp to waku relay message (#681)

* adds timestamp to WakuRelayMessage

* converts float to float64

* generates timestamp using epochTime()

* tides up imports and generates timestamps using epochTime()

* updates changelog

* adds timestamp to the post_waku_v2_relay_v1_message query parameters
This commit is contained in:
Sanaz Taheri Boshrooyeh 2021-07-21 11:05:10 -07:00 committed by GitHub
parent b87984af46
commit 71a532a414
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 46 additions and 20 deletions

View File

@ -13,7 +13,7 @@ This release contains the following:
- Support for multiple protocol IDs - now matches any protocol that adds postfix to stable ID.
- Updates the order of fields of `HistoryResponse` protobuf message.
The filed numbers of the `HistoryResponse` are shifted up by one to match up the [13/WAKU2-STORE](https://rfc.vac.dev/spec/13/) specs.
- Adds optional `timestamp` to `WakuRelayMessage`.
#### General refactoring
#### Docs
- Adds the database migration tutorial.

View File

@ -109,7 +109,8 @@ To publish a message to a pubsub topic, call the [`post_waku_v2_relay_v1_message
"params": [
"my_topic_1",
{
"payload": "0x1a2b3c4d5e6f"
"payload": "0x1a2b3c4d5e6f",
"timestamp": 1626813243.916377
}
]
}

View File

@ -1,7 +1,7 @@
{.used.}
import
std/[options, sets, tables, os, strutils, sequtils],
std/[options, sets, tables, os, strutils, sequtils, times],
testutils/unittests, stew/shims/net as stewNet,
json_rpc/[rpcserver, rpcclient],
eth/[keys, rlp], eth/common/eth_types,
@ -99,7 +99,7 @@ procSuite "Waku v2 JSON-RPC API":
response == true
# Publish a message on the default topic
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic)))
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
check:
# @TODO poll topic to verify message has been published
@ -560,7 +560,7 @@ procSuite "Waku v2 JSON-RPC API":
pubSubTopic = "polling"
contentTopic = defaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic))
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime()))
topicCache = newTable[string, seq[WakuMessage]]()
await node1.start()
@ -650,7 +650,7 @@ procSuite "Waku v2 JSON-RPC API":
pubSubTopic = "polling"
contentTopic = defaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic))
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime()))
topicCache = newTable[string, seq[WakuMessage]]()
await node1.start()

View File

@ -18,6 +18,8 @@ type
WakuRelayMessage* = object
payload*: seq[byte]
contentTopic*: Option[ContentTopic]
# sender generated timestamp
timestamp*: Option[float64]
WakuPeer* = object
multiaddr*: string

View File

@ -39,9 +39,16 @@ proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse =
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage =
const defaultCT = ContentTopic("/waku/2/default-content/proto")
var t: float64
if relayMessage.timestamp.isSome:
t = relayMessage.timestamp.get
else:
# incoming WakuRelayMessages with no timestamp will get 0 timestamp
t = float64(0)
WakuMessage(payload: relayMessage.payload,
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,
version: version)
version: version,
timestamp: t)
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref BrHmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage =
# @TODO global definition for default content topic
@ -51,9 +58,17 @@ proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref Br
dst: pubKey,
symkey: symkey)
var t: float64
if relayMessage.timestamp.isSome:
t = relayMessage.timestamp.get
else:
# incoming WakuRelayMessages with no timestamp will get 0 timestamp
t = float64(0)
WakuMessage(payload: payload.encode(version, rng[]).get(),
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,
version: version)
version: version,
timestamp: t)
proc toWakuRelayMessage*(message: WakuMessage, symkey: Option[SymKey], privateKey: Option[keys.PrivateKey]): WakuRelayMessage =
# @TODO global definition for default content topic
@ -65,5 +80,6 @@ proc toWakuRelayMessage*(message: WakuMessage, symkey: Option[SymKey], privateKe
decoded = decodePayload(message, keyInfo)
WakuRelayMessage(payload: decoded.get().payload,
contentTopic: some(message.contentTopic))
contentTopic: some(message.contentTopic),
timestamp: some(message.timestamp))

View File

@ -1,14 +1,21 @@
# Group by std, external then internal imports
import
os, strutils, chronicles, json_rpc/[rpcclient, rpcserver],
# std imports
std/ [os, strutils, times, options], #options as what # TODO: Huh? Redefinition?
# external imports
chronicles,
eth/common as eth_common,
eth/keys,
json_rpc/[rpcclient, rpcserver],
libp2p/protobuf/minprotobuf,
eth/common as eth_common, eth/keys,
# internal imports
../protocol/waku_filter/waku_filter_types,
../protocol/waku_store/waku_store_types,
../protocol/waku_message,
./wakunode2, ./waku_payload,
./jsonrpc/[jsonrpc_types,jsonrpc_utils],
std/options
#options as what # TODO: Huh? Redefinition?
./wakunode2,
./waku_payload,
./jsonrpc/[jsonrpc_types,jsonrpc_utils]
from strutils import rsplit
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
@ -56,11 +63,11 @@ os.sleep(2000)
for i in 0..<topicAmount:
os.sleep(50)
# TODO: This would then publish on a subtopic here
var res3 = waitFor nodes[0].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(0).buffer, contentTopic: some(defaultContentTopic)))
res3 = waitFor nodes[1].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(1).buffer, contentTopic: some(defaultContentTopic)))
res3 = waitFor nodes[2].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(2).buffer, contentTopic: some(defaultContentTopic)))
res3 = waitFor nodes[3].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(3).buffer, contentTopic: some(defaultContentTopic)))
res3 = waitFor nodes[4].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(4).buffer, contentTopic: some(defaultContentTopic)))
var res3 = waitFor nodes[0].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(0).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
res3 = waitFor nodes[1].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(1).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
res3 = waitFor nodes[2].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(2).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
res3 = waitFor nodes[3].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(3).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
res3 = waitFor nodes[4].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(4).buffer, contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
# Scenario xx2 - 14 full nodes, two edge nodes
# Assume one full topic