nimbus-eth2/beacon_chain/gossipsub_protocol.nim

104 lines
3.5 KiB
Nim
Raw Normal View History

import
tables, sets, macros, base64,
chronos, nimcrypto/sysrand, chronicles, json_serialization,
eth/[p2p, rlp, async_utils], eth/p2p/[rlpx, peer_pool],
spec/[datatypes, crypto],
tracing/stacktraces
type
TopicMsgHandler = proc (msg: string)
2018-11-26 13:33:06 +00:00
GossipSubPeer* = ref object
2018-11-29 01:08:34 +00:00
sentMessages: HashSet[string]
subscribedFor: HashSet[string]
2018-11-29 01:08:34 +00:00
GossipSubNetwork* = ref object
topicSubscribers: Table[string, TopicMsgHandler]
handledMessages: HashSet[string]
proc initProtocolState*(network: GossipSubNetwork, _: EthereumNode) =
network.topicSubscribers = initTable[string, TopicMsgHandler]()
network.handledMessages = initSet[string]()
proc initProtocolState*(peer: GossipSubPeer, _: Peer) =
peer.sentMessages = initSet[string]()
peer.subscribedFor = initSet[string]()
proc trySubscribing(peer: Peer, topic: string) {.gcsafe.}
proc tryEmitting(peer: Peer, topic: string,
msgId: string, msg: string): Future[void] {.gcsafe.}
2018-11-29 01:08:34 +00:00
p2pProtocol GossipSub(version = 1,
2019-08-15 16:00:12 +00:00
rlpxName = "gss",
peerState = GossipSubPeer,
2018-11-29 01:08:34 +00:00
networkState = GossipSubNetwork):
2018-11-26 13:33:06 +00:00
# This is a very barebones emulation of the GossipSub protocol
# available in LibP2P:
onPeerConnected do (peer: Peer):
2019-01-25 22:13:41 +00:00
info "GossipSub Peer connected", peer
let gossipNet = peer.networkState
for topic, _ in gossipNet.topicSubscribers:
peer.trySubscribing(topic)
onPeerDisconnected do (peer: Peer, reason: DisconnectionReason):
info "GossipSub Peer disconnected", peer, reason
proc subscribeFor(peer: Peer, topic: string) =
peer.state.subscribedFor.incl topic
proc emit(peer: Peer, topic: string, msgId: string, msg: string) =
if msgId in peer.networkState.handledMessages:
trace "Ignored previously handled message", msgId
return
peer.networkState.handledMessages.incl msgId
for p in peer.network.peers(GossipSub):
if msgId notin p.state.sentMessages and topic in p.state.subscribedFor:
p.state.sentMessages.incl msgId
traceAsyncErrors p.tryEmitting(topic, msgId, msg)
{.gcsafe.}:
let handler = peer.networkState.topicSubscribers.getOrDefault(topic)
if handler != nil:
handler(msg)
proc trySubscribing(peer: Peer, topic: string) =
var fut = peer.subscribeFor(topic)
fut.addCallback do (arg: pointer):
if fut.failed:
2019-03-28 11:08:56 +00:00
debug "Failed to subscribe to topic with GossipSub peer", topic, peer
proc tryEmitting(peer: Peer, topic: string,
msgId: string, msg: string): Future[void] =
var fut = peer.emit(topic, msgId, msg)
fut.addCallback do (arg: pointer):
if fut.failed:
2019-03-28 11:08:56 +00:00
debug "GossipSub message not delivered to Peer", peer
return fut
2018-11-26 13:33:06 +00:00
proc subscribe*[MsgType](node: EthereumNode,
topic: string,
userHandler: proc(msg: MsgType)) {.async.}=
var gossipNet = node.protocolState(GossipSub)
gossipNet.topicSubscribers[topic] = proc (msg: string) =
userHandler Json.decode(msg, MsgType)
2018-11-26 13:33:06 +00:00
for peer in node.peers(GossipSub):
peer.trySubscribing(topic)
2018-11-26 13:33:06 +00:00
proc broadcast*(node: EthereumNode, topic: string, msg: auto) =
2019-03-28 11:08:56 +00:00
var randBytes: array[10, byte];
if randomBytes(randBytes) != 10:
warn "Failed to generate random message id"
let msg = Json.encode(msg)
let msgId = base64.encode(randBytes)
trace "Sending GossipSub message", msgId
for peer in node.peers(GossipSub):
if topic in peer.state(GossipSub).subscribedFor:
traceAsyncErrors peer.tryEmitting(topic, msgId, msg)