diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cc9008b5..e9e8ba57d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ - Refactor: Split out `waku_types` types into right place; create utils folder. - Docs: Add information on how to query Status test fleet for node addresses; how to view logs and how to update submodules. - PubSub topic `subscribe` and `unsubscribe` no longer returns a future (removed `async` designation) -- Added a peer manager for `relay` and `filter` peers. +- Added a peer manager for `relay`, `filter`, `store` and `swap` peers. ## 2021-01-05 v0.2 diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 11f835b37..915a4e291 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -421,6 +421,10 @@ procSuite "Waku v2 JSON-RPC API": asyncTest "Admin API: get unmanaged peer information": const cTopic = ContentTopic(1) + let + nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"), + Port(60000)) waitFor node.start() diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 0e3f4cdae..a0c734725 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -11,6 +11,7 @@ import ../../waku/v2/protocol/[waku_message, message_notifier], ../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/node/message_store/waku_message_store, + ../../waku/v2/node/peer_manager, ../test_helpers, ./utils procSuite "Waku Store": @@ -29,7 +30,7 @@ procSuite "Waku Store": discard await listenSwitch.start() let - proto = WakuStore.init(dialSwitch, crypto.newRng()) + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) subscription = proto.subscription() rpc = HistoryQuery(topics: @[topic]) @@ -73,7 +74,7 @@ procSuite "Waku Store": discard await listenSwitch.start() let - proto = WakuStore.init(dialSwitch, crypto.newRng(), store) + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) subscription = proto.subscription() rpc = HistoryQuery(topics: @[topic]) @@ -101,7 +102,7 @@ procSuite "Waku Store": (await completionFut.withTimeout(5.seconds)) == true let - proto2 = WakuStore.init(dialSwitch, crypto.newRng(), store) + proto2 = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) key2 = PrivateKey.random(ECDSA, rng[]).get() var listenSwitch2 = newStandardSwitch(some(key2)) @@ -146,7 +147,7 @@ procSuite "Waku Store": discard await listenSwitch.start() let - proto = WakuStore.init(dialSwitch, crypto.newRng()) + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) subscription = proto.subscription() rpc = HistoryQuery(topics: @[ContentTopic(1)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) ) @@ -198,7 +199,7 @@ procSuite "Waku Store": discard await listenSwitch.start() let - proto = WakuStore.init(dialSwitch, crypto.newRng()) + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) subscription = proto.subscription() proto.setPeer(listenSwitch.peerInfo) @@ -248,7 +249,7 @@ procSuite "Waku Store": discard await listenSwitch.start() let - proto = WakuStore.init(dialSwitch, crypto.newRng()) + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) subscription = proto.subscription() proto.setPeer(listenSwitch.peerInfo) diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 7c1e5fd8e..53a7cb3e8 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -292,7 +292,7 @@ proc mountFilter*(node: WakuNode) = # because store is using a reference to the swap protocol. proc mountSwap*(node: WakuNode) = info "mounting swap" - node.wakuSwap = WakuSwap.init(node.switch, node.rng) + node.wakuSwap = WakuSwap.init(node.peerManager, node.rng) node.switch.mount(node.wakuSwap) # NYI - Do we need this? #node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription()) @@ -302,10 +302,10 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil) = if node.wakuSwap.isNil: debug "mounting store without swap" - node.wakuStore = WakuStore.init(node.switch, node.rng, store) + node.wakuStore = WakuStore.init(node.peerManager, node.rng, store) else: debug "mounting store with swap" - node.wakuStore = WakuStore.init(node.switch, node.rng, store, node.wakuSwap) + node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, node.wakuSwap) node.switch.mount(node.wakuStore) node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription()) diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 1d8824361..482e67151 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -6,7 +6,6 @@ import std/[tables, times, sequtils, algorithm, options], bearssl, chronos, chronicles, metrics, stew/[results, byteutils, endians2], - libp2p/switch, libp2p/crypto/crypto, libp2p/protocols/protocol, libp2p/protobuf/minprotobuf, @@ -15,7 +14,8 @@ import ../../node/message_store/message_store, ../waku_swap/waku_swap, ./waku_store_types, - ../../utils/requests + ../../utils/requests, + ../../node/peer_manager export waku_store_types @@ -29,6 +29,11 @@ logScope: const WakuStoreCodec* = "/vac/waku/store/2.0.0-beta1" +# Error types (metric label values) +const + dialFailure = "dial_failure" + decodeRpcFailure = "decode_rpc_failure" + # TODO Move serialization function to separate file, too noisy # TODO Move pagination to separate file, self-contained logic @@ -307,7 +312,7 @@ method init*(ws: WakuStore) = var res = HistoryRPC.init(message) if res.isErr: error "failed to decode rpc" - waku_store_errors.inc(labelValues = ["decode_rpc_failure"]) + waku_store_errors.inc(labelValues = [decodeRpcFailure]) return info "received query" @@ -345,11 +350,11 @@ method init*(ws: WakuStore) = waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"]) -proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext, +proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext, store: MessageStore = nil, wakuSwap: WakuSwap = nil): T = new result result.rng = rng - result.switch = switch + result.peerManager = peerManager result.store = store result.wakuSwap = wakuSwap result.init() @@ -387,17 +392,23 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn # - default store peer? let peer = w.peers[0] - let conn = await w.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec) + let connOpt = await w.peerManager.dialPeer(peer.peerInfo, WakuStoreCodec) - await conn.writeLP(HistoryRPC(requestId: generateRequestId(w.rng), + if connOpt.isNone(): + # @TODO more sophisticated error handling here + error "failed to connect to remote peer" + waku_store_errors.inc(labelValues = [dialFailure]) + return + + await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng), query: query).encode().buffer) - var message = await conn.readLp(64*1024) + var message = await connOpt.get().readLp(64*1024) let response = HistoryRPC.init(message) if response.isErr: error "failed to decode response" - waku_store_errors.inc(labelValues = ["decode_rpc_failure"]) + waku_store_errors.inc(labelValues = [decodeRpcFailure]) return waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) @@ -414,17 +425,23 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand # - default store peer? let peer = ws.peers[0] - let conn = await ws.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec) + let connOpt = await ws.peerManager.dialPeer(peer.peerInfo, WakuStoreCodec) - await conn.writeLP(HistoryRPC(requestId: generateRequestId(ws.rng), + if connOpt.isNone(): + # @TODO more sophisticated error handling here + error "failed to connect to remote peer" + waku_store_errors.inc(labelValues = [dialFailure]) + return + + await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(ws.rng), query: query).encode().buffer) - var message = await conn.readLp(64*1024) + var message = await connOpt.get().readLp(64*1024) let response = HistoryRPC.init(message) if response.isErr: error "failed to decode response" - waku_store_errors.inc(labelValues = ["decode_rpc_failure"]) + waku_store_errors.inc(labelValues = [decodeRpcFailure]) return # NOTE Perform accounting operation diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index fac28691e..17f9c4ebb 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -2,12 +2,13 @@ import bearssl, - libp2p/[switch, peerinfo], + libp2p/peerinfo, libp2p/protocols/protocol, ../waku_swap/waku_swap_types, ../waku_message, ../../node/message_store/message_store, - ../../utils/pagination + ../../utils/pagination, + ../../node/peer_manager export waku_message export pagination @@ -52,7 +53,7 @@ type peerInfo*: PeerInfo WakuStore* = ref object of LPProtocol - switch*: Switch + peerManager*: PeerManager rng*: ref BrHmacDrbgContext peers*: seq[HistoryPeer] messages*: seq[IndexedWakuMessage] diff --git a/waku/v2/protocol/waku_swap/waku_swap.nim b/waku/v2/protocol/waku_swap/waku_swap.nim index edb3dc524..ec30ac95f 100644 --- a/waku/v2/protocol/waku_swap/waku_swap.nim +++ b/waku/v2/protocol/waku_swap/waku_swap.nim @@ -25,11 +25,11 @@ import std/[tables, options], bearssl, chronos, chronicles, metrics, stew/results, - libp2p/switch, libp2p/crypto/crypto, libp2p/protocols/protocol, libp2p/protobuf/minprotobuf, libp2p/stream/connection, + ../../node/peer_manager, ../message_notifier, ./waku_swap_types @@ -43,6 +43,11 @@ logScope: const WakuSwapCodec* = "/vac/waku/swap/2.0.0-alpha1" +# Error types (metric label values) +const + dialFailure = "dial_failure" + decodeRpcFailure = "decode_rpc_failure" + # Serialization # ------------------------------------------------------------------------------- proc encode*(handshake: Handshake): ProtoBuffer = @@ -87,13 +92,19 @@ proc init*(T: type Cheque, buffer: seq[byte]): ProtoResult[T] = proc sendCheque*(ws: WakuSwap) {.async.} = # TODO Better peer selection, for now using hardcoded peer let peer = ws.peers[0] - let conn = await ws.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuSwapCodec) + let connOpt = await ws.peerManager.dialPeer(peer.peerInfo, WakuSwapCodec) + + if connOpt.isNone(): + # @TODO more sophisticated error handling here + error "failed to connect to remote peer" + waku_swap_errors.inc(labelValues = [dialFailure]) + return info "sendCheque" # TODO Add beneficiary, etc # XXX Hardcoded amount for now - await conn.writeLP(Cheque(amount: 1).encode().buffer) + await connOpt.get().writeLP(Cheque(amount: 1).encode().buffer) # Set new balance # XXX Assume peerId is first peer @@ -118,7 +129,7 @@ proc init*(wakuSwap: WakuSwap) = var res = Cheque.init(message) if res.isErr: error "failed to decode rpc" - waku_swap_errors.inc(labelValues = ["decode_rpc_failure"]) + waku_swap_errors.inc(labelValues = [decodeRpcFailure]) return info "received cheque", value=res.value @@ -164,11 +175,11 @@ proc init*(wakuSwap: WakuSwap) = wakuSwap.debit = debit # TODO Expression return? -proc init*(T: type WakuSwap, switch: Switch, rng: ref BrHmacDrbgContext): T = +proc init*(T: type WakuSwap, peerManager: PeerManager, rng: ref BrHmacDrbgContext): T = info "wakuSwap init 2" new result result.rng = rng - result.switch = switch + result.peerManager = peerManager result.accounting = initTable[PeerId, int]() result.text = "test" result.init() diff --git a/waku/v2/protocol/waku_swap/waku_swap_types.nim b/waku/v2/protocol/waku_swap/waku_swap_types.nim index e25b255a5..6d644b115 100644 --- a/waku/v2/protocol/waku_swap/waku_swap_types.nim +++ b/waku/v2/protocol/waku_swap/waku_swap_types.nim @@ -2,8 +2,8 @@ import std/tables, bearssl, libp2p/protocols/protocol, - libp2p/switch, - libp2p/peerinfo + libp2p/peerinfo, + ../../node/peer_manager type Beneficiary* = seq[byte] @@ -24,7 +24,7 @@ type peerInfo*: PeerInfo WakuSwap* = ref object of LPProtocol - switch*: Switch + peerManager*: PeerManager rng*: ref BrHmacDrbgContext peers*: seq[SwapPeer] text*: string