mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-05-20 13:49:51 +00:00
fix(discv5): add bootnode filter exception (#2267)
This commit is contained in:
parent
bd7570faa3
commit
7b4f4bb18c
@ -52,7 +52,7 @@ type WakuDiscoveryV5* = ref object
|
|||||||
peerManager: Option[PeerManager]
|
peerManager: Option[PeerManager]
|
||||||
topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]
|
topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]
|
||||||
|
|
||||||
proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] =
|
proc shardingPredicate*(record: Record, bootnodes: seq[Record] = @[]): Option[WakuDiscv5Predicate] =
|
||||||
## Filter peers based on relay sharding information
|
## Filter peers based on relay sharding information
|
||||||
let typedRecord = record.toTyped().valueOr:
|
let typedRecord = record.toTyped().valueOr:
|
||||||
debug "peer filtering failed", reason=error
|
debug "peer filtering failed", reason=error
|
||||||
@ -65,8 +65,9 @@ proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] =
|
|||||||
debug "peer filtering updated"
|
debug "peer filtering updated"
|
||||||
|
|
||||||
let predicate = proc(record: waku_enr.Record): bool =
|
let predicate = proc(record: waku_enr.Record): bool =
|
||||||
record.getCapabilities().len > 0 and #RFC 31 requirement
|
bootnodes.contains(record) or # Temp. Bootnode exception
|
||||||
nodeShard.shardIds.anyIt(record.containsShard(nodeShard.clusterId, it)) #RFC 64 guideline
|
(record.getCapabilities().len > 0 and #RFC 31 requirement
|
||||||
|
nodeShard.shardIds.anyIt(record.containsShard(nodeShard.clusterId, it))) #RFC 64 guideline
|
||||||
|
|
||||||
return some(predicate)
|
return some(predicate)
|
||||||
|
|
||||||
@ -78,26 +79,13 @@ proc new*(
|
|||||||
peerManager: Option[PeerManager] = none(PeerManager),
|
peerManager: Option[PeerManager] = none(PeerManager),
|
||||||
queue: AsyncEventQueue[SubscriptionEvent] = newAsyncEventQueue[SubscriptionEvent](30),
|
queue: AsyncEventQueue[SubscriptionEvent] = newAsyncEventQueue[SubscriptionEvent](30),
|
||||||
): T =
|
): T =
|
||||||
let shardPredOp =
|
|
||||||
if record.isSome(): shardingPredicate(record.get())
|
|
||||||
else: none(WakuDiscv5Predicate)
|
|
||||||
|
|
||||||
var bootstrapRecords = conf.bootstrapRecords
|
|
||||||
|
|
||||||
# Remove bootstrap nodes with which we don't share shards.
|
|
||||||
if shardPredOp.isSome():
|
|
||||||
bootstrapRecords.keepIf(shardPredOp.get())
|
|
||||||
|
|
||||||
if conf.bootstrapRecords.len > 0 and bootstrapRecords.len == 0:
|
|
||||||
warn "No discv5 bootstrap nodes share this node configured shards"
|
|
||||||
|
|
||||||
let protocol = newProtocol(
|
let protocol = newProtocol(
|
||||||
rng = rng,
|
rng = rng,
|
||||||
config = conf.discv5Config.get(protocol.defaultDiscoveryConfig),
|
config = conf.discv5Config.get(protocol.defaultDiscoveryConfig),
|
||||||
bindPort = conf.port,
|
bindPort = conf.port,
|
||||||
bindIp = conf.address,
|
bindIp = conf.address,
|
||||||
privKey = conf.privateKey,
|
privKey = conf.privateKey,
|
||||||
bootstrapRecords = bootstrapRecords,
|
bootstrapRecords = conf.bootstrapRecords,
|
||||||
enrAutoUpdate = conf.autoupdateRecord,
|
enrAutoUpdate = conf.autoupdateRecord,
|
||||||
previousRecord = record,
|
previousRecord = record,
|
||||||
enrIp = none(IpAddress),
|
enrIp = none(IpAddress),
|
||||||
@ -105,6 +93,10 @@ proc new*(
|
|||||||
enrUdpPort = none(Port),
|
enrUdpPort = none(Port),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
let shardPredOp =
|
||||||
|
if record.isSome(): shardingPredicate(record.get(), conf.bootstrapRecords)
|
||||||
|
else: none(WakuDiscv5Predicate)
|
||||||
|
|
||||||
WakuDiscoveryV5(
|
WakuDiscoveryV5(
|
||||||
conf: conf,
|
conf: conf,
|
||||||
protocol: protocol,
|
protocol: protocol,
|
||||||
@ -239,7 +231,7 @@ proc subscriptionsListener(wd: WakuDiscoveryV5) {.async.} =
|
|||||||
|
|
||||||
debug "ENR updated successfully"
|
debug "ENR updated successfully"
|
||||||
|
|
||||||
wd.predicate = shardingPredicate(wd.protocol.localNode.record)
|
wd.predicate = shardingPredicate(wd.protocol.localNode.record, wd.protocol.bootstrapRecords)
|
||||||
|
|
||||||
wd.topicSubscriptionQueue.unregister(key)
|
wd.topicSubscriptionQueue.unregister(key)
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user