mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 23:43:07 +00:00
refactor(networking): peermanager refactor and cleanups (#1539)
* refactor(networking): use addServicePeer where needed + add metrics
This commit is contained in:
parent
1a968e21ff
commit
68acf82c95
@ -24,6 +24,7 @@ import libp2p/[switch, # manage transports, a single entry poi
|
|||||||
nameresolving/dnsresolver]# define DNS resolution
|
nameresolving/dnsresolver]# define DNS resolution
|
||||||
import
|
import
|
||||||
../../waku/v2/protocol/waku_message,
|
../../waku/v2/protocol/waku_message,
|
||||||
|
../../waku/v2/protocol/waku_lightpush,
|
||||||
../../waku/v2/protocol/waku_lightpush/rpc,
|
../../waku/v2/protocol/waku_lightpush/rpc,
|
||||||
../../waku/v2/protocol/waku_filter,
|
../../waku/v2/protocol/waku_filter,
|
||||||
../../waku/v2/protocol/waku_store,
|
../../waku/v2/protocol/waku_store,
|
||||||
@ -489,7 +490,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
|||||||
echo "Connecting to storenode: " & $(storenode.get())
|
echo "Connecting to storenode: " & $(storenode.get())
|
||||||
|
|
||||||
node.mountStoreClient()
|
node.mountStoreClient()
|
||||||
node.setStorePeer(storenode.get())
|
node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec)
|
||||||
|
|
||||||
proc storeHandler(response: HistoryResponse) {.gcsafe.} =
|
proc storeHandler(response: HistoryResponse) {.gcsafe.} =
|
||||||
for msg in response.messages:
|
for msg in response.messages:
|
||||||
@ -509,13 +510,13 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
|||||||
await mountLightPush(node)
|
await mountLightPush(node)
|
||||||
|
|
||||||
node.mountLightPushClient()
|
node.mountLightPushClient()
|
||||||
node.setLightPushPeer(conf.lightpushnode)
|
node.peerManager.addServicePeer(parseRemotePeerInfo(conf.lightpushnode), WakuLightpushCodec)
|
||||||
|
|
||||||
if conf.filternode != "":
|
if conf.filternode != "":
|
||||||
await node.mountFilter()
|
await node.mountFilter()
|
||||||
await node.mountFilterClient()
|
await node.mountFilterClient()
|
||||||
|
|
||||||
node.setFilterPeer(parseRemotePeerInfo(conf.filternode))
|
node.peerManager.addServicePeer(parseRemotePeerInfo(conf.filternode), WakuFilterCodec)
|
||||||
|
|
||||||
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe.} =
|
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe.} =
|
||||||
trace "Hit filter handler", contentTopic=msg.contentTopic
|
trace "Hit filter handler", contentTopic=msg.contentTopic
|
||||||
|
|||||||
@ -16,6 +16,10 @@ import
|
|||||||
libp2p/errors,
|
libp2p/errors,
|
||||||
../../../waku/v2/protocol/waku_message,
|
../../../waku/v2/protocol/waku_message,
|
||||||
../../../waku/v2/node/waku_node,
|
../../../waku/v2/node/waku_node,
|
||||||
|
../../../waku/v2/utils/peers,
|
||||||
|
../../../waku/v2/node/peer_manager,
|
||||||
|
../../waku/v2/protocol/waku_filter,
|
||||||
|
../../waku/v2/protocol/waku_store,
|
||||||
# Chat 2 imports
|
# Chat 2 imports
|
||||||
../chat2/chat2,
|
../chat2/chat2,
|
||||||
# Common cli config
|
# Common cli config
|
||||||
@ -281,10 +285,12 @@ when isMainModule:
|
|||||||
waitFor connectToNodes(bridge.nodev2, conf.staticnodes)
|
waitFor connectToNodes(bridge.nodev2, conf.staticnodes)
|
||||||
|
|
||||||
if conf.storenode != "":
|
if conf.storenode != "":
|
||||||
setStorePeer(bridge.nodev2, conf.storenode)
|
let storePeer = parseRemotePeerInfo(conf.storenode)
|
||||||
|
bridge.nodev2.peerManager.addServicePeer(storePeer, WakuStoreCodec)
|
||||||
|
|
||||||
if conf.filternode != "":
|
if conf.filternode != "":
|
||||||
setFilterPeer(bridge.nodev2, conf.filternode)
|
let filterPeer = parseRemotePeerInfo(conf.filternode)
|
||||||
|
bridge.nodev2.peerManager.addServicePeer(filterPeer, WakuFilterCodec)
|
||||||
|
|
||||||
if conf.rpc:
|
if conf.rpc:
|
||||||
let ta = initTAddress(conf.rpcAddress,
|
let ta = initTAddress(conf.rpcAddress,
|
||||||
|
|||||||
@ -26,6 +26,8 @@ import
|
|||||||
libp2p/nameresolving/nameresolver,
|
libp2p/nameresolving/nameresolver,
|
||||||
../../waku/v2/utils/time,
|
../../waku/v2/utils/time,
|
||||||
../../waku/v2/protocol/waku_message,
|
../../waku/v2/protocol/waku_message,
|
||||||
|
../../waku/v2/protocol/waku_store,
|
||||||
|
../../waku/v2/protocol/waku_filter,
|
||||||
../../waku/v2/node/message_cache,
|
../../waku/v2/node/message_cache,
|
||||||
../../waku/v2/node/waku_node,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/node/peer_manager,
|
../../waku/v2/node/peer_manager,
|
||||||
@ -428,11 +430,13 @@ when isMainModule:
|
|||||||
|
|
||||||
if conf.storenode != "":
|
if conf.storenode != "":
|
||||||
mountStoreClient(bridge.nodev2)
|
mountStoreClient(bridge.nodev2)
|
||||||
setStorePeer(bridge.nodev2, conf.storenode)
|
let storeNode = parseRemotePeerInfo(conf.storenode)
|
||||||
|
bridge.nodev2.peerManager.addServicePeer(storeNode, WakuStoreCodec)
|
||||||
|
|
||||||
if conf.filternode != "":
|
if conf.filternode != "":
|
||||||
waitFor mountFilterClient(bridge.nodev2)
|
waitFor mountFilterClient(bridge.nodev2)
|
||||||
setFilterPeer(bridge.nodev2, conf.filternode)
|
let filterNode = parseRemotePeerInfo(conf.filternode)
|
||||||
|
bridge.nodev2.peerManager.addServicePeer(filterNode, WakuFilterCodec)
|
||||||
|
|
||||||
if conf.rpc:
|
if conf.rpc:
|
||||||
let ta = initTAddress(conf.rpcAddress,
|
let ta = initTAddress(conf.rpcAddress,
|
||||||
|
|||||||
@ -96,10 +96,9 @@ procSuite "Peer Manager":
|
|||||||
await node.mountSwap()
|
await node.mountSwap()
|
||||||
node.mountStoreClient()
|
node.mountStoreClient()
|
||||||
|
|
||||||
node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())
|
node.peerManager.addServicePeer(swapPeer.toRemotePeerInfo(), WakuSwapCodec)
|
||||||
|
node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)
|
||||||
node.setStorePeer(storePeer.toRemotePeerInfo())
|
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec)
|
||||||
node.setFilterPeer(filterPeer.toRemotePeerInfo())
|
|
||||||
|
|
||||||
# Check peers were successfully added to peer manager
|
# Check peers were successfully added to peer manager
|
||||||
check:
|
check:
|
||||||
@ -127,7 +126,7 @@ procSuite "Peer Manager":
|
|||||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||||
|
|
||||||
# Test default connectedness for new peers
|
# Test default connectedness for new peers
|
||||||
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec)
|
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo())
|
||||||
check:
|
check:
|
||||||
# No information about node2's connectedness
|
# No information about node2's connectedness
|
||||||
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == NotConnected
|
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == NotConnected
|
||||||
@ -160,7 +159,7 @@ procSuite "Peer Manager":
|
|||||||
|
|
||||||
await nodes[0].start()
|
await nodes[0].start()
|
||||||
await nodes[0].mountRelay()
|
await nodes[0].mountRelay()
|
||||||
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec)
|
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
# Set a low backoff to speed up test: 2, 4, 8, 16
|
# Set a low backoff to speed up test: 2, 4, 8, 16
|
||||||
nodes[0].peerManager.initialBackoffInSec = 2
|
nodes[0].peerManager.initialBackoffInSec = 2
|
||||||
@ -236,7 +235,8 @@ procSuite "Peer Manager":
|
|||||||
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||||
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
|
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
|
||||||
|
|
||||||
await node3.mountRelay() # This should trigger a reconnect
|
await node3.mountRelay()
|
||||||
|
await node3.peerManager.connectToRelayPeers()
|
||||||
|
|
||||||
check:
|
check:
|
||||||
# Reconnected to node2 after "restart"
|
# Reconnected to node2 after "restart"
|
||||||
@ -313,12 +313,12 @@ procSuite "Peer Manager":
|
|||||||
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
# Add all peers (but self) to node 0
|
# Add all peers (but self) to node 0
|
||||||
nodes[0].peerManager.addPeer(peerInfos[1], WakuRelayCodec)
|
nodes[0].peerManager.addPeer(peerInfos[1])
|
||||||
nodes[0].peerManager.addPeer(peerInfos[2], WakuRelayCodec)
|
nodes[0].peerManager.addPeer(peerInfos[2])
|
||||||
nodes[0].peerManager.addPeer(peerInfos[3], WakuRelayCodec)
|
nodes[0].peerManager.addPeer(peerInfos[3])
|
||||||
|
|
||||||
# Attempt to connect to all known peers supporting a given protocol
|
# Connect to relay peers
|
||||||
await nodes[0].peerManager.reconnectPeers(WakuRelayCodec, protocolMatcher(WakuRelayCodec))
|
await nodes[0].peerManager.connectToRelayPeers()
|
||||||
|
|
||||||
check:
|
check:
|
||||||
# Peerstore track all three peers
|
# Peerstore track all three peers
|
||||||
@ -512,7 +512,7 @@ procSuite "Peer Manager":
|
|||||||
|
|
||||||
# Create 15 peers and add them to the peerstore
|
# Create 15 peers and add them to the peerstore
|
||||||
let peers = toSeq(1..15).mapIt(parseRemotePeerInfo("/ip4/0.0.0.0/tcp/0/p2p/" & $PeerId.random().get()))
|
let peers = toSeq(1..15).mapIt(parseRemotePeerInfo("/ip4/0.0.0.0/tcp/0/p2p/" & $PeerId.random().get()))
|
||||||
for p in peers: pm.addPeer(p, "")
|
for p in peers: pm.addPeer(p)
|
||||||
|
|
||||||
# Check that we have 15 peers in the peerstore
|
# Check that we have 15 peers in the peerstore
|
||||||
check:
|
check:
|
||||||
|
|||||||
@ -167,8 +167,12 @@ procSuite "Waku v2 JSON-RPC API - Admin":
|
|||||||
filterPeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr])
|
filterPeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr])
|
||||||
storePeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr])
|
storePeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr])
|
||||||
|
|
||||||
node.setStorePeer(storePeer.toRemotePeerInfo())
|
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec)
|
||||||
node.setFilterPeer(filterPeer.toRemotePeerInfo())
|
node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)
|
||||||
|
|
||||||
|
# Mock that we connected in the past so Identify populated this
|
||||||
|
node.peerManager.peerStore[ProtoBook][filterPeer.peerId] = @[WakuFilterCodec]
|
||||||
|
node.peerManager.peerStore[ProtoBook][storePeer.peerId] = @[WakuStoreCodec]
|
||||||
|
|
||||||
let response = await client.get_waku_v2_admin_v1_peers()
|
let response = await client.get_waku_v2_admin_v1_peers()
|
||||||
|
|
||||||
|
|||||||
@ -14,6 +14,7 @@ import
|
|||||||
../../../waku/v2/node/jsonrpc/filter/handlers as filter_api,
|
../../../waku/v2/node/jsonrpc/filter/handlers as filter_api,
|
||||||
../../../waku/v2/node/jsonrpc/filter/client as filter_api_client,
|
../../../waku/v2/node/jsonrpc/filter/client as filter_api_client,
|
||||||
../../../waku/v2/protocol/waku_message,
|
../../../waku/v2/protocol/waku_message,
|
||||||
|
../../../waku/v2/protocol/waku_filter,
|
||||||
../../../waku/v2/protocol/waku_filter/rpc,
|
../../../waku/v2/protocol/waku_filter/rpc,
|
||||||
../../../waku/v2/protocol/waku_filter/client,
|
../../../waku/v2/protocol/waku_filter/client,
|
||||||
../../../waku/v2/utils/peers,
|
../../../waku/v2/utils/peers,
|
||||||
@ -40,7 +41,7 @@ procSuite "Waku v2 JSON-RPC API - Filter":
|
|||||||
await node1.mountFilter()
|
await node1.mountFilter()
|
||||||
await node2.mountFilterClient()
|
await node2.mountFilterClient()
|
||||||
|
|
||||||
node2.setFilterPeer(node1.peerInfo.toRemotePeerInfo())
|
node2.peerManager.addServicePeer(node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec)
|
||||||
|
|
||||||
# RPC server setup
|
# RPC server setup
|
||||||
let
|
let
|
||||||
|
|||||||
@ -16,6 +16,7 @@ import
|
|||||||
../../../waku/v2/protocol/waku_message,
|
../../../waku/v2/protocol/waku_message,
|
||||||
../../../waku/v2/protocol/waku_archive,
|
../../../waku/v2/protocol/waku_archive,
|
||||||
../../../waku/v2/protocol/waku_archive/driver/queue_driver,
|
../../../waku/v2/protocol/waku_archive/driver/queue_driver,
|
||||||
|
../../../waku/v2/protocol/waku_store,
|
||||||
../../../waku/v2/protocol/waku_store/rpc,
|
../../../waku/v2/protocol/waku_store/rpc,
|
||||||
../../../waku/v2/utils/peers,
|
../../../waku/v2/utils/peers,
|
||||||
../../../waku/v2/utils/time,
|
../../../waku/v2/utils/time,
|
||||||
@ -66,7 +67,7 @@ procSuite "Waku v2 JSON-RPC API - Store":
|
|||||||
var listenSwitch = newStandardSwitch(some(key))
|
var listenSwitch = newStandardSwitch(some(key))
|
||||||
await listenSwitch.start()
|
await listenSwitch.start()
|
||||||
|
|
||||||
node.setStorePeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
node.peerManager.addServicePeer(listenSwitch.peerInfo.toRemotePeerInfo(), WakuStoreCodec)
|
||||||
|
|
||||||
listenSwitch.mount(node.wakuRelay)
|
listenSwitch.mount(node.wakuRelay)
|
||||||
listenSwitch.mount(node.wakuStore)
|
listenSwitch.mount(node.wakuStore)
|
||||||
|
|||||||
@ -5,7 +5,7 @@ else:
|
|||||||
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, sets, sequtils, times],
|
std/[options, sets, sequtils, times, strutils],
|
||||||
chronos,
|
chronos,
|
||||||
chronicles,
|
chronicles,
|
||||||
metrics,
|
metrics,
|
||||||
@ -22,8 +22,9 @@ declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
|
|||||||
# TODO: Populate from PeerStore.Source when ready
|
# TODO: Populate from PeerStore.Source when ready
|
||||||
declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"]
|
declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"]
|
||||||
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
|
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
|
||||||
declarePublicGauge waku_connected_peers, "Number of connected peers per direction: inbound|outbound", ["direction"]
|
declarePublicGauge waku_connected_peers, "Number of connected peers per direction", ["direction"]
|
||||||
declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store"
|
declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store"
|
||||||
|
declarePublicGauge waku_service_peers, "Service peer protocol and multiaddress ", labels = ["protocol", "peerId"]
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "waku node peer_manager"
|
topics = "waku node peer_manager"
|
||||||
@ -61,6 +62,16 @@ type
|
|||||||
serviceSlots*: Table[string, RemotePeerInfo]
|
serviceSlots*: Table[string, RemotePeerInfo]
|
||||||
started: bool
|
started: bool
|
||||||
|
|
||||||
|
proc protocolMatcher*(codec: string): Matcher =
|
||||||
|
## Returns a protocol matcher function for the provided codec
|
||||||
|
proc match(proto: string): bool {.gcsafe.} =
|
||||||
|
## Matches a proto with any postfix to the provided codec.
|
||||||
|
## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos:
|
||||||
|
## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense`
|
||||||
|
return proto.startsWith(codec)
|
||||||
|
|
||||||
|
return match
|
||||||
|
|
||||||
####################
|
####################
|
||||||
# Helper functions #
|
# Helper functions #
|
||||||
####################
|
####################
|
||||||
@ -244,7 +255,7 @@ proc new*(T: type PeerManager,
|
|||||||
# Manager interface #
|
# Manager interface #
|
||||||
#####################
|
#####################
|
||||||
|
|
||||||
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
|
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) =
|
||||||
# Adds peer to manager for the specified protocol
|
# Adds peer to manager for the specified protocol
|
||||||
|
|
||||||
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
|
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
|
||||||
@ -260,14 +271,11 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
|
|||||||
# Peer already managed
|
# Peer already managed
|
||||||
return
|
return
|
||||||
|
|
||||||
debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs, proto = proto
|
trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs
|
||||||
|
|
||||||
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
|
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
|
||||||
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
|
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
|
||||||
|
|
||||||
# TODO: Remove this once service slots is ready
|
|
||||||
pm.peerStore[ProtoBook][remotePeerInfo.peerId] = pm.peerStore[ProtoBook][remotePeerInfo.peerId] & proto
|
|
||||||
|
|
||||||
# Add peer to storage. Entry will subsequently be updated with connectedness information
|
# Add peer to storage. Entry will subsequently be updated with connectedness information
|
||||||
if not pm.storage.isNil:
|
if not pm.storage.isNil:
|
||||||
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
|
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
|
||||||
@ -279,23 +287,23 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str
|
|||||||
return
|
return
|
||||||
|
|
||||||
info "Adding peer to service slots", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto
|
info "Adding peer to service slots", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto
|
||||||
|
waku_service_peers.set(1, labelValues = [$proto, $remotePeerInfo.addrs[0]])
|
||||||
|
|
||||||
# Set peer for service slot
|
# Set peer for service slot
|
||||||
pm.serviceSlots[proto] = remotePeerInfo
|
pm.serviceSlots[proto] = remotePeerInfo
|
||||||
|
|
||||||
# TODO: Remove proto once fully refactored
|
pm.addPeer(remotePeerInfo)
|
||||||
pm.addPeer(remotePeerInfo, proto)
|
|
||||||
|
|
||||||
proc reconnectPeers*(pm: PeerManager,
|
proc reconnectPeers*(pm: PeerManager,
|
||||||
proto: string,
|
proto: string,
|
||||||
protocolMatcher: Matcher,
|
|
||||||
backoff: chronos.Duration = chronos.seconds(0)) {.async.} =
|
backoff: chronos.Duration = chronos.seconds(0)) {.async.} =
|
||||||
## Reconnect to peers registered for this protocol. This will update connectedness.
|
## Reconnect to peers registered for this protocol. This will update connectedness.
|
||||||
## Especially useful to resume connections from persistent storage after a restart.
|
## Especially useful to resume connections from persistent storage after a restart.
|
||||||
|
|
||||||
debug "Reconnecting peers", proto=proto
|
debug "Reconnecting peers", proto=proto
|
||||||
|
|
||||||
for storedInfo in pm.peerStore.peers(protocolMatcher):
|
# Proto is not persisted, we need to iterate over all peers.
|
||||||
|
for storedInfo in pm.peerStore.peers(protocolMatcher(proto)):
|
||||||
# Check that the peer can be connected
|
# Check that the peer can be connected
|
||||||
if storedInfo.connectedness == CannotConnect:
|
if storedInfo.connectedness == CannotConnect:
|
||||||
debug "Not reconnecting to unreachable or non-existing peer", peerId=storedInfo.peerId
|
debug "Not reconnecting to unreachable or non-existing peer", peerId=storedInfo.peerId
|
||||||
@ -332,10 +340,11 @@ proc dialPeer*(pm: PeerManager,
|
|||||||
# Dial a given peer and add it to the list of known peers
|
# Dial a given peer and add it to the list of known peers
|
||||||
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
|
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
|
||||||
|
|
||||||
# First add dialed peer info to peer store, if it does not exist yet...
|
# First add dialed peer info to peer store, if it does not exist yet..
|
||||||
|
# TODO: nim libp2p peerstore already adds them
|
||||||
if not pm.peerStore.hasPeer(remotePeerInfo.peerId, proto):
|
if not pm.peerStore.hasPeer(remotePeerInfo.peerId, proto):
|
||||||
trace "Adding newly dialed peer to manager", peerId= $remotePeerInfo.peerId, address= $remotePeerInfo.addrs[0], proto= proto
|
trace "Adding newly dialed peer to manager", peerId= $remotePeerInfo.peerId, address= $remotePeerInfo.addrs[0], proto= proto
|
||||||
pm.addPeer(remotePeerInfo, proto)
|
pm.addPeer(remotePeerInfo)
|
||||||
|
|
||||||
return await pm.dialPeer(remotePeerInfo.peerId,remotePeerInfo.addrs, proto, dialTimeout, source)
|
return await pm.dialPeer(remotePeerInfo.peerId,remotePeerInfo.addrs, proto, dialTimeout, source)
|
||||||
|
|
||||||
@ -380,38 +389,32 @@ proc connectToNodes*(pm: PeerManager,
|
|||||||
# later.
|
# later.
|
||||||
await sleepAsync(chronos.seconds(5))
|
await sleepAsync(chronos.seconds(5))
|
||||||
|
|
||||||
# Ensures a healthy amount of connected relay peers
|
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
||||||
proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
|
let maxConnections = pm.switch.connManager.inSema.size
|
||||||
debug "Starting relay connectivity loop"
|
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len
|
||||||
while pm.started:
|
let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len
|
||||||
|
let numConPeers = numInPeers + numOutPeers
|
||||||
|
|
||||||
let maxConnections = pm.switch.connManager.inSema.size
|
# TODO: Enforce a given in/out peers ratio
|
||||||
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len
|
|
||||||
let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len
|
|
||||||
let numConPeers = numInPeers + numOutPeers
|
|
||||||
|
|
||||||
# TODO: Enforce a given in/out peers ratio
|
# Leave some room for service peers
|
||||||
|
if numConPeers >= (maxConnections - 5):
|
||||||
|
return
|
||||||
|
|
||||||
# Leave some room for service peers
|
# TODO: Track only relay connections (nwaku/issues/1566)
|
||||||
if numConPeers >= (maxConnections - 5):
|
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
|
||||||
await sleepAsync(ConnectivityLoopInterval)
|
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId,
|
||||||
continue
|
pm.initialBackoffInSec,
|
||||||
|
pm.backoffFactor))
|
||||||
|
let numPeersToConnect = min(min(maxConnections - numConPeers, outsideBackoffPeers.len), MaxParalelDials)
|
||||||
|
|
||||||
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
|
info "Relay peer connections",
|
||||||
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId,
|
connectedPeers = numConPeers,
|
||||||
pm.initialBackoffInSec,
|
targetConnectedPeers = maxConnections,
|
||||||
pm.backoffFactor))
|
notConnectedPeers = notConnectedPeers.len,
|
||||||
let numPeersToConnect = min(min(maxConnections - numConPeers, outsideBackoffPeers.len), MaxParalelDials)
|
outsideBackoffPeers = outsideBackoffPeers.len
|
||||||
|
|
||||||
info "Relay connectivity loop",
|
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect], WakuRelayCodec)
|
||||||
connectedPeers = numConPeers,
|
|
||||||
targetConnectedPeers = maxConnections,
|
|
||||||
notConnectedPeers = notConnectedPeers.len,
|
|
||||||
outsideBackoffPeers = outsideBackoffPeers.len
|
|
||||||
|
|
||||||
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect], WakuRelayCodec)
|
|
||||||
|
|
||||||
await sleepAsync(ConnectivityLoopInterval)
|
|
||||||
|
|
||||||
proc prunePeerStore*(pm: PeerManager) =
|
proc prunePeerStore*(pm: PeerManager) =
|
||||||
let numPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
|
let numPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
|
||||||
@ -447,13 +450,6 @@ proc prunePeerStore*(pm: PeerManager) =
|
|||||||
capacity = capacity,
|
capacity = capacity,
|
||||||
pruned = pruned
|
pruned = pruned
|
||||||
|
|
||||||
|
|
||||||
proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
|
|
||||||
while pm.started:
|
|
||||||
pm.prunePeerStore()
|
|
||||||
await sleepAsync(PrunePeerStoreInterval)
|
|
||||||
|
|
||||||
|
|
||||||
proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
|
proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
|
||||||
debug "Selecting peer from peerstore", protocol=proto
|
debug "Selecting peer from peerstore", protocol=proto
|
||||||
|
|
||||||
@ -481,6 +477,20 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
|
|||||||
debug "No peer found for protocol", protocol=proto
|
debug "No peer found for protocol", protocol=proto
|
||||||
return none(RemotePeerInfo)
|
return none(RemotePeerInfo)
|
||||||
|
|
||||||
|
# Prunes peers from peerstore to remove old/stale ones
|
||||||
|
proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
|
||||||
|
debug "Starting prune peerstore loop"
|
||||||
|
while pm.started:
|
||||||
|
pm.prunePeerStore()
|
||||||
|
await sleepAsync(PrunePeerStoreInterval)
|
||||||
|
|
||||||
|
# Ensures a healthy amount of connected relay peers
|
||||||
|
proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
|
||||||
|
debug "Starting relay connectivity loop"
|
||||||
|
while pm.started:
|
||||||
|
await pm.connectToRelayPeers()
|
||||||
|
await sleepAsync(ConnectivityLoopInterval)
|
||||||
|
|
||||||
proc start*(pm: PeerManager) =
|
proc start*(pm: PeerManager) =
|
||||||
pm.started = true
|
pm.started = true
|
||||||
asyncSpawn pm.relayConnectivityLoop()
|
asyncSpawn pm.relayConnectivityLoop()
|
||||||
|
|||||||
@ -104,17 +104,6 @@ type
|
|||||||
announcedAddresses* : seq[MultiAddress]
|
announcedAddresses* : seq[MultiAddress]
|
||||||
started*: bool # Indicates that node has started listening
|
started*: bool # Indicates that node has started listening
|
||||||
|
|
||||||
|
|
||||||
proc protocolMatcher*(codec: string): Matcher =
|
|
||||||
## Returns a protocol matcher function for the provided codec
|
|
||||||
proc match(proto: string): bool {.gcsafe.} =
|
|
||||||
## Matches a proto with any postfix to the provided codec.
|
|
||||||
## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos:
|
|
||||||
## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense`
|
|
||||||
return proto.startsWith(codec)
|
|
||||||
|
|
||||||
return match
|
|
||||||
|
|
||||||
template ip4TcpEndPoint(address, port): MultiAddress =
|
template ip4TcpEndPoint(address, port): MultiAddress =
|
||||||
MultiAddress.init(address, tcpProtocol, port)
|
MultiAddress.init(address, tcpProtocol, port)
|
||||||
|
|
||||||
@ -516,7 +505,6 @@ proc startRelay*(node: WakuNode) {.async.} =
|
|||||||
let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
|
let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
|
||||||
|
|
||||||
await node.peerManager.reconnectPeers(WakuRelayCodec,
|
await node.peerManager.reconnectPeers(WakuRelayCodec,
|
||||||
protocolMatcher(WakuRelayCodec),
|
|
||||||
backoffPeriod)
|
backoffPeriod)
|
||||||
|
|
||||||
# Start the WakuRelay protocol
|
# Start the WakuRelay protocol
|
||||||
@ -633,22 +621,6 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics:
|
|||||||
error "failed filter unsubscription", error=unsubRes.error
|
error "failed filter unsubscription", error=unsubRes.error
|
||||||
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
||||||
|
|
||||||
|
|
||||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
|
||||||
proc setFilterPeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError],
|
|
||||||
deprecated: "Use the explicit destination peer procedures".} =
|
|
||||||
if node.wakuFilterClient.isNil():
|
|
||||||
error "could not set peer, waku filter client is nil"
|
|
||||||
return
|
|
||||||
|
|
||||||
info "seting filter client peer", peer=peer
|
|
||||||
|
|
||||||
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
|
||||||
else: peer
|
|
||||||
node.peerManager.addPeer(remotePeer, WakuFilterCodec)
|
|
||||||
|
|
||||||
waku_filter_peers.inc()
|
|
||||||
|
|
||||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||||
proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe,
|
proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe,
|
||||||
deprecated: "Use the explicit destination peer procedure. Use 'node.filterSubscribe()' instead.".} =
|
deprecated: "Use the explicit destination peer procedure. Use 'node.filterSubscribe()' instead.".} =
|
||||||
@ -808,21 +780,6 @@ proc query*(node: WakuNode, query: HistoryQuery, peer: RemotePeerInfo): Future[W
|
|||||||
|
|
||||||
return ok(response)
|
return ok(response)
|
||||||
|
|
||||||
|
|
||||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
|
||||||
proc setStorePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError],
|
|
||||||
deprecated: "Use 'node.query()' with peer destination instead".} =
|
|
||||||
if node.wakuStoreClient.isNil():
|
|
||||||
error "could not set peer, waku store client is nil"
|
|
||||||
return
|
|
||||||
|
|
||||||
info "set store peer", peer=peer
|
|
||||||
|
|
||||||
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
|
||||||
else: peer
|
|
||||||
node.peerManager.addPeer(remotePeer, WakuStoreCodec)
|
|
||||||
waku_store_peers.inc()
|
|
||||||
|
|
||||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||||
proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe,
|
proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe,
|
||||||
deprecated: "Use 'node.query()' with peer destination instead".} =
|
deprecated: "Use 'node.query()' with peer destination instead".} =
|
||||||
@ -901,17 +858,6 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe
|
|||||||
|
|
||||||
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)
|
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)
|
||||||
|
|
||||||
|
|
||||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
|
||||||
proc setLightPushPeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError],
|
|
||||||
deprecated: "Use 'node.lightpushPublish()' instead".} =
|
|
||||||
debug "seting lightpush client peer", peer=peer
|
|
||||||
|
|
||||||
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
|
||||||
else: peer
|
|
||||||
node.peerManager.addPeer(remotePeer, WakuLightPushCodec)
|
|
||||||
waku_lightpush_peers.inc()
|
|
||||||
|
|
||||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||||
proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.async, gcsafe,
|
proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.async, gcsafe,
|
||||||
deprecated: "Use 'node.lightpushPublish()' instead".} =
|
deprecated: "Use 'node.lightpushPublish()' instead".} =
|
||||||
@ -977,7 +923,7 @@ proc fetchPeerExchangePeers*(node: Wakunode, amount: uint64) {.async, raises: [D
|
|||||||
var record: enr.Record
|
var record: enr.Record
|
||||||
if enr.fromBytes(record, pi.enr):
|
if enr.fromBytes(record, pi.enr):
|
||||||
# TODO: Add source: PX
|
# TODO: Add source: PX
|
||||||
node.peerManager.addPeer(record.toRemotePeerInfo().get, WakuRelayCodec)
|
node.peerManager.addPeer(record.toRemotePeerInfo().get)
|
||||||
validPeers += 1
|
validPeers += 1
|
||||||
info "Retrieved peer info via peer exchange protocol", validPeers = validPeers
|
info "Retrieved peer info via peer exchange protocol", validPeers = validPeers
|
||||||
else:
|
else:
|
||||||
@ -1062,8 +1008,7 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =
|
|||||||
|
|
||||||
# Add all peers, new ones and already seen (in case their addresses changed)
|
# Add all peers, new ones and already seen (in case their addresses changed)
|
||||||
for peer in discoveredPeers:
|
for peer in discoveredPeers:
|
||||||
# TODO: proto: WakuRelayCodec will be removed from add peer
|
node.peerManager.addPeer(peer)
|
||||||
node.peerManager.addPeer(peer, WakuRelayCodec)
|
|
||||||
|
|
||||||
# Discovery `queryRandom` can have a synchronous fast path for example
|
# Discovery `queryRandom` can have a synchronous fast path for example
|
||||||
# when no peers are in the routing table. Don't run it in continuous loop.
|
# when no peers are in the routing table. Don't run it in continuous loop.
|
||||||
|
|||||||
@ -238,7 +238,7 @@ proc init*(wakuSwap: WakuSwap) =
|
|||||||
|
|
||||||
proc credit(peerId: PeerID, n: int)
|
proc credit(peerId: PeerID, n: int)
|
||||||
{.gcsafe, closure, raises: [Defect, KeyError, Exception].} =
|
{.gcsafe, closure, raises: [Defect, KeyError, Exception].} =
|
||||||
|
|
||||||
info "Crediting peer: ", peer=peerId, amount=n
|
info "Crediting peer: ", peer=peerId, amount=n
|
||||||
if wakuSwap.accounting.hasKey(peerId):
|
if wakuSwap.accounting.hasKey(peerId):
|
||||||
wakuSwap.accounting[peerId] -= n
|
wakuSwap.accounting[peerId] -= n
|
||||||
@ -250,7 +250,7 @@ proc init*(wakuSwap: WakuSwap) =
|
|||||||
# TODO Debit and credit here for Karma asset
|
# TODO Debit and credit here for Karma asset
|
||||||
proc debit(peerId: PeerID, n: int)
|
proc debit(peerId: PeerID, n: int)
|
||||||
{.gcsafe, closure, raises: [Defect, KeyError, Exception].} =
|
{.gcsafe, closure, raises: [Defect, KeyError, Exception].} =
|
||||||
|
|
||||||
info "Debiting peer: ", peer=peerId, amount=n
|
info "Debiting peer: ", peer=peerId, amount=n
|
||||||
if wakuSwap.accounting.hasKey(peerId):
|
if wakuSwap.accounting.hasKey(peerId):
|
||||||
wakuSwap.accounting[peerId] += n
|
wakuSwap.accounting[peerId] += n
|
||||||
@ -258,10 +258,10 @@ proc init*(wakuSwap: WakuSwap) =
|
|||||||
wakuSwap.accounting[peerId] = n
|
wakuSwap.accounting[peerId] = n
|
||||||
info "Accounting state", accounting = wakuSwap.accounting[peerId]
|
info "Accounting state", accounting = wakuSwap.accounting[peerId]
|
||||||
wakuSwap.applyPolicy(peerId)
|
wakuSwap.applyPolicy(peerId)
|
||||||
|
|
||||||
proc applyPolicy(peerId: PeerID)
|
proc applyPolicy(peerId: PeerID)
|
||||||
{.gcsafe, closure, raises: [Defect, KeyError, Exception].} =
|
{.gcsafe, closure, raises: [Defect, KeyError, Exception].} =
|
||||||
|
|
||||||
# TODO Separate out depending on if policy is soft (accounting only) mock (send cheque but don't cash/verify) hard (actually send funds over testnet)
|
# TODO Separate out depending on if policy is soft (accounting only) mock (send cheque but don't cash/verify) hard (actually send funds over testnet)
|
||||||
|
|
||||||
#Check if the Disconnect Threshold has been hit. Account Balance nears the disconnectThreshold after a Credit has been done
|
#Check if the Disconnect Threshold has been hit. Account Balance nears the disconnectThreshold after a Credit has been done
|
||||||
@ -290,10 +290,10 @@ proc init*(wakuSwap: WakuSwap) =
|
|||||||
# TODO Expression return?
|
# TODO Expression return?
|
||||||
proc init*(T: type WakuSwap, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, swapConfig: SwapConfig): T =
|
proc init*(T: type WakuSwap, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, swapConfig: SwapConfig): T =
|
||||||
info "wakuSwap init 2"
|
info "wakuSwap init 2"
|
||||||
let
|
let
|
||||||
accounting = initTable[PeerId, int]()
|
accounting = initTable[PeerId, int]()
|
||||||
text = "test"
|
text = "test"
|
||||||
|
|
||||||
var ws = WakuSwap(rng: rng,
|
var ws = WakuSwap(rng: rng,
|
||||||
peerManager: peerManager,
|
peerManager: peerManager,
|
||||||
accounting: accounting,
|
accounting: accounting,
|
||||||
@ -303,8 +303,4 @@ proc init*(T: type WakuSwap, peerManager: PeerManager, rng: ref rand.HmacDrbgCon
|
|||||||
|
|
||||||
return ws
|
return ws
|
||||||
|
|
||||||
proc setPeer*(ws: WakuSwap, peer: RemotePeerInfo) =
|
|
||||||
ws.peerManager.addPeer(peer, WakuSwapCodec)
|
|
||||||
waku_swap_peers_count.inc()
|
|
||||||
|
|
||||||
# TODO End to end communication
|
# TODO End to end communication
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user