mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-14 17:04:53 +00:00
deploy: e85b5cbae1875c3c036f7cf12453f88031ffa592
This commit is contained in:
parent
ccd08c16af
commit
68aeda48c3
@ -2,10 +2,10 @@
|
||||
|
||||
import
|
||||
std/tables,
|
||||
stew/shims/net as stewNet,
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronos,
|
||||
chronicles,
|
||||
chronos,
|
||||
chronicles,
|
||||
libp2p/switch,
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/bufferstream,
|
||||
@ -15,11 +15,9 @@ import
|
||||
eth/keys
|
||||
import
|
||||
../../waku/v2/node/waku_node,
|
||||
../../waku/v2/node/message_store/queue_store,
|
||||
../../waku/v2/protocol/waku_store,
|
||||
../../waku/v2/protocol/waku_swap/waku_swap,
|
||||
../../waku/v2/utils/peers,
|
||||
../test_helpers,
|
||||
../test_helpers,
|
||||
./utils,
|
||||
./testlib/common
|
||||
|
||||
@ -50,106 +48,3 @@ procSuite "Waku SWAP Accounting":
|
||||
check:
|
||||
decodedCheque.isErr == false
|
||||
decodedCheque.get() == cheque
|
||||
|
||||
# TODO: To do this reliably we need access to contract node
|
||||
# With current logic state isn't updated because of bad cheque
|
||||
# Consider moving this test to e2e test, and/or move swap module to be on by default
|
||||
asyncTest "Update accounting state after store operations":
|
||||
## Setup
|
||||
let
|
||||
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60102))
|
||||
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60100))
|
||||
|
||||
await allFutures(client.start(), server.start())
|
||||
|
||||
await server.mountSwap()
|
||||
await server.mountStore(store=StoreQueueRef.new())
|
||||
await client.mountSwap()
|
||||
await client.mountStore()
|
||||
client.mountStoreClient()
|
||||
|
||||
client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo())
|
||||
server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo())
|
||||
|
||||
client.setStorePeer(server.peerInfo.toRemotePeerInfo())
|
||||
server.setStorePeer(client.peerInfo.toRemotePeerInfo())
|
||||
|
||||
## Given
|
||||
let message = fakeWakuMessage()
|
||||
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
|
||||
|
||||
let serverPeer = server.peerInfo.toRemotePeerInfo()
|
||||
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
|
||||
|
||||
## When
|
||||
let queryRes = await client.query(req, peer=serverPeer)
|
||||
|
||||
## Then
|
||||
check queryRes.isOk()
|
||||
|
||||
let response = queryRes.get()
|
||||
check:
|
||||
response.messages == @[message]
|
||||
|
||||
check:
|
||||
client.wakuSwap.accounting[server.peerInfo.peerId] == 1
|
||||
server.wakuSwap.accounting[client.peerInfo.peerId] == -1
|
||||
|
||||
## Cleanup
|
||||
await allFutures(client.stop(), server.stop())
|
||||
|
||||
|
||||
# This test will only Be checked if in Mock mode
|
||||
# TODO: Add cheque here
|
||||
asyncTest "Update accounting state after sending cheque":
|
||||
## Setup
|
||||
let
|
||||
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60202))
|
||||
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60200))
|
||||
|
||||
# Define the waku swap Config for this test
|
||||
let swapConfig = SwapConfig(mode: SwapMode.Mock, paymentThreshold: 1, disconnectThreshold: -1)
|
||||
|
||||
# Start nodes and mount protocols
|
||||
await allFutures(client.start(), server.start())
|
||||
await server.mountSwap(swapConfig)
|
||||
await server.mountStore(store=StoreQueueRef.new())
|
||||
await client.mountSwap(swapConfig)
|
||||
await client.mountStore()
|
||||
client.mountStoreClient()
|
||||
|
||||
client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo())
|
||||
server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo())
|
||||
|
||||
client.setStorePeer(server.peerInfo.toRemotePeerInfo())
|
||||
server.setStorePeer(client.peerInfo.toRemotePeerInfo())
|
||||
|
||||
## Given
|
||||
let message = fakeWakuMessage()
|
||||
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
|
||||
|
||||
let serverPeer = server.peerInfo.toRemotePeerInfo()
|
||||
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
|
||||
|
||||
## When
|
||||
# TODO: Handshakes - for now we assume implicit, e2e still works for PoC
|
||||
let res1 = await client.query(req, peer=serverPeer)
|
||||
let res2 = await client.query(req, peer=serverPeer)
|
||||
|
||||
require:
|
||||
res1.isOk()
|
||||
res2.isOk()
|
||||
|
||||
## Then
|
||||
check:
|
||||
# Accounting table updated with credit and debit, respectively
|
||||
# After sending a cheque the balance is partially adjusted
|
||||
client.wakuSwap.accounting[server.peerInfo.peerId] == 1
|
||||
server.wakuSwap.accounting[client.peerInfo.peerId] == -1
|
||||
|
||||
## Cleanup
|
||||
await allFutures(client.stop(), server.stop())
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az41-370:
|
||||
# Libtool was configured on host fv-az508-423:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -32,7 +32,7 @@ import
|
||||
../protocol/waku_lightpush,
|
||||
../protocol/waku_lightpush/client as lightpush_client,
|
||||
../protocol/waku_peer_exchange,
|
||||
../utils/peers,
|
||||
../utils/peers,
|
||||
../utils/wakuenr,
|
||||
./peer_manager/peer_manager,
|
||||
./message_store/message_retention_policy,
|
||||
@ -128,11 +128,11 @@ template wsFlag(wssEnabled: bool): MultiAddress =
|
||||
if wssEnabled: MultiAddress.init("/wss").tryGet()
|
||||
else: MultiAddress.init("/ws").tryGet()
|
||||
|
||||
proc new*(T: type WakuNode,
|
||||
proc new*(T: type WakuNode,
|
||||
nodeKey: crypto.PrivateKey,
|
||||
bindIp: ValidIpAddress,
|
||||
bindIp: ValidIpAddress,
|
||||
bindPort: Port,
|
||||
extIp = none(ValidIpAddress),
|
||||
extIp = none(ValidIpAddress),
|
||||
extPort = none(Port),
|
||||
peerStorage: PeerStorage = nil,
|
||||
maxConnections = builders.MaxConnections,
|
||||
@ -160,7 +160,7 @@ proc new*(T: type WakuNode,
|
||||
# Setup external addresses, if available
|
||||
var
|
||||
hostExtAddress, wsExtAddress = none(MultiAddress)
|
||||
|
||||
|
||||
if (dns4DomainName.isSome()):
|
||||
# Use dns4 for externally announced addresses
|
||||
hostExtAddress = some(dns4TcpEndPoint(dns4DomainName.get(), extPort.get()))
|
||||
@ -180,12 +180,12 @@ proc new*(T: type WakuNode,
|
||||
announcedAddresses.add(hostExtAddress.get())
|
||||
else:
|
||||
announcedAddresses.add(hostAddress) # We always have at least a bind address for the host
|
||||
|
||||
|
||||
if wsExtAddress.isSome():
|
||||
announcedAddresses.add(wsExtAddress.get())
|
||||
elif wsHostAddress.isSome():
|
||||
announcedAddresses.add(wsHostAddress.get())
|
||||
|
||||
|
||||
## Initialize peer
|
||||
let
|
||||
rng = crypto.newRng()
|
||||
@ -202,7 +202,7 @@ proc new*(T: type WakuNode,
|
||||
discv5UdpPort,
|
||||
wakuFlags,
|
||||
enrMultiaddrs)
|
||||
|
||||
|
||||
info "Initializing networking", addrs=announcedAddresses
|
||||
|
||||
let switch = newWakuSwitch(
|
||||
@ -210,7 +210,7 @@ proc new*(T: type WakuNode,
|
||||
hostAddress,
|
||||
wsHostAddress,
|
||||
transportFlags = {ServerFlags.ReuseAddr},
|
||||
rng = rng,
|
||||
rng = rng,
|
||||
maxConnections = maxConnections,
|
||||
wssEnabled = wssEnabled,
|
||||
secureKeyPath = secureKey,
|
||||
@ -219,7 +219,7 @@ proc new*(T: type WakuNode,
|
||||
sendSignedPeerRecord = sendSignedPeerRecord,
|
||||
agentString = agentString
|
||||
)
|
||||
|
||||
|
||||
let wakuNode = WakuNode(
|
||||
peerManager: PeerManager.new(switch, peerStorage),
|
||||
switch: switch,
|
||||
@ -231,7 +231,7 @@ proc new*(T: type WakuNode,
|
||||
return wakuNode
|
||||
|
||||
|
||||
proc peerInfo*(node: WakuNode): PeerInfo =
|
||||
proc peerInfo*(node: WakuNode): PeerInfo =
|
||||
node.switch.peerInfo
|
||||
|
||||
# TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc
|
||||
@ -239,7 +239,7 @@ proc info*(node: WakuNode): WakuInfo =
|
||||
## Returns information about the Node, such as what multiaddress it can be reached at.
|
||||
|
||||
let peerInfo = node.switch.peerInfo
|
||||
|
||||
|
||||
var listenStr : seq[string]
|
||||
for address in node.announcedAddresses:
|
||||
var fulladdr = $address & "/p2p/" & $peerInfo.peerId
|
||||
@ -277,7 +277,7 @@ proc subscribe(node: WakuNode, topic: PubsubTopic, handler: Option[TopicHandler]
|
||||
# Notify mounted protocols of new message
|
||||
if not node.wakuFilter.isNil():
|
||||
await node.wakuFilter.handleMessage(topic, msg.value)
|
||||
|
||||
|
||||
if not node.wakuStore.isNil():
|
||||
node.wakuStore.handleMessage(topic, msg.value)
|
||||
|
||||
@ -308,7 +308,7 @@ proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) =
|
||||
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
|
||||
# TODO: improved error handling
|
||||
return
|
||||
|
||||
|
||||
info "unsubscribe", topic=topic
|
||||
|
||||
let wakuRelay = node.wakuRelay
|
||||
@ -316,22 +316,22 @@ proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) =
|
||||
|
||||
proc unsubscribeAll*(node: WakuNode, topic: PubsubTopic) =
|
||||
## Unsubscribes all handlers registered on a specific PubSub topic.
|
||||
|
||||
|
||||
if node.wakuRelay.isNil():
|
||||
error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted."
|
||||
# TODO: improved error handling
|
||||
return
|
||||
|
||||
|
||||
info "unsubscribeAll", topic=topic
|
||||
|
||||
let wakuRelay = node.wakuRelay
|
||||
wakuRelay.unsubscribeAll(topic)
|
||||
|
||||
|
||||
proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
|
||||
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
|
||||
## `contentTopic` field for light node functionality. This field may be also
|
||||
## be omitted.
|
||||
|
||||
|
||||
if node.wakuRelay.isNil():
|
||||
error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
|
||||
# TODO: Improve error handling
|
||||
@ -351,7 +351,7 @@ proc startRelay*(node: WakuNode) {.async.} =
|
||||
return
|
||||
|
||||
## Setup relay protocol
|
||||
|
||||
|
||||
# Subscribe to the default PubSub topics
|
||||
for topic in node.wakuRelay.defaultPubsubTopics:
|
||||
node.subscribe(topic, none(TopicHandler))
|
||||
@ -359,14 +359,14 @@ proc startRelay*(node: WakuNode) {.async.} =
|
||||
# Resume previous relay connections
|
||||
if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)):
|
||||
info "Found previous WakuRelay peers. Reconnecting."
|
||||
|
||||
|
||||
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
|
||||
let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
|
||||
|
||||
await node.peerManager.reconnectPeers(WakuRelayCodec,
|
||||
protocolMatcher(WakuRelayCodec),
|
||||
backoffPeriod)
|
||||
|
||||
|
||||
# Start the WakuRelay protocol
|
||||
await node.wakuRelay.start()
|
||||
|
||||
@ -378,7 +378,7 @@ proc mountRelay*(node: WakuNode,
|
||||
peerExchangeHandler = none(RoutingRecordsHandler)) {.async, gcsafe.} =
|
||||
## The default relay topics is the union of all configured topics plus default PubsubTopic(s)
|
||||
info "mounting relay protocol"
|
||||
|
||||
|
||||
let initRes = WakuRelay.new(
|
||||
node.peerManager,
|
||||
defaultPubsubTopics = concat(@[DefaultPubsubTopic], topics),
|
||||
@ -432,14 +432,14 @@ proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||
|
||||
node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterCodec))
|
||||
|
||||
proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic],
|
||||
proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic],
|
||||
handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} =
|
||||
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
|
||||
if node.wakuFilterClient.isNil():
|
||||
error "cannot register filter subscription to topic", error="waku filter client is nil"
|
||||
return
|
||||
|
||||
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
||||
|
||||
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
||||
else: peer
|
||||
|
||||
info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer
|
||||
@ -466,11 +466,11 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics:
|
||||
error "cannot unregister filter subscription to content", error="waku filter client is nil"
|
||||
return
|
||||
|
||||
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
||||
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
||||
else: peer
|
||||
|
||||
|
||||
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer
|
||||
|
||||
|
||||
let unsubRes = await node.wakuFilterClient.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer)
|
||||
if unsubRes.isOk():
|
||||
info "unsubscribed from topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics
|
||||
@ -501,12 +501,12 @@ proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Content
|
||||
if node.wakuFilterClient.isNil():
|
||||
error "cannot register filter subscription to topic", error="waku filter client is nil"
|
||||
return
|
||||
|
||||
|
||||
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
|
||||
if peerOpt.isNone():
|
||||
error "cannot register filter subscription to topic", error="no suitable remote peers"
|
||||
return
|
||||
|
||||
|
||||
await node.filterSubscribe(pubsubTopic, contentTopics, handler, peer=peerOpt.get())
|
||||
|
||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||
@ -516,7 +516,7 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte
|
||||
if node.wakuFilterClient.isNil():
|
||||
error "cannot unregister filter subscription to content", error="waku filter client is nil"
|
||||
return
|
||||
|
||||
|
||||
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
|
||||
if peerOpt.isNone():
|
||||
error "cannot register filter subscription to topic", error="no suitable remote peers"
|
||||
@ -565,23 +565,19 @@ proc startMessageRetentionPolicyPeriodicTask*(node: WakuNode, interval: Duration
|
||||
|
||||
# https://github.com/nim-lang/Nim/issues/17369
|
||||
var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].}
|
||||
executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} =
|
||||
executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} =
|
||||
executeMessageRetentionPolicy(node)
|
||||
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
|
||||
|
||||
|
||||
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
|
||||
|
||||
proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} =
|
||||
if node.wakuSwap.isNil():
|
||||
info "mounting waku store protocol (no waku swap)"
|
||||
else:
|
||||
info "mounting waku store protocol with waku swap support"
|
||||
info "mounting waku store protocol"
|
||||
|
||||
node.wakuStore = WakuStore.new(
|
||||
node.peerManager,
|
||||
node.rng,
|
||||
store,
|
||||
wakuSwap=node.wakuSwap,
|
||||
node.peerManager,
|
||||
node.rng,
|
||||
store,
|
||||
retentionPolicy=retentionPolicy
|
||||
)
|
||||
|
||||
@ -605,12 +601,8 @@ proc query*(node: WakuNode, query: HistoryQuery, peer: RemotePeerInfo): Future[W
|
||||
let queryRes = await node.wakuStoreClient.query(query, peer)
|
||||
if queryRes.isErr():
|
||||
return err($queryRes.error)
|
||||
|
||||
|
||||
let response = queryRes.get()
|
||||
|
||||
if not node.wakuSwap.isNil():
|
||||
# Perform accounting operation
|
||||
node.wakuSwap.debit(peer.peerId, response.messages.len)
|
||||
|
||||
return ok(response)
|
||||
|
||||
@ -645,13 +637,13 @@ proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[History
|
||||
|
||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||
proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} =
|
||||
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
|
||||
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
|
||||
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
|
||||
## messages are stored in the the wakuStore's messages field and in the message db
|
||||
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
||||
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
||||
## an offset of 20 second is added to the time window to count for nodes asynchrony
|
||||
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
|
||||
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
|
||||
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
|
||||
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
||||
if node.wakuStoreClient.isNil():
|
||||
return
|
||||
@ -660,7 +652,7 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re
|
||||
if retrievedMessages.isErr():
|
||||
error "failed to resume store", error=retrievedMessages.error
|
||||
return
|
||||
|
||||
|
||||
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
|
||||
|
||||
|
||||
@ -672,10 +664,10 @@ proc mountLightPush*(node: WakuNode) {.async.} =
|
||||
var pushHandler: PushMessageHandler
|
||||
if node.wakuRelay.isNil():
|
||||
debug "mounting lightpush without relay (nil)"
|
||||
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||
return err("no waku relay found")
|
||||
else:
|
||||
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||
discard await node.wakuRelay.publish(pubsubTopic, message.encode().buffer)
|
||||
return ok()
|
||||
|
||||
@ -723,7 +715,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe
|
||||
if node.wakuLightpushClient.isNil():
|
||||
error "failed to publish message", error="waku lightpush client is nil"
|
||||
return
|
||||
|
||||
|
||||
let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
|
||||
if peerOpt.isNone():
|
||||
error "failed to publish message", error="no suitable remote peers"
|
||||
@ -777,11 +769,11 @@ proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||
# This is necessary as `Ping.new*` does not have explicit `raises` requirement
|
||||
# @TODO: remove exception handling once explicit `raises` in ping module
|
||||
raise newException(LPError, "Failed to initialize ping protocol")
|
||||
|
||||
|
||||
if node.started:
|
||||
# Node has started already. Let's start ping too.
|
||||
await node.libp2pPing.start()
|
||||
|
||||
|
||||
node.switch.mount(node.libp2pPing)
|
||||
|
||||
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
|
||||
@ -805,7 +797,7 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
|
||||
return
|
||||
|
||||
discard await node.libp2pPing.ping(connOpt.get()) # Ping connection
|
||||
|
||||
|
||||
await sleepAsync(keepalive)
|
||||
|
||||
proc startKeepalive*(node: WakuNode) =
|
||||
@ -832,7 +824,7 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =
|
||||
if discoveredPeers.isOk():
|
||||
## Let's attempt to connect to peers we
|
||||
## have not encountered before
|
||||
|
||||
|
||||
trace "Discovered peers", count=discoveredPeers.get().len()
|
||||
|
||||
let newPeers = discoveredPeers.get().filterIt(
|
||||
@ -850,9 +842,9 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =
|
||||
|
||||
proc startDiscv5*(node: WakuNode): Future[bool] {.async.} =
|
||||
## Start Discovery v5 service
|
||||
|
||||
|
||||
info "Starting discovery v5 service"
|
||||
|
||||
|
||||
if not node.wakuDiscv5.isNil():
|
||||
## First start listening on configured port
|
||||
try:
|
||||
@ -861,12 +853,12 @@ proc startDiscv5*(node: WakuNode): Future[bool] {.async.} =
|
||||
except CatchableError:
|
||||
error "Failed to start discovery service. UDP port may be already in use"
|
||||
return false
|
||||
|
||||
|
||||
## Start Discovery v5
|
||||
trace "Start discv5 service"
|
||||
node.wakuDiscv5.start()
|
||||
trace "Start discovering new peers using discv5"
|
||||
|
||||
|
||||
asyncSpawn node.runDiscv5Loop()
|
||||
|
||||
debug "Successfully started discovery v5 service"
|
||||
@ -877,10 +869,10 @@ proc startDiscv5*(node: WakuNode): Future[bool] {.async.} =
|
||||
|
||||
proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} =
|
||||
## Stop Discovery v5 service
|
||||
|
||||
|
||||
if not node.wakuDiscv5.isNil():
|
||||
info "Stopping discovery v5 service"
|
||||
|
||||
|
||||
## Stop Discovery v5 process and close listening port
|
||||
if node.wakuDiscv5.listening:
|
||||
trace "Stop listening on discv5 port"
|
||||
@ -892,17 +884,17 @@ proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} =
|
||||
proc start*(node: WakuNode) {.async.} =
|
||||
## Starts a created Waku Node and
|
||||
## all its mounted protocols.
|
||||
|
||||
|
||||
waku_version.set(1, labelValues=[git_version])
|
||||
info "Starting Waku node", version=git_version
|
||||
|
||||
|
||||
let peerInfo = node.switch.peerInfo
|
||||
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
|
||||
var listenStr = ""
|
||||
for address in node.announcedAddresses:
|
||||
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
|
||||
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
|
||||
listenStr &= fulladdr
|
||||
|
||||
|
||||
## XXX: this should be /ip4..., / stripped?
|
||||
info "Listening on", full = listenStr
|
||||
info "DNS: discoverable ENR ", enr = node.enr.toUri()
|
||||
@ -910,7 +902,7 @@ proc start*(node: WakuNode) {.async.} =
|
||||
# Perform relay-specific startup tasks TODO: this should be rethought
|
||||
if not node.wakuRelay.isNil():
|
||||
await node.startRelay()
|
||||
|
||||
|
||||
## The switch uses this mapper to update peer info addrs
|
||||
## with announced addrs after start
|
||||
let addressMapper =
|
||||
@ -922,16 +914,16 @@ proc start*(node: WakuNode) {.async.} =
|
||||
await node.switch.start()
|
||||
|
||||
node.started = true
|
||||
|
||||
|
||||
info "Node started successfully"
|
||||
|
||||
proc stop*(node: WakuNode) {.async.} =
|
||||
if not node.wakuRelay.isNil():
|
||||
await node.wakuRelay.stop()
|
||||
|
||||
|
||||
if not node.wakuDiscv5.isNil():
|
||||
discard await node.stopDiscv5()
|
||||
|
||||
await node.switch.stop()
|
||||
|
||||
node.started = false
|
||||
node.started = false
|
||||
|
@ -15,7 +15,6 @@ import
|
||||
../../utils/requests,
|
||||
../../utils/time,
|
||||
../waku_message,
|
||||
../waku_swap/waku_swap,
|
||||
./protocol_metrics,
|
||||
./common,
|
||||
./rpc,
|
||||
@ -27,7 +26,7 @@ logScope:
|
||||
topics = "waku store client"
|
||||
|
||||
|
||||
const
|
||||
const
|
||||
DefaultPageSize*: uint64 = 20 # A recommended default number of waku messages per page
|
||||
|
||||
|
||||
@ -35,12 +34,11 @@ type WakuStoreClient* = ref object
|
||||
peerManager: PeerManager
|
||||
rng: ref rand.HmacDrbgContext
|
||||
store: MessageStore
|
||||
wakuSwap: WakuSwap
|
||||
|
||||
proc new*(T: type WakuStoreClient,
|
||||
peerManager: PeerManager,
|
||||
rng: ref rand.HmacDrbgContext,
|
||||
store: MessageStore): T =
|
||||
store: MessageStore): T =
|
||||
WakuStoreClient(peerManager: peerManager, rng: rng, store: store)
|
||||
|
||||
proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[HistoryResult] {.async, gcsafe.} =
|
||||
@ -49,7 +47,7 @@ proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeer
|
||||
if connOpt.isNone():
|
||||
waku_store_errors.inc(labelValues = [dialFailure])
|
||||
return err(HistoryError(kind: HistoryErrorKind.PEER_DIAL_FAILURE, address: $peer))
|
||||
|
||||
|
||||
let connection = connOpt.get()
|
||||
|
||||
|
||||
@ -82,7 +80,7 @@ proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future
|
||||
return await w.sendHistoryQueryRPC(req, peer)
|
||||
|
||||
proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
|
||||
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
|
||||
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
|
||||
## it retrieves the historical messages in pages.
|
||||
## Returns all the fetched messages, if error occurs, returns an error string
|
||||
|
||||
@ -93,7 +91,7 @@ proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): F
|
||||
|
||||
while true:
|
||||
let queryRes = await w.query(req, peer)
|
||||
if queryRes.isErr():
|
||||
if queryRes.isErr():
|
||||
return err($queryRes.error)
|
||||
|
||||
let response = queryRes.get()
|
||||
@ -114,7 +112,7 @@ proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): F
|
||||
|
||||
const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds
|
||||
|
||||
proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
|
||||
proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
|
||||
## Loops through the peers candidate list in order and sends the query to each
|
||||
##
|
||||
## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list.
|
||||
@ -127,8 +125,8 @@ proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]
|
||||
.map(proc (fut: Future[WakuStoreResult[seq[WakuMessage]]]): seq[WakuMessage] =
|
||||
try:
|
||||
# fut.read() can raise a CatchableError
|
||||
# These futures have been awaited before using allFutures(). Call completed() just as a sanity check.
|
||||
if not fut.completed() or fut.read().isErr():
|
||||
# These futures have been awaited before using allFutures(). Call completed() just as a sanity check.
|
||||
if not fut.completed() or fut.read().isErr():
|
||||
return @[]
|
||||
|
||||
fut.read().value
|
||||
@ -140,30 +138,30 @@ proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]
|
||||
|
||||
return ok(messagesList)
|
||||
|
||||
proc resume*(w: WakuStoreClient,
|
||||
peerList = none(seq[RemotePeerInfo]),
|
||||
proc resume*(w: WakuStoreClient,
|
||||
peerList = none(seq[RemotePeerInfo]),
|
||||
pageSize = DefaultPageSize,
|
||||
pubsubTopic = DefaultPubsubTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe.} =
|
||||
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
|
||||
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
|
||||
## messages are stored in the store node's messages field and in the message db
|
||||
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
||||
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
||||
## an offset of 20 second is added to the time window to count for nodes asynchrony
|
||||
## peerList indicates the list of peers to query from.
|
||||
## The history is fetched from all available peers in this list and then consolidated into one deduplicated list.
|
||||
## Such candidates should be found through a discovery method (to be developed).
|
||||
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
|
||||
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
|
||||
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
||||
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
|
||||
|
||||
|
||||
# If store has not been provided, don't even try
|
||||
if w.store.isNil():
|
||||
return err("store not provided (nil)")
|
||||
|
||||
# NOTE: Original implementation is based on the message's sender timestamp. At the moment
|
||||
# of writing, the sqlite store implementation returns the last message's receiver
|
||||
# of writing, the sqlite store implementation returns the last message's receiver
|
||||
# timestamp.
|
||||
# lastSeenTime = lastSeenItem.get().msg.timestamp
|
||||
let
|
||||
let
|
||||
lastSeenTime = w.store.getNewestMessageTimestamp().get(Timestamp(0))
|
||||
now = getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
@ -175,7 +173,7 @@ proc resume*(w: WakuStoreClient,
|
||||
|
||||
let req = HistoryQuery(
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
startTime: some(queryStartTime),
|
||||
startTime: some(queryStartTime),
|
||||
endTime: some(queryEndTime),
|
||||
pageSize: uint64(pageSize),
|
||||
ascending: true
|
||||
@ -188,7 +186,7 @@ proc resume*(w: WakuStoreClient,
|
||||
|
||||
else:
|
||||
debug "no candidate list is provided, selecting a random peer"
|
||||
# if no peerList is set then query from one of the peers stored in the peer manager
|
||||
# if no peerList is set then query from one of the peers stored in the peer manager
|
||||
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
|
||||
if peerOpt.isNone():
|
||||
warn "no suitable remote peers"
|
||||
@ -198,7 +196,7 @@ proc resume*(w: WakuStoreClient,
|
||||
debug "a peer is selected from peer manager"
|
||||
res = await w.queryAll(req, peerOpt.get())
|
||||
|
||||
if res.isErr():
|
||||
if res.isErr():
|
||||
debug "failed to resume the history"
|
||||
return err("failed to resume the history")
|
||||
|
||||
|
@ -10,7 +10,7 @@ import
|
||||
std/[tables, times, sequtils, options, algorithm],
|
||||
stew/results,
|
||||
chronicles,
|
||||
chronos,
|
||||
chronos,
|
||||
bearssl/rand,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/protocol,
|
||||
@ -22,7 +22,6 @@ import
|
||||
../../node/peer_manager/peer_manager,
|
||||
../../utils/time,
|
||||
../waku_message,
|
||||
../waku_swap/waku_swap,
|
||||
./common,
|
||||
./rpc,
|
||||
./rpc_codec,
|
||||
@ -34,7 +33,7 @@ logScope:
|
||||
topics = "waku store"
|
||||
|
||||
|
||||
const
|
||||
const
|
||||
MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift"
|
||||
|
||||
|
||||
@ -43,7 +42,6 @@ type
|
||||
peerManager*: PeerManager
|
||||
rng*: ref rand.HmacDrbgContext
|
||||
store*: MessageStore
|
||||
wakuSwap*: WakuSwap
|
||||
retentionPolicy: Option[MessageRetentionPolicy]
|
||||
|
||||
|
||||
@ -63,7 +61,7 @@ proc executeMessageRetentionPolicy*(w: WakuStore) =
|
||||
debug "failed execution of retention policy", error=retPolicyRes.error
|
||||
|
||||
# TODO: Move to a message store wrapper
|
||||
proc reportStoredMessagesMetric*(w: WakuStore) =
|
||||
proc reportStoredMessagesMetric*(w: WakuStore) =
|
||||
if w.store.isNil():
|
||||
return
|
||||
|
||||
@ -78,7 +76,7 @@ proc isValidMessage(msg: WakuMessage): bool =
|
||||
if msg.timestamp == 0:
|
||||
return true
|
||||
|
||||
let
|
||||
let
|
||||
now = getNanosecondTime(getTime().toUnixFloat())
|
||||
lowerBound = now - MaxMessageTimestampVariance
|
||||
upperBound = now + MaxMessageTimestampVariance
|
||||
@ -94,19 +92,19 @@ proc handleMessage*(w: WakuStore, pubsubTopic: PubsubTopic, msg: WakuMessage) =
|
||||
if msg.ephemeral:
|
||||
# The message is ephemeral, should not be stored
|
||||
return
|
||||
|
||||
|
||||
if not isValidMessage(msg):
|
||||
waku_store_errors.inc(labelValues = [invalidMessage])
|
||||
return
|
||||
|
||||
|
||||
let insertStartTime = getTime().toUnixFloat()
|
||||
|
||||
|
||||
block:
|
||||
let
|
||||
msgDigest = computeDigest(msg)
|
||||
msgDigest = computeDigest(msg)
|
||||
msgReceivedTime = if msg.timestamp > 0: msg.timestamp
|
||||
else: getNanosecondTime(getTime().toUnixFloat())
|
||||
else: getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest
|
||||
|
||||
@ -123,14 +121,14 @@ proc handleMessage*(w: WakuStore, pubsubTopic: PubsubTopic, msg: WakuMessage) =
|
||||
# TODO: Move to a message store wrapper
|
||||
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
|
||||
## Query history to return a single page of messages matching the query
|
||||
|
||||
|
||||
# Extract query criteria. All query criteria are optional
|
||||
let
|
||||
qContentTopics = if query.contentTopics.len == 0: none(seq[ContentTopic])
|
||||
else: some(query.contentTopics)
|
||||
qPubSubTopic = query.pubsubTopic
|
||||
qCursor = query.cursor
|
||||
qStartTime = query.startTime
|
||||
qStartTime = query.startTime
|
||||
qEndTime = query.endTime
|
||||
qMaxPageSize = if query.pageSize <= 0: DefaultPageSize
|
||||
else: min(query.pageSize, MaxPageSize)
|
||||
@ -138,7 +136,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
|
||||
|
||||
|
||||
let queryStartTime = getTime().toUnixFloat()
|
||||
|
||||
|
||||
let queryRes = w.store.getMessagesByHistoryQuery(
|
||||
contentTopic = qContentTopics,
|
||||
pubsubTopic = qPubSubTopic,
|
||||
@ -159,15 +157,15 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
|
||||
return err(HistoryError(kind: HistoryErrorKind.UNKNOWN))
|
||||
|
||||
let rows = queryRes.get()
|
||||
|
||||
|
||||
if rows.len <= 0:
|
||||
return ok(HistoryResponse(
|
||||
messages: @[],
|
||||
messages: @[],
|
||||
pageSize: 0,
|
||||
ascending: qAscendingOrder,
|
||||
cursor: none(HistoryCursor)
|
||||
))
|
||||
|
||||
|
||||
|
||||
var messages = if rows.len <= int(qMaxPageSize): rows.mapIt(it[1])
|
||||
else: rows[0..^2].mapIt(it[1])
|
||||
@ -177,7 +175,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
|
||||
if not qAscendingOrder:
|
||||
messages.reverse()
|
||||
|
||||
|
||||
|
||||
if rows.len > int(qMaxPageSize):
|
||||
## Build last message cursor
|
||||
## The cursor is built from the last message INCLUDED in the response
|
||||
@ -190,7 +188,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
|
||||
messageDigest[i] = digest[i]
|
||||
|
||||
cursor = some(HistoryCursor(
|
||||
pubsubTopic: pubsubTopic,
|
||||
pubsubTopic: pubsubTopic,
|
||||
senderTime: message.timestamp,
|
||||
storeTime: storeTimestamp,
|
||||
digest: MessageDigest(data: messageDigest)
|
||||
@ -198,7 +196,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
|
||||
|
||||
|
||||
ok(HistoryResponse(
|
||||
messages: messages,
|
||||
messages: messages,
|
||||
pageSize: uint64(messages.len),
|
||||
ascending: qAscendingOrder,
|
||||
cursor: cursor
|
||||
@ -207,8 +205,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
|
||||
|
||||
## Protocol
|
||||
|
||||
proc initProtocolHandler*(ws: WakuStore) =
|
||||
|
||||
proc initProtocolHandler(ws: WakuStore) =
|
||||
|
||||
proc handler(conn: Connection, proto: string) {.async.} =
|
||||
let buf = await conn.readLp(MaxRpcSize.int)
|
||||
|
||||
@ -233,9 +231,9 @@ proc initProtocolHandler*(ws: WakuStore) =
|
||||
waku_store_queries.inc()
|
||||
|
||||
|
||||
if ws.store.isNil():
|
||||
if ws.store.isNil():
|
||||
let respErr = HistoryError(kind: HistoryErrorKind.SERVICE_UNAVAILABLE)
|
||||
|
||||
|
||||
error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error= $respErr
|
||||
|
||||
let resp = HistoryResponseRPC(error: respErr.toRPC())
|
||||
@ -245,7 +243,7 @@ proc initProtocolHandler*(ws: WakuStore) =
|
||||
|
||||
|
||||
let query = reqRpc.query.get().toAPI()
|
||||
|
||||
|
||||
let respRes = ws.findMessages(query)
|
||||
|
||||
if respRes.isErr():
|
||||
@ -259,16 +257,6 @@ proc initProtocolHandler*(ws: WakuStore) =
|
||||
|
||||
let resp = respRes.toRPC()
|
||||
|
||||
if not ws.wakuSwap.isNil():
|
||||
info "handle store swap", peerId=conn.peerId, requestId=reqRpc.requestId, text=ws.wakuSwap.text
|
||||
|
||||
# Perform accounting operation
|
||||
# TODO: Do accounting here, response is HistoryResponseRPC. How do we get node or swap context?
|
||||
let peerId = conn.peerId
|
||||
let messages = resp.messages
|
||||
ws.wakuSwap.credit(peerId, messages.len)
|
||||
|
||||
|
||||
info "sending history response", peerId=conn.peerId, requestId=reqRpc.requestId, messages=resp.messages.len
|
||||
|
||||
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp))
|
||||
@ -277,17 +265,15 @@ proc initProtocolHandler*(ws: WakuStore) =
|
||||
ws.handler = handler
|
||||
ws.codec = WakuStoreCodec
|
||||
|
||||
proc new*(T: type WakuStore,
|
||||
peerManager: PeerManager,
|
||||
proc new*(T: type WakuStore,
|
||||
peerManager: PeerManager,
|
||||
rng: ref rand.HmacDrbgContext,
|
||||
store: MessageStore,
|
||||
wakuSwap: WakuSwap = nil,
|
||||
store: MessageStore,
|
||||
retentionPolicy=none(MessageRetentionPolicy)): T =
|
||||
let ws = WakuStore(
|
||||
rng: rng,
|
||||
peerManager: peerManager,
|
||||
store: store,
|
||||
wakuSwap: wakuSwap,
|
||||
rng: rng,
|
||||
peerManager: peerManager,
|
||||
store: store,
|
||||
retentionPolicy: retentionPolicy
|
||||
)
|
||||
ws.initProtocolHandler()
|
||||
|
Loading…
x
Reference in New Issue
Block a user