Update submodules: differentiate between local and remote PeerInfo (#730)

* Update submodules

* Remove PeerInfo where necessary
This commit is contained in:
Hanno Cornelius 2021-10-06 14:29:08 +02:00 committed by GitHub
parent 5811e80ee7
commit 54bcca7402
44 changed files with 266 additions and 194 deletions

4
.gitmodules vendored
View File

@ -124,7 +124,7 @@
path = vendor/nim-websock
url = https://github.com/status-im/nim-websock.git
ignore = untracked
branch = master
branch = main
[submodule "vendor/nim-zlib"]
path = vendor/nim-zlib
url = https://github.com/status-im/nim-zlib.git
@ -137,6 +137,6 @@
branch = main
[submodule "vendor/dnsclient.nim"]
path = vendor/dnsclient.nim
url = https://github.com/jm-clius/dnsclient.nim.git
url = https://github.com/ba0f3/dnsclient.nim.git
ignore = untracked
branch = master

View File

@ -12,6 +12,7 @@ This release contains the following:
- GossipSub [prune backoff period](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#prune-backoff-and-peer-exchange) is now the recommended 1 minute
- Bridge now uses content topic format according to [23/WAKU2-TOPICS](https://rfc.vac.dev/spec/23/)
- Better internal differentiation between local and remote peer info
#### General refactoring

View File

@ -412,7 +412,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
# We have a viable storenode. Let's query it for historical messages.
echo "Connecting to storenode: " & storenode.get()
node.wakuStore.setPeer(parsePeerInfo(storenode.get()))
node.wakuStore.setPeer(parseRemotePeerInfo(storenode.get()))
proc storeHandler(response: HistoryResponse) {.gcsafe.} =
for msg in response.messages:
@ -429,12 +429,12 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
if conf.lightpushnode != "":
mountLightPush(node)
node.wakuLightPush.setPeer(parsePeerInfo(conf.lightpushnode))
node.wakuLightPush.setPeer(parseRemotePeerInfo(conf.lightpushnode))
if conf.filternode != "":
node.mountFilter()
node.wakuFilter.setPeer(parsePeerInfo(conf.filternode))
node.wakuFilter.setPeer(parseRemotePeerInfo(conf.filternode))
proc filterHandler(msg: WakuMessage) {.gcsafe.} =
trace "Hit filter handler", contentTopic=msg.contentTopic

View File

@ -23,6 +23,7 @@ import
../../waku/v2/protocol/waku_store/[waku_store, waku_store_types],
../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/utils/peers,
../test_helpers
template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]
@ -139,8 +140,8 @@ procSuite "Waku v2 JSON-RPC API":
await node3.start()
node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo])
await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
# RPC server setup
let
@ -229,7 +230,7 @@ procSuite "Waku v2 JSON-RPC API":
var listenSwitch = newStandardSwitch(some(key))
discard waitFor listenSwitch.start()
node.wakuStore.setPeer(listenSwitch.peerInfo)
node.wakuStore.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(node.wakuRelay)
listenSwitch.mount(node.wakuStore)
@ -530,9 +531,9 @@ procSuite "Waku v2 JSON-RPC API":
storeKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
storePeer = PeerInfo.init(storeKey, @[locationAddr])
node.wakuFilter.setPeer(filterPeer)
node.wakuSwap.setPeer(swapPeer)
node.wakuStore.setPeer(storePeer)
node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo())
node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())
node.wakuStore.setPeer(storePeer.toRemotePeerInfo())
let response = await client.get_waku_v2_admin_v1_peers()
@ -572,8 +573,8 @@ procSuite "Waku v2 JSON-RPC API":
await node3.start()
node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo])
await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
# Setup two servers so we can see both sides of encrypted communication
let
@ -662,8 +663,8 @@ procSuite "Waku v2 JSON-RPC API":
await node3.start()
node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo])
await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
# Setup two servers so we can see both sides of encrypted communication
let

View File

@ -34,12 +34,12 @@ procSuite "Peer Manager":
node2.mountRelay()
# Dial node2 from node1
let conn = (await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec)).get()
let conn = (await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec)).get()
# Check connection
check:
conn.activity
conn.peerInfo.peerId == peerInfo2.peerId
conn.peerId == peerInfo2.peerId
# Check that node2 is being managed in node1
check:
@ -68,7 +68,7 @@ procSuite "Peer Manager":
node2.mountRelay()
# Dial node2 from node1
let connOpt = await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds)
let connOpt = await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
# Check connection failed gracefully
check:
@ -100,9 +100,9 @@ procSuite "Peer Manager":
node.mountSwap()
node.mountStore(persistMessages = true)
node.wakuFilter.setPeer(filterPeer)
node.wakuSwap.setPeer(swapPeer)
node.wakuStore.setPeer(storePeer)
node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo())
node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())
node.wakuStore.setPeer(storePeer.toRemotePeerInfo())
# Check peers were successfully added to peer manager
check:
@ -136,21 +136,21 @@ procSuite "Peer Manager":
node2.mountRelay()
# Test default connectedness for new peers
node1.peerManager.addPeer(peerInfo2, WakuRelayCodec)
node1.peerManager.addPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec)
check:
# No information about node2's connectedness
node1.peerManager.connectedness(peerInfo2.peerId) == NotConnected
# Purposefully don't start node2
# Attempt dialing node2 from node1
discard await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds)
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
check:
# Cannot connect to node2
node1.peerManager.connectedness(peerInfo2.peerId) == CannotConnect
# Successful connection
await node2.start()
discard await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds)
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
check:
# Currently connected to node2
node1.peerManager.connectedness(peerInfo2.peerId) == Connected
@ -181,7 +181,7 @@ procSuite "Peer Manager":
node1.mountRelay()
node2.mountRelay()
discard await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds)
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
check:
# Currently connected to node2
node1.peerManager.peers().len == 1
@ -233,7 +233,7 @@ asyncTest "Peer manager support multiple protocol IDs when reconnecting to peers
node2.mountRelay()
node2.wakuRelay.codec = betaCodec
discard await node1.peerManager.dialPeer(peerInfo2, node2.wakuRelay.codec, 2.seconds)
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), node2.wakuRelay.codec, 2.seconds)
check:
# Currently connected to node2
node1.peerManager.peers().len == 1

View File

@ -19,6 +19,7 @@ import
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/node/[wakunode2, waku_payload],
../../waku/v2/utils/peers,
../test_helpers
procSuite "WakuBridge":
@ -107,7 +108,7 @@ procSuite "WakuBridge":
v2Node.mountRelay(@[DefaultBridgeTopic], triggerSelf = false)
discard waitFor v1Node.rlpxConnect(newNode(bridge.nodev1.toENode()))
waitFor v2Node.connectToNodes(@[bridge.nodev2.peerInfo])
waitFor v2Node.connectToNodes(@[bridge.nodev2.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()

View File

@ -41,7 +41,7 @@ procSuite "Waku Filter":
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
discard
@ -86,7 +86,7 @@ procSuite "Waku Filter":
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
discard

View File

@ -11,6 +11,7 @@ import
libp2p/crypto/crypto,
libp2p/multistream,
../../waku/v2/node/wakunode2,
../../waku/v2/utils/peers,
../test_helpers, ./utils
procSuite "Waku Keepalive":
@ -24,11 +25,11 @@ procSuite "Waku Keepalive":
var completionFut = newFuture[bool]()
proc pingHandler(peer: PeerInfo) {.async, gcsafe, raises: [Defect].} =
proc pingHandler(peerId: PeerID) {.async, gcsafe, raises: [Defect].} =
debug "Ping received"
check:
peer.peerId == node1.switch.peerInfo.peerId
peerId == node1.switch.peerInfo.peerId
completionFut.complete(true)
@ -40,7 +41,7 @@ procSuite "Waku Keepalive":
node2.mountRelay()
node2.switch.mount(Ping.new(handler = pingHandler))
await node1.connectToNodes(@[node2.peerInfo])
await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
node1.startKeepalive()

View File

@ -47,7 +47,7 @@ procSuite "Waku Light Push":
rpc = PushRequest(pubSubTopic: defaultTopic, message: wm)
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
# TODO Can possibly get rid of this if it isn't dynamic

View File

@ -35,7 +35,7 @@ procSuite "Waku Store":
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
proto.setPeer(listenSwitch.peerInfo)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
@ -75,7 +75,7 @@ procSuite "Waku Store":
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic1), HistoryContentFilter(contentTopic: topic3)])
proto.setPeer(listenSwitch.peerInfo)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
@ -121,7 +121,7 @@ procSuite "Waku Store":
# this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1), HistoryContentFilter(contentTopic: contentTopic3)], pubsubTopic: pubsubTopic1)
proto.setPeer(listenSwitch.peerInfo)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
@ -165,7 +165,7 @@ procSuite "Waku Store":
# this query targets: pubsubtopic1
rpc = HistoryQuery(pubsubTopic: pubsubTopic1)
proto.setPeer(listenSwitch.peerInfo)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
@ -205,7 +205,7 @@ procSuite "Waku Store":
# this query targets: pubsubtopic
rpc = HistoryQuery(pubsubTopic: pubsubtopic)
proto.setPeer(listenSwitch.peerInfo)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
@ -249,7 +249,7 @@ procSuite "Waku Store":
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
proto.setPeer(listenSwitch.peerInfo)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
@ -277,7 +277,7 @@ procSuite "Waku Store":
var listenSwitch2 = newStandardSwitch(some(key2))
discard await listenSwitch2.start()
proto2.setPeer(listenSwitch2.peerInfo)
proto2.setPeer(listenSwitch2.peerInfo.toRemotePeerInfo())
listenSwitch2.mount(proto2)
@ -319,7 +319,7 @@ procSuite "Waku Store":
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) )
proto.setPeer(listenSwitch.peerInfo)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
@ -366,7 +366,7 @@ procSuite "Waku Store":
let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
proto.setPeer(listenSwitch.peerInfo)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
@ -413,7 +413,7 @@ procSuite "Waku Store":
let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
proto.setPeer(listenSwitch.peerInfo)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
@ -551,7 +551,7 @@ procSuite "Waku Store":
let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
proto.setPeer(listenSwitch.peerInfo)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
@ -630,7 +630,7 @@ procSuite "Waku Store":
discard await dialSwitch2.start()
let proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng())
proto2.setPeer(listenSwitch.peerInfo)
proto2.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
let successResult = await proto2.resume()
check:
@ -648,7 +648,7 @@ procSuite "Waku Store":
completionFut.complete(true)
let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo)
let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo.toRemotePeerInfo())
check:
(await completionFut.withTimeout(5.seconds)) == true
@ -659,7 +659,7 @@ procSuite "Waku Store":
let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo)
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())
check:
messagesResult.isOk
@ -669,7 +669,7 @@ procSuite "Waku Store":
var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1)
let rpc = HistoryQuery(startTime: float(2), endTime: float(5), pagingInfo: pinfo)
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo)
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())
check:
messagesResult.isOk
@ -684,7 +684,9 @@ procSuite "Waku Store":
discard await dialSwitch3.start()
let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng())
let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo, listenSwitch.peerInfo, listenSwitch.peerInfo]))
let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo(),
listenSwitch.peerInfo.toRemotePeerInfo(),
listenSwitch.peerInfo.toRemotePeerInfo()]))
check:
proto3.messages.len == 10
successResult.isOk

View File

@ -14,6 +14,7 @@ import
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/node/wakunode2,
../../waku/v2/utils/peers,
../test_helpers, ./utils
procSuite "Waku SWAP Accounting":
@ -71,9 +72,9 @@ procSuite "Waku SWAP Accounting":
await sleepAsync(2000.millis)
node1.wakuStore.setPeer(node2.peerInfo)
node1.wakuSwap.setPeer(node2.peerInfo)
node2.wakuSwap.setPeer(node1.peerInfo)
node1.wakuStore.setPeer(node2.peerInfo.toRemotePeerInfo())
node1.wakuSwap.setPeer(node2.peerInfo.toRemotePeerInfo())
node2.wakuSwap.setPeer(node1.peerInfo.toRemotePeerInfo())
proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} =
debug "storeHandler hit"
@ -121,9 +122,9 @@ procSuite "Waku SWAP Accounting":
await sleepAsync(2000.millis)
node1.wakuStore.setPeer(node2.peerInfo)
node1.wakuSwap.setPeer(node2.peerInfo)
node2.wakuSwap.setPeer(node1.peerInfo)
node1.wakuStore.setPeer(node2.peerInfo.toRemotePeerInfo())
node1.wakuSwap.setPeer(node2.peerInfo.toRemotePeerInfo())
node2.wakuSwap.setPeer(node1.peerInfo.toRemotePeerInfo())
proc handler1(response: HistoryResponse) {.gcsafe, closure.} =
futures[0].complete(true)

View File

@ -118,7 +118,7 @@ procSuite "WakuNode":
node1.subscribe(pubSubTopic, relayHandler)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
node1.wakuFilter.setPeer(node2.peerInfo)
node1.wakuFilter.setPeer(node2.peerInfo.toRemotePeerInfo())
await node1.subscribe(filterRequest, contentHandler)
await sleepAsync(2000.millis)
@ -161,7 +161,7 @@ procSuite "WakuNode":
await node2.start()
node2.mountRelay()
node2.mountFilter()
node2.wakuFilter.setPeer(node1.peerInfo)
node2.wakuFilter.setPeer(node1.peerInfo.toRemotePeerInfo())
var defaultComplete = newFuture[bool]()
var otherComplete = newFuture[bool]()
@ -232,7 +232,7 @@ procSuite "WakuNode":
await node2.start()
node2.mountRelay(relayMessages=false) # Do not start WakuRelay or subscribe to any topics
node2.mountFilter()
node2.wakuFilter.setPeer(node1.peerInfo)
node2.wakuFilter.setPeer(node1.peerInfo.toRemotePeerInfo())
check:
node1.wakuRelay.isNil == false # Node1 is a full node
@ -283,7 +283,7 @@ procSuite "WakuNode":
await sleepAsync(2000.millis)
node1.wakuStore.setPeer(node2.peerInfo)
node1.wakuStore.setPeer(node2.peerInfo.toRemotePeerInfo())
proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} =
check:
@ -315,7 +315,7 @@ procSuite "WakuNode":
await node2.start()
node2.mountFilter()
node1.wakuFilter.setPeer(node2.peerInfo)
node1.wakuFilter.setPeer(node2.peerInfo.toRemotePeerInfo())
proc handler(msg: WakuMessage) {.gcsafe, closure.} =
check:
@ -360,8 +360,8 @@ procSuite "WakuNode":
await node3.start()
node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo])
await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
@ -418,7 +418,7 @@ procSuite "WakuNode":
# Now verify that protocol matcher returns `true` and relay works
await node1.connectToNodes(@[node2.peerInfo])
await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
@ -449,37 +449,37 @@ procSuite "WakuNode":
# First test the `happy path` expected case
let
addrStr = "/ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
peerInfo = parsePeerInfo(addrStr)
remotePeerInfo = parseRemotePeerInfo(addrStr)
check:
$(peerInfo.peerId) == "16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
$(peerInfo.addrs[0][0].tryGet()) == "/ip4/127.0.0.1"
$(peerInfo.addrs[0][1].tryGet()) == "/tcp/60002"
$(remotePeerInfo.peerId) == "16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
$(remotePeerInfo.addrs[0][0].tryGet()) == "/ip4/127.0.0.1"
$(remotePeerInfo.addrs[0][1].tryGet()) == "/tcp/60002"
# Now test some common corner cases
expect LPError:
# gibberish
discard parsePeerInfo("/p2p/$UCH GIBBER!SH")
discard parseRemotePeerInfo("/p2p/$UCH GIBBER!SH")
expect LPError:
# leading whitespace
discard parsePeerInfo(" /ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc")
discard parseRemotePeerInfo(" /ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc")
expect LPError:
# trailing whitespace
discard parsePeerInfo("/ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc ")
discard parseRemotePeerInfo("/ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc ")
expect LPError:
# invalid IP address
discard parsePeerInfo("/ip4/127.0.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc")
discard parseRemotePeerInfo("/ip4/127.0.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc")
expect ValueError:
# no PeerID
discard parsePeerInfo("/ip4/127.0.0.1/tcp/60002")
discard parseRemotePeerInfo("/ip4/127.0.0.1/tcp/60002")
expect ValueError:
# unsupported transport
discard parsePeerInfo("/ip4/127.0.0.1/udp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc")
discard parseRemotePeerInfo("/ip4/127.0.0.1/udp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc")
asyncTest "filtering relayed messages using topic validators":
## test scenario:
@ -519,8 +519,8 @@ procSuite "WakuNode":
await node3.start()
node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo])
await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
var completionFutValidatorAcc = newFuture[bool]()
@ -610,8 +610,8 @@ procSuite "WakuNode":
await node3.start()
node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo])
await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
@ -703,9 +703,9 @@ procSuite "WakuNode":
await node3.start()
node3.mountRelay(@[pubSubTopic])
discard await node1.peerManager.dialPeer(node2.peerInfo, WakuLightPushCodec)
discard await node1.peerManager.dialPeer(node2.peerInfo.toRemotePeerInfo(), WakuLightPushCodec)
await sleepAsync(5.seconds)
await node3.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
var completionFutLightPush = newFuture[bool]()
var completionFutRelay = newFuture[bool]()
@ -764,7 +764,7 @@ procSuite "WakuNode":
await sleepAsync(2000.millis)
node1.wakuStore.setPeer(node2.peerInfo)
node1.wakuStore.setPeer(node2.peerInfo.toRemotePeerInfo())
await node1.resume()
@ -805,7 +805,7 @@ procSuite "WakuNode":
await sleepAsync(2000.millis)
node1.wakuStore.setPeer(node2.peerInfo)
node1.wakuStore.setPeer(node2.peerInfo.toRemotePeerInfo())
# populate db with msg1 to be a duplicate

@ -1 +1 @@
Subproject commit c3ddd26a2eece2a7bb558cb67d2f92846f9b8402
Subproject commit 536cc6b7933e5f86590bb27083c0ffeab31255f9

2
vendor/nim-chronos vendored

@ -1 +1 @@
Subproject commit 14ebf269e9322de5a7f1fa455033b0bcf18144c6
Subproject commit 59b91bf0ca2d1334bab1e0ed4e02de18fa62f360

@ -1 +1 @@
Subproject commit ab4ba1cbfdccdb8c0398894ffc25169bc61faeed
Subproject commit 7176de4ddb3a628a5c3abfcd430010bf0229deb1

2
vendor/nim-dnsdisc vendored

@ -1 +1 @@
Subproject commit dcb9290d004476fb0a5389baa88121b072abf135
Subproject commit 2d448241fdb8f8e806089ef4dc978d0eff211117

2
vendor/nim-eth vendored

@ -1 +1 @@
Subproject commit 20ad6504b7e7869fbf2ce19a7e7a476a80f94cc4
Subproject commit 1babe382265329a440b6b69a8b0f8b2c2b9a306f

@ -1 +1 @@
Subproject commit 5eb7fd0c90d3f03b6778688a5893fdd2715e9fe2
Subproject commit 3a0ab42573e566ce52625760f6bbf7e0bbb6ebc4

@ -1 +1 @@
Subproject commit 9a56559ae3ce7e81b75ae150c1030adf991bf39c
Subproject commit 689da19e9e9cfff4ced85e2b25c6b2b5598ed079

2
vendor/nim-json-rpc vendored

@ -1 +1 @@
Subproject commit 318949a4013504f4ec8931f14bc1b5d6e00dee78
Subproject commit b2417fc0719a6d5069437a3097645d1fae6954d6

@ -1 +1 @@
Subproject commit 652099a95960be7790e2f4b4c925d0dd703cc9aa
Subproject commit 4f3775ddf48d9abee30c51a53862cea84a09fa78

@ -1 +1 @@
Subproject commit b70db54e073988f334904cddbfc840c9698ba74e
Subproject commit 284b3aac05a9d96c27044c389a5d27a84d8e8f4b

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit f274bfe19db5a39ffbca177b52db7e8a7eb44537
Subproject commit 75bfc1b5f7679afc104bd1ceee1a0dc3bab7a316

@ -1 +1 @@
Subproject commit 5213d397f9d85c69279961256e19a859cd32df30
Subproject commit fcd0eadadde0ee000a63df8ab21dc4e9f015a790

2
vendor/nim-stew vendored

@ -1 +1 @@
Subproject commit 3c91b8694e15137a81ec7db37c6c58194ec94a6a
Subproject commit 478cc6efdefaabadf0666a3351fb959b78009bcc

@ -1 +1 @@
Subproject commit 91d4eaa4ccb4bfddf179fe2ee4247ae000e2587f
Subproject commit f1d70dbb8c7b5e2474b0bd5ac52f42c8c4318fd2

2
vendor/nim-web3 vendored

@ -1 +1 @@
Subproject commit 97e05aea6573d2630e318e7777a54d95db6ec40e
Subproject commit 9a23474afb7e2a14798ec0bf0e69e96cd5895e55

2
vendor/nim-websock vendored

@ -1 +1 @@
Subproject commit d60df8176d187683cbfd0945b37fc8c885594ac9
Subproject commit 1abf5f2f91ae3e8483c2acbb108dea521879c6e2

@ -1 +1 @@
Subproject commit 77747657f65a5fe26c281445b6ee9a1d6e72b1eb
Subproject commit 9d6b4b6e98515af8248127f51889c24308006096

2
vendor/nimcrypto vendored

@ -1 +1 @@
Subproject commit b602bd469b66f6968f1d1b474f843a75d1ca6627
Subproject commit a5742a9a214ac33f91615f3862c7b099aec43b00

View File

@ -15,9 +15,10 @@ import
eth/p2p/discoveryv5/enr,
libp2p/crypto/crypto,
libp2p/crypto/secp,
libp2p/peerinfo,
libp2p/multiaddress,
discovery/dnsdisc/client
libp2p/peerid,
discovery/dnsdisc/client,
../../utils/peers
export client
@ -45,7 +46,7 @@ func getTransportProtocol(typedR: TypedRecord): Option[IpTransportProtocol] =
return none(IpTransportProtocol)
func toPeerInfo*(enr: enr.Record): Result[PeerInfo, cstring] =
func toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] =
let typedR = ? enr.toTypedRecord
if not typedR.secp256k1.isSome:
@ -94,7 +95,7 @@ func toPeerInfo*(enr: enr.Record): Result[PeerInfo, cstring] =
if addrs.len == 0:
return err("enr: no addresses in record")
return ok(PeerInfo.init(peerId, addrs))
return ok(RemotePeerInfo.init(peerId, addrs))
func createEnr*(privateKey: crypto.PrivateKey,
enrIp: Option[ValidIpAddress],
@ -117,7 +118,7 @@ proc emptyResolver*(domain: string): Future[string] {.async, gcsafe.} =
debug "Empty resolver called", domain=domain
return ""
proc findPeers*(wdd: var WakuDnsDiscovery): Result[seq[PeerInfo], cstring] =
proc findPeers*(wdd: var WakuDnsDiscovery): Result[seq[RemotePeerInfo], cstring] =
## Find peers to connect to using DNS based discovery
info "Finding peers using Waku DNS discovery"
@ -138,11 +139,11 @@ proc findPeers*(wdd: var WakuDnsDiscovery): Result[seq[PeerInfo], cstring] =
else:
trace "No ENR retrieved from client tree"
var discoveredNodes: seq[PeerInfo]
var discoveredNodes: seq[RemotePeerInfo]
for enr in discoveredEnr:
# Convert discovered ENR to PeerInfo and add to discovered nodes
let res = enr.toPeerInfo()
# Convert discovered ENR to RemotePeerInfo and add to discovered nodes
let res = enr.toRemotePeerInfo()
if res.isOk():
discoveredNodes.add(res.get())

View File

@ -24,6 +24,10 @@ proc constructMultiaddrStr*(peerInfo: PeerInfo): string =
# Constructs a multiaddress with both location (wire) address and p2p identity
constructMultiaddrStr(peerInfo.addrs[0], peerInfo.peerId)
proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string =
# Constructs a multiaddress with both location (wire) address and p2p identity
constructMultiaddrStr(remotePeerInfo.addrs[0], remotePeerInfo.peerId)
proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Admin API version 1 definitions

View File

@ -5,9 +5,10 @@ import
chronos, chronicles, metrics,
libp2p/multistream,
./waku_peer_store,
../storage/peer/peer_storage
../storage/peer/peer_storage,
../../utils/peers
export waku_peer_store, peer_storage
export waku_peer_store, peer_storage, peers
declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
@ -28,10 +29,10 @@ let
# Helper functions #
####################
proc toPeerInfo*(storedInfo: StoredInfo): PeerInfo =
PeerInfo.init(peerId = storedInfo.peerId,
addrs = toSeq(storedInfo.addrs),
protocols = toSeq(storedInfo.protos))
proc toRemotePeerInfo*(storedInfo: StoredInfo): RemotePeerInfo =
RemotePeerInfo.init(peerId = storedInfo.peerId,
addrs = toSeq(storedInfo.addrs),
protocols = toSeq(storedInfo.protos))
proc insertOrReplace(ps: PeerStorage,
peerId: PeerID,
@ -125,8 +126,8 @@ proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): Peer
debug "creating new PeerManager"
proc peerHook(peerInfo: PeerInfo, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerInfo.peerId, event)
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
@ -155,7 +156,7 @@ proc peers*(pm: PeerManager, protocolMatcher: Matcher): seq[StoredInfo] =
# Return the known info for all peers matching the provided protocolMatcher
pm.peers.filter(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it)))
proc connectedness*(pm: PeerManager, peerId: PeerId): Connectedness =
proc connectedness*(pm: PeerManager, peerId: PeerID): Connectedness =
# Return the connection state of the given, managed peer
# @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
# @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
@ -168,10 +169,10 @@ proc connectedness*(pm: PeerManager, peerId: PeerId): Connectedness =
else:
pm.peerStore.connectionBook.get(peerId)
proc hasPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string): bool =
proc hasPeer*(pm: PeerManager, peerId: PeerID, proto: string): bool =
# Returns `true` if peer is included in manager for the specified protocol
pm.peerStore.get(peerInfo.peerId).protos.contains(proto)
pm.peerStore.get(peerId).protos.contains(proto)
proc hasPeers*(pm: PeerManager, proto: string): bool =
# Returns `true` if manager has any peers for the specified protocol
@ -181,33 +182,33 @@ proc hasPeers*(pm: PeerManager, protocolMatcher: Matcher): bool =
# Returns `true` if manager has any peers matching the protocolMatcher
pm.peers.any(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it)))
proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) =
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
# Adds peer to manager for the specified protocol
if peerInfo.peerId == pm.switch.peerInfo.peerId:
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
# Do not attempt to manage our unmanageable self
return
debug "Adding peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
# ...known addresses
for multiaddr in peerInfo.addrs:
pm.peerStore.addressBook.add(peerInfo.peerId, multiaddr)
for multiaddr in remotePeerInfo.addrs:
pm.peerStore.addressBook.add(remotePeerInfo.peerId, multiaddr)
# ...public key
var publicKey: PublicKey
discard peerInfo.peerId.extractPublicKey(publicKey)
discard remotePeerInfo.peerId.extractPublicKey(publicKey)
pm.peerStore.keyBook.set(peerInfo.peerId, publicKey)
pm.peerStore.keyBook.set(remotePeerInfo.peerId, publicKey)
# ...associated protocols
pm.peerStore.protoBook.add(peerInfo.peerId, proto)
pm.peerStore.protoBook.add(remotePeerInfo.peerId, proto)
# Add peer to storage. Entry will subsequently be updated with connectedness information
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerInfo.peerId, pm.peerStore.get(peerInfo.peerId), NotConnected)
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] =
proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
# Selects the best peer for a given protocol
let peers = pm.peers.filterIt(it.protos.contains(proto))
@ -215,9 +216,9 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] =
# @TODO proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
let peerStored = peers[0]
return some(peerStored.toPeerInfo())
return some(peerStored.toRemotePeerInfo())
else:
return none(PeerInfo)
return none(RemotePeerInfo)
proc reconnectPeers*(pm: PeerManager,
proto: string,
@ -249,9 +250,9 @@ proc reconnectPeers*(pm: PeerManager,
# Add to protos for peer, if it has not been added yet
if not pm.peerStore.get(storedInfo.peerId).protos.contains(proto):
let peerInfo = storedInfo.toPeerInfo()
trace "Adding newly dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
pm.addPeer(peerInfo, proto)
let remotePeerInfo = storedInfo.toRemotePeerInfo()
trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
pm.addPeer(remotePeerInfo, proto)
trace "Reconnecting to peer", peerId=storedInfo.peerId
discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto)
@ -260,17 +261,31 @@ proc reconnectPeers*(pm: PeerManager,
# Dialer interface #
####################
proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
# Dial a given peer and add it to the list of known peers
# @TODO check peer validity and score before continuing. Limit number of peers to be managed.
# First add dialed peer info to peer store, if it does not exist yet...
if not pm.hasPeer(peerInfo, proto):
trace "Adding newly dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
pm.addPeer(peerInfo, proto)
if not pm.hasPeer(remotePeerInfo.peerId, proto):
trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
pm.addPeer(remotePeerInfo, proto)
if peerInfo.peerId == pm.switch.peerInfo.peerId:
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
# Do not attempt to dial self
return none(Connection)
return await pm.dialPeer(peerInfo.peerId, peerInfo.addrs, proto, dialTimeout)
return await pm.dialPeer(remotePeerInfo.peerId, remotePeerInfo.addrs, proto, dialTimeout)
proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
# Dial an existing peer by looking up it's existing addrs in the switch's peerStore
# @TODO check peer validity and score before continuing. Limit number of peers to be managed.
if peerId == pm.switch.peerInfo.peerId:
# Do not attempt to dial self
return none(Connection)
let addrs = toSeq(pm.switch.peerStore.addressBook.get(peerId))
if addrs.len == 0:
return none(Connection)
return await pm.dialPeer(peerId, addrs, proto, dialTimeout)

View File

@ -28,6 +28,13 @@ type
keyBook*: KeyBook
connectionBook*: ConnectionBook
disconnectBook*: DisconnectBook
StoredInfo* = object
# Collates stored info about a peer
peerId*: PeerID
addrs*: HashSet[MultiAddress]
protos*: HashSet[string]
publicKey*: PublicKey
proc new*(T: type WakuPeerStore): WakuPeerStore =
var p: WakuPeerStore

View File

@ -347,7 +347,7 @@ proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.as
# TODO wakuSwap now part of wakuStore object
await node.wakuStore.queryWithAccounting(query, handler)
proc resume*(node: WakuNode, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])) {.async, gcsafe.} =
proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
## messages are stored in the the wakuStore's messages field and in the message db
@ -627,7 +627,7 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
# First get a list of connected peer infos
let peers = node.peerManager.peers()
.filterIt(node.peerManager.connectedness(it.peerId) == Connected)
.mapIt(it.toPeerInfo())
.mapIt(it.toRemotePeerInfo())
# Attempt to retrieve and ping the active outgoing connection for each peer
for peer in peers:
@ -654,7 +654,7 @@ proc startKeepalive*(node: WakuNode) =
proc dialPeer*(n: WakuNode, address: string) {.async.} =
info "dialPeer", address = address
# XXX: This turns ipfs into p2p, not quite sure why
let remotePeer = parsePeerInfo(address)
let remotePeer = parseRemotePeerInfo(address)
info "Dialing peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
# NOTE This is dialing on WakuRelay protocol specifically
@ -664,21 +664,21 @@ proc dialPeer*(n: WakuNode, address: string) {.async.} =
proc setStorePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
info "Set store peer", address = address
let remotePeer = parsePeerInfo(address)
let remotePeer = parseRemotePeerInfo(address)
n.wakuStore.setPeer(remotePeer)
proc setFilterPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
info "Set filter peer", address = address
let remotePeer = parsePeerInfo(address)
let remotePeer = parseRemotePeerInfo(address)
n.wakuFilter.setPeer(remotePeer)
proc setLightPushPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
info "Set lightpush peer", address = address
let remotePeer = parsePeerInfo(address)
let remotePeer = parseRemotePeerInfo(address)
n.wakuLightPush.setPeer(remotePeer)
@ -696,10 +696,10 @@ proc connectToNodes*(n: WakuNode, nodes: seq[string]) {.async.} =
# later.
await sleepAsync(5.seconds)
proc connectToNodes*(n: WakuNode, nodes: seq[PeerInfo]) {.async.} =
for peerInfo in nodes:
info "connectToNodes", peer = peerInfo
discard await n.peerManager.dialPeer(peerInfo, WakuRelayCodec)
proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo]) {.async.} =
for remotePeerInfo in nodes:
info "connectToNodes", peer = remotePeerInfo
discard await n.peerManager.dialPeer(remotePeerInfo, WakuRelayCodec)
# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are

View File

@ -59,7 +59,7 @@ proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest
debug "unsubscribing", peerId=peerId, unsubscribeTopics=unsubscribeTopics
for subscriber in subscribers.mitems:
if subscriber.peer.peerId != peerId: continue
if subscriber.peer != peerId: continue
# make sure we delete the content filter
# if no more topics are left
@ -179,9 +179,9 @@ method init*(wf: WakuFilter) =
wf.pushHandler(value.requestId, value.push)
if value.request != FilterRequest():
if value.request.subscribe:
wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request))
wf.subscribers.add(Subscriber(peer: conn.peerId, requestId: value.requestId, filter: value.request))
else:
wf.subscribers.unsubscribeFilters(value.request, conn.peerInfo.peerId)
wf.subscribers.unsubscribeFilters(value.request, conn.peerId)
waku_filter_subscribers.set(wf.subscribers.len.int64)
@ -197,7 +197,7 @@ proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgCont
return wf
proc setPeer*(wf: WakuFilter, peer: PeerInfo) =
proc setPeer*(wf: WakuFilter, peer: RemotePeerInfo) =
wf.peerManager.addPeer(peer, WakuFilterCodec)
waku_filter_peers.inc()

View File

@ -1,7 +1,6 @@
import
std/[tables],
bearssl,
libp2p/peerinfo,
libp2p/protocols/protocol,
../../node/peer_manager/peer_manager,
../waku_message
@ -35,7 +34,7 @@ type
push*: MessagePush
Subscriber* = object
peer*: PeerInfo
peer*: PeerID
requestId*: string
filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN?

View File

@ -117,7 +117,7 @@ proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgC
return wl
proc setPeer*(wlp: WakuLightPush, peer: PeerInfo) =
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) =
wlp.peerManager.addPeer(peer, WakuLightPushCodec)
waku_lightpush_peers.inc()

View File

@ -1,7 +1,6 @@
import
std/[tables],
bearssl,
libp2p/peerinfo,
libp2p/protocols/protocol,
../../node/peer_manager/peer_manager,
../waku_message,

View File

@ -429,9 +429,9 @@ proc init*(ws: WakuStore) =
if not ws.wakuSwap.isNil:
info "handle store swap test", text=ws.wakuSwap.text
# NOTE Perform accounting operation
let peerInfo = conn.peerInfo
let peerId = conn.peerId
let messages = response.messages
ws.wakuSwap.credit(peerInfo, messages.len)
ws.wakuSwap.credit(peerId, messages.len)
else:
info "handle store swap is nil"
@ -467,7 +467,7 @@ proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgConte
return output
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
proc setPeer*(ws: WakuStore, peer: PeerInfo) =
proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) =
ws.peerManager.addPeer(peer, WakuStoreCodec)
waku_store_peers.inc()
@ -527,7 +527,7 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response)
proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: PeerInfo): Future[QueryResult] {.async, gcsafe.} =
proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: RemotePeerInfo): Future[QueryResult] {.async, gcsafe.} =
## sends the query to the given peer
## returns the number of retrieved messages if no error occurs, otherwise returns the error string
# TODO dialPeer add it to the list of known peers, while it does not cause any issue but might be unnecessary
@ -556,7 +556,7 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe
handler(response.value.response)
return ok(response.value.response.messages.len.uint64)
proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: PeerInfo): Future[MessagesResult] {.async, gcsafe.} =
proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[MessagesResult] {.async, gcsafe.} =
## a thin wrapper for queryFrom
## sends the query to the given peer
## when the query has a valid pagingInfo, it retrieves the historical messages in pages
@ -588,7 +588,7 @@ proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: PeerInfo): Fu
return ok(messageList)
proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[PeerInfo]): Future[MessagesResult] {.async, gcsafe.} =
proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[MessagesResult] {.async, gcsafe.} =
## loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully
## returns the retrieved messages, or error if all the requests fail
for peer in candidateList.items:
@ -612,7 +612,7 @@ proc isDuplicate(message: WakuMessage, list: seq[WakuMessage]): bool =
if message in list: return true
return false
proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]), pageSize: uint64 = DefaultPageSize): Future[QueryResult] {.async, gcsafe.} =
proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), pageSize: uint64 = DefaultPageSize): Future[QueryResult] {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
@ -689,8 +689,8 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
return err("no suitable remote peers")
debug "a peer is selected from peer manager"
let peerInfo = peerOpt.get()
let successResult = await ws.queryFromWithPaging(rpc, peerInfo)
let remotePeerInfo = peerOpt.get()
let successResult = await ws.queryFromWithPaging(rpc, remotePeerInfo)
if successResult.isErr:
debug "failed to resume the history"
return err("failed to resume the history")
@ -735,9 +735,9 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
# NOTE Perform accounting operation
# Assumes wakuSwap protocol is mounted
let peerInfo = peerOpt.get()
let remotePeerInfo = peerOpt.get()
let messages = response.value.response.messages
ws.wakuSwap.debit(peerInfo, messages.len)
ws.wakuSwap.debit(remotePeerInfo.peerId, messages.len)
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])

View File

@ -6,7 +6,6 @@
import
# external imports
bearssl,
libp2p/peerinfo,
libp2p/protocols/protocol,
stew/results,
# internal imports

View File

@ -114,8 +114,8 @@ proc init*(T: type Cheque, buffer: seq[byte]): ProtoResult[T] =
# TODO Assume we calculated cheque
proc sendCheque*(ws: WakuSwap, peerInfo : PeerInfo) {.async.} =
let connOpt = await ws.peerManager.dialPeer(peerInfo, WakuSwapCodec)
proc sendCheque*(ws: WakuSwap, peerId: PeerID) {.async.} =
let connOpt = await ws.peerManager.dialPeer(peerId, WakuSwapCodec)
if connOpt.isNone():
# @TODO more sophisticated error handling here
@ -145,16 +145,13 @@ proc sendCheque*(ws: WakuSwap, peerInfo : PeerInfo) {.async.} =
await connOpt.get().writeLP(Cheque(amount: 1, signature: sigBytes, issuerAddress: aliceWalletAddress).encode().buffer)
# Set new balance
let peerId = peerInfo.peerId
ws.accounting[peerId] -= 1
info "New accounting state", accounting = ws.accounting[peerId]
# TODO Authenticate cheque, check beneficiary etc
proc handleCheque*(ws: WakuSwap, cheque: Cheque, peerInfo : PeerInfo) {.raises: [Defect, KeyError].} =
proc handleCheque*(ws: WakuSwap, cheque: Cheque, peerId: PeerID) {.raises: [Defect, KeyError].} =
info "handle incoming cheque"
let peerId = peerInfo.peerId
# Get the original signer using web3. For now, a static value (0x6C3d502f1a97d4470b881015b83D9Dd1062172e1) will be used.
# Check if web3.eth.personal.ecRecover(messageHash, signature); or an equivalent function has been implemented in nim-web3
let signer = "0x6C3d502f1a97d4470b881015b83D9Dd1062172e1"
@ -227,34 +224,34 @@ proc init*(wakuSwap: WakuSwap) =
return
info "received cheque", value=res.value
wakuSwap.handleCheque(res.value, conn.peerInfo)
wakuSwap.handleCheque(res.value, conn.peerId)
proc credit(peerInfo: PeerInfo, n: int)
proc credit(peerId: PeerID, n: int)
{.gcsafe, closure, raises: [Defect, KeyError, Exception].} =
let peerId = peerInfo.peerId
info "Crediting peer: ", peer=peerId, amount=n
if wakuSwap.accounting.hasKey(peerId):
wakuSwap.accounting[peerId] -= n
else:
wakuSwap.accounting[peerId] = -n
info "Accounting state", accounting = wakuSwap.accounting[peerId]
wakuSwap.applyPolicy(peerInfo)
wakuSwap.applyPolicy(peerId)
# TODO Debit and credit here for Karma asset
proc debit(peerInfo: PeerInfo, n: int)
proc debit(peerId: PeerID, n: int)
{.gcsafe, closure, raises: [Defect, KeyError, Exception].} =
let peerId = peerInfo.peerId
info "Debiting peer: ", peer=peerId, amount=n
if wakuSwap.accounting.hasKey(peerId):
wakuSwap.accounting[peerId] += n
else:
wakuSwap.accounting[peerId] = n
info "Accounting state", accounting = wakuSwap.accounting[peerId]
wakuSwap.applyPolicy(peerInfo)
wakuSwap.applyPolicy(peerId)
proc applyPolicy(peerInfo: PeerInfo)
proc applyPolicy(peerId: PeerID)
{.gcsafe, closure, raises: [Defect, KeyError, Exception].} =
let peerId = peerInfo.peerId
# TODO Separate out depending on if policy is soft (accounting only) mock (send cheque but don't cash/verify) hard (actually send funds over testnet)
#Check if the Disconnect Threshold has been hit. Account Balance nears the disconnectThreshold after a Credit has been done
@ -268,7 +265,7 @@ proc init*(wakuSwap: WakuSwap) =
warn "Payment threshhold has been reached: ", threshold=wakuSwap.config.paymentThreshold, balance=wakuSwap.accounting[peerId]
#In soft phase we don't send cheques yet
if wakuSwap.config.mode == Mock:
discard wakuSwap.sendCheque(peerInfo)
discard wakuSwap.sendCheque(peerId)
else:
info "Payment threshhold not hit"
@ -296,7 +293,7 @@ proc init*(T: type WakuSwap, peerManager: PeerManager, rng: ref BrHmacDrbgContex
return ws
proc setPeer*(ws: WakuSwap, peer: PeerInfo) =
proc setPeer*(ws: WakuSwap, peer: RemotePeerInfo) =
ws.peerManager.addPeer(peer, WakuSwapCodec)
waku_swap_peers_count.inc()

View File

@ -4,7 +4,6 @@ import
std/tables,
bearssl,
libp2p/protocols/protocol,
libp2p/peerinfo,
../../node/peer_manager/peer_manager
type
@ -36,9 +35,9 @@ type
amount*: uint32
signature*: seq[byte]
CreditHandler* = proc (peerInfo: PeerInfo, amount: int) {.gcsafe, closure.}
DebitHandler* = proc (peerInfo: PeerInfo, amount: int) {.gcsafe, closure.}
ApplyPolicyHandler* = proc(peerInfo: PeerInfo) {.gcsafe, closure.}
CreditHandler* = proc (peerId: PeerID, amount: int) {.gcsafe, closure.}
DebitHandler* = proc (peerId: PeerID, amount: int) {.gcsafe, closure.}
ApplyPolicyHandler* = proc(peerId: PeerID) {.gcsafe, closure.}
WakuSwap* = ref object of LPProtocol
peerManager*: PeerManager

View File

@ -3,8 +3,46 @@
# Collection of utilities related to Waku peers
import
std/strutils,
libp2p/multiaddress,
libp2p/peerinfo
stew/results,
libp2p/[errors,
multiaddress,
peerid,
peerinfo]
type
RemotePeerInfo* = ref object of RootObj
peerId*: PeerID
addrs*: seq[MultiAddress]
protocols*: seq[string]
func `$`*(remotePeerInfo: RemotePeerInfo): string =
$remotePeerInfo.peerId
proc init*(
p: typedesc[RemotePeerInfo],
peerId: PeerID,
addrs: seq[MultiAddress] = @[],
protocols: seq[string] = @[]): RemotePeerInfo =
let remotePeerInfo = RemotePeerInfo(
peerId: peerId,
addrs: addrs,
protocols: protocols)
return remotePeerInfo
proc init*(p: typedesc[RemotePeerInfo],
peerId: string,
addrs: seq[MultiAddress] = @[],
protocols: seq[string] = @[]): RemotePeerInfo
{.raises: [Defect, ResultError[cstring], LPError].} =
let remotePeerInfo = RemotePeerInfo(
peerId: PeerID.init(peerId).tryGet(),
addrs: addrs,
protocols: protocols)
return remotePeerInfo
proc initAddress(T: type MultiAddress, str: string): T {.raises: [Defect, ValueError, LPError].}=
# @TODO: Rather than raising exceptions, this should return a Result
@ -17,7 +55,7 @@ proc initAddress(T: type MultiAddress, str: string): T {.raises: [Defect, ValueE
## Parses a fully qualified peer multiaddr, in the
## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo
proc parsePeerInfo*(address: string): PeerInfo {.raises: [Defect, ValueError, LPError].}=
proc parseRemotePeerInfo*(address: string): RemotePeerInfo {.raises: [Defect, ValueError, LPError].}=
let multiAddr = MultiAddress.initAddress(address)
var
@ -40,4 +78,11 @@ proc parsePeerInfo*(address: string): PeerInfo {.raises: [Defect, ValueError, LP
if (not wireAddr.isWire()):
raise newException(ValueError, "Invalid node multi-address")
return PeerInfo.init(peerIdStr, [wireAddr])
return RemotePeerInfo.init(peerIdStr, @[wireAddr])
## Converts the local peerInfo to dialable RemotePeerInfo
## Useful for testing or internal connections
proc toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo =
RemotePeerInfo.init(peerInfo.peerId,
peerInfo.addrs,
peerInfo.protocols)