mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-11 06:46:10 +00:00
Merge pull request #33 from status-im/gossip_sub
Faux implementation of GossipSub based on RLPx
This commit is contained in:
commit
d48f056be9
@ -190,7 +190,7 @@ proc processBlocks*(node: BeaconNode) {.async.} =
|
||||
|
||||
if b.slot mod EPOCH_LENGTH == 0:
|
||||
node.scheduleCycleActions()
|
||||
node.attestations.discardHistoryToSlot(b.slot)
|
||||
node.attestations.discardHistoryToSlot(b.slot.int)
|
||||
|
||||
node.network.subscribe(topicAttestations) do (a: Attestation):
|
||||
# TODO
|
||||
|
@ -51,8 +51,9 @@ iterator each*(pool: AttestationPool,
|
||||
|
||||
proc discardHistoryToSlot*(pool: var AttestationPool, slot: int) =
|
||||
## The index is treated inclusively
|
||||
let slotIdx = slot - pool.startingSlot
|
||||
if slotIdx < 0: return
|
||||
if slot < pool.startingSlot:
|
||||
return
|
||||
let slotIdx = int(slot - pool.startingSlot)
|
||||
pool.attestations.shrink(fromFirst = slotIdx + 1)
|
||||
|
||||
proc getLatestAttestation*(pool: AttestationPool, validator: ValidatorRecord) =
|
||||
|
@ -1,16 +1,23 @@
|
||||
import
|
||||
tables, sets,
|
||||
asyncdispatch2, chronicles, rlp, eth_p2p, eth_p2p/rlpx
|
||||
tables, sets, macros, base64,
|
||||
asyncdispatch2, nimcrypto/sysrand, chronicles, rlp, eth_p2p, eth_p2p/rlpx
|
||||
|
||||
type
|
||||
TopicMsgHandler = proc(data: seq[byte]): Future[void]
|
||||
|
||||
GossibSubPeer = ref object
|
||||
sentMessages: HashSet[string]
|
||||
subscribedFor: HashSet[string]
|
||||
|
||||
GossipSubNetwork = ref object
|
||||
deliveredMessages: Table[Peer, HashSet[string]]
|
||||
topicSubscribers: Table[string, seq[TopicMsgHandler]]
|
||||
topicSubscribers: Table[string, TopicMsgHandler]
|
||||
|
||||
proc initProtocolState*(network: GossipSubNetwork, node: EthereumNode) =
|
||||
network.topicSubscribers = initTable[string, TopicMsgHandler]()
|
||||
|
||||
proc initProtocolState*(peer: GossibSubPeer, node: EthereumNode) =
|
||||
peer.sentMessages = initSet[string]()
|
||||
peer.subscribedFor = initSet[string]()
|
||||
|
||||
p2pProtocol GossipSub(version = 1,
|
||||
shortName = "gss",
|
||||
@ -19,20 +26,44 @@ p2pProtocol GossipSub(version = 1,
|
||||
# This is a very barebones emulation of the GossipSub protocol
|
||||
# available in LibP2P:
|
||||
|
||||
proc interestedIn(peer: Peer, topic: string)
|
||||
proc emit(peer: Peer, topic: string, msgId: string, data: openarray[byte])
|
||||
proc subscribeFor(peer: Peer, topic: string) =
|
||||
peer.state.subscribedFor.incl topic
|
||||
|
||||
proc emit(peer: Peer, topic: string, msgId: string, data: openarray[byte]) =
|
||||
for p in peer.network.peers(GossipSub):
|
||||
if msgId notin p.state.sentMessages and topic in p.state.subscribedFor:
|
||||
asyncCheck p.emit(topic, msgId, data)
|
||||
|
||||
let handler = peer.networkState.topicSubscribers.getOrDefault(topic)
|
||||
if handler != nil:
|
||||
await handler(data)
|
||||
|
||||
proc subscribeImpl(node: EthereumNode,
|
||||
topic: string,
|
||||
subscriber: TopicMsgHandler) =
|
||||
discard
|
||||
var gossipNet = node.protocolState(GossipSub)
|
||||
gossipNet.topicSubscribers[topic] = subscriber
|
||||
for peer in node.peers(GossipSub): peer.subscribeFor(topic)
|
||||
|
||||
proc broadcastImpl(node: EthereumNode, topic: string, data: seq[byte]) =
|
||||
discard
|
||||
proc broadcastImpl(node: EthereumNode, topic: string, msgBytes: seq[byte]): seq[Future[void]] {.gcsafe.} =
|
||||
var randBytes: array[10, byte];
|
||||
if randomBytes(randBytes) != 10:
|
||||
warn "Failed to generate random message id"
|
||||
let msgId = base64.encode(randBytes)
|
||||
|
||||
for peer in node.peers(GossipSub):
|
||||
if topic in peer.state(GossipSub).subscribedFor:
|
||||
result.add peer.emit(topic, msgId, msgBytes)
|
||||
|
||||
proc makeMessageHandler[MsgType](userHandler: proc(msg: MsgType): Future[void]): TopicMsgHandler =
|
||||
result = proc (data: seq[byte]): Future[void] =
|
||||
userHandler rlp.decode(data, MsgType)
|
||||
|
||||
macro subscribe*(node: EthereumNode, topic: string, handler: untyped): untyped =
|
||||
discard
|
||||
handler.addPragma ident"async"
|
||||
result = newCall(bindSym"subscribeImpl",
|
||||
node, topic, newCall(bindSym"makeMessageHandler", handler))
|
||||
|
||||
proc broadcast*(node: EthereumNode, topic: string, data: auto) {.async.} =
|
||||
discard
|
||||
await all(node.broadcastImpl(topic, rlp.encode(data)))
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user