Integrate peer manager with Swap and Store (#370)

This commit is contained in:
Hanno Cornelius 2021-02-09 10:31:38 +02:00 committed by GitHub
parent d05692587d
commit 1b96ee0adb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 69 additions and 35 deletions

View File

@ -5,7 +5,7 @@
- Refactor: Split out `waku_types` types into right place; create utils folder.
- Docs: Add information on how to query Status test fleet for node addresses; how to view logs and how to update submodules.
- PubSub topic `subscribe` and `unsubscribe` no longer returns a future (removed `async` designation)
- Added a peer manager for `relay` and `filter` peers.
- Added a peer manager for `relay`, `filter`, `store` and `swap` peers.
## 2021-01-05 v0.2

View File

@ -421,6 +421,10 @@ procSuite "Waku v2 JSON-RPC API":
asyncTest "Admin API: get unmanaged peer information":
const cTopic = ContentTopic(1)
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
Port(60000))
waitFor node.start()

View File

@ -11,6 +11,7 @@ import
../../waku/v2/protocol/[waku_message, message_notifier],
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/node/message_store/waku_message_store,
../../waku/v2/node/peer_manager,
../test_helpers, ./utils
procSuite "Waku Store":
@ -29,7 +30,7 @@ procSuite "Waku Store":
discard await listenSwitch.start()
let
proto = WakuStore.init(dialSwitch, crypto.newRng())
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
subscription = proto.subscription()
rpc = HistoryQuery(topics: @[topic])
@ -73,7 +74,7 @@ procSuite "Waku Store":
discard await listenSwitch.start()
let
proto = WakuStore.init(dialSwitch, crypto.newRng(), store)
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
subscription = proto.subscription()
rpc = HistoryQuery(topics: @[topic])
@ -101,7 +102,7 @@ procSuite "Waku Store":
(await completionFut.withTimeout(5.seconds)) == true
let
proto2 = WakuStore.init(dialSwitch, crypto.newRng(), store)
proto2 = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
key2 = PrivateKey.random(ECDSA, rng[]).get()
var listenSwitch2 = newStandardSwitch(some(key2))
@ -146,7 +147,7 @@ procSuite "Waku Store":
discard await listenSwitch.start()
let
proto = WakuStore.init(dialSwitch, crypto.newRng())
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
subscription = proto.subscription()
rpc = HistoryQuery(topics: @[ContentTopic(1)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) )
@ -198,7 +199,7 @@ procSuite "Waku Store":
discard await listenSwitch.start()
let
proto = WakuStore.init(dialSwitch, crypto.newRng())
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
subscription = proto.subscription()
proto.setPeer(listenSwitch.peerInfo)
@ -248,7 +249,7 @@ procSuite "Waku Store":
discard await listenSwitch.start()
let
proto = WakuStore.init(dialSwitch, crypto.newRng())
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
subscription = proto.subscription()
proto.setPeer(listenSwitch.peerInfo)

View File

@ -292,7 +292,7 @@ proc mountFilter*(node: WakuNode) =
# because store is using a reference to the swap protocol.
proc mountSwap*(node: WakuNode) =
info "mounting swap"
node.wakuSwap = WakuSwap.init(node.switch, node.rng)
node.wakuSwap = WakuSwap.init(node.peerManager, node.rng)
node.switch.mount(node.wakuSwap)
# NYI - Do we need this?
#node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription())
@ -302,10 +302,10 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil) =
if node.wakuSwap.isNil:
debug "mounting store without swap"
node.wakuStore = WakuStore.init(node.switch, node.rng, store)
node.wakuStore = WakuStore.init(node.peerManager, node.rng, store)
else:
debug "mounting store with swap"
node.wakuStore = WakuStore.init(node.switch, node.rng, store, node.wakuSwap)
node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, node.wakuSwap)
node.switch.mount(node.wakuStore)
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())

View File

@ -6,7 +6,6 @@ import
std/[tables, times, sequtils, algorithm, options],
bearssl,
chronos, chronicles, metrics, stew/[results, byteutils, endians2],
libp2p/switch,
libp2p/crypto/crypto,
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
@ -15,7 +14,8 @@ import
../../node/message_store/message_store,
../waku_swap/waku_swap,
./waku_store_types,
../../utils/requests
../../utils/requests,
../../node/peer_manager
export waku_store_types
@ -29,6 +29,11 @@ logScope:
const
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta1"
# Error types (metric label values)
const
dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure"
# TODO Move serialization function to separate file, too noisy
# TODO Move pagination to separate file, self-contained logic
@ -307,7 +312,7 @@ method init*(ws: WakuStore) =
var res = HistoryRPC.init(message)
if res.isErr:
error "failed to decode rpc"
waku_store_errors.inc(labelValues = ["decode_rpc_failure"])
waku_store_errors.inc(labelValues = [decodeRpcFailure])
return
info "received query"
@ -345,11 +350,11 @@ method init*(ws: WakuStore) =
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])
proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext,
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext,
store: MessageStore = nil, wakuSwap: WakuSwap = nil): T =
new result
result.rng = rng
result.switch = switch
result.peerManager = peerManager
result.store = store
result.wakuSwap = wakuSwap
result.init()
@ -387,17 +392,23 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
# - default store peer?
let peer = w.peers[0]
let conn = await w.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec)
let connOpt = await w.peerManager.dialPeer(peer.peerInfo, WakuStoreCodec)
await conn.writeLP(HistoryRPC(requestId: generateRequestId(w.rng),
if connOpt.isNone():
# @TODO more sophisticated error handling here
error "failed to connect to remote peer"
waku_store_errors.inc(labelValues = [dialFailure])
return
await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng),
query: query).encode().buffer)
var message = await conn.readLp(64*1024)
var message = await connOpt.get().readLp(64*1024)
let response = HistoryRPC.init(message)
if response.isErr:
error "failed to decode response"
waku_store_errors.inc(labelValues = ["decode_rpc_failure"])
waku_store_errors.inc(labelValues = [decodeRpcFailure])
return
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
@ -414,17 +425,23 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
# - default store peer?
let peer = ws.peers[0]
let conn = await ws.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec)
let connOpt = await ws.peerManager.dialPeer(peer.peerInfo, WakuStoreCodec)
await conn.writeLP(HistoryRPC(requestId: generateRequestId(ws.rng),
if connOpt.isNone():
# @TODO more sophisticated error handling here
error "failed to connect to remote peer"
waku_store_errors.inc(labelValues = [dialFailure])
return
await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(ws.rng),
query: query).encode().buffer)
var message = await conn.readLp(64*1024)
var message = await connOpt.get().readLp(64*1024)
let response = HistoryRPC.init(message)
if response.isErr:
error "failed to decode response"
waku_store_errors.inc(labelValues = ["decode_rpc_failure"])
waku_store_errors.inc(labelValues = [decodeRpcFailure])
return
# NOTE Perform accounting operation

View File

@ -2,12 +2,13 @@
import
bearssl,
libp2p/[switch, peerinfo],
libp2p/peerinfo,
libp2p/protocols/protocol,
../waku_swap/waku_swap_types,
../waku_message,
../../node/message_store/message_store,
../../utils/pagination
../../utils/pagination,
../../node/peer_manager
export waku_message
export pagination
@ -52,7 +53,7 @@ type
peerInfo*: PeerInfo
WakuStore* = ref object of LPProtocol
switch*: Switch
peerManager*: PeerManager
rng*: ref BrHmacDrbgContext
peers*: seq[HistoryPeer]
messages*: seq[IndexedWakuMessage]

View File

@ -25,11 +25,11 @@ import
std/[tables, options],
bearssl,
chronos, chronicles, metrics, stew/results,
libp2p/switch,
libp2p/crypto/crypto,
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
../../node/peer_manager,
../message_notifier,
./waku_swap_types
@ -43,6 +43,11 @@ logScope:
const WakuSwapCodec* = "/vac/waku/swap/2.0.0-alpha1"
# Error types (metric label values)
const
dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure"
# Serialization
# -------------------------------------------------------------------------------
proc encode*(handshake: Handshake): ProtoBuffer =
@ -87,13 +92,19 @@ proc init*(T: type Cheque, buffer: seq[byte]): ProtoResult[T] =
proc sendCheque*(ws: WakuSwap) {.async.} =
# TODO Better peer selection, for now using hardcoded peer
let peer = ws.peers[0]
let conn = await ws.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuSwapCodec)
let connOpt = await ws.peerManager.dialPeer(peer.peerInfo, WakuSwapCodec)
if connOpt.isNone():
# @TODO more sophisticated error handling here
error "failed to connect to remote peer"
waku_swap_errors.inc(labelValues = [dialFailure])
return
info "sendCheque"
# TODO Add beneficiary, etc
# XXX Hardcoded amount for now
await conn.writeLP(Cheque(amount: 1).encode().buffer)
await connOpt.get().writeLP(Cheque(amount: 1).encode().buffer)
# Set new balance
# XXX Assume peerId is first peer
@ -118,7 +129,7 @@ proc init*(wakuSwap: WakuSwap) =
var res = Cheque.init(message)
if res.isErr:
error "failed to decode rpc"
waku_swap_errors.inc(labelValues = ["decode_rpc_failure"])
waku_swap_errors.inc(labelValues = [decodeRpcFailure])
return
info "received cheque", value=res.value
@ -164,11 +175,11 @@ proc init*(wakuSwap: WakuSwap) =
wakuSwap.debit = debit
# TODO Expression return?
proc init*(T: type WakuSwap, switch: Switch, rng: ref BrHmacDrbgContext): T =
proc init*(T: type WakuSwap, peerManager: PeerManager, rng: ref BrHmacDrbgContext): T =
info "wakuSwap init 2"
new result
result.rng = rng
result.switch = switch
result.peerManager = peerManager
result.accounting = initTable[PeerId, int]()
result.text = "test"
result.init()

View File

@ -2,8 +2,8 @@ import
std/tables,
bearssl,
libp2p/protocols/protocol,
libp2p/switch,
libp2p/peerinfo
libp2p/peerinfo,
../../node/peer_manager
type
Beneficiary* = seq[byte]
@ -24,7 +24,7 @@ type
peerInfo*: PeerInfo
WakuSwap* = ref object of LPProtocol
switch*: Switch
peerManager*: PeerManager
rng*: ref BrHmacDrbgContext
peers*: seq[SwapPeer]
text*: string