mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 16:33:08 +00:00
Adding parsePeerInfo and deprecating 'parseRemotePeerInfo' (#1658)
This commit is contained in:
parent
d5979e94da
commit
b2dcb07751
@ -482,7 +482,12 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
|||||||
var storenode: Option[RemotePeerInfo]
|
var storenode: Option[RemotePeerInfo]
|
||||||
|
|
||||||
if conf.storenode != "":
|
if conf.storenode != "":
|
||||||
storenode = some(parseRemotePeerInfo(conf.storenode))
|
let peerInfo = parsePeerInfo(conf.storenode)
|
||||||
|
if peerInfo.isOk():
|
||||||
|
storenode = some(peerInfo.value)
|
||||||
|
else:
|
||||||
|
error "Incorrect conf.storenode", error = peerInfo.error
|
||||||
|
|
||||||
elif discoveredNodes.len > 0:
|
elif discoveredNodes.len > 0:
|
||||||
echo "Store enabled, but no store nodes configured. Choosing one at random from discovered peers"
|
echo "Store enabled, but no store nodes configured. Choosing one at random from discovered peers"
|
||||||
storenode = some(discoveredNodes[rand(0..len(discoveredNodes) - 1)])
|
storenode = some(discoveredNodes[rand(0..len(discoveredNodes) - 1)])
|
||||||
@ -509,23 +514,31 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
|||||||
|
|
||||||
# NOTE Must be mounted after relay
|
# NOTE Must be mounted after relay
|
||||||
if conf.lightpushnode != "":
|
if conf.lightpushnode != "":
|
||||||
await mountLightPush(node)
|
let peerInfo = parsePeerInfo(conf.lightpushnode)
|
||||||
|
if peerInfo.isOk():
|
||||||
node.mountLightPushClient()
|
await mountLightPush(node)
|
||||||
node.peerManager.addServicePeer(parseRemotePeerInfo(conf.lightpushnode), WakuLightpushCodec)
|
node.mountLightPushClient()
|
||||||
|
node.peerManager.addServicePeer(peerInfo.value, WakuLightpushCodec)
|
||||||
|
else:
|
||||||
|
error "LightPush not mounted. Couldn't parse conf.lightpushnode",
|
||||||
|
error = peerInfo.error
|
||||||
|
|
||||||
if conf.filternode != "":
|
if conf.filternode != "":
|
||||||
await node.mountFilter()
|
let peerInfo = parsePeerInfo(conf.filternode)
|
||||||
await node.mountFilterClient()
|
if peerInfo.isOk():
|
||||||
|
await node.mountFilter()
|
||||||
|
await node.mountFilterClient()
|
||||||
|
node.peerManager.addServicePeer(peerInfo.value, WakuFilterCodec)
|
||||||
|
|
||||||
node.peerManager.addServicePeer(parseRemotePeerInfo(conf.filternode), WakuFilterCodec)
|
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe.} =
|
||||||
|
trace "Hit filter handler", contentTopic=msg.contentTopic
|
||||||
|
chat.printReceivedMessage(msg)
|
||||||
|
|
||||||
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe.} =
|
await node.subscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=chat.contentTopic, filterHandler)
|
||||||
trace "Hit filter handler", contentTopic=msg.contentTopic
|
|
||||||
|
|
||||||
chat.printReceivedMessage(msg)
|
else:
|
||||||
|
error "Filter not mounted. Couldn't parse conf.filternode",
|
||||||
await node.subscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=chat.contentTopic, filterHandler)
|
error = peerInfo.error
|
||||||
|
|
||||||
# Subscribe to a topic, if relay is mounted
|
# Subscribe to a topic, if relay is mounted
|
||||||
if conf.relay:
|
if conf.relay:
|
||||||
|
|||||||
@ -286,12 +286,18 @@ when isMainModule:
|
|||||||
waitFor connectToNodes(bridge.nodev2, conf.staticnodes)
|
waitFor connectToNodes(bridge.nodev2, conf.staticnodes)
|
||||||
|
|
||||||
if conf.storenode != "":
|
if conf.storenode != "":
|
||||||
let storePeer = parseRemotePeerInfo(conf.storenode)
|
let storePeer = parsePeerInfo(conf.storenode)
|
||||||
bridge.nodev2.peerManager.addServicePeer(storePeer, WakuStoreCodec)
|
if storePeer.isOk():
|
||||||
|
bridge.nodev2.peerManager.addServicePeer(storePeer.value, WakuStoreCodec)
|
||||||
|
else:
|
||||||
|
error "Error parsing conf.storenode", error = storePeer.error
|
||||||
|
|
||||||
if conf.filternode != "":
|
if conf.filternode != "":
|
||||||
let filterPeer = parseRemotePeerInfo(conf.filternode)
|
let filterPeer = parsePeerInfo(conf.filternode)
|
||||||
bridge.nodev2.peerManager.addServicePeer(filterPeer, WakuFilterCodec)
|
if filterPeer.isOk():
|
||||||
|
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterCodec)
|
||||||
|
else:
|
||||||
|
error "Error parsing conf.filternode", error = filterPeer.error
|
||||||
|
|
||||||
if conf.rpc:
|
if conf.rpc:
|
||||||
let ta = initTAddress(conf.rpcAddress,
|
let ta = initTAddress(conf.rpcAddress,
|
||||||
|
|||||||
@ -431,13 +431,19 @@ when isMainModule:
|
|||||||
|
|
||||||
if conf.storenode != "":
|
if conf.storenode != "":
|
||||||
mountStoreClient(bridge.nodev2)
|
mountStoreClient(bridge.nodev2)
|
||||||
let storeNode = parseRemotePeerInfo(conf.storenode)
|
let storeNode = parsePeerInfo(conf.storenode)
|
||||||
bridge.nodev2.peerManager.addServicePeer(storeNode, WakuStoreCodec)
|
if storeNode.isOk():
|
||||||
|
bridge.nodev2.peerManager.addServicePeer(storeNode.value, WakuStoreCodec)
|
||||||
|
else:
|
||||||
|
error "Couldn't parse conf.storenode", error = storeNode.error
|
||||||
|
|
||||||
if conf.filternode != "":
|
if conf.filternode != "":
|
||||||
waitFor mountFilterClient(bridge.nodev2)
|
waitFor mountFilterClient(bridge.nodev2)
|
||||||
let filterNode = parseRemotePeerInfo(conf.filternode)
|
let filterNode = parsePeerInfo(conf.filternode)
|
||||||
bridge.nodev2.peerManager.addServicePeer(filterNode, WakuFilterCodec)
|
if filterNode.isOk():
|
||||||
|
bridge.nodev2.peerManager.addServicePeer(filterNode.value, WakuFilterCodec)
|
||||||
|
else:
|
||||||
|
error "Couldn't parse conf.filternode", error = filterNode.error
|
||||||
|
|
||||||
if conf.rpc:
|
if conf.rpc:
|
||||||
let ta = initTAddress(conf.rpcAddress,
|
let ta = initTAddress(conf.rpcAddress,
|
||||||
|
|||||||
@ -491,11 +491,11 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
|
|||||||
|
|
||||||
mountStoreClient(node)
|
mountStoreClient(node)
|
||||||
if conf.storenode != "":
|
if conf.storenode != "":
|
||||||
try:
|
let storeNode = parsePeerInfo(conf.storenode)
|
||||||
let storenode = parseRemotePeerInfo(conf.storenode)
|
if storeNode.isOk():
|
||||||
node.peerManager.addServicePeer(storenode, WakuStoreCodec)
|
node.peerManager.addServicePeer(storeNode.value, WakuStoreCodec)
|
||||||
except CatchableError:
|
else:
|
||||||
return err("failed to set node waku store peer: " & getCurrentExceptionMsg())
|
return err("failed to set node waku store peer: " & storeNode.error)
|
||||||
|
|
||||||
# NOTE Must be mounted after relay
|
# NOTE Must be mounted after relay
|
||||||
if conf.lightpush:
|
if conf.lightpush:
|
||||||
@ -505,12 +505,12 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
|
|||||||
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())
|
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
if conf.lightpushnode != "":
|
if conf.lightpushnode != "":
|
||||||
try:
|
let lightPushNode = parsePeerInfo(conf.lightpushnode)
|
||||||
|
if lightPushNode.isOk():
|
||||||
mountLightPushClient(node)
|
mountLightPushClient(node)
|
||||||
let lightpushnode = parseRemotePeerInfo(conf.lightpushnode)
|
node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec)
|
||||||
node.peerManager.addServicePeer(lightpushnode, WakuLightPushCodec)
|
else:
|
||||||
except CatchableError:
|
return err("failed to set node waku lightpush peer: " & lightPushNode.error)
|
||||||
return err("failed to set node waku lightpush peer: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
# Filter setup. NOTE Must be mounted after relay
|
# Filter setup. NOTE Must be mounted after relay
|
||||||
if conf.filter:
|
if conf.filter:
|
||||||
@ -520,12 +520,12 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
|
|||||||
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
|
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
if conf.filternode != "":
|
if conf.filternode != "":
|
||||||
try:
|
let filterNode = parsePeerInfo(conf.filternode)
|
||||||
|
if filterNode.isOk():
|
||||||
await mountFilterClient(node)
|
await mountFilterClient(node)
|
||||||
let filternode = parseRemotePeerInfo(conf.filternode)
|
node.peerManager.addServicePeer(filterNode.value, WakuFilterCodec)
|
||||||
node.peerManager.addServicePeer(filternode, WakuFilterCodec)
|
else:
|
||||||
except CatchableError:
|
return err("failed to set node waku filter peer: " & filterNode.error)
|
||||||
return err("failed to set node waku filter peer: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
# waku peer exchange setup
|
# waku peer exchange setup
|
||||||
if (conf.peerExchangeNode != "") or (conf.peerExchange):
|
if (conf.peerExchangeNode != "") or (conf.peerExchange):
|
||||||
@ -535,11 +535,11 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
|
|||||||
return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())
|
return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
if conf.peerExchangeNode != "":
|
if conf.peerExchangeNode != "":
|
||||||
try:
|
let peerExchangeNode = parsePeerInfo(conf.peerExchangeNode)
|
||||||
let peerExchangeNode = parseRemotePeerInfo(conf.peerExchangeNode)
|
if peerExchangeNode.isOk():
|
||||||
node.peerManager.addServicePeer(peerExchangeNode, WakuPeerExchangeCodec)
|
node.peerManager.addServicePeer(peerExchangeNode.value, WakuPeerExchangeCodec)
|
||||||
except CatchableError:
|
else:
|
||||||
return err("failed to set node waku peer-exchange peer: " & getCurrentExceptionMsg())
|
return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error)
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
@ -601,14 +601,12 @@ when defined(waku_exp_store_resume):
|
|||||||
if address != "":
|
if address != "":
|
||||||
return err("empty peer multiaddres")
|
return err("empty peer multiaddres")
|
||||||
|
|
||||||
var remotePeer: RemotePeerInfo
|
let remotePeer = parsePeerInfo(address)
|
||||||
try:
|
if remotePeer.isErr():
|
||||||
remotePeer = parseRemotePeerInfo(address)
|
return err("invalid peer multiaddress: " & remotePeer.error)
|
||||||
except CatchableError:
|
|
||||||
return err("invalid peer multiaddress: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await node.resume(some(@[remotePeer]))
|
await node.resume(some(@[remotePeer.value]))
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
return err("failed to resume messages history: " & getCurrentExceptionMsg())
|
return err("failed to resume messages history: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
|||||||
@ -4,7 +4,6 @@ import
|
|||||||
std/[options, sequtils],
|
std/[options, sequtils],
|
||||||
stew/shims/net as stewNet,
|
stew/shims/net as stewNet,
|
||||||
testutils/unittests,
|
testutils/unittests,
|
||||||
chronicles,
|
|
||||||
chronos,
|
chronos,
|
||||||
json_rpc/rpcserver,
|
json_rpc/rpcserver,
|
||||||
json_rpc/rpcclient,
|
json_rpc/rpcclient,
|
||||||
@ -76,7 +75,10 @@ procSuite "Peer Manager":
|
|||||||
await allFutures(nodes.mapIt(it.start()))
|
await allFutures(nodes.mapIt(it.start()))
|
||||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||||
|
|
||||||
let nonExistentPeer = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e")
|
let nonExistentPeerRes = parsePeerInfo("/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e")
|
||||||
|
require nonExistentPeerRes.isOk()
|
||||||
|
|
||||||
|
let nonExistentPeer = nonExistentPeerRes.value
|
||||||
|
|
||||||
# Dial non-existent peer from node1
|
# Dial non-existent peer from node1
|
||||||
let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuFilterCodec)
|
let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuFilterCodec)
|
||||||
@ -136,9 +138,14 @@ procSuite "Peer Manager":
|
|||||||
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == NotConnected
|
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == NotConnected
|
||||||
|
|
||||||
# Failed connection
|
# Failed connection
|
||||||
let nonExistentPeer = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e")
|
let nonExistentPeerRes = parsePeerInfo("/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e")
|
||||||
|
require:
|
||||||
|
nonExistentPeerRes.isOk()
|
||||||
|
|
||||||
|
let nonExistentPeer = nonExistentPeerRes.value
|
||||||
require:
|
require:
|
||||||
(await nodes[0].peerManager.connectRelay(nonExistentPeer)) == false
|
(await nodes[0].peerManager.connectRelay(nonExistentPeer)) == false
|
||||||
|
|
||||||
check:
|
check:
|
||||||
# Cannot connect to node2
|
# Cannot connect to node2
|
||||||
nodes[0].peerManager.peerStore.connectedness(nonExistentPeer.peerId) == CannotConnect
|
nodes[0].peerManager.peerStore.connectedness(nonExistentPeer.peerId) == CannotConnect
|
||||||
@ -165,7 +172,10 @@ procSuite "Peer Manager":
|
|||||||
await allFutures(nodes.mapIt(it.start()))
|
await allFutures(nodes.mapIt(it.start()))
|
||||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||||
|
|
||||||
let nonExistentPeer = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e")
|
let nonExistentPeerRes = parsePeerInfo("/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e")
|
||||||
|
require nonExistentPeerRes.isOk()
|
||||||
|
|
||||||
|
let nonExistentPeer = nonExistentPeerRes.value
|
||||||
|
|
||||||
nodes[0].peerManager.addPeer(nonExistentPeer)
|
nodes[0].peerManager.addPeer(nonExistentPeer)
|
||||||
|
|
||||||
@ -413,37 +423,41 @@ procSuite "Peer Manager":
|
|||||||
|
|
||||||
let
|
let
|
||||||
node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
|
node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||||
peer1 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & "1")
|
peers = toSeq(1..5)
|
||||||
peer2 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30301/p2p/" & basePeerId & "2")
|
.mapIt(
|
||||||
peer3 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30302/p2p/" & basePeerId & "3")
|
parsePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it)
|
||||||
peer4 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30303/p2p/" & basePeerId & "4")
|
)
|
||||||
peer5 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30303/p2p/" & basePeerId & "5")
|
.filterIt(it.isOk())
|
||||||
|
.mapIt(it.value)
|
||||||
|
|
||||||
|
require:
|
||||||
|
peers.len == 5
|
||||||
|
|
||||||
# service peers
|
# service peers
|
||||||
node.peerManager.addServicePeer(peer1, WakuStoreCodec)
|
node.peerManager.addServicePeer(peers[0], WakuStoreCodec)
|
||||||
node.peerManager.addServicePeer(peer2, WakuFilterCodec)
|
node.peerManager.addServicePeer(peers[1], WakuFilterCodec)
|
||||||
node.peerManager.addServicePeer(peer3, WakuLightPushCodec)
|
node.peerManager.addServicePeer(peers[2], WakuLightPushCodec)
|
||||||
node.peerManager.addServicePeer(peer4, WakuPeerExchangeCodec)
|
node.peerManager.addServicePeer(peers[3], WakuPeerExchangeCodec)
|
||||||
|
|
||||||
# relay peers (should not be added)
|
# relay peers (should not be added)
|
||||||
node.peerManager.addServicePeer(peer5, WakuRelayCodec)
|
node.peerManager.addServicePeer(peers[4], WakuRelayCodec)
|
||||||
|
|
||||||
# all peers are stored in the peerstore
|
# all peers are stored in the peerstore
|
||||||
check:
|
check:
|
||||||
node.peerManager.peerStore.peers().anyIt(it.peerId == peer1.peerId)
|
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[0].peerId)
|
||||||
node.peerManager.peerStore.peers().anyIt(it.peerId == peer2.peerId)
|
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[1].peerId)
|
||||||
node.peerManager.peerStore.peers().anyIt(it.peerId == peer3.peerId)
|
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[2].peerId)
|
||||||
node.peerManager.peerStore.peers().anyIt(it.peerId == peer4.peerId)
|
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[3].peerId)
|
||||||
|
|
||||||
# but the relay peer is not
|
# but the relay peer is not
|
||||||
node.peerManager.peerStore.peers().anyIt(it.peerId == peer5.peerId) == false
|
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[4].peerId) == false
|
||||||
|
|
||||||
# all service peers are added to its service slot
|
# all service peers are added to its service slot
|
||||||
check:
|
check:
|
||||||
node.peerManager.serviceSlots[WakuStoreCodec].peerId == peer1.peerId
|
node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId
|
||||||
node.peerManager.serviceSlots[WakuFilterCodec].peerId == peer2.peerId
|
node.peerManager.serviceSlots[WakuFilterCodec].peerId == peers[1].peerId
|
||||||
node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peer3.peerId
|
node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[2].peerId
|
||||||
node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peer4.peerId
|
node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[3].peerId
|
||||||
|
|
||||||
# but the relay peer is not
|
# but the relay peer is not
|
||||||
node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false
|
node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false
|
||||||
@ -458,7 +472,12 @@ procSuite "Peer Manager":
|
|||||||
storage = nil)
|
storage = nil)
|
||||||
|
|
||||||
# Create 3 peer infos
|
# Create 3 peer infos
|
||||||
let peers = toSeq(1..3).mapIt(parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it))
|
let peers = toSeq(1..3)
|
||||||
|
.mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it))
|
||||||
|
.filterIt(it.isOk())
|
||||||
|
.mapIt(it.value)
|
||||||
|
require:
|
||||||
|
peers.len == 3
|
||||||
|
|
||||||
# Add a peer[0] to the peerstore
|
# Add a peer[0] to the peerstore
|
||||||
pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs
|
pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs
|
||||||
@ -520,8 +539,12 @@ procSuite "Peer Manager":
|
|||||||
storage = nil)
|
storage = nil)
|
||||||
|
|
||||||
# Create 15 peers and add them to the peerstore
|
# Create 15 peers and add them to the peerstore
|
||||||
let peers = toSeq(1..15).mapIt(parseRemotePeerInfo("/ip4/0.0.0.0/tcp/0/p2p/" & $PeerId.random().get()))
|
let peers = toSeq(1..15)
|
||||||
for p in peers: pm.addPeer(p)
|
.mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/0/p2p/" & $PeerId.random().get()))
|
||||||
|
.filterIt(it.isOk())
|
||||||
|
.mapIt(it.value)
|
||||||
|
for p in peers:
|
||||||
|
pm.addPeer(p)
|
||||||
|
|
||||||
# Check that we have 15 peers in the peerstore
|
# Check that we have 15 peers in the peerstore
|
||||||
check:
|
check:
|
||||||
|
|||||||
@ -16,8 +16,11 @@ suite "Utils - Peers":
|
|||||||
let address = "/ip4/127.0.0.1/tcp/65002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
let address = "/ip4/127.0.0.1/tcp/65002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let remotePeerInfo = parseRemotePeerInfo(address)
|
let remotePeerInfoRes = parsePeerInfo(address)
|
||||||
|
require remotePeerInfoRes.isOk()
|
||||||
|
|
||||||
|
let remotePeerInfo = remotePeerInfoRes.value
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
$(remotePeerInfo.peerId) == "16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
$(remotePeerInfo.peerId) == "16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
||||||
@ -29,7 +32,10 @@ suite "Utils - Peers":
|
|||||||
let address = "/dns/localhost/tcp/65012/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
let address = "/dns/localhost/tcp/65012/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let dnsPeer = parseRemotePeerInfo(address)
|
let dnsPeerRes = parsePeerInfo(address)
|
||||||
|
require dnsPeerRes.isOk()
|
||||||
|
|
||||||
|
let dnsPeer = dnsPeerRes.value
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
@ -42,7 +48,10 @@ suite "Utils - Peers":
|
|||||||
let address = "/dnsaddr/localhost/tcp/65022/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
let address = "/dnsaddr/localhost/tcp/65022/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let dnsAddrPeer = parseRemotePeerInfo(address)
|
let dnsAddrPeerRes = parsePeerInfo(address)
|
||||||
|
require dnsAddrPeerRes.isOk()
|
||||||
|
|
||||||
|
let dnsAddrPeer = dnsAddrPeerRes.value
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
@ -55,7 +64,10 @@ suite "Utils - Peers":
|
|||||||
let address = "/dns4/localhost/tcp/65032/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
let address = "/dns4/localhost/tcp/65032/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let dns4Peer = parseRemotePeerInfo(address)
|
let dns4PeerRes = parsePeerInfo(address)
|
||||||
|
require dns4PeerRes.isOk()
|
||||||
|
|
||||||
|
let dns4Peer = dns4PeerRes.value
|
||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
@ -68,7 +80,10 @@ suite "Utils - Peers":
|
|||||||
let address = "/dns6/localhost/tcp/65042/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
let address = "/dns6/localhost/tcp/65042/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let dns6Peer = parseRemotePeerInfo(address)
|
let dns6PeerRes = parsePeerInfo(address)
|
||||||
|
require dns6PeerRes.isOk()
|
||||||
|
|
||||||
|
let dns6Peer = dns6PeerRes.value
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
@ -81,46 +96,46 @@ suite "Utils - Peers":
|
|||||||
let address = "/p2p/$UCH GIBBER!SH"
|
let address = "/p2p/$UCH GIBBER!SH"
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
expect LPError:
|
check:
|
||||||
discard parseRemotePeerInfo(address)
|
parsePeerInfo(address).isErr()
|
||||||
|
|
||||||
test "Multiaddr parsing should fail with leading whitespace":
|
test "Multiaddr parsing should fail with leading whitespace":
|
||||||
## Given
|
## Given
|
||||||
let address = " /ip4/127.0.0.1/tcp/65062/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
let address = " /ip4/127.0.0.1/tcp/65062/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
expect LPError:
|
check:
|
||||||
discard parseRemotePeerInfo(address)
|
parsePeerInfo(address).isErr()
|
||||||
|
|
||||||
test "Multiaddr parsing should fail with trailing whitespace":
|
test "Multiaddr parsing should fail with trailing whitespace":
|
||||||
## Given
|
## Given
|
||||||
let address = "/ip4/127.0.0.1/tcp/65072/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc "
|
let address = "/ip4/127.0.0.1/tcp/65072/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc "
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
expect LPError:
|
check:
|
||||||
discard parseRemotePeerInfo(address)
|
parsePeerInfo(address).isErr()
|
||||||
|
|
||||||
test "Multiaddress parsing should fail with invalid IP address":
|
test "Multiaddress parsing should fail with invalid IP address":
|
||||||
## Given
|
## Given
|
||||||
let address = "/ip4/127.0.0.0.1/tcp/65082/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
let address = "/ip4/127.0.0.0.1/tcp/65082/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
expect LPError:
|
check:
|
||||||
discard parseRemotePeerInfo(address)
|
parsePeerInfo(address).isErr()
|
||||||
|
|
||||||
test "Multiaddress parsing should fail with no peer ID":
|
test "Multiaddress parsing should fail with no peer ID":
|
||||||
## Given
|
## Given
|
||||||
let address = "/ip4/127.0.0.1/tcp/65092"
|
let address = "/ip4/127.0.0.1/tcp/65092"
|
||||||
|
|
||||||
# Then
|
# Then
|
||||||
expect LPError:
|
check:
|
||||||
discard parseRemotePeerInfo(address)
|
parsePeerInfo(address).isErr()
|
||||||
|
|
||||||
test "Multiaddress parsing should fail with unsupported transport":
|
test "Multiaddress parsing should fail with unsupported transport":
|
||||||
## Given
|
## Given
|
||||||
let address = "/ip4/127.0.0.1/udp/65102/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
let address = "/ip4/127.0.0.1/udp/65102/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
expect ValueError:
|
check:
|
||||||
discard parseRemotePeerInfo(address)
|
parsePeerInfo(address).isErr()
|
||||||
|
|
||||||
|
|||||||
@ -391,7 +391,7 @@ procSuite "Waku v2 Rest API - Store":
|
|||||||
$response.contentType == $MIMETYPE_TEXT
|
$response.contentType == $MIMETYPE_TEXT
|
||||||
response.data.messages.len == 0
|
response.data.messages.len == 0
|
||||||
response.data.error_message.get ==
|
response.data.error_message.get ==
|
||||||
"Failed parsing remote peer info [multiaddress: Invalid MultiAddress, must start with `/`]"
|
"Failed parsing remote peer info [MultiAddress.init [multiaddress: Invalid MultiAddress, must start with `/`]]"
|
||||||
|
|
||||||
await restServer.stop()
|
await restServer.stop()
|
||||||
await restServer.closeWait()
|
await restServer.closeWait()
|
||||||
|
|||||||
@ -122,8 +122,14 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} =
|
|||||||
protocols = conf.protocols,
|
protocols = conf.protocols,
|
||||||
logLevel = conf.logLevel
|
logLevel = conf.logLevel
|
||||||
|
|
||||||
|
let peerRes = parsePeerInfo(conf.address)
|
||||||
|
if peerRes.isErr():
|
||||||
|
error "Couldn't parse 'conf.address'", error = peerRes.error
|
||||||
|
return 1
|
||||||
|
|
||||||
|
let peer = peerRes.value
|
||||||
|
|
||||||
let
|
let
|
||||||
peer: RemotePeerInfo = parseRemotePeerInfo(conf.address)
|
|
||||||
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
bindIp = ValidIpAddress.init("0.0.0.0")
|
bindIp = ValidIpAddress.init("0.0.0.0")
|
||||||
wsBindPort = Port(conf.nodePort + WebSocketPortOffset)
|
wsBindPort = Port(conf.nodePort + WebSocketPortOffset)
|
||||||
|
|||||||
@ -45,7 +45,11 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
|||||||
debug "post_waku_v2_admin_v1_peers"
|
debug "post_waku_v2_admin_v1_peers"
|
||||||
|
|
||||||
for i, peer in peers:
|
for i, peer in peers:
|
||||||
let connOk = await node.peerManager.connectRelay(parseRemotePeerInfo(peer), source="rpc")
|
let peerInfo = parsePeerInfo(peer)
|
||||||
|
if peerInfo.isErr():
|
||||||
|
raise newException(ValueError, "Couldn't parse remote peer info: " & peerInfo.error)
|
||||||
|
|
||||||
|
let connOk = await node.peerManager.connectRelay(peerInfo.value, source="rpc")
|
||||||
if not connOk:
|
if not connOk:
|
||||||
raise newException(ValueError, "Failed to connect to peer at index: " & $i & " " & $peer)
|
raise newException(ValueError, "Failed to connect to peer at index: " & $i & " " & $peer)
|
||||||
|
|
||||||
|
|||||||
@ -411,9 +411,11 @@ proc connectToNodes*(pm: PeerManager,
|
|||||||
|
|
||||||
var futConns: seq[Future[bool]]
|
var futConns: seq[Future[bool]]
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
let node = when node is string: parseRemotePeerInfo(node)
|
let node = parsePeerInfo(node)
|
||||||
else: node
|
if node.isOk():
|
||||||
futConns.add(pm.connectRelay(node))
|
futConns.add(pm.connectRelay(node.value))
|
||||||
|
else:
|
||||||
|
error "Couldn't parse node info", error = node.error
|
||||||
|
|
||||||
await allFutures(futConns)
|
await allFutures(futConns)
|
||||||
let successfulConns = futConns.mapIt(it.read()).countIt(true)
|
let successfulConns = futConns.mapIt(it.read()).countIt(true)
|
||||||
|
|||||||
@ -439,8 +439,12 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C
|
|||||||
error "cannot register filter subscription to topic", error="waku filter client is nil"
|
error "cannot register filter subscription to topic", error="waku filter client is nil"
|
||||||
return
|
return
|
||||||
|
|
||||||
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
let remotePeerRes = parsePeerInfo(peer)
|
||||||
else: peer
|
if remotePeerRes.isErr():
|
||||||
|
error "Couldn't parse the peer info properly", error = remotePeerRes.error
|
||||||
|
return
|
||||||
|
|
||||||
|
let remotePeer = remotePeerRes.value
|
||||||
|
|
||||||
info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId
|
info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId
|
||||||
|
|
||||||
@ -466,8 +470,12 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics:
|
|||||||
error "cannot unregister filter subscription to content", error="waku filter client is nil"
|
error "cannot unregister filter subscription to content", error="waku filter client is nil"
|
||||||
return
|
return
|
||||||
|
|
||||||
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
let remotePeerRes = parsePeerInfo(peer)
|
||||||
else: peer
|
if remotePeerRes.isErr():
|
||||||
|
error "couldn't parse remotePeerInfo", error = remotePeerRes.error
|
||||||
|
return
|
||||||
|
|
||||||
|
let remotePeer = remotePeerRes.value
|
||||||
|
|
||||||
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId
|
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId
|
||||||
|
|
||||||
@ -783,16 +791,19 @@ proc fetchPeerExchangePeers*(node: Wakunode, amount: uint64) {.async, raises: [D
|
|||||||
warn "Failed to retrieve peer info via peer exchange protocol", error = pxPeersRes.error
|
warn "Failed to retrieve peer info via peer exchange protocol", error = pxPeersRes.error
|
||||||
|
|
||||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||||
proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} =
|
proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) =
|
||||||
if node.wakuPeerExchange.isNil():
|
if node.wakuPeerExchange.isNil():
|
||||||
error "could not set peer, waku peer-exchange is nil"
|
error "could not set peer, waku peer-exchange is nil"
|
||||||
return
|
return
|
||||||
|
|
||||||
info "Set peer-exchange peer", peer=peer
|
info "Set peer-exchange peer", peer=peer
|
||||||
|
|
||||||
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
let remotePeerRes = parsePeerInfo(peer)
|
||||||
else: peer
|
if remotePeerRes.isErr():
|
||||||
node.peerManager.addPeer(remotePeer, WakuPeerExchangeCodec)
|
error "could not parse peer info", error = remotePeerRes.error
|
||||||
|
return
|
||||||
|
|
||||||
|
node.peerManager.addPeer(remotePeerRes.value, WakuPeerExchangeCodec)
|
||||||
waku_px_peers.inc()
|
waku_px_peers.inc()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -110,37 +110,42 @@ func getTransportProtocol(typedR: TypedRecord): Option[IpTransportProtocol] =
|
|||||||
|
|
||||||
return none(IpTransportProtocol)
|
return none(IpTransportProtocol)
|
||||||
|
|
||||||
## Parses a fully qualified peer multiaddr, in the
|
|
||||||
## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo
|
|
||||||
proc parseRemotePeerInfo*(address: string): RemotePeerInfo {.raises: [Defect, ValueError, LPError].}=
|
|
||||||
let multiAddr = MultiAddress.init(address).tryGet()
|
|
||||||
|
|
||||||
var
|
proc parsePeerInfo*(peer: RemotePeerInfo|string):
|
||||||
nwPart, tcpPart, p2pPart, wsPart, wssPart: MultiAddress
|
Result[RemotePeerInfo, string] =
|
||||||
|
## Parses a fully qualified peer multiaddr, in the
|
||||||
|
## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo
|
||||||
|
|
||||||
|
if peer is RemotePeerInfo:
|
||||||
|
return ok(cast[RemotePeerInfo](peer))
|
||||||
|
|
||||||
|
let multiAddr = ? MultiAddress.init(cast[string](peer))
|
||||||
|
.mapErr(proc(err: string):
|
||||||
|
string = "MultiAddress.init [" & err & "]")
|
||||||
|
|
||||||
|
var p2pPart: MultiAddress
|
||||||
|
var wireAddr = MultiAddress()
|
||||||
for addrPart in multiAddr.items():
|
for addrPart in multiAddr.items():
|
||||||
case addrPart[].protoName()[]
|
case addrPart[].protoName()[]
|
||||||
# All protocols listed here: https://github.com/multiformats/multiaddr/blob/b746a7d014e825221cc3aea6e57a92d78419990f/protocols.csv
|
# All protocols listed here: https://github.com/multiformats/multiaddr/blob/b746a7d014e825221cc3aea6e57a92d78419990f/protocols.csv
|
||||||
of "ip4", "ip6", "dns", "dnsaddr", "dns4", "dns6":
|
|
||||||
nwPart = addrPart.tryGet()
|
|
||||||
of "tcp":
|
|
||||||
tcpPart = addrPart.tryGet()
|
|
||||||
of "p2p":
|
of "p2p":
|
||||||
p2pPart = addrPart.tryGet()
|
p2pPart = ? addrPart.mapErr(proc(err: string):string = "Error getting p2pPart [" & err & "]")
|
||||||
of "ws":
|
of "ip4", "ip6", "dns", "dnsaddr", "dns4", "dns6", "tcp", "ws", "wss":
|
||||||
wsPart = addrPart.tryGet()
|
let val = ? addrPart.mapErr(proc(err: string):string = "Error getting addrPart [" & err & "]")
|
||||||
of "wss":
|
? wireAddr.append(val).mapErr(proc(err: string):string = "Error appending addrPart [" & err & "]")
|
||||||
wssPart = addrPart.tryGet()
|
|
||||||
|
|
||||||
# nim-libp2p dialing requires remote peers to be initialised with a peerId and a wire address
|
let p2pPartStr = p2pPart.toString()[]
|
||||||
let
|
if not p2pPartStr.contains("/"):
|
||||||
peerIdStr = p2pPart.toString()[].split("/")[^1]
|
return err("Error in parsePeerInfo: p2p part should contain /")
|
||||||
|
|
||||||
wireAddr = nwPart & tcpPart & wsPart & wssPart
|
let peerId = ? PeerID.init(p2pPartStr.split("/")[^1])
|
||||||
if (not wireAddr.validWireAddr()):
|
.mapErr(proc (e:cstring):string = cast[string](e))
|
||||||
raise newException(ValueError, "Invalid node multi-address")
|
|
||||||
|
if not wireAddr.validWireAddr():
|
||||||
|
return err("Error in parsePeerInfo: Invalid node multiaddress")
|
||||||
|
|
||||||
|
return ok(RemotePeerInfo.init(peerId, @[wireAddr]))
|
||||||
|
|
||||||
return RemotePeerInfo.init(peerIdStr, @[wireAddr])
|
|
||||||
|
|
||||||
# Checks whether the peerAddr parameter represents a valid p2p multiaddress.
|
# Checks whether the peerAddr parameter represents a valid p2p multiaddress.
|
||||||
# The param must be in the format `(ip4|ip6)/tcp/p2p/$peerId` but URL-encoded
|
# The param must be in the format `(ip4|ip6)/tcp/p2p/$peerId` but URL-encoded
|
||||||
@ -150,12 +155,14 @@ proc parseUrlPeerAddr*(peerAddr: Option[string]):
|
|||||||
if not peerAddr.isSome() or peerAddr.get() == "":
|
if not peerAddr.isSome() or peerAddr.get() == "":
|
||||||
return ok(none(RemotePeerInfo))
|
return ok(none(RemotePeerInfo))
|
||||||
|
|
||||||
try:
|
let parsedAddr = decodeUrl(peerAddr.get())
|
||||||
let parsedAddr = decodeUrl(peerAddr.get())
|
let parsedPeerInfo = parsePeerInfo(parsedAddr)
|
||||||
return ok(some(parseRemotePeerInfo(parsedAddr)))
|
|
||||||
except Exception:
|
if parsedPeerInfo.isOk():
|
||||||
|
return ok(some(parsedPeerInfo.value))
|
||||||
|
else:
|
||||||
return err("Failed parsing remote peer info [" &
|
return err("Failed parsing remote peer info [" &
|
||||||
getCurrentExceptionMsg() & "]")
|
parsedPeerInfo.error & "]")
|
||||||
|
|
||||||
## Converts an ENR to dialable RemotePeerInfo
|
## Converts an ENR to dialable RemotePeerInfo
|
||||||
proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] =
|
proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] =
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user