Simplified gossipsub.broadcast further

This commit is contained in:
Zahary Karadjov 2019-03-28 13:08:56 +02:00 committed by zah
parent dd72218f48
commit 6d4470a8c8
2 changed files with 13 additions and 19 deletions

View File

@ -1,6 +1,5 @@
import
options, chronos, json_serialization, strutils,
chronicles,
spec/digest, version, conf
const

View File

@ -64,30 +64,18 @@ p2pProtocol GossipSub(version = 1,
if handler != nil:
handler(msg)
proc broadcastIMPL(node: EthereumNode, topic: string, msg: string): seq[Future[void]] {.gcsafe.} =
var randBytes: array[10, byte];
if randomBytes(randBytes) != 10:
warn "Failed to generate random message id"
let msgId = base64.encode(randBytes)
trace "Sending GossipSub message", msgId
for peer in node.peers(GossipSub):
if topic in peer.state(GossipSub).subscribedFor:
result.add peer.tryEmitting(topic, msgId, msg)
proc trySubscribing(peer: Peer, topic: string) =
var fut = peer.subscribeFor(topic)
fut.addCallback do (arg: pointer):
if fut.failed:
warn "Failed to subscribe to topic with GossipSub peer", topic, peer
debug "Failed to subscribe to topic with GossipSub peer", topic, peer
proc tryEmitting(peer: Peer, topic: string,
msgId: string, msg: string): Future[void] =
var fut = peer.emit(topic, msgId, msg)
fut.addCallback do (arg: pointer):
if fut.failed:
warn "GossipSub message not delivered to Peer", peer
debug "GossipSub message not delivered to Peer", peer
return fut
proc subscribe*[MsgType](node: EthereumNode,
@ -101,8 +89,15 @@ proc subscribe*[MsgType](node: EthereumNode,
peer.trySubscribing(topic)
proc broadcast*(node: EthereumNode, topic: string, msg: auto) =
# We are intentionally using `yield` here, so the broadcast call can
# never fail. Please note that errors are logged through a callback
# set in `tryEmitting`
traceAsyncErrors all(node.broadcastIMPL(topic, Json.encode(msg)))
var randBytes: array[10, byte];
if randomBytes(randBytes) != 10:
warn "Failed to generate random message id"
let msg = Json.encode(msg)
let msgId = base64.encode(randBytes)
trace "Sending GossipSub message", msgId
for peer in node.peers(GossipSub):
if topic in peer.state(GossipSub).subscribedFor:
traceAsyncErrors peer.tryEmitting(topic, msgId, msg)