From c76152f2c1a71a6455c27825b1d7b0aed3899312 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Thu, 16 Jul 2020 12:06:57 +0200 Subject: [PATCH] Simplify send (#271) * PubSubPeer.send single message * gossipsub: simplify send further --- libp2p/protocols/pubsub/gossipsub.nim | 7 +- libp2p/protocols/pubsub/pubsub.nim | 17 +--- libp2p/protocols/pubsub/pubsubpeer.nim | 118 ++++++++++------------- libp2p/protocols/pubsub/rpc/message.nim | 17 ++-- libp2p/protocols/pubsub/rpc/messages.nim | 2 + 5 files changed, 69 insertions(+), 92 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 0da4ca41f..05f7ab009 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import tables, sets, options, sequtils, random +import std/[tables, sets, options, sequtils, random] import chronos, chronicles, metrics import pubsub, floodsub, @@ -232,7 +232,7 @@ proc heartbeat(g: GossipSub) {.async.} = var sent: seq[Future[void]] for peer in peers.keys: if peer in g.peers: - sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))]) + sent &= g.peers[peer].send(RPCMsg(control: some(peers[peer]))) checkFutures(await allFinished(sent)) g.mcache.shift() # shift the cache @@ -436,8 +436,7 @@ method rpcHandler*(g: GossipSub, if respControl.graft.len > 0 or respControl.prune.len > 0 or respControl.ihave.len > 0 or respControl.iwant.len > 0: await peer.send( - @[RPCMsg(control: some(respControl), - messages: messages)]) + RPCMsg(control: some(respControl), messages: messages)) method subscribe*(g: GossipSub, topic: string, diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 4ec749381..5f6888510 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import tables, sequtils, sets +import std/[tables, sequtils, sets] import chronos, chronicles import pubsubpeer, rpc/[message, messages], @@ -77,16 +77,6 @@ proc sendSubs*(p: PubSub, topics: seq[string], subscribe: bool) {.async.} = ## send subscriptions to remote peer - trace "sending subscriptions", peer = peer.id, - subscribe = subscribe, - topicIDs = topics - - var msg: RPCMsg - for t in topics: - trace "sending topic", peer = peer.id, - subscribe = subscribe, - topicName = t - msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe)) try: # wait for a connection before publishing @@ -95,7 +85,7 @@ proc sendSubs*(p: PubSub, trace "awaiting send connection" await peer.onConnect.wait() - await peer.send(@[msg]) + await peer.sendSubOpts(topics, subscribe) except CancelledError as exc: p.handleDisconnect(peer) raise exc @@ -107,6 +97,7 @@ method subscribeTopic*(p: PubSub, topic: string, subscribe: bool, peerId: string) {.base, async.} = + # called when remote peer subscribes to a topic discard method rpcHandler*(p: PubSub, @@ -258,7 +249,7 @@ proc sendHelper*(p: PubSub, continue trace "sending messages to peer", peer = sendPeer.id, msgs - sent.add((id: sendPeer.id, fut: sendPeer.send(@[RPCMsg(messages: msgs)]))) + sent.add((id: sendPeer.id, fut: sendPeer.send(RPCMsg(messages: msgs)))) var published: seq[string] var failed: seq[string] diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index c59b41930..c0f25870a 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import options, hashes, strutils, tables, hashes +import std/[hashes, options, sequtils, strutils, tables] import chronos, chronicles, nimcrypto/sha2, metrics import rpc/[messages, message, protobuf], timedcache, @@ -141,81 +141,69 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = finally: p.refs.dec() -proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = +proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} = logScope: peer = p.id - msgs = $msgs + msg = shortLog(msg) - for m in msgs.items: - trace "sending msgs to peer", toPeer = p.id, msgs = $msgs + trace "sending msg to peer" - # trigger send hooks - var mm = m # hooks can modify the message - p.sendObservers(mm) + # trigger send hooks + var mm = msg # hooks can modify the message + p.sendObservers(mm) - let encoded = encodeRpcMsg(mm) - if encoded.len <= 0: - trace "empty message, skipping", peer = p.id - return + let encoded = encodeRpcMsg(mm) + if encoded.len <= 0: + trace "empty message, skipping" + return - let digest = $(sha256.digest(encoded)) - if digest in p.sentRpcCache: - trace "message already sent to peer, skipping", peer = p.id - libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) - continue + logScope: + encoded = shortLog(encoded) - try: - trace "about to send message", peer = p.id, - encoded = digest - if p.connected: # this can happen if the remote disconnected - trace "sending encoded msgs to peer", peer = p.id, - encoded = encoded.shortLog - await p.sendConn.writeLp(encoded) - p.sentRpcCache.put(digest) + let digest = $(sha256.digest(encoded)) + if digest in p.sentRpcCache: + trace "message already sent to peer, skipping" + libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) + return - for m in msgs: - for mm in m.messages: - for t in mm.topicIDs: - # metrics - libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) - - except CatchableError as exc: - trace "unable to send to remote", exc = exc.msg - if not(isNil(p.sendConn)): - await p.sendConn.close() - p.sendConn = nil - p.onConnect.clear() - - p.refs.dec() - raise exc - -proc sendMsg*(p: PubSubPeer, - peerId: PeerID, - topic: string, - data: seq[byte], - seqno: uint64, - sign: bool): Future[void] {.gcsafe.} = - p.send(@[RPCMsg(messages: @[Message.init(p.peerInfo, data, topic, seqno, sign)])]) - -proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} = try: - for topic in topics: - trace "sending graft msg to peer", peer = p.id, topicID = topic - await p.send(@[RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))]) - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "Could not send graft", msg = exc.msg + trace "about to send message" + if p.connected: # this can happen if the remote disconnected + trace "sending encoded msgs to peer" + + await p.sendConn.writeLp(encoded) + p.sentRpcCache.put(digest) + + for x in mm.messages: + for t in x.topicIDs: + # metrics + libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) -proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} = - try: - for topic in topics: - trace "sending prune msg to peer", peer = p.id, topicID = topic - await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))]) - except CancelledError as exc: - raise exc except CatchableError as exc: - trace "Could not send prune", msg = exc.msg + trace "unable to send to remote", exc = exc.msg + if not(isNil(p.sendConn)): + await p.sendConn.close() + p.sendConn = nil + p.onConnect.clear() + + p.refs.dec() + raise exc + +proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool): Future[void] = + trace "sending subscriptions", peer = p.id, subscribe, topicIDs = topics + + p.send(RPCMsg( + subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it)))) + +proc sendGraft*(p: PubSubPeer, topics: seq[string]): Future[void] = + trace "sending graft msg to peer", peer = p.id, topicIDs = topics + p.send(RPCMsg(control: some( + ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it)))))) + +proc sendPrune*(p: PubSubPeer, topics: seq[string]): Future[void] = + trace "sending prune msg to peer", peer = p.id, topicIDs = topics + p.send(RPCMsg(control: some( + ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it)))))) proc `$`*(p: PubSubPeer): string = p.id diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index 0eecd548c..29d700500 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -9,17 +9,14 @@ {.push raises: [Defect].} -import options -import chronicles, stew/byteutils -import metrics -import chronicles -import nimcrypto/sysrand -import messages, protobuf, +import chronicles, metrics, stew/[byteutils, endians2] +import ./messages, ./protobuf, ../../../peerid, ../../../peerinfo, ../../../crypto/crypto, ../../../protobuf/minprotobuf -import stew/endians2 + +export messages logScope: topics = "pubsubmessage" @@ -32,8 +29,8 @@ declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messag func defaultMsgIdProvider*(m: Message): string = byteutils.toHex(m.seqno) & m.fromPeer.pretty -proc sign*(msg: Message, p: PeerInfo): seq[byte] {.gcsafe, raises: [ResultError[CryptoError], Defect].} = - p.privateKey.sign(PubSubPrefix & encodeMessage(msg)).tryGet().getBytes() +proc sign*(msg: Message, p: PeerInfo): CryptoResult[seq[byte]] = + ok((? p.privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes()) proc verify*(m: Message, p: PeerInfo): bool = if m.signature.len > 0 and m.key.len > 0: @@ -66,5 +63,5 @@ proc init*( topicIDs: @[topic]) if sign and p.publicKey.isSome: - result.signature = sign(result, p) + result.signature = sign(result, p).tryGet() result.key = p.publicKey.get().getBytes().tryGet() diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 5a0ae615d..9f62f5bcb 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -11,6 +11,8 @@ import options, sequtils import ../../../utility import ../../../peerid +export options + type SubOpts* = object subscribe*: bool