chore: saving peers enr capabilities (#3127)

This commit is contained in:
gabrielmer 2024-10-24 15:31:04 +03:00 committed by GitHub
parent dbf02226cd
commit 69d9524fa4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 209 additions and 50 deletions

View File

@ -1,7 +1,7 @@
{.used.}
import std/[options, sequtils], stew/results, testutils/unittests
import waku/waku_core, waku/waku_enr, ./testlib/wakucore
import waku/waku_core, waku/waku_enr, ./testlib/wakucore, waku/waku_core/codecs
suite "Waku ENR - Capabilities bitfield":
test "check capabilities support":
@ -97,6 +97,28 @@ suite "Waku ENR - Capabilities bitfield":
bitfield.supportsCapability(Capabilities.Lightpush) == false
bitfield.toCapabilities() == @[Capabilities.Relay, Capabilities.Store]
test "get capabilities codecs from record":
## Given
let
enrSeqNum = 1u64
enrPrivKey = generatesecp256k1key()
## When
var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
builder.withWakuCapabilities(Capabilities.Relay, Capabilities.Store)
let recordRes = builder.build()
## Then
assert recordRes.isOk(), $recordRes.error
let record = recordRes.tryGet()
let codecs = record.getCapabilitiesCodecs()
check:
codecs.len == 2
codecs.contains(WakuRelayCodec)
codecs.contains(WakuStoreCodec)
test "check capabilities on a non-waku node record":
## Given
# non waku enr, i.e. Ethereum one

View File

@ -1,8 +1,13 @@
{.used.}
import
stew/results, testutils/unittests, libp2p/multiaddress, libp2p/peerid, libp2p/errors
import waku/waku_core
stew/results,
testutils/unittests,
libp2p/multiaddress,
libp2p/peerid,
libp2p/errors,
confutils/toml/std/net
import waku/[waku_core, waku_core/codecs, waku_enr], ../testlib/wakucore
suite "Waku Core - Peers":
test "Peer info parses correctly":
@ -141,3 +146,34 @@ suite "Waku Core - Peers":
## Then
check:
parsePeerInfo(address).isErr()
test "ENRs capabilities are filled when creating RemotePeerInfo":
let
enrSeqNum = 1u64
enrPrivKey = generatesecp256k1key()
## When
var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
builder.withIpAddressAndPorts(
ipAddr = some(parseIpAddress("127.0.0.1")),
tcpPort = some(Port(0)),
udpPort = some(Port(0)),
)
builder.withWakuCapabilities(Capabilities.Relay, Capabilities.Store)
let recordRes = builder.build()
## Then
assert recordRes.isOk(), $recordRes.error
let record = recordRes.tryGet()
let remotePeerInfoRes = record.toRemotePeerInfo()
assert remotePeerInfoRes.isOk(),
"failed creating RemotePeerInfo: " & $remotePeerInfoRes.error()
let remotePeerInfo = remotePeerInfoRes.get()
check:
remotePeerInfo.protocols.len == 2
remotePeerInfo.protocols.contains(WakuRelayCodec)
remotePeerInfo.protocols.contains(WakuStoreCodec)

View File

@ -12,26 +12,26 @@ import
import
waku/[waku_core/topics, waku_enr, discovery/waku_discv5, common/enr],
../testlib/[wakucore, testasync, assertions, futures],
../testlib/[wakucore, testasync, assertions, futures, wakunode],
../waku_enr/utils,
./utils
import eth/p2p/discoveryv5/enr as ethEnr
procSuite "Waku Discovery v5":
include waku/factory/waku
suite "Waku Discovery v5":
const validEnr =
"enr:-K64QGAvsATunmvMT5c3LFjKS0tG39zlQ1195Z2pWu6RoB5fWP3EXz9QPlRXN" &
"wOtDoRLgm4bATUB53AC8uml-ZtUE_kBgmlkgnY0gmlwhApkZgOKbXVsdGlhZGRyc4" &
"CCcnOTAAAIAAAAAQACAAMABAAFAAYAB4lzZWNwMjU2azGhAwG-CMmXpAPj84f6dCt" &
"MZ6xVYOa6bdmgAiKYG6LKGQlbg3RjcILqYIV3YWt1MgE"
let
rng = eth_keys.newRng()
pk1 = eth_keys.PrivateKey.random(rng[])
pk2 = eth_keys.PrivateKey.random(rng[])
enrNoCapabilities =
initRecord(1, pk1, {"rs": @[0.byte, 0.byte, 1.byte, 0.byte, 0.byte]}).value()
enrRelay = initRecord(
1, pk2, {"waku2": @[1.byte], "rs": @[0.byte, 1.byte, 1.byte, 0.byte, 1.byte]}
)
.value()
enrNoShardingInfo = initRecord(1, pk1, {"waku2": @[1.byte]}).value()
suite "shardingPredicate":
var
recordCluster21 {.threadvar.}: Record
@ -120,6 +120,14 @@ procSuite "Waku Discovery v5":
asyncTest "filter peer per bootnode":
let
enrRelay = initRecord(
1,
pk2,
{"waku2": @[1.byte], "rs": @[0.byte, 1.byte, 1.byte, 0.byte, 1.byte]},
)
.value()
enrNoCapabilities =
initRecord(1, pk1, {"rs": @[0.byte, 0.byte, 1.byte, 0.byte, 0.byte]}).value()
predicateNoCapabilities =
shardingPredicate(enrNoCapabilities, @[enrNoCapabilities]).get()
predicateNoCapabilitiesWithBoth =
@ -151,8 +159,10 @@ procSuite "Waku Discovery v5":
predicateRecord.isNone()
asyncTest "no relay sharding info":
let predicateNoShardingInfo =
shardingPredicate(enrNoShardingInfo, @[enrNoShardingInfo])
let
enrNoShardingInfo = initRecord(1, pk1, {"waku2": @[1.byte]}).value()
predicateNoShardingInfo =
shardingPredicate(enrNoShardingInfo, @[enrNoShardingInfo])
check:
predicateNoShardingInfo.isNone()
@ -166,7 +176,7 @@ procSuite "Waku Discovery v5":
indices: seq[uint64] = @[],
recordFlags: Option[CapabilitiesBitfield] = none(CapabilitiesBitfield),
bootstrapRecords: seq[waku_enr.Record] = @[],
): (WakuDiscoveryV5, Record) =
): (WakuDiscoveryV5, Record) {.raises: [ValueError, LPError].} =
let
privKey = generateSecp256k1Key()
record = newTestEnrRecord(
@ -188,17 +198,6 @@ procSuite "Waku Discovery v5":
(node, record)
let filterForStore: WakuDiscv5Predicate = proc(record: waku_enr.Record): bool =
let typedRecord = record.toTyped()
if typedRecord.isErr():
return false
let capabilities = typedRecord.value.waku2
if capabilities.isNone():
return false
return capabilities.get().supportsCapability(Capabilities.Store)
asyncTest "find random peers without predicate":
# Given 3 nodes
let
@ -234,6 +233,17 @@ procSuite "Waku Discovery v5":
await allFutures(node1.stop(), node2.stop(), node3.stop())
asyncTest "find random peers with parameter predicate":
let filterForStore: WakuDiscv5Predicate = proc(record: waku_enr.Record): bool =
let typedRecord = record.toTyped()
if typedRecord.isErr():
return false
let capabilities = typedRecord.value.waku2
if capabilities.isNone():
return false
return capabilities.get().supportsCapability(Capabilities.Store)
# Given 4 nodes
let
(node3, record3) = buildNode(
@ -346,11 +356,6 @@ procSuite "Waku Discovery v5":
await allFutures(node1.stop(), node2.stop(), node3.stop(), node4.stop())
suite "addBoostrapNode":
let validEnr =
"enr:-I-4QG3mX250ArniAs2DLpW-QHOLKSD5x_Ibp8AYcQZbz1HhHFJtl2dNDGcha" &
"U5ugLbDKRgtTDZH8NsxXlTXDpYAgzgBgmlkgnY0gnJzjwAVBgABAAIABQAHAAkAC4" &
"lzZWNwMjU2azGhA4_KwN0NRRmmfQ-B9B2h2PZjoJvBnaIOi6sR_b2UTQBBhXdha3U" & "yAQ"
asyncTest "address is valid":
# Given an empty list of enrs
var enrs: seq[Record] = @[]
@ -400,3 +405,42 @@ procSuite "Waku Discovery v5":
# Then the enr is not added to the list
check:
enrs.len == 0
suite "waku discv5 initialization":
asyncTest "Discv5 bootstrap nodes should be added to the peer store":
var conf = defaultTestWakuNodeConf()
conf.discv5BootstrapNodes = @[validEnr]
let waku = Waku.init(conf).valueOr:
raiseAssert error
discard setupDiscoveryV5(
waku.node.enr, waku.node.peerManager, waku.node.topicSubscriptionQueue,
waku.conf, waku.dynamicBootstrapNodes, waku.rng, waku.key,
)
check:
waku.node.peerManager.wakuPeerStore.peers().anyIt(
it.enr.isSome() and it.enr.get().toUri() == validEnr
)
asyncTest "Invalid discv5 bootstrap node ENRs are ignored":
var conf = defaultTestWakuNodeConf()
let invalidEnr = "invalid-enr"
conf.discv5BootstrapNodes = @[invalidEnr]
let waku = Waku.init(conf).valueOr:
raiseAssert error
discard setupDiscoveryV5(
waku.node.enr, waku.node.peerManager, waku.node.topicSubscriptionQueue,
waku.conf, waku.dynamicBootstrapNodes, waku.rng, waku.key,
)
check:
not waku.node.peerManager.wakuPeerStore.peers().anyIt(
it.enr.isSome() and it.enr.get().toUri() == invalidEnr
)

View File

@ -381,6 +381,14 @@ proc setupDiscoveryV5*(
for enrUri in conf.discv5BootstrapNodes:
addBootstrapNode(enrUri, discv5BootstrapEnrs)
for enr in discv5BootstrapEnrs:
let peerInfoRes = enr.toRemotePeerInfo()
if peerInfoRes.isOk():
nodePeerManager.addPeer(peerInfoRes.get(), PeerOrigin.Discv5)
else:
debug "could not convert discv5 bootstrap node to peerInfo, not adding peer to Peer Store",
enr = enr.toUri(), error = peerInfoRes.error
discv5BootstrapEnrs.add(dynamicBootstrapEnrs)
let discv5Config = DiscoveryConfig.init(

View File

@ -399,12 +399,18 @@ proc startNode*(
# Connect to configured static nodes
if conf.staticnodes.len > 0:
if not conf.relay:
return err("waku relay (--relay=true) should be set when configuring staticnodes")
try:
await connectToNodes(node, conf.staticnodes, "static")
except CatchableError:
return err("failed to connect to static nodes: " & getCurrentExceptionMsg())
if dynamicBootstrapNodes.len > 0:
if not conf.relay:
return err(
"waku relay (--relay=true) should be set when configuring dynamicBootstrapNodes"
)
info "Connecting to dynamic bootstrap peers"
try:
await connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap")

10
waku/waku_core/codecs.nim Normal file
View File

@ -0,0 +1,10 @@
const
WakuRelayCodec* = "/vac/waku/relay/2.0.0"
WakuStoreCodec* = "/vac/waku/store-query/3.0.0"
WakuFilterSubscribeCodec* = "/vac/waku/filter-subscribe/2.0.0-beta1"
WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1"
WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
WakuSyncCodec* = "/vac/waku/sync/1.0.0"
WakuMetadataCodec* = "/vac/waku/metadata/1.0.0"
WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1"
WakuLegacyStoreCodec* = "/vac/waku/store/2.0.0-beta4"

View File

@ -4,6 +4,7 @@ import
std/[options, sequtils, strutils, uri, net],
results,
chronos,
chronicles,
eth/keys,
eth/p2p/discoveryv5/enr,
eth/net/utils,
@ -16,6 +17,7 @@ import
libp2p/peerinfo,
libp2p/routing_record,
json_serialization
import ../waku_enr/capabilities
type
Connectedness* = enum
@ -243,7 +245,17 @@ proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] =
if addrs.len == 0:
return err("enr: no addresses in record")
return ok(RemotePeerInfo.init(peerId, addrs, some(enr)))
let protocolsRes = catch:
enr.getCapabilitiesCodecs()
var protocols: seq[string]
if not protocolsRes.isErr():
protocols = protocolsRes.get()
else:
error "Could not retrieve supported protocols from enr",
peerId = peerId, msg = protocolsRes.error.msg
return ok(RemotePeerInfo.init(peerId, addrs, some(enr), protocols))
converter toRemotePeerInfo*(peerRecord: PeerRecord): RemotePeerInfo =
## Converts peer records to dialable RemotePeerInfo
@ -256,7 +268,7 @@ converter toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo =
RemotePeerInfo(
peerId: peerInfo.peerId,
addrs: peerInfo.listenAddrs,
enr: none(Record),
enr: none(enr.Record),
protocols: peerInfo.protocols,
agent: peerInfo.agentVersion,
protoVersion: peerInfo.protoVersion,

View File

@ -1,7 +1,8 @@
{.push raises: [].}
import std/[options, bitops, sequtils, net], results, eth/keys, libp2p/crypto/crypto
import ../common/enr
import
std/[options, bitops, sequtils, net, tables], results, eth/keys, libp2p/crypto/crypto
import ../common/enr, ../waku_core/codecs
const CapabilitiesEnrField* = "waku2"
@ -20,6 +21,14 @@ type
Lightpush = 3
Sync = 4
const capabilityToCodec = {
Capabilities.Relay: WakuRelayCodec,
Capabilities.Store: WakuStoreCodec,
Capabilities.Filter: WakuFilterSubscribeCodec,
Capabilities.Lightpush: WakuLightPushCodec,
Capabilities.Sync: WakuSyncCodec,
}.toTable
func init*(
T: type CapabilitiesBitfield, lightpush, filter, store, relay, sync: bool = false
): T =
@ -101,3 +110,7 @@ proc getCapabilities*(r: Record): seq[Capabilities] =
let bitfield = bitfieldOpt.get()
bitfield.toCapabilities()
proc getCapabilitiesCodecs*(r: Record): seq[string] {.raises: [ValueError].} =
let capabilities = r.getCapabilities()
return capabilities.mapIt(capabilityToCodec[it])

View File

@ -2,9 +2,8 @@
import results
const
WakuFilterSubscribeCodec* = "/vac/waku/filter-subscribe/2.0.0-beta1"
WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1"
from ../waku_core/codecs import WakuFilterSubscribeCodec, WakuFilterPushCodec
export WakuFilterSubscribeCodec, WakuFilterPushCodec
type
FilterSubscribeErrorKind* {.pure.} = enum

View File

@ -3,7 +3,8 @@
import results, chronos, libp2p/peerid
import ../waku_core
const WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
from ../waku_core/codecs import WakuLightPushCodec
export WakuLightPushCodec
type WakuLightPushResult*[T] = Result[T, string]

View File

@ -12,10 +12,12 @@ import
eth/p2p/discoveryv5/enr
import ../common/nimchronos, ../common/enr, ../waku_core, ../waku_enr, ./rpc
from ../waku_core/codecs import WakuMetadataCodec
export WakuMetadataCodec
logScope:
topics = "waku metadata"
const WakuMetadataCodec* = "/vac/waku/metadata/1.0.0"
const RpcResponseMaxBytes* = 1024
type WakuMetadata* = ref object of LPProtocol

View File

@ -16,6 +16,9 @@ import
./rpc_codec,
../common/rate_limit/request_limiter
from ../waku_core/codecs import WakuPeerExchangeCodec
export WakuPeerExchangeCodec
declarePublicGauge waku_px_peers_received_total,
"number of ENRs received via peer exchange"
declarePublicGauge waku_px_peers_received_unknown,
@ -37,8 +40,6 @@ const
CacheRefreshInterval = 10.minutes
DefaultPXNumPeersReq* = 5.uint64()
WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1"
# Error types (metric label values)
const
dialFailure = "dial_failure"

View File

@ -20,11 +20,12 @@ import
libp2p/switch
import ../waku_core, ./message_id, ../node/delivery_monitor/publish_observer
from ../waku_core/codecs import WakuRelayCodec
export WakuRelayCodec
logScope:
topics = "waku relay"
const WakuRelayCodec* = "/vac/waku/relay/2.0.0"
# see: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#overview-of-new-parameters
const TopicParameters = TopicParams(
topicWeight: 1,

View File

@ -3,9 +3,10 @@
import std/[options], results
import ../waku_core, ../common/paging
const
WakuStoreCodec* = "/vac/waku/store-query/3.0.0"
from ../waku_core/codecs import WakuStoreCodec
export WakuStoreCodec
const
DefaultPageSize*: uint64 = 20
MaxPageSize*: uint64 = 100

View File

@ -3,9 +3,10 @@
import std/[options, sequtils], results, stew/byteutils, nimcrypto/sha2
import ../waku_core, ../common/paging
const
WakuLegacyStoreCodec* = "/vac/waku/store/2.0.0-beta4"
from ../waku_core/codecs import WakuLegacyStoreCodec
export WakuLegacyStoreCodec
const
DefaultPageSize*: uint64 = 20
MaxPageSize*: uint64 = 100

View File

@ -3,11 +3,13 @@
import std/[options], chronos, libp2p/peerId
import ../waku_core
from ../waku_core/codecs import WakuSyncCodec
export WakuSyncCodec
const
DefaultSyncInterval*: Duration = 5.minutes
DefaultSyncRange*: Duration = 1.hours
RetryDelay*: Duration = 30.seconds
WakuSyncCodec* = "/vac/waku/sync/1.0.0"
DefaultMaxFrameSize* = 1048576 # 1 MiB
DefaultGossipSubJitter*: Duration = 20.seconds