feat: add new metadata protocol (#2062)

This commit is contained in:
Alvaro Revuelta 2023-10-11 08:58:45 +02:00 committed by GitHub
parent 25d6e52e38
commit d5c3ade5e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 408 additions and 13 deletions

View File

@ -54,6 +54,11 @@ type
name: "log-format" .}: logging.LogFormat
## General node config
clusterId* {.
desc: "Cluster id that the node is running in. Node in a different cluster id is disconnected."
defaultValue: 0
name: "cluster-id" }: uint32
agentString* {.
defaultValue: "nwaku",
desc: "Node agent string which is used as identifier in network"

View File

@ -22,12 +22,12 @@ proc validateExtMultiAddrs*(vals: seq[string]):
return ok(multiaddrs)
proc dnsResolve*(domain: string, conf: WakuNodeConf): Future[Result[string, string]] {.async} =
# Use conf's DNS servers
var nameServers: seq[TransportAddress]
for ip in conf.dnsAddrsNameServers:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
let dnsResolver = DnsResolver.new(nameServers)
# Resolve domain IP
@ -93,18 +93,19 @@ proc networkConfiguration*(conf: WakuNodeConf,
if dns4DomainName.isSome() and extIp.isNone():
try:
let dnsRes = waitFor dnsResolve(conf.dns4DomainName, conf)
if dnsRes.isErr():
return err($dnsRes.error) # Pass error down the stack
extIp = some(ValidIpAddress.init(dnsRes.get()))
except CatchableError:
return err("Could not update extIp to resolved DNS IP: " & getCurrentExceptionMsg())
# Wrap in none because NetConfig does not have a default constructor
# TODO: We could change bindIp in NetConfig to be something less restrictive
# than ValidIpAddress, which doesn't allow default construction
let netConfigRes = NetConfig.init(
clusterId = conf.clusterId,
bindIp = conf.listenAddress,
bindPort = Port(uint16(conf.tcpPort) + conf.portsShift),
extIp = extIp,

View File

@ -26,6 +26,7 @@ import
../../waku/waku_filter,
../../waku/waku_lightpush,
../../waku/waku_peer_exchange,
../../waku/waku_metadata,
./testlib/common,
./testlib/testutils,
./testlib/wakucore,
@ -38,6 +39,8 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.start()))
let connOk = await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())
await sleepAsync(chronos.milliseconds(500))
check:
connOk == true
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].peerInfo.peerId)
@ -53,6 +56,8 @@ procSuite "Peer Manager":
# Dial node2 from node1
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
await sleepAsync(chronos.milliseconds(500))
# Check connection
check:
conn.isSome()
@ -145,6 +150,7 @@ procSuite "Peer Manager":
let nonExistentPeer = nonExistentPeerRes.value
require:
(await nodes[0].peerManager.connectRelay(nonExistentPeer)) == false
await sleepAsync(chronos.milliseconds(500))
check:
# Cannot connect to node2
@ -153,6 +159,8 @@ procSuite "Peer Manager":
# Successful connection
require:
(await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())) == true
await sleepAsync(chronos.milliseconds(500))
check:
# Currently connected to node2
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connected
@ -229,6 +237,8 @@ procSuite "Peer Manager":
require:
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
await sleepAsync(chronos.milliseconds(500))
check:
# Currently connected to node2
node1.peerManager.peerStore.peers().len == 1
@ -257,6 +267,36 @@ procSuite "Peer Manager":
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "Peer manager drops conections to peers on different networks":
let clusterId1 = 1.uint32
let clusterId2 = 2.uint32
let
# different network
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId1)
# same network
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2)
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2)
# Start nodes
await allFutures([node1.start(), node2.start(), node3.start()])
# 1->2 (fails)
let conn1 = await node1.peerManager.dialPeer(node2.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec)
# 1->3 (fails)
let conn2 = await node1.peerManager.dialPeer(node3.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec)
# 2->3 (succeeds)
let conn3 = await node2.peerManager.dialPeer(node3.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec)
check:
conn1.isNone
conn2.isNone
conn3.isSome
# TODO: nwaku/issues/1377
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let
@ -370,6 +410,8 @@ procSuite "Peer Manager":
(await nodes[2].peerManager.connectRelay(peerInfos[0])) == true
(await nodes[3].peerManager.connectRelay(peerInfos[0])) == true
await sleepAsync(chronos.milliseconds(500))
check:
# Peerstore track all three peers
nodes[0].peerManager.peerStore.peers().len == 3
@ -749,6 +791,7 @@ procSuite "Peer Manager":
# 2 in connections
discard await nodes[1].peerManager.connectRelay(pInfos[0])
discard await nodes[2].peerManager.connectRelay(pInfos[0])
await sleepAsync(chronos.milliseconds(500))
# but one is pruned
check nodes[0].peerManager.switch.connManager.getConnections().len == 1
@ -756,6 +799,7 @@ procSuite "Peer Manager":
# 2 out connections
discard await nodes[0].peerManager.connectRelay(pInfos[3])
discard await nodes[0].peerManager.connectRelay(pInfos[4])
await sleepAsync(chronos.milliseconds(500))
# they are also prunned
check nodes[0].peerManager.switch.connManager.getConnections().len == 1

View File

@ -0,0 +1,50 @@
{.used.}
import
std/[options, sequtils, tables],
testutils/unittests,
chronos,
chronicles,
stew/shims/net,
libp2p/switch,
libp2p/peerId,
libp2p/crypto/crypto,
libp2p/multistream,
libp2p/muxers/muxer,
eth/keys,
eth/p2p/discoveryv5/enr
import
../../waku/waku_node,
../../waku/node/peer_manager,
../../waku/waku_discv5,
../../waku/waku_metadata,
./testlib/wakucore,
./testlib/wakunode
procSuite "Waku Metadata Protocol":
# TODO: Add tests with shards when ready
asyncTest "request() returns the supported metadata of the peer":
let clusterId = 10.uint32
let
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId)
# Start nodes
await allFutures([node1.start(), node2.start()])
# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec)
require:
connOpt.isSome
# Request metadata
let response1 = await node2.wakuMetadata.request(connOpt.get())
# Check the response or dont even continue
require:
response1.isOk
check:
response1.get().clusterId.get() == clusterId

View File

@ -0,0 +1,45 @@
{.used.}
import
std/[options, sequtils, tables],
testutils/unittests,
chronos,
chronicles
import
../../waku/waku_metadata,
../../waku/waku_metadata/rpc,
./testlib/wakucore,
./testlib/wakunode
procSuite "Waku Protobufs":
# TODO: Missing test coverage in many encode/decode protobuf functions
test "WakuMetadataResponse":
let res = WakuMetadataResponse(
clusterId: some(7),
shards: @[10, 23, 33],
)
let buffer = res.encode()
let decodedBuff = WakuMetadataResponse.decode(buffer.buffer)
check:
decodedBuff.isOk()
decodedBuff.get().clusterId.get() == res.clusterId.get()
decodedBuff.get().shards == res.shards
test "WakuMetadataRequest":
let req = WakuMetadataRequest(
clusterId: some(5),
shards: @[100, 2, 0],
)
let buffer = req.encode()
let decodedBuff = WakuMetadataRequest.decode(buffer.buffer)
check:
decodedBuff.isOk()
decodedBuff.get().clusterId.get() == req.clusterId.get()
decodedBuff.get().shards == req.shards

View File

@ -54,20 +54,21 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
agentString = none(string),
clusterId: uint32 = 0.uint32,
peerStoreCapacity = none(int)): WakuNode =
var resolvedExtIp = extIp
# Update extPort to default value if it's missing and there's an extIp or a DNS domain
# Update extPort to default value if it's missing and there's an extIp or a DNS domain
let extPort = if (extIp.isSome() or dns4DomainName.isSome()) and
extPort.isNone():
some(Port(60000))
else:
extPort
if dns4DomainName.isSome() and extIp.isNone():
let conf = defaultTestWakuNodeConf()
# If there's an error resolving the IP, an exception is thrown and test fails
# If there's an error resolving the IP, an exception is thrown and test fails
let dnsRes = waitFor dnsResolve(dns4DomainName.get(), conf)
if dnsRes.isErr():
raise newException(Defect, $dnsRes.error)
@ -76,6 +77,7 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
let netConfigRes = NetConfig.init(
bindIp = bindIp,
clusterId = clusterId,
bindPort = bindPort,
extIp = resolvedExtIp,
extPort = extPort,

View File

@ -16,6 +16,7 @@ import
type NetConfig* = object
hostAddress*: MultiAddress
clusterId*: uint32
wsHostAddress*: Option[MultiAddress]
hostExtAddress*: Option[MultiAddress]
wsExtAddress*: Option[MultiAddress]
@ -69,6 +70,7 @@ proc init*(T: type NetConfig,
wssEnabled: bool = false,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
clusterId: uint32 = 0,
wakuFlags = none(CapabilitiesBitfield)): NetConfigResult =
## Initialize and validate waku node network configuration
@ -137,6 +139,7 @@ proc init*(T: type NetConfig,
ok(NetConfig(
hostAddress: hostAddress,
clusterId: clusterId,
wsHostAddress: wsHostAddress,
hostExtAddress: hostExtAddress,
wsExtAddress: wsExtAddress,

View File

@ -17,6 +17,7 @@ import
../../waku_core,
../../waku_relay,
../../waku_enr/sharding,
../../waku_metadata,
./peer_store/peer_storage,
./waku_peer_store
@ -66,6 +67,7 @@ type
PeerManager* = ref object of RootObj
switch*: Switch
peerStore*: PeerStore
wakuMetadata*: WakuMetadata
initialBackoffInSec*: int
backoffFactor*: int
maxFailedAttempts*: int
@ -138,7 +140,7 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownO
# Add peer to storage. Entry will subsequently be updated with connectedness information
if not pm.storage.isNil:
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
pm.storage.insertOrReplace(remotePeerInfo.peerId, remotePeerInfo, NotConnected)
# Connects to a given node. Note that this function uses `connect` and
# does not provide a protocol. Streams for relay (gossipsub) are created
@ -231,6 +233,7 @@ proc dialPeer(pm: PeerManager,
proc loadFromStorage(pm: PeerManager) =
debug "loading peers from storage"
# Load peers from storage, if available
var amount = 0
proc onData(peerId: PeerID, remotePeerInfo: RemotePeerInfo, connectedness: Connectedness, disconnectTime: int64) =
trace "loading peer", peerId=peerId, connectedness=connectedness
@ -250,12 +253,15 @@ proc loadFromStorage(pm: PeerManager) =
pm.peerStore[DisconnectBook][peerId] = disconnectTime
pm.peerStore[SourceBook][peerId] = remotePeerInfo.origin
amount.inc()
let res = pm.storage.getAll(onData)
if res.isErr:
warn "failed to load peers from storage", err = res.error
waku_peers_errors.inc(labelValues = ["storage_load_failure"])
else:
debug "successfully queried peer storage"
return
debug "successfully queried peer storage", amount = amount
proc canBeConnected*(pm: PeerManager,
peerId: PeerId): bool =
@ -315,6 +321,44 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected
var clusterOk = false
var reason = ""
# To prevent metadata protocol from breaking prev nodes, by now we only
# disconnect if the clusterid is specified.
if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0:
block wakuMetadata:
var conn: Connection
try:
conn = await pm.switch.dial(peerId, WakuMetadataCodec)
except CatchableError:
reason = "waku metadata codec not supported"
break wakuMetadata
# request metadata from connecting peer
let metadata = (await pm.wakuMetadata.request(conn)).valueOr:
reason = "failed waku metadata codec request"
break wakuMetadata
# does not report any clusterId
let clusterId = metadata.clusterId.valueOr:
reason = "empty clusterId reported"
break wakuMetadata
# drop it if it doesnt match our network id
if pm.wakuMetadata.clusterId != clusterId:
reason = "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId
break wakuMetadata
# reaching here means the clusterId matches
clusterOk = true
if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0 and not clusterOk:
info "disconnecting from peer", peerId=peerId, reason=reason
asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId)
# TODO: Take action depending on the supported shards as reported by metadata
let ip = pm.getPeerIp(peerId)
if ip.isSome:
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)
@ -346,6 +390,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
proc new*(T: type PeerManager,
switch: Switch,
wakuMetadata: WakuMetadata = nil,
maxRelayPeers: Option[int] = none(int),
storage: PeerStorage = nil,
initialBackoffInSec = InitialBackoffInSec,
@ -388,6 +433,7 @@ proc new*(T: type PeerManager,
let outRelayPeersTarget = max(maxRelayPeersValue div 3, 10)
let pm = PeerManager(switch: switch,
wakuMetadata: wakuMetadata,
peerStore: switch.peerStore,
storage: storage,
initialBackoffInSec: initialBackoffInSec,

View File

@ -35,6 +35,7 @@ import
../waku_filter_v2,
../waku_filter_v2/client as filter_client,
../waku_lightpush,
../waku_metadata,
../waku_lightpush/client as lightpush_client,
../waku_enr,
../waku_dnsdisc,
@ -95,6 +96,7 @@ type
wakuLightPush*: WakuLightPush
wakuLightpushClient*: WakuLightPushClient
wakuPeerExchange*: WakuPeerExchange
wakuMetadata*: WakuMetadata
enr*: enr.Record
libp2pPing*: Ping
rng*: ref rand.HmacDrbgContext
@ -143,7 +145,7 @@ proc new*(T: type WakuNode,
let queue = newAsyncEventQueue[SubscriptionEvent](30)
return WakuNode(
let node = WakuNode(
peerManager: peerManager,
switch: switch,
rng: rng,
@ -152,6 +154,14 @@ proc new*(T: type WakuNode,
topicSubscriptionQueue: queue
)
# mount metadata protocol
let metadata = WakuMetadata.new(netConfig.clusterId)
node.switch.mount(metadata, protocolMatcher(WakuMetadataCodec))
node.wakuMetadata = metadata
peerManager.wakuMetadata = metadata
return node
proc peerInfo*(node: WakuNode): PeerInfo =
node.switch.peerInfo

10
waku/waku_metadata.nim Normal file
View File

@ -0,0 +1,10 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
./waku_metadata/protocol
export
protocol

View File

@ -0,0 +1,103 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[options, sequtils, random],
stew/results,
chronicles,
chronos,
metrics,
libp2p/protocols/protocol,
libp2p/stream/connection,
libp2p/crypto/crypto,
eth/p2p/discoveryv5/enr
import
../common/nimchronos,
../waku_core,
./rpc
logScope:
topics = "waku metadata"
const WakuMetadataCodec* = "/vac/waku/metadata/1.0.0"
const RpcResponseMaxBytes* = 1024
type
WakuMetadata* = ref object of LPProtocol
clusterId*: uint32
shards*: seq[uint32]
proc respond(m: WakuMetadata, conn: Connection): Future[Result[void, string]] {.async, gcsafe.} =
try:
await conn.writeLP(WakuMetadataResponse(
clusterId: some(m.clusterId),
shards: m.shards
).encode().buffer)
except CatchableError as exc:
return err(exc.msg)
return ok()
proc request*(m: WakuMetadata, conn: Connection): Future[Result[WakuMetadataResponse, string]] {.async, gcsafe.} =
var buffer: seq[byte]
var error: string
try:
await conn.writeLP(WakuMetadataRequest(
clusterId: some(m.clusterId),
shards: m.shards,
).encode().buffer)
buffer = await conn.readLp(RpcResponseMaxBytes)
except CatchableError as exc:
error = $exc.msg
finally:
# close, no more data is expected
await conn.closeWithEof()
if error.len > 0:
return err("write/read failed: " & error)
let decodedBuff = WakuMetadataResponse.decode(buffer)
if decodedBuff.isErr():
return err("decode failed: " & $decodedBuff.error)
echo decodedBuff.get().clusterId
return ok(decodedBuff.get())
proc initProtocolHandler*(m: WakuMetadata) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var buffer: seq[byte]
try:
buffer = await conn.readLp(RpcResponseMaxBytes)
except CatchableError as exc:
return
let decBuf = WakuMetadataResponse.decode(buffer)
if decBuf.isErr():
return
let response = decBuf.get()
debug "Received WakuMetadata request",
remoteClusterId=response.clusterId,
remoteShards=response.shards,
localClusterId=m.clusterId,
localShards=m.shards
discard await m.respond(conn)
# close, no data is expected
await conn.closeWithEof()
m.handler = handle
m.codec = WakuMetadataCodec
proc new*(T: type WakuMetadata, clusterId: uint32): T =
let m = WakuMetadata(
clusterId: clusterId,
# TODO: must be updated real time
shards: @[],
)
m.initProtocolHandler()
info "Created WakuMetadata protocol", clusterId=clusterId
return m

View File

@ -0,0 +1,76 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/options
import
../common/protobuf
type
WakuMetadataRequest* = object
clusterId*: Option[uint32]
shards*: seq[uint32]
type
WakuMetadataResponse* = object
clusterId*: Option[uint32]
shards*: seq[uint32]
proc encode*(rpc: WakuMetadataRequest): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, rpc.clusterId)
for shard in rpc.shards:
pb.write3(2, shard)
pb.finish3()
pb
proc decode*(T: type WakuMetadataRequest, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = WakuMetadataRequest()
var clusterId: uint64
if not ?pb.getField(1, clusterId):
rpc.clusterId = none(uint32)
else:
rpc.clusterId = some(clusterId.uint32)
var shards: seq[uint64]
if ?pb.getRepeatedField(2, shards):
for shard in shards:
rpc.shards.add(shard.uint32)
ok(rpc)
proc encode*(rpc: WakuMetadataResponse): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, rpc.clusterId)
for shard in rpc.shards:
pb.write3(2, shard)
pb.finish3()
pb
proc decode*(T: type WakuMetadataResponse, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = WakuMetadataResponse()
var clusterId: uint64
if not ?pb.getField(1, clusterId):
rpc.clusterId = none(uint32)
else:
rpc.clusterId = some(clusterId.uint32)
var shards: seq[uint64]
if ?pb.getRepeatedField(2, shards):
for shard in shards:
rpc.shards.add(shard.uint32)
ok(rpc)