mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-02-18 17:37:33 +00:00
Faux implementation of GossipSub based on RLPx
This commit is contained in:
parent
10ed2bd5b9
commit
c71f89e211
@ -189,7 +189,7 @@ proc processBlocks*(node: BeaconNode) {.async.} =
|
|||||||
|
|
||||||
if b.slot mod EPOCH_LENGTH == 0:
|
if b.slot mod EPOCH_LENGTH == 0:
|
||||||
node.scheduleCycleActions()
|
node.scheduleCycleActions()
|
||||||
node.attestations.discardHistoryToSlot(b.slot)
|
node.attestations.discardHistoryToSlot(b.slot.int)
|
||||||
|
|
||||||
node.network.subscribe(topicAttestations) do (a: Attestation):
|
node.network.subscribe(topicAttestations) do (a: Attestation):
|
||||||
# TODO
|
# TODO
|
||||||
|
@ -51,8 +51,9 @@ iterator each*(pool: AttestationPool,
|
|||||||
|
|
||||||
proc discardHistoryToSlot*(pool: var AttestationPool, slot: int) =
|
proc discardHistoryToSlot*(pool: var AttestationPool, slot: int) =
|
||||||
## The index is treated inclusively
|
## The index is treated inclusively
|
||||||
let slotIdx = slot - pool.startingSlot
|
if slot < pool.startingSlot:
|
||||||
if slotIdx < 0: return
|
return
|
||||||
|
let slotIdx = int(slot - pool.startingSlot)
|
||||||
pool.attestations.shrink(fromFirst = slotIdx + 1)
|
pool.attestations.shrink(fromFirst = slotIdx + 1)
|
||||||
|
|
||||||
proc getLatestAttestation*(pool: AttestationPool, validator: ValidatorRecord) =
|
proc getLatestAttestation*(pool: AttestationPool, validator: ValidatorRecord) =
|
||||||
|
@ -1,16 +1,23 @@
|
|||||||
import
|
import
|
||||||
tables, sets,
|
tables, sets, macros, base64,
|
||||||
asyncdispatch2, chronicles, rlp, eth_p2p, eth_p2p/rlpx
|
asyncdispatch2, nimcrypto/sysrand, chronicles, rlp, eth_p2p, eth_p2p/rlpx
|
||||||
|
|
||||||
type
|
type
|
||||||
TopicMsgHandler = proc(data: seq[byte]): Future[void]
|
TopicMsgHandler = proc(data: seq[byte]): Future[void]
|
||||||
|
|
||||||
GossibSubPeer = ref object
|
GossibSubPeer = ref object
|
||||||
sentMessages: HashSet[string]
|
sentMessages: HashSet[string]
|
||||||
|
subscribedFor: HashSet[string]
|
||||||
|
|
||||||
GossipSubNetwork = ref object
|
GossipSubNetwork = ref object
|
||||||
deliveredMessages: Table[Peer, HashSet[string]]
|
topicSubscribers: Table[string, TopicMsgHandler]
|
||||||
topicSubscribers: Table[string, seq[TopicMsgHandler]]
|
|
||||||
|
proc initProtocolState*(network: GossipSubNetwork, node: EthereumNode) =
|
||||||
|
network.topicSubscribers = initTable[string, TopicMsgHandler]()
|
||||||
|
|
||||||
|
proc initProtocolState*(peer: GossibSubPeer, node: EthereumNode) =
|
||||||
|
peer.sentMessages = initSet[string]()
|
||||||
|
peer.subscribedFor = initSet[string]()
|
||||||
|
|
||||||
p2pProtocol GossipSub(version = 1,
|
p2pProtocol GossipSub(version = 1,
|
||||||
shortName = "gss",
|
shortName = "gss",
|
||||||
@ -19,20 +26,44 @@ p2pProtocol GossipSub(version = 1,
|
|||||||
# This is a very barebones emulation of the GossipSub protocol
|
# This is a very barebones emulation of the GossipSub protocol
|
||||||
# available in LibP2P:
|
# available in LibP2P:
|
||||||
|
|
||||||
proc interestedIn(peer: Peer, topic: string)
|
proc subscribeFor(peer: Peer, topic: string) =
|
||||||
proc emit(peer: Peer, topic: string, msgId: string, data: openarray[byte])
|
peer.state.subscribedFor.incl topic
|
||||||
|
|
||||||
|
proc emit(peer: Peer, topic: string, msgId: string, data: openarray[byte]) =
|
||||||
|
for p in peer.network.peers(GossipSub):
|
||||||
|
if msgId notin p.state.sentMessages and topic in p.state.subscribedFor:
|
||||||
|
asyncCheck p.emit(topic, msgId, data)
|
||||||
|
|
||||||
|
let handler = peer.networkState.topicSubscribers.getOrDefault(topic)
|
||||||
|
if handler != nil:
|
||||||
|
await handler(data)
|
||||||
|
|
||||||
proc subscribeImpl(node: EthereumNode,
|
proc subscribeImpl(node: EthereumNode,
|
||||||
topic: string,
|
topic: string,
|
||||||
subscriber: TopicMsgHandler) =
|
subscriber: TopicMsgHandler) =
|
||||||
discard
|
var gossipNet = node.protocolState(GossipSub)
|
||||||
|
gossipNet.topicSubscribers[topic] = subscriber
|
||||||
|
for peer in node.peers(GossipSub): peer.subscribeFor(topic)
|
||||||
|
|
||||||
proc broadcastImpl(node: EthereumNode, topic: string, data: seq[byte]) =
|
proc broadcastImpl(node: EthereumNode, topic: string, msgBytes: seq[byte]): seq[Future[void]] {.gcsafe.} =
|
||||||
discard
|
var randBytes: array[10, byte];
|
||||||
|
if randomBytes(randBytes) != 10:
|
||||||
|
warn "Failed to generate random message id"
|
||||||
|
let msgId = base64.encode(randBytes)
|
||||||
|
|
||||||
|
for peer in node.peers(GossipSub):
|
||||||
|
if topic in peer.state(GossipSub).subscribedFor:
|
||||||
|
result.add peer.emit(topic, msgId, msgBytes)
|
||||||
|
|
||||||
|
proc makeMessageHandler[MsgType](userHandler: proc(msg: MsgType): Future[void]): TopicMsgHandler =
|
||||||
|
result = proc (data: seq[byte]): Future[void] =
|
||||||
|
userHandler rlp.decode(data, MsgType)
|
||||||
|
|
||||||
macro subscribe*(node: EthereumNode, topic: string, handler: untyped): untyped =
|
macro subscribe*(node: EthereumNode, topic: string, handler: untyped): untyped =
|
||||||
discard
|
handler.addPragma ident"async"
|
||||||
|
result = newCall(bindSym"subscribeImpl",
|
||||||
|
node, topic, newCall(bindSym"makeMessageHandler", handler))
|
||||||
|
|
||||||
proc broadcast*(node: EthereumNode, topic: string, data: auto) {.async.} =
|
proc broadcast*(node: EthereumNode, topic: string, data: auto) {.async.} =
|
||||||
discard
|
await all(node.broadcastImpl(topic, rlp.encode(data)))
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user