mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-05-17 20:29:29 +00:00
chore: remove metadata protocol dependency on enr, relax check when nwaku is edge node (#3519)
* remove metadata protocol dep on enr, do not disconnect peers based on shards mismatch
This commit is contained in:
parent
9de9f9e115
commit
accf6a2561
@ -652,7 +652,7 @@ when isMainModule:
|
|||||||
error "failed to setup RLN", err = getCurrentExceptionMsg()
|
error "failed to setup RLN", err = getCurrentExceptionMsg()
|
||||||
quit 1
|
quit 1
|
||||||
|
|
||||||
node.mountMetadata(conf.clusterId).isOkOr:
|
node.mountMetadata(conf.clusterId, conf.shards).isOkOr:
|
||||||
error "failed to mount waku metadata protocol: ", err = error
|
error "failed to mount waku metadata protocol: ", err = error
|
||||||
quit 1
|
quit 1
|
||||||
|
|
||||||
|
|||||||
@ -256,7 +256,7 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} =
|
|||||||
error "failed to mount libp2p ping protocol: " & getCurrentExceptionMsg()
|
error "failed to mount libp2p ping protocol: " & getCurrentExceptionMsg()
|
||||||
quit(QuitFailure)
|
quit(QuitFailure)
|
||||||
|
|
||||||
node.mountMetadata(conf.clusterId).isOkOr:
|
node.mountMetadata(conf.clusterId, conf.shards).isOkOr:
|
||||||
error "failed to mount metadata protocol", error
|
error "failed to mount metadata protocol", error
|
||||||
quit(QuitFailure)
|
quit(QuitFailure)
|
||||||
|
|
||||||
|
|||||||
@ -76,7 +76,9 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
|
|||||||
builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet()
|
builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet()
|
||||||
let node = builder.build().tryGet()
|
let node = builder.build().tryGet()
|
||||||
|
|
||||||
node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol")
|
node.mountMetadata(clusterId, shardId).expect(
|
||||||
|
"failed to mount waku metadata protocol"
|
||||||
|
)
|
||||||
await node.mountFilterClient()
|
await node.mountFilterClient()
|
||||||
|
|
||||||
await node.start()
|
await node.start()
|
||||||
|
|||||||
@ -68,7 +68,9 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
|||||||
builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet()
|
builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet()
|
||||||
let node = builder.build().tryGet()
|
let node = builder.build().tryGet()
|
||||||
|
|
||||||
node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol")
|
node.mountMetadata(clusterId, shardId).expect(
|
||||||
|
"failed to mount waku metadata protocol"
|
||||||
|
)
|
||||||
node.mountLegacyLightPushClient()
|
node.mountLegacyLightPushClient()
|
||||||
|
|
||||||
await node.start()
|
await node.start()
|
||||||
|
|||||||
@ -34,8 +34,10 @@ suite "Peer Manager":
|
|||||||
)
|
)
|
||||||
|
|
||||||
# And both mount metadata and filter
|
# And both mount metadata and filter
|
||||||
discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic
|
discard
|
||||||
discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic
|
client.mountMetadata(0, @[1'u16]) # clusterId irrelevant, overridden by topic
|
||||||
|
discard
|
||||||
|
server.mountMetadata(0, @[0'u16]) # clusterId irrelevant, overridden by topic
|
||||||
await client.mountFilterClient()
|
await client.mountFilterClient()
|
||||||
await server.mountFilter()
|
await server.mountFilter()
|
||||||
|
|
||||||
@ -69,8 +71,10 @@ suite "Peer Manager":
|
|||||||
)
|
)
|
||||||
|
|
||||||
# And both mount metadata and relay
|
# And both mount metadata and relay
|
||||||
discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic
|
discard
|
||||||
discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic
|
client.mountMetadata(0, @[1'u16]) # clusterId irrelevant, overridden by topic
|
||||||
|
discard
|
||||||
|
server.mountMetadata(0, @[0'u16]) # clusterId irrelevant, overridden by topic
|
||||||
(await client.mountRelay()).isOkOr:
|
(await client.mountRelay()).isOkOr:
|
||||||
assert false, "Failed to mount relay"
|
assert false, "Failed to mount relay"
|
||||||
(await server.mountRelay()).isOkOr:
|
(await server.mountRelay()).isOkOr:
|
||||||
@ -105,8 +109,10 @@ suite "Peer Manager":
|
|||||||
)
|
)
|
||||||
|
|
||||||
# And both mount metadata and relay
|
# And both mount metadata and relay
|
||||||
discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic
|
discard
|
||||||
discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic
|
client.mountMetadata(0, @[1'u16]) # clusterId irrelevant, overridden by topic
|
||||||
|
discard
|
||||||
|
server.mountMetadata(0, @[0'u16]) # clusterId irrelevant, overridden by topic
|
||||||
(await client.mountRelay()).isOkOr:
|
(await client.mountRelay()).isOkOr:
|
||||||
assert false, "Failed to mount relay"
|
assert false, "Failed to mount relay"
|
||||||
(await server.mountRelay()).isOkOr:
|
(await server.mountRelay()).isOkOr:
|
||||||
|
|||||||
@ -274,8 +274,8 @@ procSuite "Peer Manager":
|
|||||||
)
|
)
|
||||||
node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023))
|
node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023))
|
||||||
|
|
||||||
node1.mountMetadata(0).expect("Mounted Waku Metadata")
|
node1.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata")
|
||||||
node2.mountMetadata(0).expect("Mounted Waku Metadata")
|
node2.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata")
|
||||||
|
|
||||||
await node1.start()
|
await node1.start()
|
||||||
await node2.start()
|
await node2.start()
|
||||||
@ -313,7 +313,7 @@ procSuite "Peer Manager":
|
|||||||
peerStorage = storage,
|
peerStorage = storage,
|
||||||
)
|
)
|
||||||
|
|
||||||
node3.mountMetadata(0).expect("Mounted Waku Metadata")
|
node3.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata")
|
||||||
|
|
||||||
await node3.start()
|
await node3.start()
|
||||||
|
|
||||||
@ -347,8 +347,8 @@ procSuite "Peer Manager":
|
|||||||
)
|
)
|
||||||
node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023))
|
node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023))
|
||||||
|
|
||||||
node1.mountMetadata(0).expect("Mounted Waku Metadata")
|
node1.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata")
|
||||||
node2.mountMetadata(0).expect("Mounted Waku Metadata")
|
node2.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata")
|
||||||
|
|
||||||
await node1.start()
|
await node1.start()
|
||||||
await node2.start()
|
await node2.start()
|
||||||
@ -386,7 +386,7 @@ procSuite "Peer Manager":
|
|||||||
peerStorage = storage,
|
peerStorage = storage,
|
||||||
)
|
)
|
||||||
|
|
||||||
node3.mountMetadata(0).expect("Mounted Waku Metadata")
|
node3.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata")
|
||||||
|
|
||||||
await node3.start()
|
await node3.start()
|
||||||
|
|
||||||
@ -439,9 +439,9 @@ procSuite "Peer Manager":
|
|||||||
subscribeShards = @[uint16(0)],
|
subscribeShards = @[uint16(0)],
|
||||||
)
|
)
|
||||||
|
|
||||||
node1.mountMetadata(3).expect("Mounted Waku Metadata")
|
node1.mountMetadata(3, @[0'u16]).expect("Mounted Waku Metadata")
|
||||||
node2.mountMetadata(4).expect("Mounted Waku Metadata")
|
node2.mountMetadata(4, @[0'u16]).expect("Mounted Waku Metadata")
|
||||||
node3.mountMetadata(4).expect("Mounted Waku Metadata")
|
node3.mountMetadata(4, @[0'u16]).expect("Mounted Waku Metadata")
|
||||||
|
|
||||||
# Start nodes
|
# Start nodes
|
||||||
await allFutures([node1.start(), node2.start(), node3.start()])
|
await allFutures([node1.start(), node2.start(), node3.start()])
|
||||||
@ -548,7 +548,7 @@ procSuite "Peer Manager":
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Start them
|
# Start them
|
||||||
discard nodes.mapIt(it.mountMetadata(0))
|
discard nodes.mapIt(it.mountMetadata(0, @[0'u16]))
|
||||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||||
await allFutures(nodes.mapIt(it.start()))
|
await allFutures(nodes.mapIt(it.start()))
|
||||||
|
|
||||||
@ -621,7 +621,7 @@ procSuite "Peer Manager":
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Start them
|
# Start them
|
||||||
discard nodes.mapIt(it.mountMetadata(0))
|
discard nodes.mapIt(it.mountMetadata(0, @[0'u16]))
|
||||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||||
await allFutures(nodes.mapIt(it.start()))
|
await allFutures(nodes.mapIt(it.start()))
|
||||||
|
|
||||||
|
|||||||
@ -43,11 +43,12 @@ suite "Waku v2 Rest API - Admin":
|
|||||||
node3 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60604))
|
node3 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60604))
|
||||||
|
|
||||||
let clusterId = 1.uint16
|
let clusterId = 1.uint16
|
||||||
node1.mountMetadata(clusterId).isOkOr:
|
let shards: seq[uint16] = @[0]
|
||||||
|
node1.mountMetadata(clusterId, shards).isOkOr:
|
||||||
assert false, "Failed to mount metadata: " & $error
|
assert false, "Failed to mount metadata: " & $error
|
||||||
node2.mountMetadata(clusterId).isOkOr:
|
node2.mountMetadata(clusterId, shards).isOkOr:
|
||||||
assert false, "Failed to mount metadata: " & $error
|
assert false, "Failed to mount metadata: " & $error
|
||||||
node3.mountMetadata(clusterId).isOkOr:
|
node3.mountMetadata(clusterId, shards).isOkOr:
|
||||||
assert false, "Failed to mount metadata: " & $error
|
assert false, "Failed to mount metadata: " & $error
|
||||||
|
|
||||||
await allFutures(node1.start(), node2.start(), node3.start())
|
await allFutures(node1.start(), node2.start(), node3.start())
|
||||||
|
|||||||
@ -154,7 +154,8 @@ proc setupProtocols(
|
|||||||
## Optionally include persistent message storage.
|
## Optionally include persistent message storage.
|
||||||
## No protocols are started yet.
|
## No protocols are started yet.
|
||||||
|
|
||||||
node.mountMetadata(conf.clusterId).isOkOr:
|
var allShards = conf.subscribeShards
|
||||||
|
node.mountMetadata(conf.clusterId, allShards).isOkOr:
|
||||||
return err("failed to mount waku metadata protocol: " & error)
|
return err("failed to mount waku metadata protocol: " & error)
|
||||||
|
|
||||||
var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
|
var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
|
||||||
|
|||||||
@ -654,17 +654,6 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
|
|||||||
$clusterId
|
$clusterId
|
||||||
break guardClauses
|
break guardClauses
|
||||||
|
|
||||||
if (
|
|
||||||
pm.switch.peerStore.hasPeer(peerId, WakuRelayCodec) and
|
|
||||||
not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it))
|
|
||||||
):
|
|
||||||
let myShardsString = "[ " & toSeq(pm.wakuMetadata.shards).join(", ") & " ]"
|
|
||||||
let otherShardsString = "[ " & metadata.shards.join(", ") & " ]"
|
|
||||||
reason =
|
|
||||||
"no shards in common: my_shards = " & myShardsString & " others_shards = " &
|
|
||||||
otherShardsString
|
|
||||||
break guardClauses
|
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
info "disconnecting from peer", peerId = peerId, reason = reason
|
info "disconnecting from peer", peerId = peerId, reason = reason
|
||||||
@ -799,6 +788,7 @@ proc getOnlineStateObserver*(pm: PeerManager): OnOnlineStateChange =
|
|||||||
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
|
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
|
||||||
|
|
||||||
proc manageRelayPeers*(pm: PeerManager) {.async.} =
|
proc manageRelayPeers*(pm: PeerManager) {.async.} =
|
||||||
|
#TODO: this check should not be based on whether shards are present, but rather if relay is mounted
|
||||||
if pm.wakuMetadata.shards.len == 0:
|
if pm.wakuMetadata.shards.len == 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|||||||
@ -183,11 +183,14 @@ proc disconnectNode*(node: WakuNode, remotePeer: RemotePeerInfo) {.async.} =
|
|||||||
|
|
||||||
## Waku Metadata
|
## Waku Metadata
|
||||||
|
|
||||||
proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] =
|
proc mountMetadata*(
|
||||||
|
node: WakuNode, clusterId: uint32, shards: seq[uint16]
|
||||||
|
): Result[void, string] =
|
||||||
if not node.wakuMetadata.isNil():
|
if not node.wakuMetadata.isNil():
|
||||||
return err("Waku metadata already mounted, skipping")
|
return err("Waku metadata already mounted, skipping")
|
||||||
|
let shards32 = shards.mapIt(it.uint32)
|
||||||
let metadata = WakuMetadata.new(clusterId, node.enr, node.topicSubscriptionQueue)
|
let metadata =
|
||||||
|
WakuMetadata.new(clusterId, shards32.toHashSet(), some(node.topicSubscriptionQueue))
|
||||||
|
|
||||||
node.wakuMetadata = metadata
|
node.wakuMetadata = metadata
|
||||||
node.peerManager.wakuMetadata = metadata
|
node.peerManager.wakuMetadata = metadata
|
||||||
|
|||||||
@ -10,7 +10,7 @@ import
|
|||||||
libp2p/stream/connection,
|
libp2p/stream/connection,
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
eth/p2p/discoveryv5/enr
|
eth/p2p/discoveryv5/enr
|
||||||
import ../common/nimchronos, ../common/enr, ../waku_core, ../waku_enr, ./rpc
|
import ../common/nimchronos, ../waku_core, ./rpc
|
||||||
|
|
||||||
from ../waku_core/codecs import WakuMetadataCodec
|
from ../waku_core/codecs import WakuMetadataCodec
|
||||||
export WakuMetadataCodec
|
export WakuMetadataCodec
|
||||||
@ -23,7 +23,7 @@ const RpcResponseMaxBytes* = 1024
|
|||||||
type WakuMetadata* = ref object of LPProtocol
|
type WakuMetadata* = ref object of LPProtocol
|
||||||
clusterId*: uint32
|
clusterId*: uint32
|
||||||
shards*: HashSet[uint32]
|
shards*: HashSet[uint32]
|
||||||
topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]
|
topicSubscriptionQueue: Option[AsyncEventQueue[SubscriptionEvent]]
|
||||||
|
|
||||||
proc respond(
|
proc respond(
|
||||||
m: WakuMetadata, conn: Connection
|
m: WakuMetadata, conn: Connection
|
||||||
@ -49,7 +49,7 @@ proc request*(
|
|||||||
let readRes = catch:
|
let readRes = catch:
|
||||||
await conn.readLp(RpcResponseMaxBytes)
|
await conn.readLp(RpcResponseMaxBytes)
|
||||||
|
|
||||||
# close no watter what
|
# close no matter what
|
||||||
let closeRes = catch:
|
let closeRes = catch:
|
||||||
await conn.closeWithEof()
|
await conn.closeWithEof()
|
||||||
if closeRes.isErr():
|
if closeRes.isErr():
|
||||||
@ -104,21 +104,11 @@ proc initProtocolHandler(m: WakuMetadata) =
|
|||||||
proc new*(
|
proc new*(
|
||||||
T: type WakuMetadata,
|
T: type WakuMetadata,
|
||||||
clusterId: uint32,
|
clusterId: uint32,
|
||||||
enr: Record,
|
shards: HashSet[uint32],
|
||||||
queue: AsyncEventQueue[SubscriptionEvent],
|
queue: Option[AsyncEventQueue[SubscriptionEvent]],
|
||||||
): T =
|
): T =
|
||||||
var (cluster, shards) = (clusterId, initHashSet[uint32]())
|
|
||||||
|
|
||||||
let enrRes = enr.toTyped()
|
|
||||||
if enrRes.isOk():
|
|
||||||
let shardingRes = enrRes.get().relaySharding()
|
|
||||||
if shardingRes.isSome():
|
|
||||||
let relayShard = shardingRes.get()
|
|
||||||
cluster = uint32(relayShard.clusterId)
|
|
||||||
shards = toHashSet(relayShard.shardIds.mapIt(uint32(it)))
|
|
||||||
|
|
||||||
let wm =
|
let wm =
|
||||||
WakuMetadata(clusterId: cluster, shards: shards, topicSubscriptionQueue: queue)
|
WakuMetadata(clusterId: clusterId, shards: shards, topicSubscriptionQueue: queue)
|
||||||
|
|
||||||
wm.initProtocolHandler()
|
wm.initProtocolHandler()
|
||||||
|
|
||||||
@ -128,32 +118,31 @@ proc new*(
|
|||||||
|
|
||||||
proc subscriptionsListener(wm: WakuMetadata) {.async.} =
|
proc subscriptionsListener(wm: WakuMetadata) {.async.} =
|
||||||
## Listen for pubsub topics subscriptions changes
|
## Listen for pubsub topics subscriptions changes
|
||||||
|
if wm.topicSubscriptionQueue.isSome():
|
||||||
|
let key = wm.topicSubscriptionQueue.get().register()
|
||||||
|
|
||||||
let key = wm.topicSubscriptionQueue.register()
|
while wm.started:
|
||||||
|
let events = await wm.topicSubscriptionQueue.get().waitEvents(key)
|
||||||
|
|
||||||
while wm.started:
|
for event in events:
|
||||||
let events = await wm.topicSubscriptionQueue.waitEvents(key)
|
let parsedShard = RelayShard.parse(event.topic).valueOr:
|
||||||
|
continue
|
||||||
|
|
||||||
for event in events:
|
if parsedShard.clusterId != wm.clusterId:
|
||||||
let parsedShard = RelayShard.parse(event.topic).valueOr:
|
continue
|
||||||
continue
|
|
||||||
|
|
||||||
if parsedShard.clusterId != wm.clusterId:
|
case event.kind
|
||||||
continue
|
of PubsubSub:
|
||||||
|
wm.shards.incl(parsedShard.shardId)
|
||||||
|
of PubsubUnsub:
|
||||||
|
wm.shards.excl(parsedShard.shardId)
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
|
||||||
case event.kind
|
wm.topicSubscriptionQueue.get().unregister(key)
|
||||||
of PubsubSub:
|
|
||||||
wm.shards.incl(parsedShard.shardId)
|
|
||||||
of PubsubUnsub:
|
|
||||||
wm.shards.excl(parsedShard.shardId)
|
|
||||||
else:
|
|
||||||
continue
|
|
||||||
|
|
||||||
wm.topicSubscriptionQueue.unregister(key)
|
|
||||||
|
|
||||||
proc start*(wm: WakuMetadata) =
|
proc start*(wm: WakuMetadata) =
|
||||||
wm.started = true
|
wm.started = true
|
||||||
|
|
||||||
asyncSpawn wm.subscriptionsListener()
|
asyncSpawn wm.subscriptionsListener()
|
||||||
|
|
||||||
proc stop*(wm: WakuMetadata) =
|
proc stop*(wm: WakuMetadata) =
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user