From 68d50a97f8f178c3c8ee62d9e0b58cf81cc807e4 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 12 Sep 2019 04:08:11 -0600 Subject: [PATCH] properly initialize hashsets --- libp2p/protocols/pubsub/floodsub.nim | 23 +++++++++++++---------- libp2p/protocols/pubsub/rpcmsg.nim | 9 ++++++--- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 16a3d1899..1046a5658 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -7,7 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import sequtils, tables, options, sets, sequtils, strutils, sets +import sequtils, tables, options, + sets, sequtils, strutils, sets import chronos, chronicles import pubsub, pubsubpeer, @@ -65,15 +66,15 @@ proc rpcHandler(f: FloodSub, f.peerTopics[s.topic].incl(id) else: debug "unsubscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic - # unsubscribe the peer to the topic + # unsubscribe the peer from the topic f.peerTopics[s.topic].excl(id) # send subscriptions to every peer for p in f.peers.values: await p.send(@[RPCMsg(subscriptions: m.subscriptions)]) + var toSendPeers: HashSet[string] = initSet[string]() if m.messages.len > 0: # if there are any messages - var toSendPeers: HashSet[string] = initSet[string]() for msg in m.messages: # for every message for t in msg.topicIDs: # for every topic in the message toSendPeers.incl(f.peerTopics[t]) # get all the peers interested in this topic @@ -84,7 +85,9 @@ proc rpcHandler(f: FloodSub, for p in toSendPeers: await f.peers[p].send(@[RPCMsg(messages: m.messages)]) -proc handleConn(f: FloodSub, conn: Connection) {.async, gcsafe.} = +proc handleConn(f: FloodSub, + conn: Connection) + {.async, gcsafe.} = ## handle incoming/outgoing connections ## ## this proc will: @@ -110,8 +113,8 @@ proc handleConn(f: FloodSub, conn: Connection) {.async, gcsafe.} = asyncCheck peer.handle() method init(f: FloodSub) = - proc handler(conn: Connection, proto: string) {.async, gcsafe.} = - ## main protocol handler that gets triggered on every + proc handler(conn: Connection, proto: string) {.async, gcsafe.} = + ## main protocol handler that gets triggered on every ## connection for a protocol string ## e.g. ``/floodsub/1.0.0``, etc... ## @@ -127,7 +130,7 @@ method subscribeToPeer*(f: FloodSub, conn: Connection) {.async, gcsafe.} = method publish*(f: FloodSub, topic: string, data: seq[byte]) - {.async, gcsafe.} = + {.async, gcsafe.} = debug "about to publish message on topic", topic = topic, data = data let msg = makeMessage(f.peerInfo.peerId.get(), data, topic) if topic in f.peerTopics: @@ -135,10 +138,10 @@ method publish*(f: FloodSub, debug "pubslishing message", topic = topic, peer = p, data = data await f.peers[p].send(@[RPCMsg(messages: @[msg])]) -method subscribe*(f: FloodSub, - topic: string, +method subscribe*(f: FloodSub, + topic: string, handler: TopicHandler) - {.async, gcsafe.} = + {.async, gcsafe.} = await procCall PubSub(f).subscribe(topic, handler) for p in f.peers.values: await f.sendSubs(p, @[topic], true) diff --git a/libp2p/protocols/pubsub/rpcmsg.nim b/libp2p/protocols/pubsub/rpcmsg.nim index 4f3a95c49..73c2de270 100644 --- a/libp2p/protocols/pubsub/rpcmsg.nim +++ b/libp2p/protocols/pubsub/rpcmsg.nim @@ -44,8 +44,11 @@ proc encodeMessage(msg: Message, buff: var ProtoBuffer) {.gcsafe.} = for t in msg.topicIDs: buff.write(initProtoField(4, t)) - buff.write(initProtoField(5, msg.signature)) - buff.write(initProtoField(6, msg.key)) + if msg.signature.len > 0: + buff.write(initProtoField(5, msg.signature)) + + if msg.key.len > 0: + buff.write(initProtoField(6, msg.key)) buff.finish() @@ -134,7 +137,7 @@ proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} = result.messages.add(msg) else: - raise newException(CatchableError, "message type not recognizedd") + raise newException(CatchableError, "message type not recognized") var prefix {.threadvar.}: seq[byte] proc getPreix(): var seq[byte] =