diff --git a/CHANGELOG.md b/CHANGELOG.md index e9e8ba57d..a83888322 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Docs: Add information on how to query Status test fleet for node addresses; how to view logs and how to update submodules. - PubSub topic `subscribe` and `unsubscribe` no longer returns a future (removed `async` designation) - Added a peer manager for `relay`, `filter`, `store` and `swap` peers. +- `relay`, `filter`, `store` and `swap` peers are now stored in a common, shared peer store and no longer in separate sets. ## 2021-01-05 v0.2 diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 1ad4527d4..b11f96c48 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -12,6 +12,9 @@ import ../../waku/v2/node/wakunode2, ../../waku/v2/node/peer_manager, ../../waku/v2/protocol/waku_relay, + ../../waku/v2/protocol/waku_filter/waku_filter, + ../../waku/v2/protocol/waku_store/waku_store, + ../../waku/v2/protocol/waku_swap/waku_swap, ../test_helpers procSuite "Peer Manager": @@ -72,3 +75,46 @@ procSuite "Peer Manager": connOpt.isNone() await node1.stop() + + asyncTest "Adding, selecting and filtering peers work": + let + nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + # Create filter peer + filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet() + filterKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get() + filterPeer = PeerInfo.init(filterKey, @[filterLoc]) + # Create swap peer + swapLoc = MultiAddress.init("/ip4/127.0.0.2/tcp/2").tryGet() + swapKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get() + swapPeer = PeerInfo.init(swapKey, @[swapLoc]) + # Create store peer + storeLoc = MultiAddress.init("/ip4/127.0.0.3/tcp/4").tryGet() + storeKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get() + storePeer = PeerInfo.init(storeKey, @[storeLoc]) + + await node.start() + + node.mountFilter() + node.mountSwap() + node.mountStore() + + node.wakuFilter.setPeer(filterPeer) + node.wakuSwap.setPeer(swapPeer) + node.wakuStore.setPeer(storePeer) + + # Check peers were successfully added to peer manager + check: + node.peerManager.peers().len == 3 + node.peerManager.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and + it.addrs.contains(filterLoc) and + it.protos.contains(WakuFilterCodec)) + node.peerManager.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and + it.addrs.contains(swapLoc) and + it.protos.contains(WakuSwapCodec)) + node.peerManager.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and + it.addrs.contains(storeLoc) and + it.protos.contains(WakuStoreCodec)) + + await node.stop() diff --git a/waku/v2/node/jsonrpc/admin_api.nim b/waku/v2/node/jsonrpc/admin_api.nim index ebc482071..709c9f628 100644 --- a/waku/v2/node/jsonrpc/admin_api.nim +++ b/waku/v2/node/jsonrpc/admin_api.nim @@ -7,6 +7,7 @@ import ../../protocol/waku_store/[waku_store_types, waku_store], ../../protocol/waku_swap/[waku_swap_types, waku_swap], ../../protocol/waku_filter/[waku_filter_types, waku_filter], + ../../protocol/waku_relay, ../wakunode2, ../peer_manager, ./jsonrpc_types @@ -37,36 +38,36 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = ## Managed peers if not node.wakuRelay.isNil: - # Map all managed peers to WakuPeers and add to return list - wPeers.insert(node.peerManager.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), - protocol: toSeq(it.protos.items)[0], - connected: node.peerManager.connectedness(it.peerId))), + # Map managed peers to WakuPeers and add to return list + wPeers.insert(node.peerManager.peers(WakuRelayCodec) + .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), + protocol: WakuRelayCodec, + connected: node.peerManager.connectedness(it.peerId))), + wPeers.len) # Append to the end of the sequence + + if not node.wakuFilter.isNil: + # Map WakuFilter peers to WakuPeers and add to return list + wPeers.insert(node.peerManager.peers(WakuFilterCodec) + .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), + protocol: WakuFilterCodec, + connected: node.peerManager.connectedness(it.peerId))), wPeers.len) # Append to the end of the sequence - - ## Unmanaged peers - ## @TODO add these peers to peer manager if not node.wakuSwap.isNil: # Map WakuSwap peers to WakuPeers and add to return list - wPeers.insert(node.wakuSwap.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it.peerInfo), - protocol: WakuSwapCodec, - connected: node.switch.isConnected(it.peerInfo))), - wPeers.len) # Append to the end of the sequence - - if not node.wakuFilter.isNil: - # Map WakuFilter peers to WakuPeers and add to return list - wPeers.insert(node.wakuFilter.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it.peerInfo), - protocol: WakuFilterCodec, - connected: node.switch.isConnected(it.peerInfo))), + wPeers.insert(node.peerManager.peers(WakuSwapCodec) + .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), + protocol: WakuSwapCodec, + connected: node.peerManager.connectedness(it.peerId))), wPeers.len) # Append to the end of the sequence if not node.wakuStore.isNil: # Map WakuStore peers to WakuPeers and add to return list - wPeers.insert(node.wakuStore.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it.peerInfo), - protocol: WakuStoreCodec, - connected: node.switch.isConnected(it.peerInfo))), + wPeers.insert(node.peerManager.peers(WakuStoreCodec) + .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), + protocol: WakuStoreCodec, + connected: node.peerManager.connectedness(it.peerId))), wPeers.len) # Append to the end of the sequence - # @TODO filter output on protocol/connected-status return wPeers diff --git a/waku/v2/node/peer_manager.nim b/waku/v2/node/peer_manager.nim index 9cf785c03..58d86666f 100644 --- a/waku/v2/node/peer_manager.nim +++ b/waku/v2/node/peer_manager.nim @@ -1,7 +1,7 @@ {.push raises: [Defect, Exception].} import - std/[options, sets], + std/[options, sets, sequtils], chronos, chronicles, metrics, libp2p/standard_setup, libp2p/peerstore @@ -29,12 +29,42 @@ proc new*(T: type PeerManager, switch: Switch): PeerManager = # Helper functions # #################### -proc hasPeer(pm: PeerManager, peerInfo: PeerInfo, proto: string): bool = +proc toPeerInfo(storedInfo: StoredInfo): PeerInfo = + PeerInfo.init(peerId = storedInfo.peerId, + addrs = toSeq(storedInfo.addrs), + protocols = toSeq(storedInfo.protos)) + +##################### +# Manager interface # +##################### + +proc peers*(pm: PeerManager): seq[StoredInfo] = + # Return the known info for all peers + pm.peerStore.peers() + +proc peers*(pm: PeerManager, proto: string): seq[StoredInfo] = + # Return the known info for all peers registered on the specified protocol + pm.peers.filterIt(it.protos.contains(proto)) + +proc connectedness*(pm: PeerManager, peerId: PeerId): bool = + # Return the connection state of the given, managed peer + # @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc. + # @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts + + let storedInfo = pm.peerStore.get(peerId) + + if (storedInfo == StoredInfo()): + # Peer is not managed, therefore not connected + return false + else: + pm.switch.isConnected(peerId) + +proc hasPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string): bool = # Returns `true` if peer is included in manager for the specified protocol pm.peerStore.get(peerInfo.peerId).protos.contains(proto) -proc addPeer(pm: PeerManager, peerInfo: PeerInfo, proto: string) = +proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) = # Adds peer to manager for the specified protocol debug "Adding peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto @@ -52,6 +82,17 @@ proc addPeer(pm: PeerManager, peerInfo: PeerInfo, proto: string) = # ...associated protocols pm.peerStore.protoBook.add(peerInfo.peerId, proto) +proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] = + # Selects the best peer for a given protocol + let peers = pm.peers.filterIt(it.protos.contains(proto)) + + if peers.len >= 1: + # @TODO proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned + let peerStored = peers[0] + + return some(peerStored.toPeerInfo()) + else: + return none(PeerInfo) #################### # Dialer interface # @@ -89,24 +130,3 @@ proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout = debug "Dialing remote peer failed", msg = e.msg waku_peers_dials.inc(labelValues = ["failed"]) return none(Connection) - -##################### -# Manager interface # -##################### - -proc peers*(pm: PeerManager): seq[StoredInfo] = - # Return the known info for all peers - pm.peerStore.peers() - -proc connectedness*(pm: PeerManager, peerId: PeerId): bool = - # Return the connection state of the given, managed peer - # @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc. - # @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts - - let storedInfo = pm.peerStore.get(peerId) - - if (storedInfo == StoredInfo()): - # Peer is not managed, therefore not connected - return false - else: - pm.switch.isConnected(peerId) diff --git a/waku/v2/protocol/waku_filter/waku_filter.nim b/waku/v2/protocol/waku_filter/waku_filter.nim index 5433e6d04..0f760db64 100644 --- a/waku/v2/protocol/waku_filter/waku_filter.nim +++ b/waku/v2/protocol/waku_filter/waku_filter.nim @@ -30,7 +30,6 @@ logScope: const WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" - # Error types (metric label values) const dialFailure = "dial_failure" @@ -198,9 +197,8 @@ proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgCont result.pushHandler = handler result.init() -# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY proc setPeer*(wf: WakuFilter, peer: PeerInfo) = - wf.peers.add(FilterPeer(peerInfo: peer)) + wf.peerManager.addPeer(peer, WakuFilterCodec) waku_filter_peers.inc() proc subscription*(proto: WakuFilter): MessageNotificationSubscription = @@ -228,8 +226,10 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription = MessageNotificationSubscription.init(@[], handle) proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} = - if wf.peers.len >= 1: - let peer = wf.peers[0].peerInfo # @TODO: select peer from manager rather than from local set + let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) + + if peerOpt.isSome: + let peer = peerOpt.get() let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) @@ -246,9 +246,13 @@ proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] proc unsubscribe*(wf: WakuFilter, request: FilterRequest) {.async, gcsafe.} = # @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC. - let id = generateRequestId(wf.rng) - if wf.peers.len >= 1: - let peer = wf.peers[0].peerInfo # @TODO: select peer from manager rather than from local set + let + id = generateRequestId(wf.rng) + peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) + + if peerOpt.isSome: + # @TODO: if there are more than one WakuFilter peer, WakuFilter should unsubscribe from all peers + let peer = peerOpt.get() let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) diff --git a/waku/v2/protocol/waku_filter/waku_filter_types.nim b/waku/v2/protocol/waku_filter/waku_filter_types.nim index b4fb25c9a..548406b1c 100644 --- a/waku/v2/protocol/waku_filter/waku_filter_types.nim +++ b/waku/v2/protocol/waku_filter/waku_filter_types.nim @@ -41,12 +41,8 @@ type MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.} - FilterPeer* = object - peerInfo*: PeerInfo - WakuFilter* = ref object of LPProtocol rng*: ref BrHmacDrbgContext peerManager*: PeerManager - peers*: seq[FilterPeer] subscribers*: seq[Subscriber] pushHandler*: MessagePushHandler diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 482e67151..79f5ddee3 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -361,7 +361,7 @@ proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgConte # @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY proc setPeer*(ws: WakuStore, peer: PeerInfo) = - ws.peers.add(HistoryPeer(peerInfo: peer)) + ws.peerManager.addPeer(peer, WakuStoreCodec) waku_store_peers.inc() proc subscription*(proto: WakuStore): MessageNotificationSubscription = @@ -391,8 +391,14 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn # - latency? # - default store peer? - let peer = w.peers[0] - let connOpt = await w.peerManager.dialPeer(peer.peerInfo, WakuStoreCodec) + let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) + + if peerOpt.isNone(): + error "failed to connect to remote peer" + waku_store_errors.inc(labelValues = [dialFailure]) + return + + let connOpt = await w.peerManager.dialPeer(peerOpt.get(), WakuStoreCodec) if connOpt.isNone(): # @TODO more sophisticated error handling here @@ -424,8 +430,14 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand # - latency? # - default store peer? - let peer = ws.peers[0] - let connOpt = await ws.peerManager.dialPeer(peer.peerInfo, WakuStoreCodec) + let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec) + + if peerOpt.isNone(): + error "failed to connect to remote peer" + waku_store_errors.inc(labelValues = [dialFailure]) + return + + let connOpt = await ws.peerManager.dialPeer(peerOpt.get(), WakuStoreCodec) if connOpt.isNone(): # @TODO more sophisticated error handling here @@ -446,7 +458,7 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand # NOTE Perform accounting operation # Assumes wakuSwap protocol is mounted - let peerId = peer.peerInfo.peerId + let peerId = peerOpt.get().peerId let messages = response.value.response.messages ws.wakuSwap.debit(peerId, messages.len) diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index 17f9c4ebb..09157bece 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -49,13 +49,9 @@ type query*: HistoryQuery response*: HistoryResponse - HistoryPeer* = object - peerInfo*: PeerInfo - WakuStore* = ref object of LPProtocol peerManager*: PeerManager rng*: ref BrHmacDrbgContext - peers*: seq[HistoryPeer] messages*: seq[IndexedWakuMessage] store*: MessageStore wakuSwap*: WakuSwap diff --git a/waku/v2/protocol/waku_swap/waku_swap.nim b/waku/v2/protocol/waku_swap/waku_swap.nim index ec30ac95f..7f29b9b87 100644 --- a/waku/v2/protocol/waku_swap/waku_swap.nim +++ b/waku/v2/protocol/waku_swap/waku_swap.nim @@ -90,9 +90,16 @@ proc init*(T: type Cheque, buffer: seq[byte]): ProtoResult[T] = # TODO Test for credit/debit operations in succession proc sendCheque*(ws: WakuSwap) {.async.} = - # TODO Better peer selection, for now using hardcoded peer - let peer = ws.peers[0] - let connOpt = await ws.peerManager.dialPeer(peer.peerInfo, WakuSwapCodec) + let peerOpt = ws.peerManager.selectPeer(WakuSwapCodec) + + if peerOpt.isNone(): + error "failed to connect to remote peer" + waku_swap_errors.inc(labelValues = [dialFailure]) + return + + let peer = peerOpt.get() + + let connOpt = await ws.peerManager.dialPeer(peer, WakuSwapCodec) if connOpt.isNone(): # @TODO more sophisticated error handling here @@ -107,8 +114,7 @@ proc sendCheque*(ws: WakuSwap) {.async.} = await connOpt.get().writeLP(Cheque(amount: 1).encode().buffer) # Set new balance - # XXX Assume peerId is first peer - let peerId = ws.peers[0].peerInfo.peerId + let peerId = peer.peerId ws.accounting[peerId] -= 1 info "New accounting state", accounting = ws.accounting[peerId] @@ -116,7 +122,8 @@ proc sendCheque*(ws: WakuSwap) {.async.} = proc handleCheque*(ws: WakuSwap, cheque: Cheque) = info "handle incoming cheque" # XXX Assume peerId is first peer - let peerId = ws.peers[0].peerInfo.peerId + let peerOpt = ws.peerManager.selectPeer(WakuSwapCodec) + let peerId = peerOpt.get().peerId ws.accounting[peerId] += int(cheque.amount) info "New accounting state", accounting = ws.accounting[peerId] @@ -185,7 +192,7 @@ proc init*(T: type WakuSwap, peerManager: PeerManager, rng: ref BrHmacDrbgContex result.init() proc setPeer*(ws: WakuSwap, peer: PeerInfo) = - ws.peers.add(SwapPeer(peerInfo: peer)) + ws.peerManager.addPeer(peer, WakuSwapCodec) waku_swap_peers.inc() # TODO End to end communication diff --git a/waku/v2/protocol/waku_swap/waku_swap_types.nim b/waku/v2/protocol/waku_swap/waku_swap_types.nim index 6d644b115..09ce46c9c 100644 --- a/waku/v2/protocol/waku_swap/waku_swap_types.nim +++ b/waku/v2/protocol/waku_swap/waku_swap_types.nim @@ -20,13 +20,9 @@ type CreditHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.} DebitHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.} - SwapPeer* = object - peerInfo*: PeerInfo - WakuSwap* = ref object of LPProtocol peerManager*: PeerManager rng*: ref BrHmacDrbgContext - peers*: seq[SwapPeer] text*: string accounting*: Table[PeerId, int] credit*: CreditHandler