properly initialize hashsets

This commit is contained in:
Dmitriy Ryajov 2019-09-12 04:08:11 -06:00
parent 9f3b80b60c
commit 68d50a97f8
2 changed files with 19 additions and 13 deletions

View File

@ -7,7 +7,8 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import sequtils, tables, options, sets, sequtils, strutils, sets import sequtils, tables, options,
sets, sequtils, strutils, sets
import chronos, chronicles import chronos, chronicles
import pubsub, import pubsub,
pubsubpeer, pubsubpeer,
@ -65,15 +66,15 @@ proc rpcHandler(f: FloodSub,
f.peerTopics[s.topic].incl(id) f.peerTopics[s.topic].incl(id)
else: else:
debug "unsubscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic debug "unsubscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic
# unsubscribe the peer to the topic # unsubscribe the peer from the topic
f.peerTopics[s.topic].excl(id) f.peerTopics[s.topic].excl(id)
# send subscriptions to every peer # send subscriptions to every peer
for p in f.peers.values: for p in f.peers.values:
await p.send(@[RPCMsg(subscriptions: m.subscriptions)]) await p.send(@[RPCMsg(subscriptions: m.subscriptions)])
var toSendPeers: HashSet[string] = initSet[string]()
if m.messages.len > 0: # if there are any messages if m.messages.len > 0: # if there are any messages
var toSendPeers: HashSet[string] = initSet[string]()
for msg in m.messages: # for every message for msg in m.messages: # for every message
for t in msg.topicIDs: # for every topic in the message for t in msg.topicIDs: # for every topic in the message
toSendPeers.incl(f.peerTopics[t]) # get all the peers interested in this topic toSendPeers.incl(f.peerTopics[t]) # get all the peers interested in this topic
@ -84,7 +85,9 @@ proc rpcHandler(f: FloodSub,
for p in toSendPeers: for p in toSendPeers:
await f.peers[p].send(@[RPCMsg(messages: m.messages)]) await f.peers[p].send(@[RPCMsg(messages: m.messages)])
proc handleConn(f: FloodSub, conn: Connection) {.async, gcsafe.} = proc handleConn(f: FloodSub,
conn: Connection)
{.async, gcsafe.} =
## handle incoming/outgoing connections ## handle incoming/outgoing connections
## ##
## this proc will: ## this proc will:
@ -110,8 +113,8 @@ proc handleConn(f: FloodSub, conn: Connection) {.async, gcsafe.} =
asyncCheck peer.handle() asyncCheck peer.handle()
method init(f: FloodSub) = method init(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async, gcsafe.} = proc handler(conn: Connection, proto: string) {.async, gcsafe.} =
## main protocol handler that gets triggered on every ## main protocol handler that gets triggered on every
## connection for a protocol string ## connection for a protocol string
## e.g. ``/floodsub/1.0.0``, etc... ## e.g. ``/floodsub/1.0.0``, etc...
## ##
@ -127,7 +130,7 @@ method subscribeToPeer*(f: FloodSub, conn: Connection) {.async, gcsafe.} =
method publish*(f: FloodSub, method publish*(f: FloodSub,
topic: string, topic: string,
data: seq[byte]) data: seq[byte])
{.async, gcsafe.} = {.async, gcsafe.} =
debug "about to publish message on topic", topic = topic, data = data debug "about to publish message on topic", topic = topic, data = data
let msg = makeMessage(f.peerInfo.peerId.get(), data, topic) let msg = makeMessage(f.peerInfo.peerId.get(), data, topic)
if topic in f.peerTopics: if topic in f.peerTopics:
@ -135,10 +138,10 @@ method publish*(f: FloodSub,
debug "pubslishing message", topic = topic, peer = p, data = data debug "pubslishing message", topic = topic, peer = p, data = data
await f.peers[p].send(@[RPCMsg(messages: @[msg])]) await f.peers[p].send(@[RPCMsg(messages: @[msg])])
method subscribe*(f: FloodSub, method subscribe*(f: FloodSub,
topic: string, topic: string,
handler: TopicHandler) handler: TopicHandler)
{.async, gcsafe.} = {.async, gcsafe.} =
await procCall PubSub(f).subscribe(topic, handler) await procCall PubSub(f).subscribe(topic, handler)
for p in f.peers.values: for p in f.peers.values:
await f.sendSubs(p, @[topic], true) await f.sendSubs(p, @[topic], true)

View File

@ -44,8 +44,11 @@ proc encodeMessage(msg: Message, buff: var ProtoBuffer) {.gcsafe.} =
for t in msg.topicIDs: for t in msg.topicIDs:
buff.write(initProtoField(4, t)) buff.write(initProtoField(4, t))
buff.write(initProtoField(5, msg.signature)) if msg.signature.len > 0:
buff.write(initProtoField(6, msg.key)) buff.write(initProtoField(5, msg.signature))
if msg.key.len > 0:
buff.write(initProtoField(6, msg.key))
buff.finish() buff.finish()
@ -134,7 +137,7 @@ proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} =
result.messages.add(msg) result.messages.add(msg)
else: else:
raise newException(CatchableError, "message type not recognizedd") raise newException(CatchableError, "message type not recognized")
var prefix {.threadvar.}: seq[byte] var prefix {.threadvar.}: seq[byte]
proc getPreix(): var seq[byte] = proc getPreix(): var seq[byte] =