Removed local peer sets for filter, swap and store (#375)

This commit is contained in:
Hanno Cornelius 2021-02-11 10:58:25 +02:00 committed by GitHub
parent 0dac4efa10
commit f364c1a9cd
10 changed files with 157 additions and 78 deletions

View File

@ -6,6 +6,7 @@
- Docs: Add information on how to query Status test fleet for node addresses; how to view logs and how to update submodules. - Docs: Add information on how to query Status test fleet for node addresses; how to view logs and how to update submodules.
- PubSub topic `subscribe` and `unsubscribe` no longer returns a future (removed `async` designation) - PubSub topic `subscribe` and `unsubscribe` no longer returns a future (removed `async` designation)
- Added a peer manager for `relay`, `filter`, `store` and `swap` peers. - Added a peer manager for `relay`, `filter`, `store` and `swap` peers.
- `relay`, `filter`, `store` and `swap` peers are now stored in a common, shared peer store and no longer in separate sets.
## 2021-01-05 v0.2 ## 2021-01-05 v0.2

View File

@ -12,6 +12,9 @@ import
../../waku/v2/node/wakunode2, ../../waku/v2/node/wakunode2,
../../waku/v2/node/peer_manager, ../../waku/v2/node/peer_manager,
../../waku/v2/protocol/waku_relay, ../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_swap/waku_swap,
../test_helpers ../test_helpers
procSuite "Peer Manager": procSuite "Peer Manager":
@ -72,3 +75,46 @@ procSuite "Peer Manager":
connOpt.isNone() connOpt.isNone()
await node1.stop() await node1.stop()
asyncTest "Adding, selecting and filtering peers work":
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
Port(60000))
# Create filter peer
filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
filterKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
filterPeer = PeerInfo.init(filterKey, @[filterLoc])
# Create swap peer
swapLoc = MultiAddress.init("/ip4/127.0.0.2/tcp/2").tryGet()
swapKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
swapPeer = PeerInfo.init(swapKey, @[swapLoc])
# Create store peer
storeLoc = MultiAddress.init("/ip4/127.0.0.3/tcp/4").tryGet()
storeKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
storePeer = PeerInfo.init(storeKey, @[storeLoc])
await node.start()
node.mountFilter()
node.mountSwap()
node.mountStore()
node.wakuFilter.setPeer(filterPeer)
node.wakuSwap.setPeer(swapPeer)
node.wakuStore.setPeer(storePeer)
# Check peers were successfully added to peer manager
check:
node.peerManager.peers().len == 3
node.peerManager.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
it.addrs.contains(filterLoc) and
it.protos.contains(WakuFilterCodec))
node.peerManager.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and
it.addrs.contains(swapLoc) and
it.protos.contains(WakuSwapCodec))
node.peerManager.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
it.addrs.contains(storeLoc) and
it.protos.contains(WakuStoreCodec))
await node.stop()

View File

@ -7,6 +7,7 @@ import
../../protocol/waku_store/[waku_store_types, waku_store], ../../protocol/waku_store/[waku_store_types, waku_store],
../../protocol/waku_swap/[waku_swap_types, waku_swap], ../../protocol/waku_swap/[waku_swap_types, waku_swap],
../../protocol/waku_filter/[waku_filter_types, waku_filter], ../../protocol/waku_filter/[waku_filter_types, waku_filter],
../../protocol/waku_relay,
../wakunode2, ../wakunode2,
../peer_manager, ../peer_manager,
./jsonrpc_types ./jsonrpc_types
@ -37,36 +38,36 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Managed peers ## Managed peers
if not node.wakuRelay.isNil: if not node.wakuRelay.isNil:
# Map all managed peers to WakuPeers and add to return list # Map managed peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), wPeers.insert(node.peerManager.peers(WakuRelayCodec)
protocol: toSeq(it.protos.items)[0], .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
connected: node.peerManager.connectedness(it.peerId))), protocol: WakuRelayCodec,
connected: node.peerManager.connectedness(it.peerId))),
wPeers.len) # Append to the end of the sequence
if not node.wakuFilter.isNil:
# Map WakuFilter peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peers(WakuFilterCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
protocol: WakuFilterCodec,
connected: node.peerManager.connectedness(it.peerId))),
wPeers.len) # Append to the end of the sequence wPeers.len) # Append to the end of the sequence
## Unmanaged peers
## @TODO add these peers to peer manager
if not node.wakuSwap.isNil: if not node.wakuSwap.isNil:
# Map WakuSwap peers to WakuPeers and add to return list # Map WakuSwap peers to WakuPeers and add to return list
wPeers.insert(node.wakuSwap.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it.peerInfo), wPeers.insert(node.peerManager.peers(WakuSwapCodec)
protocol: WakuSwapCodec, .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
connected: node.switch.isConnected(it.peerInfo))), protocol: WakuSwapCodec,
wPeers.len) # Append to the end of the sequence connected: node.peerManager.connectedness(it.peerId))),
if not node.wakuFilter.isNil:
# Map WakuFilter peers to WakuPeers and add to return list
wPeers.insert(node.wakuFilter.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it.peerInfo),
protocol: WakuFilterCodec,
connected: node.switch.isConnected(it.peerInfo))),
wPeers.len) # Append to the end of the sequence wPeers.len) # Append to the end of the sequence
if not node.wakuStore.isNil: if not node.wakuStore.isNil:
# Map WakuStore peers to WakuPeers and add to return list # Map WakuStore peers to WakuPeers and add to return list
wPeers.insert(node.wakuStore.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it.peerInfo), wPeers.insert(node.peerManager.peers(WakuStoreCodec)
protocol: WakuStoreCodec, .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
connected: node.switch.isConnected(it.peerInfo))), protocol: WakuStoreCodec,
connected: node.peerManager.connectedness(it.peerId))),
wPeers.len) # Append to the end of the sequence wPeers.len) # Append to the end of the sequence
# @TODO filter output on protocol/connected-status # @TODO filter output on protocol/connected-status
return wPeers return wPeers

View File

@ -1,7 +1,7 @@
{.push raises: [Defect, Exception].} {.push raises: [Defect, Exception].}
import import
std/[options, sets], std/[options, sets, sequtils],
chronos, chronicles, metrics, chronos, chronicles, metrics,
libp2p/standard_setup, libp2p/standard_setup,
libp2p/peerstore libp2p/peerstore
@ -29,12 +29,42 @@ proc new*(T: type PeerManager, switch: Switch): PeerManager =
# Helper functions # # Helper functions #
#################### ####################
proc hasPeer(pm: PeerManager, peerInfo: PeerInfo, proto: string): bool = proc toPeerInfo(storedInfo: StoredInfo): PeerInfo =
PeerInfo.init(peerId = storedInfo.peerId,
addrs = toSeq(storedInfo.addrs),
protocols = toSeq(storedInfo.protos))
#####################
# Manager interface #
#####################
proc peers*(pm: PeerManager): seq[StoredInfo] =
# Return the known info for all peers
pm.peerStore.peers()
proc peers*(pm: PeerManager, proto: string): seq[StoredInfo] =
# Return the known info for all peers registered on the specified protocol
pm.peers.filterIt(it.protos.contains(proto))
proc connectedness*(pm: PeerManager, peerId: PeerId): bool =
# Return the connection state of the given, managed peer
# @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
# @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
let storedInfo = pm.peerStore.get(peerId)
if (storedInfo == StoredInfo()):
# Peer is not managed, therefore not connected
return false
else:
pm.switch.isConnected(peerId)
proc hasPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string): bool =
# Returns `true` if peer is included in manager for the specified protocol # Returns `true` if peer is included in manager for the specified protocol
pm.peerStore.get(peerInfo.peerId).protos.contains(proto) pm.peerStore.get(peerInfo.peerId).protos.contains(proto)
proc addPeer(pm: PeerManager, peerInfo: PeerInfo, proto: string) = proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) =
# Adds peer to manager for the specified protocol # Adds peer to manager for the specified protocol
debug "Adding peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto debug "Adding peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
@ -52,6 +82,17 @@ proc addPeer(pm: PeerManager, peerInfo: PeerInfo, proto: string) =
# ...associated protocols # ...associated protocols
pm.peerStore.protoBook.add(peerInfo.peerId, proto) pm.peerStore.protoBook.add(peerInfo.peerId, proto)
proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] =
# Selects the best peer for a given protocol
let peers = pm.peers.filterIt(it.protos.contains(proto))
if peers.len >= 1:
# @TODO proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
let peerStored = peers[0]
return some(peerStored.toPeerInfo())
else:
return none(PeerInfo)
#################### ####################
# Dialer interface # # Dialer interface #
@ -89,24 +130,3 @@ proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout =
debug "Dialing remote peer failed", msg = e.msg debug "Dialing remote peer failed", msg = e.msg
waku_peers_dials.inc(labelValues = ["failed"]) waku_peers_dials.inc(labelValues = ["failed"])
return none(Connection) return none(Connection)
#####################
# Manager interface #
#####################
proc peers*(pm: PeerManager): seq[StoredInfo] =
# Return the known info for all peers
pm.peerStore.peers()
proc connectedness*(pm: PeerManager, peerId: PeerId): bool =
# Return the connection state of the given, managed peer
# @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
# @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
let storedInfo = pm.peerStore.get(peerId)
if (storedInfo == StoredInfo()):
# Peer is not managed, therefore not connected
return false
else:
pm.switch.isConnected(peerId)

View File

@ -30,7 +30,6 @@ logScope:
const const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
# Error types (metric label values) # Error types (metric label values)
const const
dialFailure = "dial_failure" dialFailure = "dial_failure"
@ -198,9 +197,8 @@ proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgCont
result.pushHandler = handler result.pushHandler = handler
result.init() result.init()
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
proc setPeer*(wf: WakuFilter, peer: PeerInfo) = proc setPeer*(wf: WakuFilter, peer: PeerInfo) =
wf.peers.add(FilterPeer(peerInfo: peer)) wf.peerManager.addPeer(peer, WakuFilterCodec)
waku_filter_peers.inc() waku_filter_peers.inc()
proc subscription*(proto: WakuFilter): MessageNotificationSubscription = proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
@ -228,8 +226,10 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
MessageNotificationSubscription.init(@[], handle) MessageNotificationSubscription.init(@[], handle)
proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} = proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} =
if wf.peers.len >= 1: let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)
let peer = wf.peers[0].peerInfo # @TODO: select peer from manager rather than from local set
if peerOpt.isSome:
let peer = peerOpt.get()
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
@ -246,9 +246,13 @@ proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]]
proc unsubscribe*(wf: WakuFilter, request: FilterRequest) {.async, gcsafe.} = proc unsubscribe*(wf: WakuFilter, request: FilterRequest) {.async, gcsafe.} =
# @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC. # @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC.
let id = generateRequestId(wf.rng) let
if wf.peers.len >= 1: id = generateRequestId(wf.rng)
let peer = wf.peers[0].peerInfo # @TODO: select peer from manager rather than from local set peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isSome:
# @TODO: if there are more than one WakuFilter peer, WakuFilter should unsubscribe from all peers
let peer = peerOpt.get()
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)

View File

@ -41,12 +41,8 @@ type
MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.} MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.}
FilterPeer* = object
peerInfo*: PeerInfo
WakuFilter* = ref object of LPProtocol WakuFilter* = ref object of LPProtocol
rng*: ref BrHmacDrbgContext rng*: ref BrHmacDrbgContext
peerManager*: PeerManager peerManager*: PeerManager
peers*: seq[FilterPeer]
subscribers*: seq[Subscriber] subscribers*: seq[Subscriber]
pushHandler*: MessagePushHandler pushHandler*: MessagePushHandler

View File

@ -361,7 +361,7 @@ proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgConte
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY # @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
proc setPeer*(ws: WakuStore, peer: PeerInfo) = proc setPeer*(ws: WakuStore, peer: PeerInfo) =
ws.peers.add(HistoryPeer(peerInfo: peer)) ws.peerManager.addPeer(peer, WakuStoreCodec)
waku_store_peers.inc() waku_store_peers.inc()
proc subscription*(proto: WakuStore): MessageNotificationSubscription = proc subscription*(proto: WakuStore): MessageNotificationSubscription =
@ -391,8 +391,14 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
# - latency? # - latency?
# - default store peer? # - default store peer?
let peer = w.peers[0] let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
let connOpt = await w.peerManager.dialPeer(peer.peerInfo, WakuStoreCodec)
if peerOpt.isNone():
error "failed to connect to remote peer"
waku_store_errors.inc(labelValues = [dialFailure])
return
let connOpt = await w.peerManager.dialPeer(peerOpt.get(), WakuStoreCodec)
if connOpt.isNone(): if connOpt.isNone():
# @TODO more sophisticated error handling here # @TODO more sophisticated error handling here
@ -424,8 +430,14 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
# - latency? # - latency?
# - default store peer? # - default store peer?
let peer = ws.peers[0] let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec)
let connOpt = await ws.peerManager.dialPeer(peer.peerInfo, WakuStoreCodec)
if peerOpt.isNone():
error "failed to connect to remote peer"
waku_store_errors.inc(labelValues = [dialFailure])
return
let connOpt = await ws.peerManager.dialPeer(peerOpt.get(), WakuStoreCodec)
if connOpt.isNone(): if connOpt.isNone():
# @TODO more sophisticated error handling here # @TODO more sophisticated error handling here
@ -446,7 +458,7 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
# NOTE Perform accounting operation # NOTE Perform accounting operation
# Assumes wakuSwap protocol is mounted # Assumes wakuSwap protocol is mounted
let peerId = peer.peerInfo.peerId let peerId = peerOpt.get().peerId
let messages = response.value.response.messages let messages = response.value.response.messages
ws.wakuSwap.debit(peerId, messages.len) ws.wakuSwap.debit(peerId, messages.len)

View File

@ -49,13 +49,9 @@ type
query*: HistoryQuery query*: HistoryQuery
response*: HistoryResponse response*: HistoryResponse
HistoryPeer* = object
peerInfo*: PeerInfo
WakuStore* = ref object of LPProtocol WakuStore* = ref object of LPProtocol
peerManager*: PeerManager peerManager*: PeerManager
rng*: ref BrHmacDrbgContext rng*: ref BrHmacDrbgContext
peers*: seq[HistoryPeer]
messages*: seq[IndexedWakuMessage] messages*: seq[IndexedWakuMessage]
store*: MessageStore store*: MessageStore
wakuSwap*: WakuSwap wakuSwap*: WakuSwap

View File

@ -90,9 +90,16 @@ proc init*(T: type Cheque, buffer: seq[byte]): ProtoResult[T] =
# TODO Test for credit/debit operations in succession # TODO Test for credit/debit operations in succession
proc sendCheque*(ws: WakuSwap) {.async.} = proc sendCheque*(ws: WakuSwap) {.async.} =
# TODO Better peer selection, for now using hardcoded peer let peerOpt = ws.peerManager.selectPeer(WakuSwapCodec)
let peer = ws.peers[0]
let connOpt = await ws.peerManager.dialPeer(peer.peerInfo, WakuSwapCodec) if peerOpt.isNone():
error "failed to connect to remote peer"
waku_swap_errors.inc(labelValues = [dialFailure])
return
let peer = peerOpt.get()
let connOpt = await ws.peerManager.dialPeer(peer, WakuSwapCodec)
if connOpt.isNone(): if connOpt.isNone():
# @TODO more sophisticated error handling here # @TODO more sophisticated error handling here
@ -107,8 +114,7 @@ proc sendCheque*(ws: WakuSwap) {.async.} =
await connOpt.get().writeLP(Cheque(amount: 1).encode().buffer) await connOpt.get().writeLP(Cheque(amount: 1).encode().buffer)
# Set new balance # Set new balance
# XXX Assume peerId is first peer let peerId = peer.peerId
let peerId = ws.peers[0].peerInfo.peerId
ws.accounting[peerId] -= 1 ws.accounting[peerId] -= 1
info "New accounting state", accounting = ws.accounting[peerId] info "New accounting state", accounting = ws.accounting[peerId]
@ -116,7 +122,8 @@ proc sendCheque*(ws: WakuSwap) {.async.} =
proc handleCheque*(ws: WakuSwap, cheque: Cheque) = proc handleCheque*(ws: WakuSwap, cheque: Cheque) =
info "handle incoming cheque" info "handle incoming cheque"
# XXX Assume peerId is first peer # XXX Assume peerId is first peer
let peerId = ws.peers[0].peerInfo.peerId let peerOpt = ws.peerManager.selectPeer(WakuSwapCodec)
let peerId = peerOpt.get().peerId
ws.accounting[peerId] += int(cheque.amount) ws.accounting[peerId] += int(cheque.amount)
info "New accounting state", accounting = ws.accounting[peerId] info "New accounting state", accounting = ws.accounting[peerId]
@ -185,7 +192,7 @@ proc init*(T: type WakuSwap, peerManager: PeerManager, rng: ref BrHmacDrbgContex
result.init() result.init()
proc setPeer*(ws: WakuSwap, peer: PeerInfo) = proc setPeer*(ws: WakuSwap, peer: PeerInfo) =
ws.peers.add(SwapPeer(peerInfo: peer)) ws.peerManager.addPeer(peer, WakuSwapCodec)
waku_swap_peers.inc() waku_swap_peers.inc()
# TODO End to end communication # TODO End to end communication

View File

@ -20,13 +20,9 @@ type
CreditHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.} CreditHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.}
DebitHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.} DebitHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.}
SwapPeer* = object
peerInfo*: PeerInfo
WakuSwap* = ref object of LPProtocol WakuSwap* = ref object of LPProtocol
peerManager*: PeerManager peerManager*: PeerManager
rng*: ref BrHmacDrbgContext rng*: ref BrHmacDrbgContext
peers*: seq[SwapPeer]
text*: string text*: string
accounting*: Table[PeerId, int] accounting*: Table[PeerId, int]
credit*: CreditHandler credit*: CreditHandler