Simplify send (#271)

* PubSubPeer.send single message

* gossipsub: simplify send further
This commit is contained in:
Jacek Sieka 2020-07-16 12:06:57 +02:00 committed by GitHub
parent 4112e04036
commit c76152f2c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 69 additions and 92 deletions

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import tables, sets, options, sequtils, random import std/[tables, sets, options, sequtils, random]
import chronos, chronicles, metrics import chronos, chronicles, metrics
import pubsub, import pubsub,
floodsub, floodsub,
@ -232,7 +232,7 @@ proc heartbeat(g: GossipSub) {.async.} =
var sent: seq[Future[void]] var sent: seq[Future[void]]
for peer in peers.keys: for peer in peers.keys:
if peer in g.peers: if peer in g.peers:
sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))]) sent &= g.peers[peer].send(RPCMsg(control: some(peers[peer])))
checkFutures(await allFinished(sent)) checkFutures(await allFinished(sent))
g.mcache.shift() # shift the cache g.mcache.shift() # shift the cache
@ -436,8 +436,7 @@ method rpcHandler*(g: GossipSub,
if respControl.graft.len > 0 or respControl.prune.len > 0 or if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.ihave.len > 0 or respControl.iwant.len > 0: respControl.ihave.len > 0 or respControl.iwant.len > 0:
await peer.send( await peer.send(
@[RPCMsg(control: some(respControl), RPCMsg(control: some(respControl), messages: messages))
messages: messages)])
method subscribe*(g: GossipSub, method subscribe*(g: GossipSub,
topic: string, topic: string,

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import tables, sequtils, sets import std/[tables, sequtils, sets]
import chronos, chronicles import chronos, chronicles
import pubsubpeer, import pubsubpeer,
rpc/[message, messages], rpc/[message, messages],
@ -77,16 +77,6 @@ proc sendSubs*(p: PubSub,
topics: seq[string], topics: seq[string],
subscribe: bool) {.async.} = subscribe: bool) {.async.} =
## send subscriptions to remote peer ## send subscriptions to remote peer
trace "sending subscriptions", peer = peer.id,
subscribe = subscribe,
topicIDs = topics
var msg: RPCMsg
for t in topics:
trace "sending topic", peer = peer.id,
subscribe = subscribe,
topicName = t
msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe))
try: try:
# wait for a connection before publishing # wait for a connection before publishing
@ -95,7 +85,7 @@ proc sendSubs*(p: PubSub,
trace "awaiting send connection" trace "awaiting send connection"
await peer.onConnect.wait() await peer.onConnect.wait()
await peer.send(@[msg]) await peer.sendSubOpts(topics, subscribe)
except CancelledError as exc: except CancelledError as exc:
p.handleDisconnect(peer) p.handleDisconnect(peer)
raise exc raise exc
@ -107,6 +97,7 @@ method subscribeTopic*(p: PubSub,
topic: string, topic: string,
subscribe: bool, subscribe: bool,
peerId: string) {.base, async.} = peerId: string) {.base, async.} =
# called when remote peer subscribes to a topic
discard discard
method rpcHandler*(p: PubSub, method rpcHandler*(p: PubSub,
@ -258,7 +249,7 @@ proc sendHelper*(p: PubSub,
continue continue
trace "sending messages to peer", peer = sendPeer.id, msgs trace "sending messages to peer", peer = sendPeer.id, msgs
sent.add((id: sendPeer.id, fut: sendPeer.send(@[RPCMsg(messages: msgs)]))) sent.add((id: sendPeer.id, fut: sendPeer.send(RPCMsg(messages: msgs))))
var published: seq[string] var published: seq[string]
var failed: seq[string] var failed: seq[string]

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import options, hashes, strutils, tables, hashes import std/[hashes, options, sequtils, strutils, tables]
import chronos, chronicles, nimcrypto/sha2, metrics import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf], import rpc/[messages, message, protobuf],
timedcache, timedcache,
@ -141,81 +141,69 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
finally: finally:
p.refs.dec() p.refs.dec()
proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} =
logScope: logScope:
peer = p.id peer = p.id
msgs = $msgs msg = shortLog(msg)
for m in msgs.items: trace "sending msg to peer"
trace "sending msgs to peer", toPeer = p.id, msgs = $msgs
# trigger send hooks # trigger send hooks
var mm = m # hooks can modify the message var mm = msg # hooks can modify the message
p.sendObservers(mm) p.sendObservers(mm)
let encoded = encodeRpcMsg(mm) let encoded = encodeRpcMsg(mm)
if encoded.len <= 0: if encoded.len <= 0:
trace "empty message, skipping", peer = p.id trace "empty message, skipping"
return return
let digest = $(sha256.digest(encoded)) logScope:
if digest in p.sentRpcCache: encoded = shortLog(encoded)
trace "message already sent to peer, skipping", peer = p.id
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
continue
try: let digest = $(sha256.digest(encoded))
trace "about to send message", peer = p.id, if digest in p.sentRpcCache:
encoded = digest trace "message already sent to peer, skipping"
if p.connected: # this can happen if the remote disconnected libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
trace "sending encoded msgs to peer", peer = p.id, return
encoded = encoded.shortLog
await p.sendConn.writeLp(encoded)
p.sentRpcCache.put(digest)
for m in msgs:
for mm in m.messages:
for t in mm.topicIDs:
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
except CatchableError as exc:
trace "unable to send to remote", exc = exc.msg
if not(isNil(p.sendConn)):
await p.sendConn.close()
p.sendConn = nil
p.onConnect.clear()
p.refs.dec()
raise exc
proc sendMsg*(p: PubSubPeer,
peerId: PeerID,
topic: string,
data: seq[byte],
seqno: uint64,
sign: bool): Future[void] {.gcsafe.} =
p.send(@[RPCMsg(messages: @[Message.init(p.peerInfo, data, topic, seqno, sign)])])
proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} =
try: try:
for topic in topics: trace "about to send message"
trace "sending graft msg to peer", peer = p.id, topicID = topic if p.connected: # this can happen if the remote disconnected
await p.send(@[RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))]) trace "sending encoded msgs to peer"
except CancelledError as exc:
raise exc await p.sendConn.writeLp(encoded)
except CatchableError as exc: p.sentRpcCache.put(digest)
trace "Could not send graft", msg = exc.msg
for x in mm.messages:
for t in x.topicIDs:
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} =
try:
for topic in topics:
trace "sending prune msg to peer", peer = p.id, topicID = topic
await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))])
except CancelledError as exc:
raise exc
except CatchableError as exc: except CatchableError as exc:
trace "Could not send prune", msg = exc.msg trace "unable to send to remote", exc = exc.msg
if not(isNil(p.sendConn)):
await p.sendConn.close()
p.sendConn = nil
p.onConnect.clear()
p.refs.dec()
raise exc
proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool): Future[void] =
trace "sending subscriptions", peer = p.id, subscribe, topicIDs = topics
p.send(RPCMsg(
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))))
proc sendGraft*(p: PubSubPeer, topics: seq[string]): Future[void] =
trace "sending graft msg to peer", peer = p.id, topicIDs = topics
p.send(RPCMsg(control: some(
ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it))))))
proc sendPrune*(p: PubSubPeer, topics: seq[string]): Future[void] =
trace "sending prune msg to peer", peer = p.id, topicIDs = topics
p.send(RPCMsg(control: some(
ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it))))))
proc `$`*(p: PubSubPeer): string = proc `$`*(p: PubSubPeer): string =
p.id p.id

View File

@ -9,17 +9,14 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import options import chronicles, metrics, stew/[byteutils, endians2]
import chronicles, stew/byteutils import ./messages, ./protobuf,
import metrics
import chronicles
import nimcrypto/sysrand
import messages, protobuf,
../../../peerid, ../../../peerid,
../../../peerinfo, ../../../peerinfo,
../../../crypto/crypto, ../../../crypto/crypto,
../../../protobuf/minprotobuf ../../../protobuf/minprotobuf
import stew/endians2
export messages
logScope: logScope:
topics = "pubsubmessage" topics = "pubsubmessage"
@ -32,8 +29,8 @@ declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messag
func defaultMsgIdProvider*(m: Message): string = func defaultMsgIdProvider*(m: Message): string =
byteutils.toHex(m.seqno) & m.fromPeer.pretty byteutils.toHex(m.seqno) & m.fromPeer.pretty
proc sign*(msg: Message, p: PeerInfo): seq[byte] {.gcsafe, raises: [ResultError[CryptoError], Defect].} = proc sign*(msg: Message, p: PeerInfo): CryptoResult[seq[byte]] =
p.privateKey.sign(PubSubPrefix & encodeMessage(msg)).tryGet().getBytes() ok((? p.privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes())
proc verify*(m: Message, p: PeerInfo): bool = proc verify*(m: Message, p: PeerInfo): bool =
if m.signature.len > 0 and m.key.len > 0: if m.signature.len > 0 and m.key.len > 0:
@ -66,5 +63,5 @@ proc init*(
topicIDs: @[topic]) topicIDs: @[topic])
if sign and p.publicKey.isSome: if sign and p.publicKey.isSome:
result.signature = sign(result, p) result.signature = sign(result, p).tryGet()
result.key = p.publicKey.get().getBytes().tryGet() result.key = p.publicKey.get().getBytes().tryGet()

View File

@ -11,6 +11,8 @@ import options, sequtils
import ../../../utility import ../../../utility
import ../../../peerid import ../../../peerid
export options
type type
SubOpts* = object SubOpts* = object
subscribe*: bool subscribe*: bool