Removed local peer sets for filter, swap and store (#375)

This commit is contained in:
Hanno Cornelius 2021-02-11 10:58:25 +02:00 committed by GitHub
parent 0c60e6537f
commit bf0eab4a48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 157 additions and 78 deletions

View File

@ -6,6 +6,7 @@
- Docs: Add information on how to query Status test fleet for node addresses; how to view logs and how to update submodules.
- PubSub topic `subscribe` and `unsubscribe` no longer returns a future (removed `async` designation)
- Added a peer manager for `relay`, `filter`, `store` and `swap` peers.
- `relay`, `filter`, `store` and `swap` peers are now stored in a common, shared peer store and no longer in separate sets.
## 2021-01-05 v0.2

View File

@ -12,6 +12,9 @@ import
../../waku/v2/node/wakunode2,
../../waku/v2/node/peer_manager,
../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_swap/waku_swap,
../test_helpers
procSuite "Peer Manager":
@ -72,3 +75,46 @@ procSuite "Peer Manager":
connOpt.isNone()
await node1.stop()
asyncTest "Adding, selecting and filtering peers work":
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
Port(60000))
# Create filter peer
filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
filterKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
filterPeer = PeerInfo.init(filterKey, @[filterLoc])
# Create swap peer
swapLoc = MultiAddress.init("/ip4/127.0.0.2/tcp/2").tryGet()
swapKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
swapPeer = PeerInfo.init(swapKey, @[swapLoc])
# Create store peer
storeLoc = MultiAddress.init("/ip4/127.0.0.3/tcp/4").tryGet()
storeKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
storePeer = PeerInfo.init(storeKey, @[storeLoc])
await node.start()
node.mountFilter()
node.mountSwap()
node.mountStore()
node.wakuFilter.setPeer(filterPeer)
node.wakuSwap.setPeer(swapPeer)
node.wakuStore.setPeer(storePeer)
# Check peers were successfully added to peer manager
check:
node.peerManager.peers().len == 3
node.peerManager.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
it.addrs.contains(filterLoc) and
it.protos.contains(WakuFilterCodec))
node.peerManager.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and
it.addrs.contains(swapLoc) and
it.protos.contains(WakuSwapCodec))
node.peerManager.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
it.addrs.contains(storeLoc) and
it.protos.contains(WakuStoreCodec))
await node.stop()

View File

@ -7,6 +7,7 @@ import
../../protocol/waku_store/[waku_store_types, waku_store],
../../protocol/waku_swap/[waku_swap_types, waku_swap],
../../protocol/waku_filter/[waku_filter_types, waku_filter],
../../protocol/waku_relay,
../wakunode2,
../peer_manager,
./jsonrpc_types
@ -37,36 +38,36 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Managed peers
if not node.wakuRelay.isNil:
# Map all managed peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
protocol: toSeq(it.protos.items)[0],
connected: node.peerManager.connectedness(it.peerId))),
# Map managed peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peers(WakuRelayCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
protocol: WakuRelayCodec,
connected: node.peerManager.connectedness(it.peerId))),
wPeers.len) # Append to the end of the sequence
if not node.wakuFilter.isNil:
# Map WakuFilter peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peers(WakuFilterCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
protocol: WakuFilterCodec,
connected: node.peerManager.connectedness(it.peerId))),
wPeers.len) # Append to the end of the sequence
## Unmanaged peers
## @TODO add these peers to peer manager
if not node.wakuSwap.isNil:
# Map WakuSwap peers to WakuPeers and add to return list
wPeers.insert(node.wakuSwap.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it.peerInfo),
protocol: WakuSwapCodec,
connected: node.switch.isConnected(it.peerInfo))),
wPeers.len) # Append to the end of the sequence
if not node.wakuFilter.isNil:
# Map WakuFilter peers to WakuPeers and add to return list
wPeers.insert(node.wakuFilter.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it.peerInfo),
protocol: WakuFilterCodec,
connected: node.switch.isConnected(it.peerInfo))),
wPeers.insert(node.peerManager.peers(WakuSwapCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
protocol: WakuSwapCodec,
connected: node.peerManager.connectedness(it.peerId))),
wPeers.len) # Append to the end of the sequence
if not node.wakuStore.isNil:
# Map WakuStore peers to WakuPeers and add to return list
wPeers.insert(node.wakuStore.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it.peerInfo),
protocol: WakuStoreCodec,
connected: node.switch.isConnected(it.peerInfo))),
wPeers.insert(node.peerManager.peers(WakuStoreCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
protocol: WakuStoreCodec,
connected: node.peerManager.connectedness(it.peerId))),
wPeers.len) # Append to the end of the sequence
# @TODO filter output on protocol/connected-status
return wPeers

View File

@ -1,7 +1,7 @@
{.push raises: [Defect, Exception].}
import
std/[options, sets],
std/[options, sets, sequtils],
chronos, chronicles, metrics,
libp2p/standard_setup,
libp2p/peerstore
@ -29,12 +29,42 @@ proc new*(T: type PeerManager, switch: Switch): PeerManager =
# Helper functions #
####################
proc hasPeer(pm: PeerManager, peerInfo: PeerInfo, proto: string): bool =
proc toPeerInfo(storedInfo: StoredInfo): PeerInfo =
PeerInfo.init(peerId = storedInfo.peerId,
addrs = toSeq(storedInfo.addrs),
protocols = toSeq(storedInfo.protos))
#####################
# Manager interface #
#####################
proc peers*(pm: PeerManager): seq[StoredInfo] =
# Return the known info for all peers
pm.peerStore.peers()
proc peers*(pm: PeerManager, proto: string): seq[StoredInfo] =
# Return the known info for all peers registered on the specified protocol
pm.peers.filterIt(it.protos.contains(proto))
proc connectedness*(pm: PeerManager, peerId: PeerId): bool =
# Return the connection state of the given, managed peer
# @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
# @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
let storedInfo = pm.peerStore.get(peerId)
if (storedInfo == StoredInfo()):
# Peer is not managed, therefore not connected
return false
else:
pm.switch.isConnected(peerId)
proc hasPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string): bool =
# Returns `true` if peer is included in manager for the specified protocol
pm.peerStore.get(peerInfo.peerId).protos.contains(proto)
proc addPeer(pm: PeerManager, peerInfo: PeerInfo, proto: string) =
proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) =
# Adds peer to manager for the specified protocol
debug "Adding peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
@ -52,6 +82,17 @@ proc addPeer(pm: PeerManager, peerInfo: PeerInfo, proto: string) =
# ...associated protocols
pm.peerStore.protoBook.add(peerInfo.peerId, proto)
proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] =
# Selects the best peer for a given protocol
let peers = pm.peers.filterIt(it.protos.contains(proto))
if peers.len >= 1:
# @TODO proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
let peerStored = peers[0]
return some(peerStored.toPeerInfo())
else:
return none(PeerInfo)
####################
# Dialer interface #
@ -89,24 +130,3 @@ proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout =
debug "Dialing remote peer failed", msg = e.msg
waku_peers_dials.inc(labelValues = ["failed"])
return none(Connection)
#####################
# Manager interface #
#####################
proc peers*(pm: PeerManager): seq[StoredInfo] =
# Return the known info for all peers
pm.peerStore.peers()
proc connectedness*(pm: PeerManager, peerId: PeerId): bool =
# Return the connection state of the given, managed peer
# @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
# @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
let storedInfo = pm.peerStore.get(peerId)
if (storedInfo == StoredInfo()):
# Peer is not managed, therefore not connected
return false
else:
pm.switch.isConnected(peerId)

View File

@ -30,7 +30,6 @@ logScope:
const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
# Error types (metric label values)
const
dialFailure = "dial_failure"
@ -198,9 +197,8 @@ proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgCont
result.pushHandler = handler
result.init()
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
proc setPeer*(wf: WakuFilter, peer: PeerInfo) =
wf.peers.add(FilterPeer(peerInfo: peer))
wf.peerManager.addPeer(peer, WakuFilterCodec)
waku_filter_peers.inc()
proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
@ -228,8 +226,10 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
MessageNotificationSubscription.init(@[], handle)
proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} =
if wf.peers.len >= 1:
let peer = wf.peers[0].peerInfo # @TODO: select peer from manager rather than from local set
let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isSome:
let peer = peerOpt.get()
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
@ -246,9 +246,13 @@ proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]]
proc unsubscribe*(wf: WakuFilter, request: FilterRequest) {.async, gcsafe.} =
# @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC.
let id = generateRequestId(wf.rng)
if wf.peers.len >= 1:
let peer = wf.peers[0].peerInfo # @TODO: select peer from manager rather than from local set
let
id = generateRequestId(wf.rng)
peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isSome:
# @TODO: if there are more than one WakuFilter peer, WakuFilter should unsubscribe from all peers
let peer = peerOpt.get()
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)

View File

@ -41,12 +41,8 @@ type
MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.}
FilterPeer* = object
peerInfo*: PeerInfo
WakuFilter* = ref object of LPProtocol
rng*: ref BrHmacDrbgContext
peerManager*: PeerManager
peers*: seq[FilterPeer]
subscribers*: seq[Subscriber]
pushHandler*: MessagePushHandler

View File

@ -361,7 +361,7 @@ proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgConte
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
proc setPeer*(ws: WakuStore, peer: PeerInfo) =
ws.peers.add(HistoryPeer(peerInfo: peer))
ws.peerManager.addPeer(peer, WakuStoreCodec)
waku_store_peers.inc()
proc subscription*(proto: WakuStore): MessageNotificationSubscription =
@ -391,8 +391,14 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
# - latency?
# - default store peer?
let peer = w.peers[0]
let connOpt = await w.peerManager.dialPeer(peer.peerInfo, WakuStoreCodec)
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
error "failed to connect to remote peer"
waku_store_errors.inc(labelValues = [dialFailure])
return
let connOpt = await w.peerManager.dialPeer(peerOpt.get(), WakuStoreCodec)
if connOpt.isNone():
# @TODO more sophisticated error handling here
@ -424,8 +430,14 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
# - latency?
# - default store peer?
let peer = ws.peers[0]
let connOpt = await ws.peerManager.dialPeer(peer.peerInfo, WakuStoreCodec)
let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
error "failed to connect to remote peer"
waku_store_errors.inc(labelValues = [dialFailure])
return
let connOpt = await ws.peerManager.dialPeer(peerOpt.get(), WakuStoreCodec)
if connOpt.isNone():
# @TODO more sophisticated error handling here
@ -446,7 +458,7 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
# NOTE Perform accounting operation
# Assumes wakuSwap protocol is mounted
let peerId = peer.peerInfo.peerId
let peerId = peerOpt.get().peerId
let messages = response.value.response.messages
ws.wakuSwap.debit(peerId, messages.len)

View File

@ -49,13 +49,9 @@ type
query*: HistoryQuery
response*: HistoryResponse
HistoryPeer* = object
peerInfo*: PeerInfo
WakuStore* = ref object of LPProtocol
peerManager*: PeerManager
rng*: ref BrHmacDrbgContext
peers*: seq[HistoryPeer]
messages*: seq[IndexedWakuMessage]
store*: MessageStore
wakuSwap*: WakuSwap

View File

@ -90,9 +90,16 @@ proc init*(T: type Cheque, buffer: seq[byte]): ProtoResult[T] =
# TODO Test for credit/debit operations in succession
proc sendCheque*(ws: WakuSwap) {.async.} =
# TODO Better peer selection, for now using hardcoded peer
let peer = ws.peers[0]
let connOpt = await ws.peerManager.dialPeer(peer.peerInfo, WakuSwapCodec)
let peerOpt = ws.peerManager.selectPeer(WakuSwapCodec)
if peerOpt.isNone():
error "failed to connect to remote peer"
waku_swap_errors.inc(labelValues = [dialFailure])
return
let peer = peerOpt.get()
let connOpt = await ws.peerManager.dialPeer(peer, WakuSwapCodec)
if connOpt.isNone():
# @TODO more sophisticated error handling here
@ -107,8 +114,7 @@ proc sendCheque*(ws: WakuSwap) {.async.} =
await connOpt.get().writeLP(Cheque(amount: 1).encode().buffer)
# Set new balance
# XXX Assume peerId is first peer
let peerId = ws.peers[0].peerInfo.peerId
let peerId = peer.peerId
ws.accounting[peerId] -= 1
info "New accounting state", accounting = ws.accounting[peerId]
@ -116,7 +122,8 @@ proc sendCheque*(ws: WakuSwap) {.async.} =
proc handleCheque*(ws: WakuSwap, cheque: Cheque) =
info "handle incoming cheque"
# XXX Assume peerId is first peer
let peerId = ws.peers[0].peerInfo.peerId
let peerOpt = ws.peerManager.selectPeer(WakuSwapCodec)
let peerId = peerOpt.get().peerId
ws.accounting[peerId] += int(cheque.amount)
info "New accounting state", accounting = ws.accounting[peerId]
@ -185,7 +192,7 @@ proc init*(T: type WakuSwap, peerManager: PeerManager, rng: ref BrHmacDrbgContex
result.init()
proc setPeer*(ws: WakuSwap, peer: PeerInfo) =
ws.peers.add(SwapPeer(peerInfo: peer))
ws.peerManager.addPeer(peer, WakuSwapCodec)
waku_swap_peers.inc()
# TODO End to end communication

View File

@ -20,13 +20,9 @@ type
CreditHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.}
DebitHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.}
SwapPeer* = object
peerInfo*: PeerInfo
WakuSwap* = ref object of LPProtocol
peerManager*: PeerManager
rng*: ref BrHmacDrbgContext
peers*: seq[SwapPeer]
text*: string
accounting*: Table[PeerId, int]
credit*: CreditHandler