mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
chore: Timestamp now in publish (#3373)
* Ensure timestamp is always set in WakuMessage when publishing
This commit is contained in:
parent
2786ef6079
commit
5ae526ce4f
@ -1,6 +1,6 @@
|
||||
{.used.}
|
||||
|
||||
import std/options, chronos, libp2p/crypto/crypto
|
||||
import std/options, chronos, chronicles, libp2p/crypto/crypto
|
||||
|
||||
import
|
||||
waku/node/peer_manager,
|
||||
|
||||
@ -1,6 +1,11 @@
|
||||
{.used.}
|
||||
|
||||
import std/[options, strscans], testutils/unittests, chronos, libp2p/crypto/crypto
|
||||
import
|
||||
std/[options, strscans],
|
||||
testutils/unittests,
|
||||
chronos,
|
||||
chronicles,
|
||||
libp2p/crypto/crypto
|
||||
|
||||
import
|
||||
waku/[
|
||||
@ -307,6 +312,39 @@ suite "Waku Lightpush Client":
|
||||
# Cleanup
|
||||
await serverSwitch2.stop()
|
||||
|
||||
asyncTest "Check timestamp is not zero":
|
||||
## This test validates that, even the generated message has a timestamp of 0,
|
||||
## the node will eventually set a timestamp when publishing the message.
|
||||
let
|
||||
zeroTimestamp = 0
|
||||
meta = "TEST-META"
|
||||
message = fakeWakuMessage(
|
||||
payloads.ALPHABETIC, content_topics.CURRENT, meta, zeroTimestamp
|
||||
)
|
||||
|
||||
# When publishing a valid payload
|
||||
let publishResponse =
|
||||
await client.publish(some(pubsubTopic), message, serverRemotePeerInfo)
|
||||
|
||||
# Then the message is received by the server
|
||||
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
assertResultOk publishResponse
|
||||
check handlerFuture.finished()
|
||||
|
||||
# And the message is received with the correct topic and payload
|
||||
let (readPubsubTopic, readMessage) = handlerFuture.read()
|
||||
|
||||
check:
|
||||
pubsubTopic == readPubsubTopic
|
||||
message.payload == readMessage.payload
|
||||
message.contentTopic == readMessage.contentTopic
|
||||
message.meta == readMessage.meta
|
||||
message.timestamp != readMessage.timestamp
|
||||
message.ephemeral == readMessage.ephemeral
|
||||
message.proof == readMessage.proof
|
||||
message.version == readMessage.version
|
||||
readMessage.timestamp > 0
|
||||
|
||||
suite "Verification of PushResponse Payload":
|
||||
asyncTest "Positive Responses":
|
||||
# When sending a valid PushRequest
|
||||
|
||||
@ -90,6 +90,7 @@ suite "WakuNode - Relay":
|
||||
topic == $shard
|
||||
msg.contentTopic == contentTopic
|
||||
msg.payload == payload
|
||||
msg.timestamp > 0
|
||||
completionFut.complete(true)
|
||||
|
||||
node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
|
||||
@ -279,6 +280,7 @@ suite "WakuNode - Relay":
|
||||
topic == $shard
|
||||
msg.contentTopic == contentTopic
|
||||
msg.payload == payload
|
||||
msg.timestamp > 0
|
||||
completionFut.complete(true)
|
||||
|
||||
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
|
||||
@ -327,6 +329,7 @@ suite "WakuNode - Relay":
|
||||
topic == $shard
|
||||
msg.contentTopic == contentTopic
|
||||
msg.payload == payload
|
||||
msg.timestamp > 0
|
||||
completionFut.complete(true)
|
||||
|
||||
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
|
||||
@ -379,6 +382,7 @@ suite "WakuNode - Relay":
|
||||
topic == $shard
|
||||
msg.contentTopic == contentTopic
|
||||
msg.payload == payload
|
||||
msg.timestamp > 0
|
||||
completionFut.complete(true)
|
||||
|
||||
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
|
||||
@ -429,6 +433,7 @@ suite "WakuNode - Relay":
|
||||
topic == $shard
|
||||
msg.contentTopic == contentTopic
|
||||
msg.payload == payload
|
||||
msg.timestamp > 0
|
||||
completionFut.complete(true)
|
||||
|
||||
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
|
||||
@ -487,6 +492,7 @@ suite "WakuNode - Relay":
|
||||
topic == $shard
|
||||
msg.contentTopic == contentTopic
|
||||
msg.payload == payload
|
||||
msg.timestamp > 0
|
||||
completionFut.complete(true)
|
||||
|
||||
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
|
||||
|
||||
@ -451,9 +451,9 @@ procSuite "WakuNode - RLN relay":
|
||||
completionFut1.complete(true)
|
||||
if msg == wm2:
|
||||
completionFut2.complete(true)
|
||||
if msg == wm3:
|
||||
if msg.payload == wm3.payload:
|
||||
completionFut3.complete(true)
|
||||
if msg == wm4:
|
||||
if msg.payload == wm4.payload:
|
||||
completionFut4.complete(true)
|
||||
|
||||
# mount the relay handler for node3
|
||||
|
||||
@ -65,9 +65,13 @@ proc sendPushRequest(
|
||||
proc publish*(
|
||||
wl: WakuLightPushClient,
|
||||
pubSubTopic: Option[PubsubTopic] = none(PubsubTopic),
|
||||
message: WakuMessage,
|
||||
wakuMessage: WakuMessage,
|
||||
peer: PeerId | RemotePeerInfo,
|
||||
): Future[WakuLightPushResult] {.async, gcsafe.} =
|
||||
var message = wakuMessage
|
||||
if message.timestamp == 0:
|
||||
message.timestamp = getNowInNanosecondTime()
|
||||
|
||||
when peer is PeerId:
|
||||
info "publish",
|
||||
peerId = shortLog(peer),
|
||||
@ -88,11 +92,15 @@ proc publish*(
|
||||
return lightpushSuccessResult(publishedCount)
|
||||
|
||||
proc publishToAny*(
|
||||
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage
|
||||
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, wakuMessage: WakuMessage
|
||||
): Future[WakuLightPushResult] {.async, gcsafe.} =
|
||||
## This proc is similar to the publish one but in this case
|
||||
## we don't specify a particular peer and instead we get it from peer manager
|
||||
|
||||
var message = wakuMessage
|
||||
if message.timestamp == 0:
|
||||
message.timestamp = getNowInNanosecondTime()
|
||||
|
||||
info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
|
||||
|
||||
let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr:
|
||||
|
||||
@ -72,10 +72,15 @@ proc sendPushRequest(
|
||||
proc publish*(
|
||||
wl: WakuLegacyLightPushClient,
|
||||
pubSubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
wakuMessage: WakuMessage,
|
||||
peer: RemotePeerInfo,
|
||||
): Future[WakuLightPushResult[string]] {.async, gcsafe.} =
|
||||
## On success, returns the msg_hash of the published message
|
||||
|
||||
var message = wakuMessage
|
||||
if message.timestamp == 0:
|
||||
message.timestamp = getNowInNanosecondTime()
|
||||
|
||||
let msg_hash_hex_str = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
|
||||
?await wl.sendPushRequest(pushRequest, peer)
|
||||
|
||||
@ -533,11 +533,15 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler)
|
||||
procCall GossipSub(w).unsubscribe(pubsubTopic, handler)
|
||||
|
||||
proc publish*(
|
||||
w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
w: WakuRelay, pubsubTopic: PubsubTopic, wakuMessage: WakuMessage
|
||||
): Future[Result[int, PublishOutcome]] {.async.} =
|
||||
if pubsubTopic.isEmptyOrWhitespace():
|
||||
return err(NoTopicSpecified)
|
||||
|
||||
var message = wakuMessage
|
||||
if message.timestamp == 0:
|
||||
message.timestamp = getNowInNanosecondTime()
|
||||
|
||||
let data = message.encode().buffer
|
||||
|
||||
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user