mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-05-05 14:29:29 +00:00
refactor(networking): unify peer data models, remove StoredInfo (#1597)
This commit is contained in:
parent
f5c77ed76f
commit
ba027aa21c
@ -105,13 +105,13 @@ procSuite "Peer Manager":
|
|||||||
node.peerManager.peerStore.peers().len == 3
|
node.peerManager.peerStore.peers().len == 3
|
||||||
node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
|
node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
|
||||||
it.addrs.contains(filterLoc) and
|
it.addrs.contains(filterLoc) and
|
||||||
it.protos.contains(WakuFilterCodec))
|
it.protocols.contains(WakuFilterCodec))
|
||||||
node.peerManager.peerStore.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and
|
node.peerManager.peerStore.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and
|
||||||
it.addrs.contains(swapLoc) and
|
it.addrs.contains(swapLoc) and
|
||||||
it.protos.contains(WakuSwapCodec))
|
it.protocols.contains(WakuSwapCodec))
|
||||||
node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
|
node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
|
||||||
it.addrs.contains(storeLoc) and
|
it.addrs.contains(storeLoc) and
|
||||||
it.protos.contains(WakuStoreCodec))
|
it.protocols.contains(WakuStoreCodec))
|
||||||
|
|
||||||
await node.stop()
|
await node.stop()
|
||||||
|
|
||||||
@ -270,7 +270,7 @@ procSuite "Peer Manager":
|
|||||||
# Currently connected to node2
|
# Currently connected to node2
|
||||||
node1.peerManager.peerStore.peers().len == 1
|
node1.peerManager.peerStore.peers().len == 1
|
||||||
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||||
node1.peerManager.peerStore.peers().anyIt(it.protos.contains(node2.wakuRelay.codec))
|
node1.peerManager.peerStore.peers().anyIt(it.protocols.contains(node2.wakuRelay.codec))
|
||||||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
|
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
|
||||||
|
|
||||||
# Simulate restart by initialising a new node using the same storage
|
# Simulate restart by initialising a new node using the same storage
|
||||||
@ -286,7 +286,7 @@ procSuite "Peer Manager":
|
|||||||
# Node2 has been loaded after "restart", but we have not yet reconnected
|
# Node2 has been loaded after "restart", but we have not yet reconnected
|
||||||
node3.peerManager.peerStore.peers().len == 1
|
node3.peerManager.peerStore.peers().len == 1
|
||||||
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||||
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec))
|
node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(betaCodec))
|
||||||
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
|
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
|
||||||
|
|
||||||
await node3.start() # This should trigger a reconnect
|
await node3.start() # This should trigger a reconnect
|
||||||
@ -295,8 +295,8 @@ procSuite "Peer Manager":
|
|||||||
# Reconnected to node2 after "restart"
|
# Reconnected to node2 after "restart"
|
||||||
node3.peerManager.peerStore.peers().len == 1
|
node3.peerManager.peerStore.peers().len == 1
|
||||||
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||||
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec))
|
node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(betaCodec))
|
||||||
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(stableCodec))
|
node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(stableCodec))
|
||||||
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
|
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
|
||||||
|
|
||||||
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
||||||
|
|||||||
@ -22,64 +22,86 @@ suite "Peer Storage":
|
|||||||
peerKey = generateEcdsaKey()
|
peerKey = generateEcdsaKey()
|
||||||
peer = PeerInfo.new(peerKey, @[peerLoc])
|
peer = PeerInfo.new(peerKey, @[peerLoc])
|
||||||
peerProto = "/waku/2/default-waku/codec"
|
peerProto = "/waku/2/default-waku/codec"
|
||||||
stored = StoredInfo(peerId: peer.peerId, addrs: @[peerLoc], protos: @[peerProto], publicKey: peerKey.getPublicKey().tryGet())
|
connectedness = Connectedness.CanConnect
|
||||||
conn = Connectedness.CanConnect
|
|
||||||
disconn = 999999
|
disconn = 999999
|
||||||
|
stored = RemotePeerInfo(
|
||||||
|
peerId: peer.peerId,
|
||||||
|
addrs: @[peerLoc],
|
||||||
|
protocols: @[peerProto],
|
||||||
|
publicKey: peerKey.getPublicKey().tryGet(),
|
||||||
|
connectedness: connectedness,
|
||||||
|
disconnectTime: disconn)
|
||||||
|
|
||||||
defer: storage.close()
|
defer: storage.close()
|
||||||
|
|
||||||
# Test insert and retrieve
|
# Test insert and retrieve
|
||||||
|
|
||||||
discard storage.put(peer.peerId, stored, conn, disconn)
|
require storage.put(peer.peerId, stored, connectedness, disconn).isOk
|
||||||
|
|
||||||
var responseCount = 0
|
var responseCount = 0
|
||||||
# flags to check data matches what was stored (default true)
|
|
||||||
var peerIdFlag, storedInfoFlag, connectednessFlag, disconnectFlag: bool
|
|
||||||
|
|
||||||
proc data(peerId: PeerID, storedInfo: StoredInfo,
|
# Fetched variables from callback
|
||||||
|
var resPeerId: PeerId
|
||||||
|
var resStoredInfo: RemotePeerInfo
|
||||||
|
var resConnectedness: Connectedness
|
||||||
|
var resDisconnect: int64
|
||||||
|
|
||||||
|
proc data(peerId: PeerID, storedInfo: RemotePeerInfo,
|
||||||
connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} =
|
connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} =
|
||||||
responseCount += 1
|
responseCount += 1
|
||||||
|
|
||||||
# Note: cannot use `check` within `{.raises: [Defect].}` block
|
# Note: cannot use `check` within `{.raises: [Defect].}` block
|
||||||
# @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception
|
# @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception
|
||||||
# These flags are checked outside this block.
|
# These flags are checked outside this block.
|
||||||
peerIdFlag = peerId == peer.peerId
|
resPeerId = peerId
|
||||||
storedInfoFlag = storedInfo == stored
|
resStoredInfo = storedInfo
|
||||||
connectednessFlag = connectedness == conn
|
resConnectedness = connectedness
|
||||||
disconnectFlag = disconnectTime == disconn
|
resDisconnect = disconnectTime
|
||||||
|
|
||||||
let res = storage.getAll(data)
|
let res = storage.getAll(data)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
res.isErr == false
|
res.isErr == false
|
||||||
responseCount == 1
|
responseCount == 1
|
||||||
peerIdFlag
|
resPeerId == peer.peerId
|
||||||
storedInfoFlag
|
resStoredInfo.peerId == peer.peerId
|
||||||
connectednessFlag
|
resStoredInfo.addrs == @[peerLoc]
|
||||||
disconnectFlag
|
resStoredInfo.protocols == @[peerProto]
|
||||||
|
resStoredInfo.publicKey == peerKey.getPublicKey().tryGet()
|
||||||
|
# TODO: For compatibility, we don't store connectedness and disconnectTime
|
||||||
|
#resStoredInfo.connectedness == connectedness
|
||||||
|
#resStoredInfo.disconnectTime == disconn
|
||||||
|
resConnectedness == Connectedness.CanConnect
|
||||||
|
resDisconnect == disconn
|
||||||
|
|
||||||
# Test replace and retrieve (update an existing entry)
|
# Test replace and retrieve (update an existing entry)
|
||||||
discard storage.put(peer.peerId, stored, Connectedness.CannotConnect, disconn + 10)
|
require storage.put(peer.peerId, stored, Connectedness.CannotConnect, disconn + 10).isOk
|
||||||
|
|
||||||
responseCount = 0
|
responseCount = 0
|
||||||
proc replacedData(peerId: PeerID, storedInfo: StoredInfo,
|
proc replacedData(peerId: PeerID, storedInfo: RemotePeerInfo,
|
||||||
connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} =
|
connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} =
|
||||||
responseCount += 1
|
responseCount += 1
|
||||||
|
|
||||||
# Note: cannot use `check` within `{.raises: [Defect].}` block
|
# Note: cannot use `check` within `{.raises: [Defect].}` block
|
||||||
# @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception
|
# @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception
|
||||||
# These flags are checked outside this block.
|
# These flags are checked outside this block.
|
||||||
peerIdFlag = peerId == peer.peerId
|
resPeerId = peerId
|
||||||
storedInfoFlag = storedInfo == stored
|
resStoredInfo = storedInfo
|
||||||
connectednessFlag = connectedness == CannotConnect
|
resConnectedness = connectedness
|
||||||
disconnectFlag = disconnectTime == disconn + 10
|
resDisconnect = disconnectTime
|
||||||
|
|
||||||
let repRes = storage.getAll(replacedData)
|
let repRes = storage.getAll(replacedData)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
repRes.isErr == false
|
repRes.isErr == false
|
||||||
responseCount == 1
|
responseCount == 1
|
||||||
peerIdFlag
|
resPeerId == peer.peerId
|
||||||
storedInfoFlag
|
resStoredInfo.peerId == peer.peerId
|
||||||
connectednessFlag
|
resStoredInfo.addrs == @[peerLoc]
|
||||||
disconnectFlag
|
resStoredInfo.protocols == @[peerProto]
|
||||||
|
resStoredInfo.publicKey == peerKey.getPublicKey().tryGet()
|
||||||
|
# TODO: For compatibility, we don't store connectedness and disconnectTime
|
||||||
|
#resStoredInfo.connectedness == connectedness
|
||||||
|
#resStoredInfo.disconnectTime == disconn
|
||||||
|
resConnectedness == Connectedness.CannotConnect
|
||||||
|
resDisconnect == disconn + 10
|
||||||
|
|||||||
@ -103,37 +103,37 @@ suite "Extended nim-libp2p Peer Store":
|
|||||||
|
|
||||||
test "get() returns the correct StoredInfo for a given PeerId":
|
test "get() returns the correct StoredInfo for a given PeerId":
|
||||||
# When
|
# When
|
||||||
let storedInfoPeer1 = peerStore.get(p1)
|
let peer1 = peerStore.get(p1)
|
||||||
let storedInfoPeer6 = peerStore.get(p6)
|
let peer6 = peerStore.get(p6)
|
||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
# regression on nim-libp2p fields
|
# regression on nim-libp2p fields
|
||||||
storedInfoPeer1.peerId == p1
|
peer1.peerId == p1
|
||||||
storedInfoPeer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
|
peer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
|
||||||
storedInfoPeer1.protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
|
peer1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
|
||||||
storedInfoPeer1.agent == "nwaku"
|
peer1.agent == "nwaku"
|
||||||
storedInfoPeer1.protoVersion == "protoVersion1"
|
peer1.protoVersion == "protoVersion1"
|
||||||
|
|
||||||
# our extended fields
|
# our extended fields
|
||||||
storedInfoPeer1.connectedness == Connected
|
peer1.connectedness == Connected
|
||||||
storedInfoPeer1.disconnectTime == 0
|
peer1.disconnectTime == 0
|
||||||
storedInfoPeer1.origin == Discv5
|
peer1.origin == Discv5
|
||||||
storedInfoPeer1.numberFailedConn == 1
|
peer1.numberFailedConn == 1
|
||||||
storedInfoPeer1.lastFailedConn == Moment.init(1001, Second)
|
peer1.lastFailedConn == Moment.init(1001, Second)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
# fields are empty, not part of the peerstore
|
# fields are empty, not part of the peerstore
|
||||||
storedInfoPeer6.peerId == p6
|
peer6.peerId == p6
|
||||||
storedInfoPeer6.addrs.len == 0
|
peer6.addrs.len == 0
|
||||||
storedInfoPeer6.protos.len == 0
|
peer6.protocols.len == 0
|
||||||
storedInfoPeer6.agent == default(string)
|
peer6.agent == default(string)
|
||||||
storedInfoPeer6.protoVersion == default(string)
|
peer6.protoVersion == default(string)
|
||||||
storedInfoPeer6.connectedness == default(Connectedness)
|
peer6.connectedness == default(Connectedness)
|
||||||
storedInfoPeer6.disconnectTime == default(int)
|
peer6.disconnectTime == default(int)
|
||||||
storedInfoPeer6.origin == default(PeerOrigin)
|
peer6.origin == default(PeerOrigin)
|
||||||
storedInfoPeer6.numberFailedConn == default(int)
|
peer6.numberFailedConn == default(int)
|
||||||
storedInfoPeer6.lastFailedConn == default(Moment)
|
peer6.lastFailedConn == default(Moment)
|
||||||
|
|
||||||
test "peers() returns all StoredInfo of the PeerStore":
|
test "peers() returns all StoredInfo of the PeerStore":
|
||||||
# When
|
# When
|
||||||
@ -153,7 +153,7 @@ suite "Extended nim-libp2p Peer Store":
|
|||||||
check:
|
check:
|
||||||
# regression on nim-libp2p fields
|
# regression on nim-libp2p fields
|
||||||
p3.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()]
|
p3.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()]
|
||||||
p3.protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
|
p3.protocols == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
|
||||||
p3.agent == "gowaku"
|
p3.agent == "gowaku"
|
||||||
p3.protoVersion == "protoVersion3"
|
p3.protoVersion == "protoVersion3"
|
||||||
|
|
||||||
@ -180,7 +180,7 @@ suite "Extended nim-libp2p Peer Store":
|
|||||||
# Only p3 supports that protocol
|
# Only p3 supports that protocol
|
||||||
lpPeers.len == 1
|
lpPeers.len == 1
|
||||||
lpPeers.anyIt(it.peerId == p3)
|
lpPeers.anyIt(it.peerId == p3)
|
||||||
lpPeers[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
|
lpPeers[0].protocols == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
|
||||||
|
|
||||||
test "peers() returns all StoredInfo matching a given protocolMatcher":
|
test "peers() returns all StoredInfo matching a given protocolMatcher":
|
||||||
# When
|
# When
|
||||||
@ -197,28 +197,25 @@ suite "Extended nim-libp2p Peer Store":
|
|||||||
pMatcherStorePeers.anyIt(it.peerId == p5)
|
pMatcherStorePeers.anyIt(it.peerId == p5)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
pMatcherStorePeers.filterIt(it.peerId == p1)[0].protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
|
pMatcherStorePeers.filterIt(it.peerId == p1)[0].protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
|
||||||
pMatcherStorePeers.filterIt(it.peerId == p2)[0].protos == @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"]
|
pMatcherStorePeers.filterIt(it.peerId == p2)[0].protocols == @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"]
|
||||||
pMatcherStorePeers.filterIt(it.peerId == p3)[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
|
pMatcherStorePeers.filterIt(it.peerId == p3)[0].protocols == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
|
||||||
pMatcherStorePeers.filterIt(it.peerId == p5)[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
|
pMatcherStorePeers.filterIt(it.peerId == p5)[0].protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
|
||||||
|
|
||||||
check:
|
check:
|
||||||
pMatcherSwapPeers.len == 1
|
pMatcherSwapPeers.len == 1
|
||||||
pMatcherSwapPeers.anyIt(it.peerId == p5)
|
pMatcherSwapPeers.anyIt(it.peerId == p5)
|
||||||
pMatcherSwapPeers[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
|
pMatcherSwapPeers[0].protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
|
||||||
|
|
||||||
test "toRemotePeerInfo() converts a StoredInfo to a RemotePeerInfo":
|
test "toRemotePeerInfo() converts a StoredInfo to a RemotePeerInfo":
|
||||||
# Given
|
# Given
|
||||||
let storedInfoPeer1 = peerStore.get(p1)
|
let peer1 = peerStore.get(p1)
|
||||||
|
|
||||||
# When
|
|
||||||
let remotePeerInfo1 = storedInfoPeer1.toRemotePeerInfo()
|
|
||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
remotePeerInfo1.peerId == p1
|
peer1.peerId == p1
|
||||||
remotePeerInfo1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
|
peer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
|
||||||
remotePeerInfo1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
|
peer1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
|
||||||
|
|
||||||
test "connectedness() returns the connection status of a given PeerId":
|
test "connectedness() returns the connection status of a given PeerId":
|
||||||
check:
|
check:
|
||||||
|
|||||||
@ -38,13 +38,6 @@ proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string =
|
|||||||
return ""
|
return ""
|
||||||
constructMultiaddrStr(remotePeerInfo.addrs[0], remotePeerInfo.peerId)
|
constructMultiaddrStr(remotePeerInfo.addrs[0], remotePeerInfo.peerId)
|
||||||
|
|
||||||
proc constructMultiaddrStr*(storedInfo: StoredInfo): string =
|
|
||||||
# Constructs a multiaddress with both location (wire) address and p2p identity
|
|
||||||
if storedInfo.addrs.len == 0:
|
|
||||||
return ""
|
|
||||||
constructMultiaddrStr(storedInfo.addrs[0], storedInfo.peerId)
|
|
||||||
|
|
||||||
|
|
||||||
proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||||
|
|
||||||
rpcsrv.rpc("post_waku_v2_admin_v1_peers") do (peers: seq[string]) -> bool:
|
rpcsrv.rpc("post_waku_v2_admin_v1_peers") do (peers: seq[string]) -> bool:
|
||||||
|
|||||||
@ -78,11 +78,11 @@ proc protocolMatcher*(codec: string): Matcher =
|
|||||||
|
|
||||||
proc insertOrReplace(ps: PeerStorage,
|
proc insertOrReplace(ps: PeerStorage,
|
||||||
peerId: PeerID,
|
peerId: PeerID,
|
||||||
storedInfo: StoredInfo,
|
remotePeerInfo: RemotePeerInfo,
|
||||||
connectedness: Connectedness,
|
connectedness: Connectedness,
|
||||||
disconnectTime: int64 = 0) =
|
disconnectTime: int64 = 0) =
|
||||||
# Insert peer entry into persistent storage, or replace existing entry with updated info
|
# Insert peer entry into persistent storage, or replace existing entry with updated info
|
||||||
let res = ps.put(peerId, storedInfo, connectedness, disconnectTime)
|
let res = ps.put(peerId, remotePeerInfo, connectedness, disconnectTime)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
warn "failed to store peers", err = res.error
|
warn "failed to store peers", err = res.error
|
||||||
waku_peers_errors.inc(labelValues = ["storage_failure"])
|
waku_peers_errors.inc(labelValues = ["storage_failure"])
|
||||||
@ -137,24 +137,24 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
|
|||||||
proc loadFromStorage(pm: PeerManager) =
|
proc loadFromStorage(pm: PeerManager) =
|
||||||
debug "loading peers from storage"
|
debug "loading peers from storage"
|
||||||
# Load peers from storage, if available
|
# Load peers from storage, if available
|
||||||
proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) =
|
proc onData(peerId: PeerID, remotePeerInfo: RemotePeerInfo, connectedness: Connectedness, disconnectTime: int64) =
|
||||||
trace "loading peer", peerId= $peerId, storedInfo= $storedInfo, connectedness=connectedness
|
trace "loading peer", peerId=peerId, connectedness=connectedness
|
||||||
|
|
||||||
if peerId == pm.switch.peerInfo.peerId:
|
if peerId == pm.switch.peerInfo.peerId:
|
||||||
# Do not manage self
|
# Do not manage self
|
||||||
return
|
return
|
||||||
|
|
||||||
# nim-libp2p books
|
# nim-libp2p books
|
||||||
pm.peerStore[AddressBook][peerId] = storedInfo.addrs
|
pm.peerStore[AddressBook][peerId] = remotePeerInfo.addrs
|
||||||
pm.peerStore[ProtoBook][peerId] = storedInfo.protos
|
pm.peerStore[ProtoBook][peerId] = remotePeerInfo.protocols
|
||||||
pm.peerStore[KeyBook][peerId] = storedInfo.publicKey
|
pm.peerStore[KeyBook][peerId] = remotePeerInfo.publicKey
|
||||||
pm.peerStore[AgentBook][peerId] = storedInfo.agent
|
pm.peerStore[AgentBook][peerId] = remotePeerInfo.agent
|
||||||
pm.peerStore[ProtoVersionBook][peerId] = storedInfo.protoVersion
|
pm.peerStore[ProtoVersionBook][peerId] = remotePeerInfo.protoVersion
|
||||||
|
|
||||||
# custom books
|
# custom books
|
||||||
pm.peerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state
|
pm.peerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state
|
||||||
pm.peerStore[DisconnectBook][peerId] = disconnectTime
|
pm.peerStore[DisconnectBook][peerId] = disconnectTime
|
||||||
pm.peerStore[SourceBook][peerId] = storedInfo.origin
|
pm.peerStore[SourceBook][peerId] = remotePeerInfo.origin
|
||||||
|
|
||||||
let res = pm.storage.getAll(onData)
|
let res = pm.storage.getAll(onData)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
@ -461,7 +461,7 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
|
|||||||
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
|
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
|
||||||
if peers.len > 0:
|
if peers.len > 0:
|
||||||
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
|
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
|
||||||
return some(peers[0].toRemotePeerInfo())
|
return some(peers[0])
|
||||||
debug "No peer found for protocol", protocol=proto
|
debug "No peer found for protocol", protocol=proto
|
||||||
return none(RemotePeerInfo)
|
return none(RemotePeerInfo)
|
||||||
|
|
||||||
@ -473,7 +473,7 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
|
|||||||
# If not slotted, we select a random peer for the given protocol
|
# If not slotted, we select a random peer for the given protocol
|
||||||
if peers.len > 0:
|
if peers.len > 0:
|
||||||
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
|
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
|
||||||
return some(peers[0].toRemotePeerInfo())
|
return some(peers[0])
|
||||||
debug "No peer found for protocol", protocol=proto
|
debug "No peer found for protocol", protocol=proto
|
||||||
return none(RemotePeerInfo)
|
return none(RemotePeerInfo)
|
||||||
|
|
||||||
|
|||||||
@ -7,24 +7,25 @@ else:
|
|||||||
import
|
import
|
||||||
stew/results
|
stew/results
|
||||||
import
|
import
|
||||||
../waku_peer_store
|
../waku_peer_store,
|
||||||
|
../../../utils/peers
|
||||||
|
|
||||||
## This module defines a peer storage interface. Implementations of
|
## This module defines a peer storage interface. Implementations of
|
||||||
## PeerStorage are used to store and retrieve peers
|
## PeerStorage are used to store and retrieve peers
|
||||||
|
|
||||||
type
|
type
|
||||||
PeerStorage* = ref object of RootObj
|
PeerStorage* = ref object of RootObj
|
||||||
|
|
||||||
PeerStorageResult*[T] = Result[T, string]
|
PeerStorageResult*[T] = Result[T, string]
|
||||||
|
|
||||||
DataProc* = proc(peerId: PeerID, storedInfo: StoredInfo,
|
DataProc* = proc(peerId: PeerID, remotePeerInfo: RemotePeerInfo,
|
||||||
connectedness: Connectedness, disconnectTime: int64) {.closure, raises: [Defect].}
|
connectedness: Connectedness, disconnectTime: int64) {.closure, raises: [Defect].}
|
||||||
|
|
||||||
# PeerStorage interface
|
# PeerStorage interface
|
||||||
method put*(db: PeerStorage,
|
method put*(db: PeerStorage,
|
||||||
peerId: PeerID,
|
peerId: PeerID,
|
||||||
storedInfo: StoredInfo,
|
remotePeerInfo: RemotePeerInfo,
|
||||||
connectedness: Connectedness,
|
connectedness: Connectedness,
|
||||||
disconnectTime: int64): PeerStorageResult[void] {.base.} = discard
|
disconnectTime: int64): PeerStorageResult[void] {.base.} = discard
|
||||||
|
|
||||||
method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[bool] {.base.} = discard
|
method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[bool] {.base.} = discard
|
||||||
|
|||||||
@ -5,13 +5,14 @@ else:
|
|||||||
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/sets,
|
std/sets,
|
||||||
stew/results,
|
stew/results,
|
||||||
sqlite3_abi,
|
sqlite3_abi,
|
||||||
libp2p/protobuf/minprotobuf
|
libp2p/protobuf/minprotobuf
|
||||||
import
|
import
|
||||||
../../../../common/sqlite,
|
../../../../common/sqlite,
|
||||||
../waku_peer_store,
|
../waku_peer_store,
|
||||||
|
../../../utils/peers,
|
||||||
./peer_storage
|
./peer_storage
|
||||||
|
|
||||||
export sqlite
|
export sqlite
|
||||||
@ -25,11 +26,11 @@ type
|
|||||||
# Protobuf Serialisation #
|
# Protobuf Serialisation #
|
||||||
##########################
|
##########################
|
||||||
|
|
||||||
proc init*(T: type StoredInfo, buffer: seq[byte]): ProtoResult[T] =
|
proc init*(T: type RemotePeerInfo, buffer: seq[byte]): ProtoResult[T] =
|
||||||
var
|
var
|
||||||
multiaddrSeq: seq[MultiAddress]
|
multiaddrSeq: seq[MultiAddress]
|
||||||
protoSeq: seq[string]
|
protoSeq: seq[string]
|
||||||
storedInfo = StoredInfo()
|
storedInfo = RemotePeerInfo()
|
||||||
|
|
||||||
var pb = initProtoBuffer(buffer)
|
var pb = initProtoBuffer(buffer)
|
||||||
|
|
||||||
@ -37,25 +38,27 @@ proc init*(T: type StoredInfo, buffer: seq[byte]): ProtoResult[T] =
|
|||||||
discard ? pb.getRepeatedField(2, multiaddrSeq)
|
discard ? pb.getRepeatedField(2, multiaddrSeq)
|
||||||
discard ? pb.getRepeatedField(3, protoSeq)
|
discard ? pb.getRepeatedField(3, protoSeq)
|
||||||
discard ? pb.getField(4, storedInfo.publicKey)
|
discard ? pb.getField(4, storedInfo.publicKey)
|
||||||
|
|
||||||
|
# TODO: Store the rest of parameters such as connectedness and disconnectTime
|
||||||
|
|
||||||
storedInfo.addrs = multiaddrSeq
|
storedInfo.addrs = multiaddrSeq
|
||||||
storedInfo.protos = protoSeq
|
storedInfo.protocols = protoSeq
|
||||||
|
|
||||||
ok(storedInfo)
|
ok(storedInfo)
|
||||||
|
|
||||||
proc encode*(storedInfo: StoredInfo): PeerStorageResult[ProtoBuffer] =
|
proc encode*(remotePeerInfo: RemotePeerInfo): PeerStorageResult[ProtoBuffer] =
|
||||||
var pb = initProtoBuffer()
|
var pb = initProtoBuffer()
|
||||||
|
|
||||||
pb.write(1, storedInfo.peerId)
|
pb.write(1, remotePeerInfo.peerId)
|
||||||
|
|
||||||
for multiaddr in storedInfo.addrs.items:
|
for multiaddr in remotePeerInfo.addrs.items:
|
||||||
pb.write(2, multiaddr)
|
pb.write(2, multiaddr)
|
||||||
|
|
||||||
for proto in storedInfo.protos.items:
|
for proto in remotePeerInfo.protocols.items:
|
||||||
pb.write(3, proto)
|
pb.write(3, proto)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
pb.write(4, storedInfo.publicKey)
|
pb.write(4, remotePeerInfo.publicKey)
|
||||||
except ResultError[CryptoError] as e:
|
except ResultError[CryptoError] as e:
|
||||||
return err("Failed to encode public key")
|
return err("Failed to encode public key")
|
||||||
|
|
||||||
@ -69,13 +72,15 @@ proc new*(T: type WakuPeerStorage, db: SqliteDatabase): PeerStorageResult[T] =
|
|||||||
## Misconfiguration can lead to nil DB
|
## Misconfiguration can lead to nil DB
|
||||||
if db.isNil():
|
if db.isNil():
|
||||||
return err("db not initialized")
|
return err("db not initialized")
|
||||||
|
|
||||||
## Create the "Peer" table
|
## Create the "Peer" table
|
||||||
## It contains:
|
## It contains:
|
||||||
## - peer id as primary key, stored as a blob
|
## - peer id as primary key, stored as a blob
|
||||||
## - stored info (serialised protobuf), stored as a blob
|
## - stored info (serialised protobuf), stored as a blob
|
||||||
## - last known enumerated connectedness state, stored as an integer
|
## - last known enumerated connectedness state, stored as an integer
|
||||||
## - disconnect time in epoch seconds, if applicable
|
## - disconnect time in epoch seconds, if applicable
|
||||||
|
|
||||||
|
# TODO: connectedness and disconnectTime are now stored in the storedInfo type
|
||||||
let
|
let
|
||||||
createStmt = db.prepareStmt("""
|
createStmt = db.prepareStmt("""
|
||||||
CREATE TABLE IF NOT EXISTS Peer (
|
CREATE TABLE IF NOT EXISTS Peer (
|
||||||
@ -102,19 +107,19 @@ proc new*(T: type WakuPeerStorage, db: SqliteDatabase): PeerStorageResult[T] =
|
|||||||
).expect("this is a valid statement")
|
).expect("this is a valid statement")
|
||||||
|
|
||||||
## General initialization
|
## General initialization
|
||||||
|
|
||||||
ok(WakuPeerStorage(database: db,
|
ok(WakuPeerStorage(database: db,
|
||||||
replaceStmt: replaceStmt))
|
replaceStmt: replaceStmt))
|
||||||
|
|
||||||
|
|
||||||
method put*(db: WakuPeerStorage,
|
method put*(db: WakuPeerStorage,
|
||||||
peerId: PeerID,
|
peerId: PeerID,
|
||||||
storedInfo: StoredInfo,
|
remotePeerInfo: RemotePeerInfo,
|
||||||
connectedness: Connectedness,
|
connectedness: Connectedness,
|
||||||
disconnectTime: int64): PeerStorageResult[void] =
|
disconnectTime: int64): PeerStorageResult[void] =
|
||||||
|
|
||||||
## Adds a peer to storage or replaces existing entry if it already exists
|
## Adds a peer to storage or replaces existing entry if it already exists
|
||||||
let encoded = storedInfo.encode()
|
let encoded = remotePeerInfo.encode()
|
||||||
|
|
||||||
if encoded.isErr:
|
if encoded.isErr:
|
||||||
return err("failed to encode: " & encoded.error())
|
return err("failed to encode: " & encoded.error())
|
||||||
@ -129,7 +134,7 @@ method getAll*(db: WakuPeerStorage, onData: peer_storage.DataProc): PeerStorageR
|
|||||||
## Retrieves all peers from storage
|
## Retrieves all peers from storage
|
||||||
var gotPeers = false
|
var gotPeers = false
|
||||||
|
|
||||||
proc peer(s: ptr sqlite3_stmt) {.raises: [Defect, LPError, ResultError[ProtoError]].} =
|
proc peer(s: ptr sqlite3_stmt) {.raises: [Defect, LPError, ResultError[ProtoError]].} =
|
||||||
gotPeers = true
|
gotPeers = true
|
||||||
let
|
let
|
||||||
# Peer ID
|
# Peer ID
|
||||||
@ -139,7 +144,7 @@ method getAll*(db: WakuPeerStorage, onData: peer_storage.DataProc): PeerStorageR
|
|||||||
# Stored Info
|
# Stored Info
|
||||||
sTo = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1))
|
sTo = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1))
|
||||||
sToL = sqlite3_column_bytes(s, 1)
|
sToL = sqlite3_column_bytes(s, 1)
|
||||||
storedInfo = StoredInfo.init(@(toOpenArray(sTo, 0, sToL - 1))).tryGet()
|
storedInfo = RemotePeerInfo.init(@(toOpenArray(sTo, 0, sToL - 1))).tryGet()
|
||||||
# Connectedness
|
# Connectedness
|
||||||
connectedness = Connectedness(sqlite3_column_int(s, 2))
|
connectedness = Connectedness(sqlite3_column_int(s, 2))
|
||||||
# DisconnectTime
|
# DisconnectTime
|
||||||
@ -152,13 +157,13 @@ method getAll*(db: WakuPeerStorage, onData: peer_storage.DataProc): PeerStorageR
|
|||||||
queryResult = db.database.query("SELECT peerId, storedInfo, connectedness, disconnectTime FROM Peer", peer)
|
queryResult = db.database.query("SELECT peerId, storedInfo, connectedness, disconnectTime FROM Peer", peer)
|
||||||
except LPError, ResultError[ProtoError]:
|
except LPError, ResultError[ProtoError]:
|
||||||
return err("failed to extract peer from query result")
|
return err("failed to extract peer from query result")
|
||||||
|
|
||||||
if queryResult.isErr:
|
if queryResult.isErr:
|
||||||
return err("failed")
|
return err("failed")
|
||||||
|
|
||||||
ok gotPeers
|
ok gotPeers
|
||||||
|
|
||||||
proc close*(db: WakuPeerStorage) =
|
proc close*(db: WakuPeerStorage) =
|
||||||
## Closes the database.
|
## Closes the database.
|
||||||
db.replaceStmt.dispose()
|
db.replaceStmt.dispose()
|
||||||
db.database.close()
|
db.database.close()
|
||||||
|
|||||||
@ -6,6 +6,7 @@ else:
|
|||||||
import
|
import
|
||||||
std/[tables, sequtils, sets, options, times, math],
|
std/[tables, sequtils, sets, options, times, math],
|
||||||
chronos,
|
chronos,
|
||||||
|
eth/p2p/discoveryv5/enr,
|
||||||
libp2p/builders,
|
libp2p/builders,
|
||||||
libp2p/peerstore
|
libp2p/peerstore
|
||||||
|
|
||||||
@ -15,26 +16,6 @@ import
|
|||||||
export peerstore, builders
|
export peerstore, builders
|
||||||
|
|
||||||
type
|
type
|
||||||
Connectedness* = enum
|
|
||||||
# NotConnected: default state for a new peer. No connection and no further information on connectedness.
|
|
||||||
NotConnected,
|
|
||||||
# CannotConnect: attempted to connect to peer, but failed.
|
|
||||||
CannotConnect,
|
|
||||||
# CanConnect: was recently connected to peer and disconnected gracefully.
|
|
||||||
CanConnect,
|
|
||||||
# Connected: actively connected to peer.
|
|
||||||
Connected
|
|
||||||
|
|
||||||
PeerOrigin* = enum
|
|
||||||
UnknownOrigin,
|
|
||||||
Discv5,
|
|
||||||
Static,
|
|
||||||
Dns
|
|
||||||
|
|
||||||
PeerDirection* = enum
|
|
||||||
UnknownDirection,
|
|
||||||
Inbound,
|
|
||||||
Outbound
|
|
||||||
|
|
||||||
# Keeps track of the Connectedness state of a peer
|
# Keeps track of the Connectedness state of a peer
|
||||||
ConnectionBook* = ref object of PeerBook[Connectedness]
|
ConnectionBook* = ref object of PeerBook[Connectedness]
|
||||||
@ -54,22 +35,8 @@ type
|
|||||||
# Direction
|
# Direction
|
||||||
DirectionBook* = ref object of PeerBook[PeerDirection]
|
DirectionBook* = ref object of PeerBook[PeerDirection]
|
||||||
|
|
||||||
StoredInfo* = object
|
# ENR Book
|
||||||
# Taken from nim-libp2
|
ENRBook* = ref object of PeerBook[enr.Record]
|
||||||
peerId*: PeerId
|
|
||||||
addrs*: seq[MultiAddress]
|
|
||||||
protos*: seq[string]
|
|
||||||
publicKey*: PublicKey
|
|
||||||
agent*: string
|
|
||||||
protoVersion*: string
|
|
||||||
|
|
||||||
# Extended custom fields
|
|
||||||
connectedness*: Connectedness
|
|
||||||
disconnectTime*: int64
|
|
||||||
origin*: PeerOrigin
|
|
||||||
direction*: PeerDirection
|
|
||||||
lastFailedConn*: Moment
|
|
||||||
numberFailedConn*: int
|
|
||||||
|
|
||||||
##################
|
##################
|
||||||
# Peer Store API #
|
# Peer Store API #
|
||||||
@ -103,16 +70,16 @@ proc delete*(peerStore: PeerStore,
|
|||||||
peerStore.del(peerId)
|
peerStore.del(peerId)
|
||||||
|
|
||||||
proc get*(peerStore: PeerStore,
|
proc get*(peerStore: PeerStore,
|
||||||
peerId: PeerID): StoredInfo =
|
peerId: PeerID): RemotePeerInfo =
|
||||||
## Get the stored information of a given peer.
|
## Get the stored information of a given peer.
|
||||||
StoredInfo(
|
RemotePeerInfo(
|
||||||
# Taken from nim-libp2
|
|
||||||
peerId: peerId,
|
peerId: peerId,
|
||||||
addrs: peerStore[AddressBook][peerId],
|
addrs: peerStore[AddressBook][peerId],
|
||||||
protos: peerStore[ProtoBook][peerId],
|
enr: if peerStore[ENRBook][peerId] != default(enr.Record): some(peerStore[ENRBook][peerId]) else: none(enr.Record),
|
||||||
publicKey: peerStore[KeyBook][peerId],
|
protocols: peerStore[ProtoBook][peerId],
|
||||||
agent: peerStore[AgentBook][peerId],
|
agent: peerStore[AgentBook][peerId],
|
||||||
protoVersion: peerStore[ProtoVersionBook][peerId],
|
protoVersion: peerStore[ProtoVersionBook][peerId],
|
||||||
|
publicKey: peerStore[KeyBook][peerId],
|
||||||
|
|
||||||
# Extended custom fields
|
# Extended custom fields
|
||||||
connectedness: peerStore[ConnectionBook][peerId],
|
connectedness: peerStore[ConnectionBook][peerId],
|
||||||
@ -124,7 +91,7 @@ proc get*(peerStore: PeerStore,
|
|||||||
)
|
)
|
||||||
|
|
||||||
# TODO: Rename peers() to getPeersByProtocol()
|
# TODO: Rename peers() to getPeersByProtocol()
|
||||||
proc peers*(peerStore: PeerStore): seq[StoredInfo] =
|
proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] =
|
||||||
## Get all the stored information of every peer.
|
## Get all the stored information of every peer.
|
||||||
let allKeys = concat(toSeq(peerStore[AddressBook].book.keys()),
|
let allKeys = concat(toSeq(peerStore[AddressBook].book.keys()),
|
||||||
toSeq(peerStore[ProtoBook].book.keys()),
|
toSeq(peerStore[ProtoBook].book.keys()),
|
||||||
@ -132,19 +99,13 @@ proc peers*(peerStore: PeerStore): seq[StoredInfo] =
|
|||||||
|
|
||||||
return allKeys.mapIt(peerStore.get(it))
|
return allKeys.mapIt(peerStore.get(it))
|
||||||
|
|
||||||
proc peers*(peerStore: PeerStore, proto: string): seq[StoredInfo] =
|
proc peers*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] =
|
||||||
# Return the known info for all peers registered on the specified protocol
|
# Return the known info for all peers registered on the specified protocol
|
||||||
peerStore.peers.filterIt(it.protos.contains(proto))
|
peerStore.peers.filterIt(it.protocols.contains(proto))
|
||||||
|
|
||||||
proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[StoredInfo] =
|
proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[RemotePeerInfo] =
|
||||||
# Return the known info for all peers matching the provided protocolMatcher
|
# Return the known info for all peers matching the provided protocolMatcher
|
||||||
peerStore.peers.filterIt(it.protos.anyIt(protocolMatcher(it)))
|
peerStore.peers.filterIt(it.protocols.anyIt(protocolMatcher(it)))
|
||||||
|
|
||||||
proc toRemotePeerInfo*(storedInfo: StoredInfo): RemotePeerInfo =
|
|
||||||
RemotePeerInfo.init(peerId = storedInfo.peerId,
|
|
||||||
addrs = toSeq(storedInfo.addrs),
|
|
||||||
protocols = toSeq(storedInfo.protos))
|
|
||||||
|
|
||||||
|
|
||||||
proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness =
|
proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness =
|
||||||
# Return the connection state of the given, managed peer
|
# Return the connection state of the given, managed peer
|
||||||
@ -159,7 +120,7 @@ proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool =
|
|||||||
proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool =
|
proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool =
|
||||||
# Returns `true` if peer is included in manager for the specified protocol
|
# Returns `true` if peer is included in manager for the specified protocol
|
||||||
# TODO: What if peer does not exist in the peerStore?
|
# TODO: What if peer does not exist in the peerStore?
|
||||||
peerStore.get(peerId).protos.contains(proto)
|
peerStore.get(peerId).protocols.contains(proto)
|
||||||
|
|
||||||
proc hasPeers*(peerStore: PeerStore, proto: string): bool =
|
proc hasPeers*(peerStore: PeerStore, proto: string): bool =
|
||||||
# Returns `true` if the peerstore has any peer for the specified protocol
|
# Returns `true` if the peerstore has any peer for the specified protocol
|
||||||
@ -169,14 +130,14 @@ proc hasPeers*(peerStore: PeerStore, protocolMatcher: Matcher): bool =
|
|||||||
# Returns `true` if the peerstore has any peer matching the protocolMatcher
|
# Returns `true` if the peerstore has any peer matching the protocolMatcher
|
||||||
toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it)))
|
toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it)))
|
||||||
|
|
||||||
proc getPeersByDirection*(peerStore: PeerStore, direction: PeerDirection): seq[StoredInfo] =
|
proc getPeersByDirection*(peerStore: PeerStore, direction: PeerDirection): seq[RemotePeerInfo] =
|
||||||
return peerStore.peers.filterIt(it.direction == direction)
|
return peerStore.peers.filterIt(it.direction == direction)
|
||||||
|
|
||||||
proc getNotConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] =
|
proc getNotConnectedPeers*(peerStore: PeerStore): seq[RemotePeerInfo] =
|
||||||
return peerStore.peers.filterIt(it.connectedness != Connected)
|
return peerStore.peers.filterIt(it.connectedness != Connected)
|
||||||
|
|
||||||
proc getConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] =
|
proc getConnectedPeers*(peerStore: PeerStore): seq[RemotePeerInfo] =
|
||||||
return peerStore.peers.filterIt(it.connectedness == Connected)
|
return peerStore.peers.filterIt(it.connectedness == Connected)
|
||||||
|
|
||||||
proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[StoredInfo] =
|
proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] =
|
||||||
return peerStore.peers.filterIt(it.protos.contains(proto))
|
return peerStore.peers.filterIt(it.protocols.contains(proto))
|
||||||
|
|||||||
@ -585,7 +585,7 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C
|
|||||||
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
||||||
else: peer
|
else: peer
|
||||||
|
|
||||||
info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer
|
info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId
|
||||||
|
|
||||||
# Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled
|
# Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled
|
||||||
# TODO: Move this logic to wakunode2 app
|
# TODO: Move this logic to wakunode2 app
|
||||||
@ -612,7 +612,7 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics:
|
|||||||
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
|
||||||
else: peer
|
else: peer
|
||||||
|
|
||||||
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer
|
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId
|
||||||
|
|
||||||
let unsubRes = await node.wakuFilterClient.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer)
|
let unsubRes = await node.wakuFilterClient.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer)
|
||||||
if unsubRes.isOk():
|
if unsubRes.isOk():
|
||||||
@ -854,7 +854,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe
|
|||||||
if node.wakuLightpushClient.isNil():
|
if node.wakuLightpushClient.isNil():
|
||||||
return err("waku lightpush client is nil")
|
return err("waku lightpush client is nil")
|
||||||
|
|
||||||
debug "publishing message with lightpush", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, peer=peer
|
debug "publishing message with lightpush", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, peer=peer.peerId
|
||||||
|
|
||||||
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)
|
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)
|
||||||
|
|
||||||
@ -980,7 +980,6 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
|
|||||||
# First get a list of connected peer infos
|
# First get a list of connected peer infos
|
||||||
let peers = node.peerManager.peerStore.peers()
|
let peers = node.peerManager.peerStore.peers()
|
||||||
.filterIt(it.connectedness == Connected)
|
.filterIt(it.connectedness == Connected)
|
||||||
.mapIt(it.toRemotePeerInfo())
|
|
||||||
|
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
try:
|
try:
|
||||||
|
|||||||
@ -5,7 +5,8 @@ else:
|
|||||||
|
|
||||||
# Collection of utilities related to Waku peers
|
# Collection of utilities related to Waku peers
|
||||||
import
|
import
|
||||||
std/[options, sequtils, strutils],
|
std/[options, sequtils, strutils, times],
|
||||||
|
chronos,
|
||||||
stew/results,
|
stew/results,
|
||||||
stew/shims/net,
|
stew/shims/net,
|
||||||
eth/keys,
|
eth/keys,
|
||||||
@ -18,6 +19,32 @@ import
|
|||||||
peerinfo,
|
peerinfo,
|
||||||
routing_record]
|
routing_record]
|
||||||
|
|
||||||
|
#import
|
||||||
|
# ../node/peer_manager/waku_peer_store
|
||||||
|
# todo organize this
|
||||||
|
|
||||||
|
type
|
||||||
|
Connectedness* = enum
|
||||||
|
# NotConnected: default state for a new peer. No connection and no further information on connectedness.
|
||||||
|
NotConnected,
|
||||||
|
# CannotConnect: attempted to connect to peer, but failed.
|
||||||
|
CannotConnect,
|
||||||
|
# CanConnect: was recently connected to peer and disconnected gracefully.
|
||||||
|
CanConnect,
|
||||||
|
# Connected: actively connected to peer.
|
||||||
|
Connected
|
||||||
|
|
||||||
|
PeerOrigin* = enum
|
||||||
|
UnknownOrigin,
|
||||||
|
Discv5,
|
||||||
|
Static,
|
||||||
|
Dns
|
||||||
|
|
||||||
|
PeerDirection* = enum
|
||||||
|
UnknownDirection,
|
||||||
|
Inbound,
|
||||||
|
Outbound
|
||||||
|
|
||||||
type
|
type
|
||||||
RemotePeerInfo* = ref object of RootObj
|
RemotePeerInfo* = ref object of RootObj
|
||||||
peerId*: PeerID
|
peerId*: PeerID
|
||||||
@ -25,6 +52,16 @@ type
|
|||||||
enr*: Option[enr.Record]
|
enr*: Option[enr.Record]
|
||||||
protocols*: seq[string]
|
protocols*: seq[string]
|
||||||
|
|
||||||
|
agent*: string
|
||||||
|
protoVersion*: string
|
||||||
|
publicKey*: crypto.PublicKey
|
||||||
|
connectedness*: Connectedness
|
||||||
|
disconnectTime*: int64
|
||||||
|
origin*: PeerOrigin
|
||||||
|
direction*: PeerDirection
|
||||||
|
lastFailedConn*: Moment
|
||||||
|
numberFailedConn*: int
|
||||||
|
|
||||||
func `$`*(remotePeerInfo: RemotePeerInfo): string =
|
func `$`*(remotePeerInfo: RemotePeerInfo): string =
|
||||||
$remotePeerInfo.peerId
|
$remotePeerInfo.peerId
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user