mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-12 15:06:38 +00:00
deploy: 2c1ad9f76091e913cda8937097573549be410b25
This commit is contained in:
parent
26b52bab7b
commit
434feca4c2
@ -242,7 +242,7 @@ proc publish(c: Chat, line: string) =
|
|||||||
c.node.wakuRlnRelay.lastEpoch = message.proof.epoch
|
c.node.wakuRlnRelay.lastEpoch = message.proof.epoch
|
||||||
if not c.node.wakuLightPush.isNil():
|
if not c.node.wakuLightPush.isNil():
|
||||||
# Attempt lightpush
|
# Attempt lightpush
|
||||||
asyncSpawn c.node.lightpush2(DefaultTopic, message)
|
asyncSpawn c.node.lightpushPublish(DefaultTopic, message)
|
||||||
else:
|
else:
|
||||||
asyncSpawn c.node.publish(DefaultTopic, message, handler)
|
asyncSpawn c.node.publish(DefaultTopic, message, handler)
|
||||||
else:
|
else:
|
||||||
@ -271,7 +271,7 @@ proc publish(c: Chat, line: string) =
|
|||||||
|
|
||||||
if not c.node.wakuLightPush.isNil():
|
if not c.node.wakuLightPush.isNil():
|
||||||
# Attempt lightpush
|
# Attempt lightpush
|
||||||
asyncSpawn c.node.lightpush2(DefaultTopic, message)
|
asyncSpawn c.node.lightpushPublish(DefaultTopic, message)
|
||||||
else:
|
else:
|
||||||
asyncSpawn c.node.publish(DefaultTopic, message)
|
asyncSpawn c.node.publish(DefaultTopic, message)
|
||||||
|
|
||||||
@ -493,7 +493,8 @@ proc processInput(rfd: AsyncFD) {.async.} =
|
|||||||
if conf.lightpushnode != "":
|
if conf.lightpushnode != "":
|
||||||
await mountLightPush(node)
|
await mountLightPush(node)
|
||||||
|
|
||||||
node.wakuLightPush.setPeer(parseRemotePeerInfo(conf.lightpushnode))
|
node.mountLightPushClient()
|
||||||
|
node.setLightPushPeer(conf.lightpushnode)
|
||||||
|
|
||||||
if conf.filternode != "":
|
if conf.filternode != "":
|
||||||
await node.mountFilter()
|
await node.mountFilter()
|
||||||
|
@ -36,6 +36,11 @@ type
|
|||||||
desc: "prints the version"
|
desc: "prints the version"
|
||||||
defaultValue: false
|
defaultValue: false
|
||||||
name: "version" }: bool
|
name: "version" }: bool
|
||||||
|
|
||||||
|
agentString* {.
|
||||||
|
defaultValue: "nwaku",
|
||||||
|
desc: "Node agent string which is used as identifier in network"
|
||||||
|
name: "agent-string" .}: string
|
||||||
|
|
||||||
nodekey* {.
|
nodekey* {.
|
||||||
desc: "P2P node private key as 64 char hex string.",
|
desc: "P2P node private key as 64 char hex string.",
|
||||||
|
@ -283,7 +283,8 @@ proc initNode(conf: WakuNodeConf,
|
|||||||
dnsResolver,
|
dnsResolver,
|
||||||
conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
|
conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
|
||||||
dns4DomainName,
|
dns4DomainName,
|
||||||
discv5UdpPort
|
discv5UdpPort,
|
||||||
|
some(conf.agentString)
|
||||||
)
|
)
|
||||||
except:
|
except:
|
||||||
return err("failed to create waku node instance: " & getCurrentExceptionMsg())
|
return err("failed to create waku node instance: " & getCurrentExceptionMsg())
|
||||||
@ -399,17 +400,18 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
|
|||||||
return err("failed to set node waku store peer: " & getCurrentExceptionMsg())
|
return err("failed to set node waku store peer: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
# NOTE Must be mounted after relay
|
# NOTE Must be mounted after relay
|
||||||
if (conf.lightpushnode != "") or (conf.lightpush):
|
if conf.lightpush:
|
||||||
try:
|
try:
|
||||||
await mountLightPush(node)
|
await mountLightPush(node)
|
||||||
except:
|
except:
|
||||||
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())
|
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
if conf.lightpushnode != "":
|
if conf.lightpushnode != "":
|
||||||
try:
|
try:
|
||||||
setLightPushPeer(node, conf.lightpushnode)
|
mountLightPushClient(node)
|
||||||
except:
|
setLightPushPeer(node, conf.lightpushnode)
|
||||||
return err("failed to set node waku lightpush peer: " & getCurrentExceptionMsg())
|
except:
|
||||||
|
return err("failed to set node waku lightpush peer: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
# Filter setup. NOTE Must be mounted after relay
|
# Filter setup. NOTE Must be mounted after relay
|
||||||
if (conf.filternode != "") or (conf.filter):
|
if (conf.filternode != "") or (conf.filter):
|
||||||
|
@ -60,8 +60,7 @@ suite "Waku Lightpush":
|
|||||||
message = fakeWakuMessage()
|
message = fakeWakuMessage()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let rpc = PushRequest(pubSubTopic: topic, message: message)
|
let requestRes = await client.publish(topic, message, peer=serverPeerId)
|
||||||
let requestRes = await client.request(rpc, serverPeerId)
|
|
||||||
|
|
||||||
require await handlerFuture.withTimeout(100.millis)
|
require await handlerFuture.withTimeout(100.millis)
|
||||||
|
|
||||||
@ -105,8 +104,7 @@ suite "Waku Lightpush":
|
|||||||
message = fakeWakuMessage()
|
message = fakeWakuMessage()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let rpc = PushRequest(pubSubTopic: topic, message: message)
|
let requestRes = await client.publish(topic, message, peer=serverPeerId)
|
||||||
let requestRes = await client.request(rpc, serverPeerId)
|
|
||||||
|
|
||||||
require await handlerFuture.withTimeout(100.millis)
|
require await handlerFuture.withTimeout(100.millis)
|
||||||
|
|
||||||
|
@ -209,3 +209,39 @@ procSuite "WakuNode":
|
|||||||
check:
|
check:
|
||||||
node.announcedAddresses.len == 1
|
node.announcedAddresses.len == 1
|
||||||
node.announcedAddresses.contains(expectedDns4Addr)
|
node.announcedAddresses.contains(expectedDns4Addr)
|
||||||
|
|
||||||
|
|
||||||
|
asyncTest "Agent string is set and advertised correctly":
|
||||||
|
let
|
||||||
|
# custom agent string
|
||||||
|
expectedAgentString1 = "node1-agent-string"
|
||||||
|
|
||||||
|
# bump when updating nim-libp2p
|
||||||
|
expectedAgentString2 = "nim-libp2p/0.0.1"
|
||||||
|
let
|
||||||
|
# node with custom agent string
|
||||||
|
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000),
|
||||||
|
agentString = some(expectedAgentString1))
|
||||||
|
|
||||||
|
# node with default agent string from libp2p
|
||||||
|
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||||
|
|
||||||
|
await node1.start()
|
||||||
|
await node1.mountRelay()
|
||||||
|
|
||||||
|
await node2.start()
|
||||||
|
await node2.mountRelay()
|
||||||
|
|
||||||
|
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
||||||
|
await node2.connectToNodes(@[node1.switch.peerInfo.toRemotePeerInfo()])
|
||||||
|
|
||||||
|
let node1Agent = node2.switch.peerStore[AgentBook][node1.switch.peerInfo.toRemotePeerInfo().peerId]
|
||||||
|
let node2Agent = node1.switch.peerStore[AgentBook][node2.switch.peerInfo.toRemotePeerInfo().peerId]
|
||||||
|
|
||||||
|
check:
|
||||||
|
node1Agent == expectedAgentString1
|
||||||
|
node2Agent == expectedAgentString2
|
||||||
|
|
||||||
|
await allFutures(node1.stop(), node2.stop())
|
@ -35,7 +35,7 @@ procSuite "WakuNode - Lightpush":
|
|||||||
await destNode.mountRelay(@[DefaultPubsubTopic])
|
await destNode.mountRelay(@[DefaultPubsubTopic])
|
||||||
await bridgeNode.mountRelay(@[DefaultPubsubTopic])
|
await bridgeNode.mountRelay(@[DefaultPubsubTopic])
|
||||||
await bridgeNode.mountLightPush()
|
await bridgeNode.mountLightPush()
|
||||||
await lightNode.mountLightPush()
|
lightNode.mountLightPushClient()
|
||||||
|
|
||||||
discard await lightNode.peerManager.dialPeer(bridgeNode.peerInfo.toRemotePeerInfo(), WakuLightPushCodec)
|
discard await lightNode.peerManager.dialPeer(bridgeNode.peerInfo.toRemotePeerInfo(), WakuLightPushCodec)
|
||||||
await sleepAsync(100.milliseconds)
|
await sleepAsync(100.milliseconds)
|
||||||
@ -57,16 +57,10 @@ procSuite "WakuNode - Lightpush":
|
|||||||
await sleepAsync(100.millis)
|
await sleepAsync(100.millis)
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let lightpushRes = await lightNode.lightpush(DefaultPubsubTopic, message)
|
await lightNode.lightpushPublish(DefaultPubsubTopic, message)
|
||||||
|
|
||||||
require await completionFutRelay.withTimeout(5.seconds)
|
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check lightpushRes.isOk()
|
check await completionFutRelay.withTimeout(5.seconds)
|
||||||
|
|
||||||
let response = lightpushRes.get()
|
|
||||||
check:
|
|
||||||
response.isSuccess == true
|
|
||||||
|
|
||||||
## Cleanup
|
## Cleanup
|
||||||
await allFutures(lightNode.stop(), bridgeNode.stop(), destNode.stop())
|
await allFutures(lightNode.stop(), bridgeNode.stop(), destNode.stop())
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
# libtool - Provide generalized library-building support services.
|
# libtool - Provide generalized library-building support services.
|
||||||
# Generated automatically by config.status (libbacktrace) version-unused
|
# Generated automatically by config.status (libbacktrace) version-unused
|
||||||
# Libtool was configured on host fv-az201-830:
|
# Libtool was configured on host fv-az133-57:
|
||||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||||
#
|
#
|
||||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||||
|
@ -21,6 +21,7 @@ import
|
|||||||
../protocol/waku_swap/waku_swap,
|
../protocol/waku_swap/waku_swap,
|
||||||
../protocol/waku_filter,
|
../protocol/waku_filter,
|
||||||
../protocol/waku_lightpush,
|
../protocol/waku_lightpush,
|
||||||
|
../protocol/waku_lightpush/client,
|
||||||
../protocol/waku_rln_relay/waku_rln_relay_types,
|
../protocol/waku_rln_relay/waku_rln_relay_types,
|
||||||
../protocol/waku_peer_exchange,
|
../protocol/waku_peer_exchange,
|
||||||
../utils/[peers, requests, wakuenr],
|
../utils/[peers, requests, wakuenr],
|
||||||
@ -37,6 +38,8 @@ declarePublicGauge waku_version, "Waku version info (in git describe format)", [
|
|||||||
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
||||||
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
|
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
|
||||||
declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"]
|
declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"]
|
||||||
|
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
|
||||||
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakunode"
|
topics = "wakunode"
|
||||||
@ -76,6 +79,7 @@ type
|
|||||||
wakuSwap*: WakuSwap
|
wakuSwap*: WakuSwap
|
||||||
wakuRlnRelay*: WakuRLNRelay
|
wakuRlnRelay*: WakuRLNRelay
|
||||||
wakuLightPush*: WakuLightPush
|
wakuLightPush*: WakuLightPush
|
||||||
|
wakuLightpushClient*: WakuLightPushClient
|
||||||
wakuPeerExchange*: WakuPeerExchange
|
wakuPeerExchange*: WakuPeerExchange
|
||||||
enr*: enr.Record
|
enr*: enr.Record
|
||||||
libp2pPing*: Ping
|
libp2pPing*: Ping
|
||||||
@ -129,7 +133,9 @@ proc new*(T: type WakuNode,
|
|||||||
nameResolver: NameResolver = nil,
|
nameResolver: NameResolver = nil,
|
||||||
sendSignedPeerRecord = false,
|
sendSignedPeerRecord = false,
|
||||||
dns4DomainName = none(string),
|
dns4DomainName = none(string),
|
||||||
discv5UdpPort = none(Port)): T {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
|
discv5UdpPort = none(Port),
|
||||||
|
agentString = none(string), # defaults to nim-libp2p version
|
||||||
|
): T {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
|
||||||
## Creates a Waku Node instance.
|
## Creates a Waku Node instance.
|
||||||
|
|
||||||
## Initialize addresses
|
## Initialize addresses
|
||||||
@ -198,7 +204,8 @@ proc new*(T: type WakuNode,
|
|||||||
secureKeyPath = secureKey,
|
secureKeyPath = secureKey,
|
||||||
secureCertPath = secureCert,
|
secureCertPath = secureCert,
|
||||||
nameResolver = nameResolver,
|
nameResolver = nameResolver,
|
||||||
sendSignedPeerRecord = sendSignedPeerRecord
|
sendSignedPeerRecord = sendSignedPeerRecord,
|
||||||
|
agentString = agentString
|
||||||
)
|
)
|
||||||
|
|
||||||
let wakuNode = WakuNode(
|
let wakuNode = WakuNode(
|
||||||
@ -586,13 +593,18 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re
|
|||||||
|
|
||||||
## Waku lightpush
|
## Waku lightpush
|
||||||
|
|
||||||
|
proc mountLightPushClient*(node: WakuNode) =
|
||||||
|
info "mounting light push client"
|
||||||
|
|
||||||
|
node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng)
|
||||||
|
|
||||||
|
|
||||||
proc mountLightPush*(node: WakuNode) {.async.} =
|
proc mountLightPush*(node: WakuNode) {.async.} =
|
||||||
info "mounting light push"
|
info "mounting light push"
|
||||||
|
|
||||||
var pushHandler: PushMessageHandler
|
var pushHandler: PushMessageHandler
|
||||||
if node.wakuRelay.isNil():
|
if node.wakuRelay.isNil():
|
||||||
debug "mounting lightpush without relay (nil)"
|
debug "mounting lightpush without relay (nil)"
|
||||||
# TODO: Remove after using waku lightpush client
|
|
||||||
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||||
return err("no waku relay found")
|
return err("no waku relay found")
|
||||||
else:
|
else:
|
||||||
@ -609,29 +621,46 @@ proc mountLightPush*(node: WakuNode) {.async.} =
|
|||||||
|
|
||||||
node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec))
|
node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec))
|
||||||
|
|
||||||
proc setLightPushPeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} =
|
proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage, peer: RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||||
if node.wakuLightPush.isNil():
|
|
||||||
error "could not set peer, waku lightpush is nil"
|
|
||||||
return
|
|
||||||
|
|
||||||
info "Set lightpush peer", peer=peer
|
|
||||||
|
|
||||||
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
|
||||||
else: peer
|
|
||||||
node.wakuLightPush.setPeer(remotePeer)
|
|
||||||
|
|
||||||
proc lightpush*(node: WakuNode, topic: PubsubTopic, message: WakuMessage): Future[WakuLightpushResult[PushResponse]] {.async, gcsafe.} =
|
|
||||||
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
|
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
|
||||||
## Returns whether relaying was successful or not.
|
## Returns whether relaying was successful or not.
|
||||||
## `WakuMessage` should contain a `contentTopic` field for light node
|
## `WakuMessage` should contain a `contentTopic` field for light node
|
||||||
## functionality.
|
## functionality.
|
||||||
debug "Publishing with lightpush", topic=topic, contentTopic=message.contentTopic
|
if node.wakuLightpushClient.isNil():
|
||||||
|
return err("waku lightpush client is nil")
|
||||||
|
|
||||||
let rpc = PushRequest(pubSubTopic: topic, message: message)
|
debug "publishing message with lightpush", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, peer=peer
|
||||||
return await node.wakuLightPush.request(rpc)
|
|
||||||
|
|
||||||
proc lightpush2*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
|
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)
|
||||||
discard await node.lightpush(topic, message)
|
|
||||||
|
|
||||||
|
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||||
|
proc setLightPushPeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError],
|
||||||
|
deprecated: "Use 'node.lightpushPublish()' instead".} =
|
||||||
|
debug "seting lightpush client peer", peer=peer
|
||||||
|
|
||||||
|
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
||||||
|
else: peer
|
||||||
|
node.peerManager.addPeer(remotePeer, WakuLightPushCodec)
|
||||||
|
waku_lightpush_peers.inc()
|
||||||
|
|
||||||
|
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||||
|
proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.async, gcsafe,
|
||||||
|
deprecated: "Use 'node.lightpushPublish()' instead".} =
|
||||||
|
if node.wakuLightpushClient.isNil():
|
||||||
|
error "failed to publish message", error="waku lightpush client is nil"
|
||||||
|
return
|
||||||
|
|
||||||
|
let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
|
||||||
|
if peerOpt.isNone():
|
||||||
|
error "failed to publish message", error="no suitable remote peers"
|
||||||
|
return
|
||||||
|
|
||||||
|
let publishRes = await node.lightpushPublish(pubsubTopic, message, peer=peerOpt.get())
|
||||||
|
if publishRes.isOk():
|
||||||
|
return
|
||||||
|
|
||||||
|
error "failed to publish message", error=publishRes.error
|
||||||
|
|
||||||
|
|
||||||
## Waku peer-exchange
|
## Waku peer-exchange
|
||||||
|
@ -70,7 +70,9 @@ proc newWakuSwitch*(
|
|||||||
sendSignedPeerRecord = false,
|
sendSignedPeerRecord = false,
|
||||||
wssEnabled: bool = false,
|
wssEnabled: bool = false,
|
||||||
secureKeyPath: string = "",
|
secureKeyPath: string = "",
|
||||||
secureCertPath: string = ""): Switch
|
secureCertPath: string = "",
|
||||||
|
agentString = none(string), # defaults to nim-libp2p version
|
||||||
|
): Switch
|
||||||
{.raises: [Defect, IOError, LPError].} =
|
{.raises: [Defect, IOError, LPError].} =
|
||||||
|
|
||||||
var b = SwitchBuilder
|
var b = SwitchBuilder
|
||||||
@ -86,6 +88,8 @@ proc newWakuSwitch*(
|
|||||||
.withNameResolver(nameResolver)
|
.withNameResolver(nameResolver)
|
||||||
.withSignedPeerRecord(sendSignedPeerRecord)
|
.withSignedPeerRecord(sendSignedPeerRecord)
|
||||||
|
|
||||||
|
if agentString.isSome():
|
||||||
|
b = b.withAgentVersion(agentString.get())
|
||||||
if privKey.isSome():
|
if privKey.isSome():
|
||||||
b = b.withPrivateKey(privKey.get())
|
b = b.withPrivateKey(privKey.get())
|
||||||
if wsAddress.isSome():
|
if wsAddress.isSome():
|
||||||
|
@ -10,6 +10,7 @@ import
|
|||||||
import
|
import
|
||||||
../../node/peer_manager/peer_manager,
|
../../node/peer_manager/peer_manager,
|
||||||
../../utils/requests,
|
../../utils/requests,
|
||||||
|
../waku_message,
|
||||||
./protocol,
|
./protocol,
|
||||||
./protocol_metrics,
|
./protocol_metrics,
|
||||||
./rpc,
|
./rpc,
|
||||||
@ -31,7 +32,7 @@ proc new*(T: type WakuLightPushClient,
|
|||||||
WakuLightPushClient(peerManager: peerManager, rng: rng)
|
WakuLightPushClient(peerManager: peerManager, rng: rng)
|
||||||
|
|
||||||
|
|
||||||
proc request*(wl: WakuLightPushClient, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
proc sendPushRequest(wl: WakuLightPushClient, req: PushRequest, peer: PeerId|RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||||
let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
|
let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
|
||||||
if connOpt.isNone():
|
if connOpt.isNone():
|
||||||
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
||||||
@ -41,8 +42,8 @@ proc request*(wl: WakuLightPushClient, req: PushRequest, peer: RemotePeerInfo):
|
|||||||
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: req)
|
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: req)
|
||||||
await connection.writeLP(rpc.encode().buffer)
|
await connection.writeLP(rpc.encode().buffer)
|
||||||
|
|
||||||
var message = await connection.readLp(MaxRpcSize.int)
|
var buffer = await connection.readLp(MaxRpcSize.int)
|
||||||
let decodeRespRes = PushRPC.init(message)
|
let decodeRespRes = PushRPC.init(buffer)
|
||||||
if decodeRespRes.isErr():
|
if decodeRespRes.isErr():
|
||||||
error "failed to decode response"
|
error "failed to decode response"
|
||||||
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
@ -62,18 +63,6 @@ proc request*(wl: WakuLightPushClient, req: PushRequest, peer: RemotePeerInfo):
|
|||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
|
proc publish*(wl: WakuLightPushClient, pubsubTopic: string, message: WakuMessage, peer: PeerId|RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||||
### Set lightpush peer and send push requests
|
let pushRequest = PushRequest(pubsubTopic: pubsubTopic, message: message)
|
||||||
|
return await wl.sendPushRequest(pushRequest, peer)
|
||||||
proc setPeer*(wl: WakuLightPushClient, peer: RemotePeerInfo) =
|
|
||||||
wl.peerManager.addPeer(peer, WakuLightPushCodec)
|
|
||||||
waku_lightpush_peers.inc()
|
|
||||||
|
|
||||||
proc request*(wl: WakuLightPushClient, req: PushRequest): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
|
||||||
let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec)
|
|
||||||
if peerOpt.isNone():
|
|
||||||
error "no suitable remote peers"
|
|
||||||
waku_lightpush_errors.inc(labelValues = [peerNotFoundFailure])
|
|
||||||
return err(peerNotFoundFailure)
|
|
||||||
|
|
||||||
return await wl.request(req, peerOpt.get())
|
|
||||||
|
@ -8,10 +8,8 @@ import
|
|||||||
metrics,
|
metrics,
|
||||||
bearssl/rand
|
bearssl/rand
|
||||||
import
|
import
|
||||||
../waku_message,
|
|
||||||
../waku_relay,
|
|
||||||
../../node/peer_manager/peer_manager,
|
../../node/peer_manager/peer_manager,
|
||||||
../../utils/requests,
|
../waku_message,
|
||||||
./rpc,
|
./rpc,
|
||||||
./rpc_codec,
|
./rpc_codec,
|
||||||
./protocol_metrics
|
./protocol_metrics
|
||||||
@ -77,43 +75,3 @@ proc new*(T: type WakuLightPush,
|
|||||||
let wl = WakuLightPush(rng: rng, peerManager: peerManager, pushHandler: pushHandler)
|
let wl = WakuLightPush(rng: rng, peerManager: peerManager, pushHandler: pushHandler)
|
||||||
wl.initProtocolHandler()
|
wl.initProtocolHandler()
|
||||||
return wl
|
return wl
|
||||||
|
|
||||||
|
|
||||||
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) {.
|
|
||||||
deprecated: "Use 'WakuLightPushClient.setPeer()' instead" .} =
|
|
||||||
wlp.peerManager.addPeer(peer, WakuLightPushCodec)
|
|
||||||
waku_lightpush_peers.inc()
|
|
||||||
|
|
||||||
proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe,
|
|
||||||
deprecated: "Use 'WakuLightPushClient.request()' instead" .} =
|
|
||||||
let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
|
|
||||||
if connOpt.isNone():
|
|
||||||
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
|
||||||
return err(dialFailure)
|
|
||||||
|
|
||||||
let connection = connOpt.get()
|
|
||||||
|
|
||||||
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: req)
|
|
||||||
await connection.writeLP(rpc.encode().buffer)
|
|
||||||
|
|
||||||
var message = await connection.readLp(MaxRpcSize.int)
|
|
||||||
let res = PushRPC.init(message)
|
|
||||||
|
|
||||||
if res.isErr():
|
|
||||||
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
|
||||||
return err(decodeRpcFailure)
|
|
||||||
|
|
||||||
let rpcRes = res.get()
|
|
||||||
if rpcRes.response == PushResponse():
|
|
||||||
return err("empty response body")
|
|
||||||
|
|
||||||
return ok(rpcRes.response)
|
|
||||||
|
|
||||||
proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe,
|
|
||||||
deprecated: "Use 'WakuLightPushClient.request()' instead" .} =
|
|
||||||
let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec)
|
|
||||||
if peerOpt.isNone():
|
|
||||||
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
|
||||||
return err(dialFailure)
|
|
||||||
|
|
||||||
return await wl.request(req, peerOpt.get())
|
|
||||||
|
@ -3,7 +3,6 @@
|
|||||||
import metrics
|
import metrics
|
||||||
|
|
||||||
|
|
||||||
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
|
|
||||||
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
|
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
|
||||||
declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"]
|
declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"]
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user