70 lines
2.6 KiB
Nim
70 lines
2.6 KiB
Nim
import
|
|
tables, sets, macros, base64,
|
|
asyncdispatch2, nimcrypto/sysrand, chronicles, rlp, eth_p2p, eth_p2p/rlpx
|
|
|
|
type
|
|
TopicMsgHandler = proc(data: seq[byte]): Future[void]
|
|
|
|
GossibSubPeer = ref object
|
|
sentMessages: HashSet[string]
|
|
subscribedFor: HashSet[string]
|
|
|
|
GossipSubNetwork = ref object
|
|
topicSubscribers: Table[string, TopicMsgHandler]
|
|
|
|
proc initProtocolState*(network: GossipSubNetwork, node: EthereumNode) =
|
|
network.topicSubscribers = initTable[string, TopicMsgHandler]()
|
|
|
|
proc initProtocolState*(peer: GossibSubPeer, node: EthereumNode) =
|
|
peer.sentMessages = initSet[string]()
|
|
peer.subscribedFor = initSet[string]()
|
|
|
|
p2pProtocol GossipSub(version = 1,
|
|
shortName = "gss",
|
|
peerState = GossibSubPeer,
|
|
networkState = GossipSubNetwork):
|
|
# This is a very barebones emulation of the GossipSub protocol
|
|
# available in LibP2P:
|
|
|
|
proc subscribeFor(peer: Peer, topic: string) =
|
|
peer.state.subscribedFor.incl topic
|
|
|
|
proc emit(peer: Peer, topic: string, msgId: string, data: openarray[byte]) =
|
|
for p in peer.network.peers(GossipSub):
|
|
if msgId notin p.state.sentMessages and topic in p.state.subscribedFor:
|
|
asyncCheck p.emit(topic, msgId, data)
|
|
|
|
let handler = peer.networkState.topicSubscribers.getOrDefault(topic)
|
|
if handler != nil:
|
|
await handler(data)
|
|
|
|
proc subscribeImpl(node: EthereumNode,
|
|
topic: string,
|
|
subscriber: TopicMsgHandler) =
|
|
var gossipNet = node.protocolState(GossipSub)
|
|
gossipNet.topicSubscribers[topic] = subscriber
|
|
for peer in node.peers(GossipSub): discard peer.subscribeFor(topic)
|
|
|
|
proc broadcastImpl(node: EthereumNode, topic: string, msgBytes: seq[byte]): seq[Future[void]] {.gcsafe.} =
|
|
var randBytes: array[10, byte];
|
|
if randomBytes(randBytes) != 10:
|
|
warn "Failed to generate random message id"
|
|
let msgId = base64.encode(randBytes)
|
|
|
|
for peer in node.peers(GossipSub):
|
|
if topic in peer.state(GossipSub).subscribedFor:
|
|
result.add peer.emit(topic, msgId, msgBytes)
|
|
|
|
proc makeMessageHandler[MsgType](userHandler: proc(msg: MsgType): Future[void]): TopicMsgHandler =
|
|
result = proc (data: seq[byte]): Future[void] =
|
|
userHandler rlp.decode(data, MsgType)
|
|
|
|
macro subscribe*(node: EthereumNode, topic: string, handler: untyped): untyped =
|
|
handler.addPragma ident"async"
|
|
result = newCall(bindSym"subscribeImpl",
|
|
node, topic, newCall(bindSym"makeMessageHandler", handler))
|
|
|
|
proc broadcast*(node: EthereumNode, topic: string, data: auto) {.async.} =
|
|
await all(node.broadcastImpl(topic, rlp.encode(data)))
|
|
|