mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-27 23:27:27 +00:00
Integrate persistent peer storage (#437)
* Integrate persistent peer storage
This commit is contained in:
parent
7a732e7cc6
commit
5102576234
@ -8,7 +8,7 @@
|
|||||||
- Added a peer manager for `relay`, `filter`, `store` and `swap` peers.
|
- Added a peer manager for `relay`, `filter`, `store` and `swap` peers.
|
||||||
- `relay`, `filter`, `store` and `swap` peers are now stored in a common, shared peer store and no longer in separate sets.
|
- `relay`, `filter`, `store` and `swap` peers are now stored in a common, shared peer store and no longer in separate sets.
|
||||||
- Admin API now provides a `post` method to connect to peers on an ad-hoc basis
|
- Admin API now provides a `post` method to connect to peers on an ad-hoc basis
|
||||||
- Added persistent peer storage
|
- Added persistent peer storage. A node will now attempt to reconnect to `relay` peers after a restart.
|
||||||
|
|
||||||
## 2021-01-05 v0.2
|
## 2021-01-05 v0.2
|
||||||
|
|
||||||
|
@ -13,7 +13,8 @@ import
|
|||||||
./v2/test_peer_manager,
|
./v2/test_peer_manager,
|
||||||
./v2/test_web3, # TODO remove it when rln-relay tests get finalized
|
./v2/test_web3, # TODO remove it when rln-relay tests get finalized
|
||||||
./v2/test_waku_rln_relay,
|
./v2/test_waku_rln_relay,
|
||||||
./v2/test_waku_bridge
|
./v2/test_waku_bridge,
|
||||||
|
./v2/test_peer_storage
|
||||||
|
|
||||||
# TODO Only enable this once swap module is integrated more nicely as a dependency, i.e. as submodule with CI etc
|
# TODO Only enable this once swap module is integrated more nicely as a dependency, i.e. as submodule with CI etc
|
||||||
# For PoC execute it manually and run separate module here: https://github.com/vacp2p/swap-contracts-module
|
# For PoC execute it manually and run separate module here: https://github.com/vacp2p/swap-contracts-module
|
||||||
|
@ -12,7 +12,8 @@ import
|
|||||||
libp2p/protocols/pubsub/pubsub,
|
libp2p/protocols/pubsub/pubsub,
|
||||||
libp2p/protocols/pubsub/rpc/message,
|
libp2p/protocols/pubsub/rpc/message,
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/wakunode2,
|
||||||
../../waku/v2/node/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
|
../../waku/v2/node/storage/peer/waku_peer_storage,
|
||||||
../../waku/v2/protocol/waku_relay,
|
../../waku/v2/protocol/waku_relay,
|
||||||
../../waku/v2/protocol/waku_filter/waku_filter,
|
../../waku/v2/protocol/waku_filter/waku_filter,
|
||||||
../../waku/v2/protocol/waku_store/waku_store,
|
../../waku/v2/protocol/waku_store/waku_store,
|
||||||
@ -162,3 +163,53 @@ procSuite "Peer Manager":
|
|||||||
check:
|
check:
|
||||||
# Not currently connected to node2, but had recent, successful connection.
|
# Not currently connected to node2, but had recent, successful connection.
|
||||||
node1.peerManager.connectedness(peerInfo2.peerId) == CanConnect
|
node1.peerManager.connectedness(peerInfo2.peerId) == CanConnect
|
||||||
|
|
||||||
|
await node2.stop()
|
||||||
|
|
||||||
|
asyncTest "Peer manager can use persistent storage and survive restarts":
|
||||||
|
let
|
||||||
|
database = SqliteDatabase.init("", inMemory = true)[]
|
||||||
|
storage = WakuPeerStorage.new(database)[]
|
||||||
|
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||||
|
Port(60000), peerStorage = storage)
|
||||||
|
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||||
|
Port(60002))
|
||||||
|
peerInfo2 = node2.peerInfo
|
||||||
|
|
||||||
|
await node1.start()
|
||||||
|
await node2.start()
|
||||||
|
|
||||||
|
node1.mountRelay()
|
||||||
|
node2.mountRelay()
|
||||||
|
|
||||||
|
discard await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds)
|
||||||
|
check:
|
||||||
|
# Currently connected to node2
|
||||||
|
node1.peerManager.peers().len == 1
|
||||||
|
node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||||
|
node1.peerManager.connectedness(peerInfo2.peerId) == Connected
|
||||||
|
|
||||||
|
# Simulate restart by initialising a new node using the same storage
|
||||||
|
let
|
||||||
|
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"),
|
||||||
|
Port(60004), peerStorage = storage)
|
||||||
|
|
||||||
|
await node3.start()
|
||||||
|
check:
|
||||||
|
# Node2 has been loaded after "restart", but we have not yet reconnected
|
||||||
|
node3.peerManager.peers().len == 1
|
||||||
|
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||||
|
node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected
|
||||||
|
|
||||||
|
node3.mountRelay() # This should trigger a reconnect
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Reconnected to node2 after "restart"
|
||||||
|
node3.peerManager.peers().len == 1
|
||||||
|
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||||
|
node3.peerManager.connectedness(peerInfo2.peerId) == Connected
|
||||||
|
|
||||||
|
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
||||||
|
@ -4,15 +4,15 @@ import
|
|||||||
std/[unittest, sets],
|
std/[unittest, sets],
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
../test_helpers,
|
../test_helpers,
|
||||||
../../waku/v2/node/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/node/storage/peer/waku_peer_storage
|
../../waku/v2/node/storage/peer/waku_peer_storage
|
||||||
|
|
||||||
suite "Peer Storage":
|
suite "Peer Storage":
|
||||||
|
|
||||||
test "Store and retrieve from persistent peer storage":
|
test "Store, replace and retrieve from persistent peer storage":
|
||||||
let
|
let
|
||||||
database = SqliteDatabase.init("", inMemory = true)[]
|
database = SqliteDatabase.init("", inMemory = true)[]
|
||||||
storage = WakuPeerStorage.init(database)[]
|
storage = WakuPeerStorage.new(database)[]
|
||||||
|
|
||||||
# Test Peer
|
# Test Peer
|
||||||
peerLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
|
peerLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
|
||||||
@ -24,6 +24,8 @@ suite "Peer Storage":
|
|||||||
|
|
||||||
defer: storage.close()
|
defer: storage.close()
|
||||||
|
|
||||||
|
# Test insert and retrieve
|
||||||
|
|
||||||
discard storage.put(peer.peerId, stored, conn)
|
discard storage.put(peer.peerId, stored, conn)
|
||||||
|
|
||||||
var responseCount = 0
|
var responseCount = 0
|
||||||
@ -40,3 +42,21 @@ suite "Peer Storage":
|
|||||||
check:
|
check:
|
||||||
res.isErr == false
|
res.isErr == false
|
||||||
responseCount == 1
|
responseCount == 1
|
||||||
|
|
||||||
|
# Test replace and retrieve (update an existing entry)
|
||||||
|
discard storage.put(peer.peerId, stored, Connectedness.CannotConnect)
|
||||||
|
|
||||||
|
responseCount = 0
|
||||||
|
proc replacedData(peerId: PeerID, storedInfo: StoredInfo,
|
||||||
|
connectedness: Connectedness) =
|
||||||
|
responseCount += 1
|
||||||
|
check:
|
||||||
|
peerId == peer.peerId
|
||||||
|
storedInfo == stored
|
||||||
|
connectedness == CannotConnect
|
||||||
|
|
||||||
|
let repRes = storage.getAll(replacedData)
|
||||||
|
|
||||||
|
check:
|
||||||
|
repRes.isErr == false
|
||||||
|
responseCount == 1
|
||||||
|
@ -8,7 +8,7 @@ import
|
|||||||
libp2p/stream/[bufferstream, connection],
|
libp2p/stream/[bufferstream, connection],
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
libp2p/multistream,
|
libp2p/multistream,
|
||||||
../../waku/v2/node/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/protocol/message_notifier,
|
../../waku/v2/protocol/message_notifier,
|
||||||
../../waku/v2/protocol/waku_filter/waku_filter,
|
../../waku/v2/protocol/waku_filter/waku_filter,
|
||||||
../test_helpers, ./utils
|
../test_helpers, ./utils
|
||||||
|
@ -11,7 +11,7 @@ import
|
|||||||
../../waku/v2/protocol/[waku_message, message_notifier],
|
../../waku/v2/protocol/[waku_message, message_notifier],
|
||||||
../../waku/v2/protocol/waku_store/waku_store,
|
../../waku/v2/protocol/waku_store/waku_store,
|
||||||
../../waku/v2/node/storage/message/waku_message_store,
|
../../waku/v2/node/storage/message/waku_message_store,
|
||||||
../../waku/v2/node/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../test_helpers, ./utils
|
../test_helpers, ./utils
|
||||||
|
|
||||||
procSuite "Waku Store":
|
procSuite "Waku Store":
|
||||||
|
@ -9,7 +9,7 @@ import
|
|||||||
../../protocol/waku_filter/[waku_filter_types, waku_filter],
|
../../protocol/waku_filter/[waku_filter_types, waku_filter],
|
||||||
../../protocol/waku_relay,
|
../../protocol/waku_relay,
|
||||||
../wakunode2,
|
../wakunode2,
|
||||||
../peer_manager,
|
../peer_manager/peer_manager,
|
||||||
./jsonrpc_types
|
./jsonrpc_types
|
||||||
|
|
||||||
export jsonrpc_types
|
export jsonrpc_types
|
||||||
|
@ -3,66 +3,26 @@
|
|||||||
import
|
import
|
||||||
std/[options, sets, sequtils],
|
std/[options, sets, sequtils],
|
||||||
chronos, chronicles, metrics,
|
chronos, chronicles, metrics,
|
||||||
libp2p/standard_setup,
|
./waku_peer_store,
|
||||||
libp2p/peerstore
|
../storage/peer/peer_storage
|
||||||
|
|
||||||
export peerstore, standard_setup
|
export waku_peer_store
|
||||||
|
|
||||||
declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
|
declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
|
||||||
|
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakupeers"
|
topics = "wakupeers"
|
||||||
|
|
||||||
type
|
type
|
||||||
Connectedness* = enum
|
|
||||||
# NotConnected: default state for a new peer. No connection and no further information on connectedness.
|
|
||||||
NotConnected,
|
|
||||||
# CannotConnect: attempted to connect to peer, but failed.
|
|
||||||
CannotConnect,
|
|
||||||
# CanConnect: was recently connected to peer and disconnected gracefully.
|
|
||||||
CanConnect,
|
|
||||||
# Connected: actively connected to peer.
|
|
||||||
Connected
|
|
||||||
|
|
||||||
ConnectionBook* = object of PeerBook[Connectedness]
|
|
||||||
|
|
||||||
WakuPeerStore* = ref object of PeerStore
|
|
||||||
connectionBook*: ConnectionBook
|
|
||||||
|
|
||||||
PeerManager* = ref object of RootObj
|
PeerManager* = ref object of RootObj
|
||||||
switch*: Switch
|
switch*: Switch
|
||||||
peerStore*: WakuPeerStore
|
peerStore*: WakuPeerStore
|
||||||
|
storage: PeerStorage
|
||||||
|
|
||||||
const
|
const
|
||||||
defaultDialTimeout = 1.minutes # @TODO should this be made configurable?
|
defaultDialTimeout = 1.minutes # @TODO should this be made configurable?
|
||||||
|
|
||||||
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
|
|
||||||
case event.kind
|
|
||||||
of ConnEventKind.Connected:
|
|
||||||
pm.peerStore.connectionBook.set(peerId, Connected)
|
|
||||||
return
|
|
||||||
of ConnEventKind.Disconnected:
|
|
||||||
pm.peerStore.connectionBook.set(peerId, CanConnect)
|
|
||||||
return
|
|
||||||
|
|
||||||
proc new*(T: type WakuPeerStore): WakuPeerStore =
|
|
||||||
var p: WakuPeerStore
|
|
||||||
new(p)
|
|
||||||
return p
|
|
||||||
|
|
||||||
proc new*(T: type PeerManager, switch: Switch): PeerManager =
|
|
||||||
let pm = PeerManager(switch: switch,
|
|
||||||
peerStore: WakuPeerStore.new())
|
|
||||||
|
|
||||||
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
|
|
||||||
onConnEvent(pm, peerId, event)
|
|
||||||
|
|
||||||
|
|
||||||
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
|
|
||||||
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
|
|
||||||
|
|
||||||
return pm
|
|
||||||
|
|
||||||
####################
|
####################
|
||||||
# Helper functions #
|
# Helper functions #
|
||||||
####################
|
####################
|
||||||
@ -72,6 +32,93 @@ proc toPeerInfo(storedInfo: StoredInfo): PeerInfo =
|
|||||||
addrs = toSeq(storedInfo.addrs),
|
addrs = toSeq(storedInfo.addrs),
|
||||||
protocols = toSeq(storedInfo.protos))
|
protocols = toSeq(storedInfo.protos))
|
||||||
|
|
||||||
|
proc insertOrReplace(ps: PeerStorage, peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness) =
|
||||||
|
# Insert peer entry into persistent storage, or replace existing entry with updated info
|
||||||
|
let res = ps.put(peerId, storedInfo, connectedness)
|
||||||
|
if res.isErr:
|
||||||
|
warn "failed to store peers", err = res.error
|
||||||
|
waku_peers_errors.inc(labelValues = ["storage_failure"])
|
||||||
|
|
||||||
|
proc dialPeer(pm: PeerManager, peerId: PeerID,
|
||||||
|
addrs: seq[MultiAddress], proto: string,
|
||||||
|
dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
|
||||||
|
info "Dialing peer from manager", wireAddr = addrs[0], peerId = peerId
|
||||||
|
|
||||||
|
# Dial Peer
|
||||||
|
let dialFut = pm.switch.dial(peerId, addrs, proto)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Attempt to dial remote peer
|
||||||
|
if (await dialFut.withTimeout(dialTimeout)):
|
||||||
|
waku_peers_dials.inc(labelValues = ["successful"])
|
||||||
|
return some(dialFut.read())
|
||||||
|
else:
|
||||||
|
# @TODO any redial attempts?
|
||||||
|
debug "Dialing remote peer timed out"
|
||||||
|
waku_peers_dials.inc(labelValues = ["timeout"])
|
||||||
|
|
||||||
|
pm.peerStore.connectionBook.set(peerId, CannotConnect)
|
||||||
|
if not pm.storage.isNil:
|
||||||
|
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
|
||||||
|
|
||||||
|
return none(Connection)
|
||||||
|
except CatchableError as e:
|
||||||
|
# @TODO any redial attempts?
|
||||||
|
debug "Dialing remote peer failed", msg = e.msg
|
||||||
|
waku_peers_dials.inc(labelValues = ["failed"])
|
||||||
|
|
||||||
|
pm.peerStore.connectionBook.set(peerId, CannotConnect)
|
||||||
|
if not pm.storage.isNil:
|
||||||
|
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
|
||||||
|
|
||||||
|
return none(Connection)
|
||||||
|
|
||||||
|
proc loadFromStorage(pm: PeerManager) =
|
||||||
|
# Load peers from storage, if available
|
||||||
|
proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness) =
|
||||||
|
pm.peerStore.addressBook.set(peerId, storedInfo.addrs)
|
||||||
|
pm.peerStore.protoBook.set(peerId, storedInfo.protos)
|
||||||
|
pm.peerStore.keyBook.set(peerId, storedInfo.publicKey)
|
||||||
|
pm.peerStore.connectionBook.set(peerId, NotConnected) # Reset connectedness state
|
||||||
|
|
||||||
|
let res = pm.storage.getAll(onData)
|
||||||
|
if res.isErr:
|
||||||
|
warn "failed to load peers from storage", err = res.error
|
||||||
|
waku_peers_errors.inc(labelValues = ["storage_load_failure"])
|
||||||
|
|
||||||
|
##################
|
||||||
|
# Initialisation #
|
||||||
|
##################
|
||||||
|
|
||||||
|
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
|
||||||
|
case event.kind
|
||||||
|
of ConnEventKind.Connected:
|
||||||
|
pm.peerStore.connectionBook.set(peerId, Connected)
|
||||||
|
if not pm.storage.isNil:
|
||||||
|
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
|
||||||
|
return
|
||||||
|
of ConnEventKind.Disconnected:
|
||||||
|
pm.peerStore.connectionBook.set(peerId, CanConnect)
|
||||||
|
if not pm.storage.isNil:
|
||||||
|
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect)
|
||||||
|
return
|
||||||
|
|
||||||
|
proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager =
|
||||||
|
let pm = PeerManager(switch: switch,
|
||||||
|
peerStore: WakuPeerStore.new(),
|
||||||
|
storage: storage)
|
||||||
|
|
||||||
|
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
|
||||||
|
onConnEvent(pm, peerId, event)
|
||||||
|
|
||||||
|
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
|
||||||
|
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
|
||||||
|
|
||||||
|
if not storage.isNil:
|
||||||
|
pm.loadFromStorage() # Load previously managed peers.
|
||||||
|
|
||||||
|
return pm
|
||||||
|
|
||||||
#####################
|
#####################
|
||||||
# Manager interface #
|
# Manager interface #
|
||||||
#####################
|
#####################
|
||||||
@ -120,6 +167,10 @@ proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) =
|
|||||||
# ...associated protocols
|
# ...associated protocols
|
||||||
pm.peerStore.protoBook.add(peerInfo.peerId, proto)
|
pm.peerStore.protoBook.add(peerInfo.peerId, proto)
|
||||||
|
|
||||||
|
# Add peer to storage. Entry will subsequently be updated with connectedness information
|
||||||
|
if not pm.storage.isNil:
|
||||||
|
pm.storage.insertOrReplace(peerInfo.peerId, pm.peerStore.get(peerInfo.peerId), NotConnected)
|
||||||
|
|
||||||
proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] =
|
proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] =
|
||||||
# Selects the best peer for a given protocol
|
# Selects the best peer for a given protocol
|
||||||
let peers = pm.peers.filterIt(it.protos.contains(proto))
|
let peers = pm.peers.filterIt(it.protos.contains(proto))
|
||||||
@ -132,6 +183,16 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] =
|
|||||||
else:
|
else:
|
||||||
return none(PeerInfo)
|
return none(PeerInfo)
|
||||||
|
|
||||||
|
proc reconnectPeers*(pm: PeerManager, proto: string) {.async.} =
|
||||||
|
## Reconnect to peers registered for this protocol. This will update connectedness.
|
||||||
|
## Especially useful to resume connections from persistent storage after a restart.
|
||||||
|
|
||||||
|
debug "Reconnecting peers", proto=proto
|
||||||
|
|
||||||
|
for storedInfo in pm.peers(proto):
|
||||||
|
trace "Reconnecting to peer", peerId=storedInfo.peerId
|
||||||
|
discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto)
|
||||||
|
|
||||||
####################
|
####################
|
||||||
# Dialer interface #
|
# Dialer interface #
|
||||||
####################
|
####################
|
||||||
@ -145,28 +206,4 @@ proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout =
|
|||||||
trace "Adding newly dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
|
trace "Adding newly dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
|
||||||
pm.addPeer(peerInfo, proto)
|
pm.addPeer(peerInfo, proto)
|
||||||
|
|
||||||
info "Dialing peer from manager", wireAddr = peerInfo.addrs[0], peerId = peerInfo.peerId
|
return await pm.dialPeer(peerInfo.peerId, peerInfo.addrs, proto, dialTimeout)
|
||||||
|
|
||||||
# Dial Peer
|
|
||||||
# @TODO Keep track of conn and connected state in peer store
|
|
||||||
let dialFut = pm.switch.dial(peerInfo.peerId, peerInfo.addrs, proto)
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Attempt to dial remote peer
|
|
||||||
if (await dialFut.withTimeout(dialTimeout)):
|
|
||||||
waku_peers_dials.inc(labelValues = ["successful"])
|
|
||||||
return some(dialFut.read())
|
|
||||||
else:
|
|
||||||
# @TODO any redial attempts?
|
|
||||||
# @TODO indicate CannotConnect on peer metadata
|
|
||||||
debug "Dialing remote peer timed out"
|
|
||||||
waku_peers_dials.inc(labelValues = ["timeout"])
|
|
||||||
pm.peerStore.connectionBook.set(peerInfo.peerId, CannotConnect)
|
|
||||||
return none(Connection)
|
|
||||||
except CatchableError as e:
|
|
||||||
# @TODO any redial attempts?
|
|
||||||
# @TODO indicate CannotConnect on peer metadata
|
|
||||||
debug "Dialing remote peer failed", msg = e.msg
|
|
||||||
waku_peers_dials.inc(labelValues = ["failed"])
|
|
||||||
pm.peerStore.connectionBook.set(peerInfo.peerId, CannotConnect)
|
|
||||||
return none(Connection)
|
|
28
waku/v2/node/peer_manager/waku_peer_store.nim
Normal file
28
waku/v2/node/peer_manager/waku_peer_store.nim
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
import
|
||||||
|
libp2p/standard_setup,
|
||||||
|
libp2p/peerstore
|
||||||
|
|
||||||
|
export peerstore, standard_setup
|
||||||
|
|
||||||
|
type
|
||||||
|
Connectedness* = enum
|
||||||
|
# NotConnected: default state for a new peer. No connection and no further information on connectedness.
|
||||||
|
NotConnected,
|
||||||
|
# CannotConnect: attempted to connect to peer, but failed.
|
||||||
|
CannotConnect,
|
||||||
|
# CanConnect: was recently connected to peer and disconnected gracefully.
|
||||||
|
CanConnect,
|
||||||
|
# Connected: actively connected to peer.
|
||||||
|
Connected
|
||||||
|
|
||||||
|
ConnectionBook* = object of PeerBook[Connectedness]
|
||||||
|
|
||||||
|
WakuPeerStore* = ref object of PeerStore
|
||||||
|
connectionBook*: ConnectionBook
|
||||||
|
|
||||||
|
proc new*(T: type WakuPeerStore): WakuPeerStore =
|
||||||
|
var p: WakuPeerStore
|
||||||
|
new(p)
|
||||||
|
return p
|
23
waku/v2/node/storage/peer/peer_storage.nim
Normal file
23
waku/v2/node/storage/peer/peer_storage.nim
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
import
|
||||||
|
stew/results,
|
||||||
|
chronos,
|
||||||
|
../../peer_manager/waku_peer_store
|
||||||
|
|
||||||
|
## This module defines a peer storage interface. Implementations of
|
||||||
|
## PeerStorage are used to store and retrieve peers
|
||||||
|
|
||||||
|
type
|
||||||
|
PeerStorage* = ref object of RootObj
|
||||||
|
|
||||||
|
PeerStorageResult*[T] = Result[T, string]
|
||||||
|
|
||||||
|
DataProc* = proc(peerId: PeerID, storedInfo: StoredInfo,
|
||||||
|
connectedness: Connectedness) {.closure.}
|
||||||
|
|
||||||
|
# PeerStorage interface
|
||||||
|
method put*(db: PeerStorage,
|
||||||
|
peerId: PeerID,
|
||||||
|
storedInfo: StoredInfo,
|
||||||
|
connectedness: Connectedness): PeerStorageResult[void] {.base.} = discard
|
||||||
|
|
||||||
|
method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[bool] {.base.} = discard
|
@ -4,20 +4,15 @@ import
|
|||||||
chronos, metrics,
|
chronos, metrics,
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
stew/results,
|
stew/results,
|
||||||
|
./peer_storage,
|
||||||
../sqlite,
|
../sqlite,
|
||||||
../../peer_manager
|
../../peer_manager/waku_peer_store
|
||||||
|
|
||||||
export sqlite
|
export sqlite
|
||||||
|
|
||||||
type
|
type
|
||||||
WakuPeerStorage* = ref object of RootObj
|
WakuPeerStorage* = ref object of PeerStorage
|
||||||
database*: SqliteDatabase
|
database*: SqliteDatabase
|
||||||
|
|
||||||
WakuPeerStorageResult*[T] = Result[T, string]
|
|
||||||
|
|
||||||
DataProc* = proc(peerId: PeerID, storedInfo: StoredInfo,
|
|
||||||
connectedness: Connectedness) {.closure.}
|
|
||||||
|
|
||||||
|
|
||||||
##########################
|
##########################
|
||||||
# Protobuf Serialisation #
|
# Protobuf Serialisation #
|
||||||
@ -60,7 +55,7 @@ proc encode*(storedInfo: StoredInfo): ProtoBuffer =
|
|||||||
# Storage implementation #
|
# Storage implementation #
|
||||||
##########################
|
##########################
|
||||||
|
|
||||||
proc init*(T: type WakuPeerStorage, db: SqliteDatabase): WakuPeerStorageResult[T] =
|
proc new*(T: type WakuPeerStorage, db: SqliteDatabase): PeerStorageResult[T] =
|
||||||
## Create the "Peers" table
|
## Create the "Peers" table
|
||||||
## It contains:
|
## It contains:
|
||||||
## - peer id as primary key, stored as a blob
|
## - peer id as primary key, stored as a blob
|
||||||
@ -84,14 +79,14 @@ proc init*(T: type WakuPeerStorage, db: SqliteDatabase): WakuPeerStorageResult[T
|
|||||||
ok(WakuPeerStorage(database: db))
|
ok(WakuPeerStorage(database: db))
|
||||||
|
|
||||||
|
|
||||||
proc put*(db: WakuPeerStorage,
|
method put*(db: WakuPeerStorage,
|
||||||
peerId: PeerID,
|
peerId: PeerID,
|
||||||
storedInfo: StoredInfo,
|
storedInfo: StoredInfo,
|
||||||
connectedness: Connectedness): WakuPeerStorageResult[void] =
|
connectedness: Connectedness): PeerStorageResult[void] =
|
||||||
|
|
||||||
## Adds a peer to storage
|
## Adds a peer to storage or replaces existing entry if it already exists
|
||||||
let prepare = db.database.prepareStmt(
|
let prepare = db.database.prepareStmt(
|
||||||
"INSERT INTO Peers (peerId, storedInfo, connectedness) VALUES (?, ?, ?);",
|
"REPLACE INTO Peers (peerId, storedInfo, connectedness) VALUES (?, ?, ?);",
|
||||||
(seq[byte], seq[byte], int32),
|
(seq[byte], seq[byte], int32),
|
||||||
void
|
void
|
||||||
)
|
)
|
||||||
@ -105,7 +100,7 @@ proc put*(db: WakuPeerStorage,
|
|||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc getAll*(db: WakuPeerStorage, onData: DataProc): WakuPeerStorageResult[bool] =
|
method getAll*(db: WakuPeerStorage, onData: peer_storage.DataProc): PeerStorageResult[bool] =
|
||||||
## Retrieves all peers from storage
|
## Retrieves all peers from storage
|
||||||
var gotPeers = false
|
var gotPeers = false
|
||||||
|
|
||||||
|
@ -18,8 +18,9 @@ import
|
|||||||
../protocol/waku_rln_relay/[rln,waku_rln_relay_utils],
|
../protocol/waku_rln_relay/[rln,waku_rln_relay_utils],
|
||||||
../utils/peers,
|
../utils/peers,
|
||||||
./storage/message/message_store,
|
./storage/message/message_store,
|
||||||
|
./storage/peer/peer_storage,
|
||||||
../utils/requests,
|
../utils/requests,
|
||||||
./peer_manager
|
./peer_manager/peer_manager
|
||||||
|
|
||||||
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
||||||
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
|
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
|
||||||
@ -110,7 +111,8 @@ template tcpEndPoint(address, port): auto =
|
|||||||
|
|
||||||
proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
||||||
bindIp: ValidIpAddress, bindPort: Port,
|
bindIp: ValidIpAddress, bindPort: Port,
|
||||||
extIp = none[ValidIpAddress](), extPort = none[Port]()): T =
|
extIp = none[ValidIpAddress](), extPort = none[Port](),
|
||||||
|
peerStorage: PeerStorage = nil): T =
|
||||||
## Creates a Waku Node.
|
## Creates a Waku Node.
|
||||||
##
|
##
|
||||||
## Status: Implemented.
|
## Status: Implemented.
|
||||||
@ -127,7 +129,7 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
|||||||
peerInfo.addrs.add(hostAddress) # Index 0
|
peerInfo.addrs.add(hostAddress) # Index 0
|
||||||
for multiaddr in announcedAddresses:
|
for multiaddr in announcedAddresses:
|
||||||
peerInfo.addrs.add(multiaddr) # Announced addresses in index > 0
|
peerInfo.addrs.add(multiaddr) # Announced addresses in index > 0
|
||||||
|
|
||||||
var switch = newStandardSwitch(some(nodekey), hostAddress,
|
var switch = newStandardSwitch(some(nodekey), hostAddress,
|
||||||
transportFlags = {ServerFlags.ReuseAddr}, rng = rng)
|
transportFlags = {ServerFlags.ReuseAddr}, rng = rng)
|
||||||
# TODO Untested - verify behavior after switch interface change
|
# TODO Untested - verify behavior after switch interface change
|
||||||
@ -138,7 +140,7 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
|||||||
# triggerSelf = true, sign = false,
|
# triggerSelf = true, sign = false,
|
||||||
# verifySignature = false).PubSub
|
# verifySignature = false).PubSub
|
||||||
result = WakuNode(
|
result = WakuNode(
|
||||||
peerManager: PeerManager.new(switch),
|
peerManager: PeerManager.new(switch, peerStorage),
|
||||||
switch: switch,
|
switch: switch,
|
||||||
rng: rng,
|
rng: rng,
|
||||||
peerInfo: peerInfo,
|
peerInfo: peerInfo,
|
||||||
@ -388,6 +390,9 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
|
|||||||
node.wakuRelay = wakuRelay
|
node.wakuRelay = wakuRelay
|
||||||
node.switch.mount(wakuRelay)
|
node.switch.mount(wakuRelay)
|
||||||
|
|
||||||
|
# Reonnect to previous relay peers
|
||||||
|
waitFor node.peerManager.reconnectPeers(WakuRelayCodec)
|
||||||
|
|
||||||
info "mounting relay"
|
info "mounting relay"
|
||||||
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
let msg = WakuMessage.init(data)
|
let msg = WakuMessage.init(data)
|
||||||
@ -476,6 +481,7 @@ when isMainModule:
|
|||||||
relay_api,
|
relay_api,
|
||||||
store_api],
|
store_api],
|
||||||
./storage/message/waku_message_store,
|
./storage/message/waku_message_store,
|
||||||
|
./storage/peer/waku_peer_storage,
|
||||||
../../common/utils/nat
|
../../common/utils/nat
|
||||||
|
|
||||||
proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) =
|
proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) =
|
||||||
@ -524,9 +530,32 @@ when isMainModule:
|
|||||||
info "Node metrics", totalMessages
|
info "Node metrics", totalMessages
|
||||||
discard setTimer(Moment.fromNow(2.seconds), logMetrics)
|
discard setTimer(Moment.fromNow(2.seconds), logMetrics)
|
||||||
discard setTimer(Moment.fromNow(2.seconds), logMetrics)
|
discard setTimer(Moment.fromNow(2.seconds), logMetrics)
|
||||||
|
|
||||||
let
|
let
|
||||||
conf = WakuNodeConf.load()
|
conf = WakuNodeConf.load()
|
||||||
|
|
||||||
|
# Storage setup
|
||||||
|
var sqliteDatabase: SqliteDatabase
|
||||||
|
|
||||||
|
if conf.dbpath != "":
|
||||||
|
let dbRes = SqliteDatabase.init(conf.dbpath)
|
||||||
|
if dbRes.isErr:
|
||||||
|
warn "failed to init database", err = dbRes.error
|
||||||
|
waku_node_errors.inc(labelValues = ["init_db_failure"])
|
||||||
|
else:
|
||||||
|
sqliteDatabase = dbRes.value
|
||||||
|
|
||||||
|
var pStorage: WakuPeerStorage
|
||||||
|
|
||||||
|
if not sqliteDatabase.isNil:
|
||||||
|
let res = WakuPeerStorage.new(sqliteDatabase)
|
||||||
|
if res.isErr:
|
||||||
|
warn "failed to init new WakuPeerStorage", err = res.error
|
||||||
|
waku_node_errors.inc(labelValues = ["init_store_failure"])
|
||||||
|
else:
|
||||||
|
pStorage = res.value
|
||||||
|
|
||||||
|
let
|
||||||
(extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, clientId,
|
(extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, clientId,
|
||||||
Port(uint16(conf.tcpPort) + conf.portsShift),
|
Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||||
Port(uint16(conf.udpPort) + conf.portsShift))
|
Port(uint16(conf.udpPort) + conf.portsShift))
|
||||||
@ -535,8 +564,10 @@ when isMainModule:
|
|||||||
## config, the external port is the same as the bind port.
|
## config, the external port is the same as the bind port.
|
||||||
extPort = if extIp.isSome() and extTcpPort.isNone(): some(Port(uint16(conf.tcpPort) + conf.portsShift))
|
extPort = if extIp.isSome() and extTcpPort.isNone(): some(Port(uint16(conf.tcpPort) + conf.portsShift))
|
||||||
else: extTcpPort
|
else: extTcpPort
|
||||||
node = WakuNode.init(conf.nodeKey, conf.listenAddress,
|
node = WakuNode.init(conf.nodeKey,
|
||||||
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extPort)
|
conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||||
|
extIp, extPort,
|
||||||
|
pStorage)
|
||||||
|
|
||||||
waitFor node.start()
|
waitFor node.start()
|
||||||
|
|
||||||
@ -548,13 +579,8 @@ when isMainModule:
|
|||||||
if conf.store:
|
if conf.store:
|
||||||
var store: WakuMessageStore
|
var store: WakuMessageStore
|
||||||
|
|
||||||
if conf.dbpath != "":
|
if not sqliteDatabase.isNil:
|
||||||
let dbRes = SqliteDatabase.init(conf.dbpath)
|
let res = WakuMessageStore.init(sqliteDatabase)
|
||||||
if dbRes.isErr:
|
|
||||||
warn "failed to init database", err = dbRes.error
|
|
||||||
waku_node_errors.inc(labelValues = ["init_db_failure"])
|
|
||||||
|
|
||||||
let res = WakuMessageStore.init(dbRes.value)
|
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
warn "failed to init WakuMessageStore", err = res.error
|
warn "failed to init WakuMessageStore", err = res.error
|
||||||
waku_node_errors.inc(labelValues = ["init_store_failure"])
|
waku_node_errors.inc(labelValues = ["init_store_failure"])
|
||||||
|
@ -12,7 +12,7 @@ import
|
|||||||
../message_notifier,
|
../message_notifier,
|
||||||
waku_filter_types,
|
waku_filter_types,
|
||||||
../../utils/requests,
|
../../utils/requests,
|
||||||
../../node/peer_manager
|
../../node/peer_manager/peer_manager
|
||||||
|
|
||||||
# NOTE This is just a start, the design of this protocol isn't done yet. It
|
# NOTE This is just a start, the design of this protocol isn't done yet. It
|
||||||
# should be direct payload exchange (a la req-resp), not be coupled with the
|
# should be direct payload exchange (a la req-resp), not be coupled with the
|
||||||
|
@ -3,7 +3,7 @@ import
|
|||||||
bearssl,
|
bearssl,
|
||||||
libp2p/peerinfo,
|
libp2p/peerinfo,
|
||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
../../node/peer_manager,
|
../../node/peer_manager/peer_manager,
|
||||||
../waku_message
|
../waku_message
|
||||||
|
|
||||||
export waku_message
|
export waku_message
|
||||||
|
@ -15,7 +15,7 @@ import
|
|||||||
../waku_swap/waku_swap,
|
../waku_swap/waku_swap,
|
||||||
./waku_store_types,
|
./waku_store_types,
|
||||||
../../utils/requests,
|
../../utils/requests,
|
||||||
../../node/peer_manager
|
../../node/peer_manager/peer_manager
|
||||||
|
|
||||||
export waku_store_types
|
export waku_store_types
|
||||||
|
|
||||||
|
@ -8,7 +8,7 @@ import
|
|||||||
../waku_message,
|
../waku_message,
|
||||||
../../node/storage/message/message_store,
|
../../node/storage/message/message_store,
|
||||||
../../utils/pagination,
|
../../utils/pagination,
|
||||||
../../node/peer_manager
|
../../node/peer_manager/peer_manager
|
||||||
|
|
||||||
export waku_message
|
export waku_message
|
||||||
export pagination
|
export pagination
|
||||||
|
@ -29,7 +29,7 @@ import
|
|||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
libp2p/stream/connection,
|
libp2p/stream/connection,
|
||||||
../../node/peer_manager,
|
../../node/peer_manager/peer_manager,
|
||||||
../message_notifier,
|
../message_notifier,
|
||||||
./waku_swap_types,
|
./waku_swap_types,
|
||||||
../../waku/v2/protocol/waku_swap/waku_swap_contracts
|
../../waku/v2/protocol/waku_swap/waku_swap_contracts
|
||||||
|
@ -3,7 +3,7 @@ import
|
|||||||
bearssl,
|
bearssl,
|
||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
libp2p/peerinfo,
|
libp2p/peerinfo,
|
||||||
../../node/peer_manager
|
../../node/peer_manager/peer_manager
|
||||||
|
|
||||||
type
|
type
|
||||||
Beneficiary* = seq[byte]
|
Beneficiary* = seq[byte]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user