Feat/pubsub validators (#58)

* feat: adding validator hooks to pubsub

* expose add/remove validators on switch

* do less unnecessary copyng
This commit is contained in:
Dmitriy Ryajov 2019-12-16 23:24:03 -06:00 committed by GitHub
parent b6b0cdea98
commit 68cc57669e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 385 additions and 116 deletions

View File

@ -141,4 +141,5 @@ method getObservedAddrs*(c: Connection): Future[MultiAddress] {.base, async, gcs
result = c.observedAddrs result = c.observedAddrs
proc `$`*(conn: Connection): string = proc `$`*(conn: Connection): string =
result = $(conn.peerInfo) if not isNil(conn.peerInfo):
result = $(conn.peerInfo)

View File

@ -139,7 +139,7 @@ proc identify*(p: Identify,
if peer != remotePeerInfo.peerId: if peer != remotePeerInfo.peerId:
trace "Peer ids don't match", trace "Peer ids don't match",
remote = peer.pretty(), remote = peer.pretty(),
local = remotePeerInfo.get().id local = remotePeerInfo.id
raise newException(IdentityNoMatchError, raise newException(IdentityNoMatchError,
"Peer ids don't match") "Peer ids don't match")

View File

@ -46,26 +46,31 @@ method subscribeTopic*(f: FloodSub,
# unsubscribe the peer from the topic # unsubscribe the peer from the topic
f.floodsub[topic].excl(peerId) f.floodsub[topic].excl(peerId)
method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async, gcsafe.} = method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async.} =
## handle peer disconnects ## handle peer disconnects
for t in f.floodsub.keys: for t in f.floodsub.keys:
f.floodsub[t].excl(peer.id) f.floodsub[t].excl(peer.id)
method rpcHandler*(f: FloodSub, method rpcHandler*(f: FloodSub,
peer: PubSubPeer, peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async, gcsafe.} = rpcMsgs: seq[RPCMsg]) {.async.} =
trace "processing RPC message", peer = peer.id, msg = rpcMsgs await procCall PubSub(f).rpcHandler(peer, rpcMsgs)
for m in rpcMsgs: # for all RPC messages
trace "processing message", msg = rpcMsgs
if m.subscriptions.len > 0: # if there are any subscriptions
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
f.subscribeTopic(s.topic, s.subscribe, peer.id)
for m in rpcMsgs: # for all RPC messages
if m.messages.len > 0: # if there are any messages if m.messages.len > 0: # if there are any messages
var toSendPeers: HashSet[string] = initHashSet[string]() var toSendPeers: HashSet[string] = initHashSet[string]()
for msg in m.messages: # for every message for msg in m.messages: # for every message
if msg.msgId notin f.seen: if msg.msgId notin f.seen:
f.seen.put(msg.msgId) # add the message to the seen cache f.seen.put(msg.msgId) # add the message to the seen cache
if not msg.verify(peer.peerInfo):
trace "dropping message due to failed signature verification"
continue
if not (await f.validate(msg)):
trace "dropping message due to failed validation"
continue
for t in msg.topicIDs: # for every topic in the message for t in msg.topicIDs: # for every topic in the message
if t in f.floodsub: if t in f.floodsub:
toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic
@ -79,7 +84,7 @@ method rpcHandler*(f: FloodSub,
await f.peers[p].send(@[RPCMsg(messages: m.messages)]) await f.peers[p].send(@[RPCMsg(messages: m.messages)])
method init(f: FloodSub) = method init(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async, gcsafe.} = proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every ## main protocol handler that gets triggered on every
## connection for a protocol string ## connection for a protocol string
## e.g. ``/floodsub/1.0.0``, etc... ## e.g. ``/floodsub/1.0.0``, etc...
@ -92,7 +97,7 @@ method init(f: FloodSub) =
method publish*(f: FloodSub, method publish*(f: FloodSub,
topic: string, topic: string,
data: seq[byte]) {.async, gcsafe.} = data: seq[byte]) {.async.} =
await procCall PubSub(f).publish(topic, data) await procCall PubSub(f).publish(topic, data)
if data.len <= 0 or topic.len <= 0: if data.len <= 0 or topic.len <= 0:
@ -110,7 +115,7 @@ method publish*(f: FloodSub,
await f.peers[p].send(@[RPCMsg(messages: @[msg])]) await f.peers[p].send(@[RPCMsg(messages: @[msg])])
method unsubscribe*(f: FloodSub, method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) {.async, gcsafe.} = topics: seq[TopicPair]) {.async.} =
await procCall PubSub(f).unsubscribe(topics) await procCall PubSub(f).unsubscribe(topics)
for p in f.peers.values: for p in f.peers.values:

View File

@ -73,7 +73,7 @@ proc addInterval(every: Duration, cb: CallbackFunc,
return retFuture return retFuture
method init(g: GossipSub) = method init(g: GossipSub) =
proc handler(conn: Connection, proto: string) {.async, gcsafe.} = proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every ## main protocol handler that gets triggered on every
## connection for a protocol string ## connection for a protocol string
## e.g. ``/floodsub/1.0.0``, etc... ## e.g. ``/floodsub/1.0.0``, etc...
@ -84,7 +84,7 @@ method init(g: GossipSub) =
g.handler = handler g.handler = handler
g.codec = GossipSubCodec g.codec = GossipSubCodec
method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async, gcsafe.} = method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async.} =
## handle peer disconnects ## handle peer disconnects
await procCall FloodSub(g).handleDisconnect(peer) await procCall FloodSub(g).handleDisconnect(peer)
for t in g.gossipsub.keys: for t in g.gossipsub.keys:
@ -161,16 +161,10 @@ proc handleIWant(g: GossipSub, peer: PubSubPeer, iwants: seq[
method rpcHandler(g: GossipSub, method rpcHandler(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async, gcsafe.} = rpcMsgs: seq[RPCMsg]) {.async.} =
await procCall PubSub(g).rpcHandler(peer, rpcMsgs) await procCall PubSub(g).rpcHandler(peer, rpcMsgs)
trace "processing RPC message", peer = peer.id, msg = rpcMsgs for m in rpcMsgs: # for all RPC messages
for m in rpcMsgs: # for all RPC messages
trace "processing messages", msg = rpcMsgs
if m.subscriptions.len > 0: # if there are any subscriptions
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
g.subscribeTopic(s.topic, s.subscribe, peer.id)
if m.messages.len > 0: # if there are any messages if m.messages.len > 0: # if there are any messages
var toSendPeers: HashSet[string] = initHashSet[string]() var toSendPeers: HashSet[string] = initHashSet[string]()
for msg in m.messages: # for every message for msg in m.messages: # for every message
@ -181,6 +175,14 @@ method rpcHandler(g: GossipSub,
g.seen.put(msg.msgId) # add the message to the seen cache g.seen.put(msg.msgId) # add the message to the seen cache
if not msg.verify(peer.peerInfo):
trace "dropping message due to failed signature verification"
continue
if not (await g.validate(msg)):
trace "dropping message due to failed validation"
continue
# this shouldn't happen # this shouldn't happen
if g.peerInfo.peerId == msg.fromPeerId(): if g.peerInfo.peerId == msg.fromPeerId():
trace "skipping messages from self", msg = msg.msgId trace "skipping messages from self", msg = msg.msgId
@ -227,10 +229,9 @@ method rpcHandler(g: GossipSub,
if respControl.graft.len > 0 or respControl.prune.len > 0 or if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.ihave.len > 0 or respControl.iwant.len > 0: respControl.ihave.len > 0 or respControl.iwant.len > 0:
await peer.send(@[RPCMsg(control: some(respControl), await peer.send(@[RPCMsg(control: some(respControl), messages: messages)])
messages: messages)])
proc replenishFanout(g: GossipSub, topic: string) {.async, gcsafe.} = proc replenishFanout(g: GossipSub, topic: string) {.async.} =
## get fanout peers for a topic ## get fanout peers for a topic
trace "about to replenish fanout" trace "about to replenish fanout"
if topic notin g.fanout: if topic notin g.fanout:
@ -246,7 +247,7 @@ proc replenishFanout(g: GossipSub, topic: string) {.async, gcsafe.} =
trace "fanout replenished with peers", peers = g.fanout[topic].len trace "fanout replenished with peers", peers = g.fanout[topic].len
proc rebalanceMesh(g: GossipSub, topic: string) {.async, gcsafe.} = proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
trace "about to rebalance mesh" trace "about to rebalance mesh"
# create a mesh topic that we're subscribing to # create a mesh topic that we're subscribing to
if topic notin g.mesh: if topic notin g.mesh:
@ -288,7 +289,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async, gcsafe.} =
trace "mesh balanced, got peers", peers = g.mesh[topic].len trace "mesh balanced, got peers", peers = g.mesh[topic].len
proc dropFanoutPeers(g: GossipSub) {.async, gcsafe.} = proc dropFanoutPeers(g: GossipSub) {.async.} =
# drop peers that we haven't published to in # drop peers that we haven't published to in
# GossipSubFanoutTTL seconds # GossipSubFanoutTTL seconds
for topic in g.lastFanoutPubSub.keys: for topic in g.lastFanoutPubSub.keys:
@ -334,7 +335,7 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
result[id] = ControlMessage() result[id] = ControlMessage()
result[id].ihave.add(ihave) result[id].ihave.add(ihave)
proc heartbeat(g: GossipSub) {.async, gcsafe.} = proc heartbeat(g: GossipSub) {.async.} =
trace "running heartbeat" trace "running heartbeat"
await g.heartbeatLock.acquire() await g.heartbeatLock.acquire()
@ -353,12 +354,12 @@ proc heartbeat(g: GossipSub) {.async, gcsafe.} =
method subscribe*(g: GossipSub, method subscribe*(g: GossipSub,
topic: string, topic: string,
handler: TopicHandler) {.async, gcsafe.} = handler: TopicHandler) {.async.} =
await procCall PubSub(g).subscribe(topic, handler) await procCall PubSub(g).subscribe(topic, handler)
asyncCheck g.rebalanceMesh(topic) asyncCheck g.rebalanceMesh(topic)
method unsubscribe*(g: GossipSub, method unsubscribe*(g: GossipSub,
topics: seq[TopicPair]) {.async, gcsafe.} = topics: seq[TopicPair]) {.async.} =
await procCall PubSub(g).unsubscribe(topics) await procCall PubSub(g).unsubscribe(topics)
for pair in topics: for pair in topics:
@ -372,10 +373,11 @@ method unsubscribe*(g: GossipSub,
method publish*(g: GossipSub, method publish*(g: GossipSub,
topic: string, topic: string,
data: seq[byte]) {.async, gcsafe.} = data: seq[byte]) {.async.} =
await procCall PubSub(g).publish(topic, data) await procCall PubSub(g).publish(topic, data)
trace "about to publish message on topic", name = topic, data = data.toHex() trace "about to publish message on topic", name = topic,
data = data.toHex()
if data.len > 0 and topic.len > 0: if data.len > 0 and topic.len > 0:
var peers: HashSet[string] var peers: HashSet[string]
if topic in g.topics: # if we're subscribed to the topic attempt to build a mesh if topic in g.topics: # if we're subscribed to the topic attempt to build a mesh
@ -453,7 +455,7 @@ when isMainModule and not defined(release):
let topic = "foobar" let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[string]() gossipSub.mesh[topic] = initHashSet[string]()
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async.} =
discard discard
for i in 0..<15: for i in 0..<15:
@ -480,7 +482,7 @@ when isMainModule and not defined(release):
let topic = "foobar" let topic = "foobar"
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[string]()
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async.} =
discard discard
for i in 0..<15: for i in 0..<15:
@ -505,12 +507,12 @@ when isMainModule and not defined(release):
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
let topic = "foobar" let topic = "foobar"
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[string]()
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async.} =
discard discard
for i in 0..<15: for i in 0..<15:
@ -535,13 +537,13 @@ when isMainModule and not defined(release):
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
let topic = "foobar" let topic = "foobar"
gossipSub.fanout[topic] = initHashSet[string]() gossipSub.fanout[topic] = initHashSet[string]()
gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(100.millis) gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(100.millis)
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async.} =
discard discard
for i in 0..<6: for i in 0..<6:
@ -568,7 +570,7 @@ when isMainModule and not defined(release):
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
let topic1 = "foobar1" let topic1 = "foobar1"
@ -578,7 +580,7 @@ when isMainModule and not defined(release):
gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(100.millis) gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(100.millis)
gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(500.millis) gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(500.millis)
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async.} =
discard discard
for i in 0..<6: for i in 0..<6:
@ -608,10 +610,10 @@ when isMainModule and not defined(release):
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async.} =
discard discard
let topic = "foobar" let topic = "foobar"
@ -657,10 +659,10 @@ when isMainModule and not defined(release):
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async.} =
discard discard
let topic = "foobar" let topic = "foobar"
@ -689,10 +691,10 @@ when isMainModule and not defined(release):
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async.} =
discard discard
let topic = "foobar" let topic = "foobar"
@ -721,10 +723,10 @@ when isMainModule and not defined(release):
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async.} =
discard discard
let topic = "foobar" let topic = "foobar"

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import tables, options, sequtils import tables, sequtils, sets
import chronos, chronicles import chronos, chronicles
import pubsubpeer, import pubsubpeer,
rpc/messages, rpc/messages,
@ -22,8 +22,11 @@ logScope:
topic = "PubSub" topic = "PubSub"
type type
TopicHandler* = proc (topic: string, TopicHandler* = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe.} data: seq[byte]): Future[void] {.gcsafe.}
ValidatorHandler* = proc(topic: string,
message: Message): Future[bool] {.closure.}
TopicPair* = tuple[topic: string, handler: TopicHandler] TopicPair* = tuple[topic: string, handler: TopicHandler]
@ -37,11 +40,12 @@ type
peers*: Table[string, PubSubPeer] # peerid to peer map peers*: Table[string, PubSubPeer] # peerid to peer map
triggerSelf*: bool # trigger own local handler on publish triggerSelf*: bool # trigger own local handler on publish
cleanupLock: AsyncLock cleanupLock: AsyncLock
validators*: Table[string, HashSet[ValidatorHandler]]
proc sendSubs*(p: PubSub, proc sendSubs*(p: PubSub,
peer: PubSubPeer, peer: PubSubPeer,
topics: seq[string], topics: seq[string],
subscribe: bool) {.async, gcsafe.} = subscribe: bool) {.async.} =
## send subscriptions to remote peer ## send subscriptions to remote peer
trace "sending subscriptions", peer = peer.id, trace "sending subscriptions", peer = peer.id,
subscribe = subscribe, subscribe = subscribe,
@ -56,13 +60,24 @@ proc sendSubs*(p: PubSub,
await peer.send(@[msg]) await peer.send(@[msg])
method rpcHandler*(p: PubSub, method subscribeTopic*(p: PubSub,
peer: PubSubPeer, topic: string,
rpcMsgs: seq[RPCMsg]) {.async, base, gcsafe.} = subscribe: bool,
## handle rpc messages peerId: string) {.base.} =
discard discard
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base, gcsafe.} = method rpcHandler*(p: PubSub,
peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async, base.} =
## handle rpc messages
trace "processing RPC message", peer = peer.id, msg = rpcMsgs
for m in rpcMsgs: # for all RPC messages
trace "processing messages", msg = rpcMsgs
if m.subscriptions.len > 0: # if there are any subscriptions
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
p.subscribeTopic(s.topic, s.subscribe, peer.id)
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base.} =
## handle peer disconnects ## handle peer disconnects
if peer.id in p.peers: if peer.id in p.peers:
p.peers.del(peer.id) p.peers.del(peer.id)
@ -90,7 +105,7 @@ proc getPeer(p: PubSub, peerInfo: PeerInfo, proto: string): PubSubPeer =
method handleConn*(p: PubSub, method handleConn*(p: PubSub,
conn: Connection, conn: Connection,
proto: string) {.base, async, gcsafe.} = proto: string) {.base, async.} =
## handle incoming connections ## handle incoming connections
## ##
## this proc will: ## this proc will:
@ -107,7 +122,7 @@ method handleConn*(p: PubSub,
await conn.close() await conn.close()
return return
proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
# call floodsub rpc handler # call floodsub rpc handler
await p.rpcHandler(peer, msgs) await p.rpcHandler(peer, msgs)
@ -122,7 +137,7 @@ method handleConn*(p: PubSub,
await p.cleanUpHelper(peer) await p.cleanUpHelper(peer)
method subscribeToPeer*(p: PubSub, method subscribeToPeer*(p: PubSub,
conn: Connection) {.base, async, gcsafe.} = conn: Connection) {.base, async.} =
var peer = p.getPeer(conn.peerInfo, p.codec) var peer = p.getPeer(conn.peerInfo, p.codec)
trace "setting connection for peer", peerId = conn.peerInfo.id trace "setting connection for peer", peerId = conn.peerInfo.id
if not peer.isConnected: if not peer.isConnected:
@ -137,7 +152,7 @@ method subscribeToPeer*(p: PubSub,
asyncCheck p.cleanUpHelper(peer) asyncCheck p.cleanUpHelper(peer)
method unsubscribe*(p: PubSub, method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async, gcsafe.} = topics: seq[TopicPair]) {.base, async.} =
## unsubscribe from a list of ``topic`` strings ## unsubscribe from a list of ``topic`` strings
for t in topics: for t in topics:
for i, h in p.topics[t.topic].handler: for i, h in p.topics[t.topic].handler:
@ -146,19 +161,13 @@ method unsubscribe*(p: PubSub,
method unsubscribe*(p: PubSub, method unsubscribe*(p: PubSub,
topic: string, topic: string,
handler: TopicHandler): Future[void] {.base, gcsafe.} = handler: TopicHandler): Future[void] {.base.} =
## unsubscribe from a ``topic`` string ## unsubscribe from a ``topic`` string
result = p.unsubscribe(@[(topic, handler)]) result = p.unsubscribe(@[(topic, handler)])
method subscribeTopic*(p: PubSub,
topic: string,
subscribe: bool,
peerId: string) {.base, gcsafe.} =
discard
method subscribe*(p: PubSub, method subscribe*(p: PubSub,
topic: string, topic: string,
handler: TopicHandler) {.base, async, gcsafe.} = handler: TopicHandler) {.base, async.} =
## subscribe to a topic ## subscribe to a topic
## ##
## ``topic`` - a string topic to subscribe to ## ``topic`` - a string topic to subscribe to
@ -178,7 +187,7 @@ method subscribe*(p: PubSub,
method publish*(p: PubSub, method publish*(p: PubSub,
topic: string, topic: string,
data: seq[byte]) {.base, async, gcsafe.} = data: seq[byte]) {.base, async.} =
## publish to a ``topic`` ## publish to a ``topic``
if p.triggerSelf and topic in p.topics: if p.triggerSelf and topic in p.topics:
for h in p.topics[topic].handler: for h in p.topics[topic].handler:
@ -190,14 +199,44 @@ method initPubSub*(p: PubSub) {.base.} =
method start*(p: PubSub) {.async, base.} = method start*(p: PubSub) {.async, base.} =
## start pubsub ## start pubsub
## start long running/repeating procedures
discard discard
method stop*(p: PubSub) {.async, base.} = method stop*(p: PubSub) {.async, base.} =
## stopt pubsub ## stopt pubsub
## stop long running/repeating procedures
discard discard
method addValidator*(p: PubSub,
topic: varargs[string],
hook: ValidatorHandler) {.base.} =
for t in topic:
if t notin p.validators:
p.validators[t] = initHashSet[ValidatorHandler]()
trace "adding validator for topic", topicId = t
p.validators[t].incl(hook)
method removeValidator*(p: PubSub,
topic: varargs[string],
hook: ValidatorHandler) {.base.} =
for t in topic:
if t in p.validators:
p.validators[t].excl(hook)
method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} =
var pending: seq[Future[bool]]
trace "about to validate message"
for topic in message.topicIDs:
trace "looking for validators on topic", topicID = topic,
registered = toSeq(p.validators.keys)
if topic in p.validators:
trace "running validators for topic", topicID = topic
# TODO: add timeout to validator
pending.add(p.validators[topic].mapIt(it(topic, message)))
await allFutures(pending) # await all futures
if pending.allIt(it.read()): # if there are failed
result = true
proc newPubSub*(p: typedesc[PubSub], proc newPubSub*(p: typedesc[PubSub],
peerInfo: PeerInfo, peerInfo: PeerInfo,
triggerSelf: bool = false): p = triggerSelf: bool = false): p =

View File

@ -45,7 +45,7 @@ proc `conn=`*(p: PubSubPeer, conn: Connection) =
p.sendConn = conn p.sendConn = conn
p.onConnect.fire() p.onConnect.fire()
proc handle*(p: PubSubPeer, conn: Connection) {.async, gcsafe.} = proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
trace "handling pubsub rpc", peer = p.id, closed = conn.closed trace "handling pubsub rpc", peer = p.id, closed = conn.closed
try: try:
while not conn.closed: while not conn.closed:
@ -66,7 +66,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async, gcsafe.} =
finally: finally:
trace "exiting pubsub peer read loop", peer = p.id trace "exiting pubsub peer read loop", peer = p.id
proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} = proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
try: try:
for m in msgs: for m in msgs:
trace "sending msgs to peer", toPeer = p.id trace "sending msgs to peer", toPeer = p.id
@ -105,12 +105,12 @@ proc sendMsg*(p: PubSubPeer,
data: seq[byte]): Future[void] {.gcsafe.} = data: seq[byte]): Future[void] {.gcsafe.} =
p.send(@[RPCMsg(messages: @[newMessage(p.peerInfo, data, topic)])]) p.send(@[RPCMsg(messages: @[newMessage(p.peerInfo, data, topic)])])
proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async, gcsafe.} = proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} =
for topic in topics: for topic in topics:
trace "sending graft msg to peer", peer = p.id, topicID = topic trace "sending graft msg to peer", peer = p.id, topicID = topic
await p.send(@[RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))]) await p.send(@[RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))])
proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async, gcsafe.} = proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} =
for topic in topics: for topic in topics:
trace "sending prune msg to peer", peer = p.id, topicID = topic trace "sending prune msg to peer", peer = p.id, topicID = topic
await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))]) await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))])

View File

@ -27,17 +27,16 @@ proc msgId*(m: Message): string =
proc fromPeerId*(m: Message): PeerId = proc fromPeerId*(m: Message): PeerId =
PeerID.init(m.fromPeer) PeerID.init(m.fromPeer)
proc sign*(p: PeerInfo, msg: Message): Message {.gcsafe.} = proc sign*(msg: Message, p: PeerInfo): Message {.gcsafe.} =
var buff = initProtoBuffer() var buff = initProtoBuffer()
encodeMessage(msg, buff) encodeMessage(msg, buff)
let prefix = cast[seq[byte]](PubSubPrefix)
if buff.buffer.len > 0: if buff.buffer.len > 0:
result = msg result = msg
result.signature = p.privateKey. result.signature = p.privateKey.
sign(prefix & buff.buffer). sign(cast[seq[byte]](PubSubPrefix) & buff.buffer).
getBytes() getBytes()
proc verify*(p: PeerInfo, m: Message): bool = proc verify*(m: Message, p: PeerInfo): bool =
if m.signature.len > 0 and m.key.len > 0: if m.signature.len > 0 and m.key.len > 0:
var msg = m var msg = m
msg.signature = @[] msg.signature = @[]
@ -49,7 +48,8 @@ proc verify*(p: PeerInfo, m: Message): bool =
var remote: Signature var remote: Signature
var key: PublicKey var key: PublicKey
if remote.init(m.signature) and key.init(m.key): if remote.init(m.signature) and key.init(m.key):
result = remote.verify(buff.buffer, key) trace "verifying signature", remoteSignature = remote
result = remote.verify(cast[seq[byte]](PubSubPrefix) & buff.buffer, key)
proc newMessage*(p: PeerInfo, proc newMessage*(p: PeerInfo,
data: seq[byte], data: seq[byte],
@ -64,6 +64,6 @@ proc newMessage*(p: PeerInfo,
seqno: seqno, seqno: seqno,
topicIDs: @[name]) topicIDs: @[name])
if sign: if sign:
result = p.sign(result) result = result.sign(p)
result.key = key result.key = key

View File

@ -437,16 +437,15 @@ proc handleConn(s: Secio, conn: Connection): Future[Connection] {.async, gcsafe.
var stream = newBufferStream(writeHandler) var stream = newBufferStream(writeHandler)
asyncCheck readLoop(sconn, stream) asyncCheck readLoop(sconn, stream)
var secured = newConnection(stream) result = newConnection(stream)
secured.peerInfo = PeerInfo.init(sconn.peerInfo.publicKey.get()) result.closeEvent.wait()
result = secured
secured.closeEvent.wait()
.addCallback do (udata: pointer): .addCallback do (udata: pointer):
trace "wrapped connection closed, closing upstream" trace "wrapped connection closed, closing upstream"
if not isNil(sconn) and not sconn.closed: if not isNil(sconn) and not sconn.closed:
asyncCheck sconn.close() asyncCheck sconn.close()
result.peerInfo = PeerInfo.init(sconn.peerInfo.publicKey.get())
method init(s: Secio) {.gcsafe.} = method init(s: Secio) {.gcsafe.} =
proc handle(conn: Connection, proto: string) {.async, gcsafe.} = proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
trace "handling connection" trace "handling connection"

View File

@ -314,6 +314,24 @@ proc publish*(s: Switch, topic: string, data: seq[byte]): Future[void] {.gcsafe.
result = s.pubSub.get().publish(topic, data) result = s.pubSub.get().publish(topic, data)
proc addValidator*(s: Switch,
topics: varargs[string],
hook: ValidatorHandler) =
# add validator
if s.pubSub.isNone:
raise newNoPubSubException()
s.pubSub.get().addValidator(topics, hook)
proc removeValidator*(s: Switch,
topics: varargs[string],
hook: ValidatorHandler) =
# pubslish to pubsub topic
if s.pubSub.isNone:
raise newNoPubSubException()
s.pubSub.get().removeValidator(topics, hook)
proc newSwitch*(peerInfo: PeerInfo, proc newSwitch*(peerInfo: PeerInfo,
transports: seq[Transport], transports: seq[Transport],
identity: Identify, identity: Identify,

View File

@ -10,11 +10,15 @@
import unittest, sequtils, options import unittest, sequtils, options
import chronos import chronos
import utils, import utils,
../../libp2p/[switch, crypto/crypto] ../../libp2p/[switch,
crypto/crypto,
protocols/pubsub/pubsub,
protocols/pubsub/rpc/messages,
protocols/pubsub/rpc/message]
suite "FloodSub": suite "FloodSub":
test "FloodSub basic publish/subscribe A -> B": test "FloodSub basic publish/subscribe A -> B":
proc testBasicPubSub(): Future[bool] {.async.} = proc runTests(): Future[bool] {.async.} =
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar" check topic == "foobar"
@ -36,10 +40,10 @@ suite "FloodSub":
await allFutures(awaiters) await allFutures(awaiters)
check: check:
waitFor(testBasicPubSub()) == true waitFor(runTests()) == true
test "FloodSub basic publish/subscribe B -> A": test "FloodSub basic publish/subscribe B -> A":
proc testBasicPubSub(): Future[bool] {.async.} = proc runTests(): Future[bool] {.async.} =
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar" check topic == "foobar"
@ -61,10 +65,107 @@ suite "FloodSub":
await allFutures(awaiters) await allFutures(awaiters)
check: check:
waitFor(testBasicPubSub()) == true waitFor(runTests()) == true
test "FloodSub validation should succeed":
proc runTests(): Future[bool] {.async.} =
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
handlerFut.complete(true)
var nodes = generateNodes(2)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await sleepAsync(1000.millis)
var validatorFut = newFuture[bool]()
proc validator(topic: string,
message: Message): Future[bool] {.async.} =
check topic == "foobar"
validatorFut.complete(true)
result = true
nodes[1].addValidator("foobar", validator)
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
await allFutures(handlerFut, handlerFut)
await allFutures(nodes[0].stop(), nodes[1].stop())
await allFutures(awaiters)
result = true
check:
waitFor(runTests()) == true
test "FloodSub validation should fail":
proc runTests(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check false # if we get here, it should fail
var nodes = generateNodes(2)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await sleepAsync(100.millis)
var validatorFut = newFuture[bool]()
proc validator(topic: string,
message: Message): Future[bool] {.async.} =
validatorFut.complete(true)
result = false
nodes[1].addValidator("foobar", validator)
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
await allFutures(nodes[0].stop(), nodes[1].stop())
await allFutures(awaiters)
result = true
check:
waitFor(runTests()) == true
test "FloodSub validation one fails and one succeeds":
proc runTests(): Future[bool] {.async.} =
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foo"
handlerFut.complete(true)
var nodes = generateNodes(2)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes)
await nodes[1].subscribe("foo", handler)
await nodes[1].subscribe("bar", handler)
await sleepAsync(1000.millis)
proc validator(topic: string,
message: Message): Future[bool] {.async.} =
if topic == "foo":
result = true
else:
result = false
nodes[1].addValidator("foo", "bar", validator)
await nodes[0].publish("foo", cast[seq[byte]]("Hello!"))
await nodes[0].publish("bar", cast[seq[byte]]("Hello!"))
await sleepAsync(100.millis)
await allFutures(nodes[0].stop(), nodes[1].stop())
await allFutures(awaiters)
result = true
check:
waitFor(runTests()) == true
test "FloodSub multiple peers, no self trigger": test "FloodSub multiple peers, no self trigger":
proc testBasicFloodSub(): Future[bool] {.async.} = proc runTests(): Future[bool] {.async.} =
var passed: int var passed: int
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar" check topic == "foobar"
@ -93,10 +194,10 @@ suite "FloodSub":
result = passed >= 10 # non deterministic, so at least 2 times result = passed >= 10 # non deterministic, so at least 2 times
check: check:
waitFor(testBasicFloodSub()) == true waitFor(runTests()) == true
test "FloodSub multiple peers, with self trigger": test "FloodSub multiple peers, with self trigger":
proc testBasicFloodSub(): Future[bool] {.async.} = proc runTests(): Future[bool] {.async.} =
var passed: int var passed: int
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar" check topic == "foobar"
@ -127,4 +228,4 @@ suite "FloodSub":
result = passed >= 10 # non deterministic, so at least 20 times result = passed >= 10 # non deterministic, so at least 20 times
check: check:
waitFor(testBasicFloodSub()) == true waitFor(runTests()) == true

View File

@ -9,22 +9,126 @@
import unittest, sequtils, options, tables, sets import unittest, sequtils, options, tables, sets
import chronos import chronos
import utils, ../../libp2p/[switch, import utils, ../../libp2p/[peer,
peer,
peerinfo, peerinfo,
connection, connection,
crypto/crypto, crypto/crypto,
stream/bufferstream, stream/bufferstream,
protocols/pubsub/pubsub, protocols/pubsub/pubsub,
protocols/pubsub/gossipsub] protocols/pubsub/gossipsub,
protocols/pubsub/rpc/messages]
proc createGossipSub(): GossipSub = proc createGossipSub(): GossipSub =
var peerInfo = PeerInfo.init(PrivateKey.random(RSA)) var peerInfo = PeerInfo.init(PrivateKey.random(RSA))
result = newPubSub(GossipSub, peerInfo) result = newPubSub(GossipSub, peerInfo)
suite "GossipSub": suite "GossipSub":
test "GossipSub validation should succeed":
proc runTests(): Future[bool] {.async.} =
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
handlerFut.complete(true)
var nodes = generateNodes(2, true)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await sleepAsync(1000.millis)
var validatorFut = newFuture[bool]()
proc validator(topic: string,
message: Message):
Future[bool] {.async.} =
check topic == "foobar"
validatorFut.complete(true)
result = true
nodes[1].addValidator("foobar", validator)
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
await allFutures(handlerFut, handlerFut)
await allFutures(nodes[0].stop(), nodes[1].stop())
await allFutures(awaiters)
result = true
check:
waitFor(runTests()) == true
test "GossipSub validation should fail":
proc runTests(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check false # if we get here, it should fail
var nodes = generateNodes(2, true)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await sleepAsync(100.millis)
var validatorFut = newFuture[bool]()
proc validator(topic: string,
message: Message):
Future[bool] {.async.} =
validatorFut.complete(true)
result = false
nodes[1].addValidator("foobar", validator)
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
await sleepAsync(100.millis)
discard await validatorFut
await allFutures(nodes[0].stop(), nodes[1].stop())
await allFutures(awaiters)
result = true
check:
waitFor(runTests()) == true
test "GossipSub validation one fails and one succeeds":
proc runTests(): Future[bool] {.async.} =
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foo"
handlerFut.complete(true)
var nodes = generateNodes(2, true)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes)
await nodes[1].subscribe("foo", handler)
await nodes[1].subscribe("bar", handler)
await sleepAsync(1000.millis)
proc validator(topic: string,
message: Message):
Future[bool] {.async.} =
if topic == "foo":
result = true
else:
result = false
nodes[1].addValidator("foo", "bar", validator)
await nodes[0].publish("foo", cast[seq[byte]]("Hello!"))
await nodes[0].publish("bar", cast[seq[byte]]("Hello!"))
await sleepAsync(100.millis)
await allFutures(nodes[0].stop(), nodes[1].stop())
await allFutures(awaiters)
result = true
check:
waitFor(runTests()) == true
test "should add remote peer topic subscriptions": test "should add remote peer topic subscriptions":
proc testRun(): Future[bool] {.async.} = proc runTests(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard discard
@ -54,7 +158,7 @@ suite "GossipSub":
result = true result = true
check: check:
waitFor(testRun()) == true waitFor(runTests()) == true
test "e2e - should add remote peer topic subscriptions": test "e2e - should add remote peer topic subscriptions":
proc testBasicGossipSub(): Future[bool] {.async.} = proc testBasicGossipSub(): Future[bool] {.async.} =
@ -91,7 +195,7 @@ suite "GossipSub":
waitFor(testBasicGossipSub()) == true waitFor(testBasicGossipSub()) == true
test "should add remote peer topic subscriptions if both peers are subscribed": test "should add remote peer topic subscriptions if both peers are subscribed":
proc testRun(): Future[bool] {.async.} = proc runTests(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard discard
@ -134,7 +238,7 @@ suite "GossipSub":
result = true result = true
check: check:
waitFor(testRun()) == true waitFor(runTests()) == true
test "e2e - should add remote peer topic subscriptions if both peers are subscribed": test "e2e - should add remote peer topic subscriptions if both peers are subscribed":
proc testBasicGossipSub(): Future[bool] {.async.} = proc testBasicGossipSub(): Future[bool] {.async.} =
@ -179,7 +283,7 @@ suite "GossipSub":
waitFor(testBasicGossipSub()) == true waitFor(testBasicGossipSub()) == true
# test "send over fanout A -> B": # test "send over fanout A -> B":
# proc testRun(): Future[bool] {.async.} = # proc runTests(): Future[bool] {.async.} =
# var handlerFut = newFuture[bool]() # var handlerFut = newFuture[bool]()
# proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
# check: # check:
@ -216,10 +320,10 @@ suite "GossipSub":
# result = await handlerFut # result = await handlerFut
# check: # check:
# waitFor(testRun()) == true # waitFor(runTests()) == true
test "e2e - send over fanout A -> B": test "e2e - send over fanout A -> B":
proc testRun(): Future[bool] {.async.} = proc runTests(): Future[bool] {.async.} =
var passed: bool var passed: bool
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar" check topic == "foobar"
@ -250,10 +354,10 @@ suite "GossipSub":
result = passed result = passed
check: check:
waitFor(testRun()) == true waitFor(runTests()) == true
# test "send over mesh A -> B": # test "send over mesh A -> B":
# proc testRun(): Future[bool] {.async.} = # proc runTests(): Future[bool] {.async.} =
# var passed: bool # var passed: bool
# proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
# check: # check:
@ -289,10 +393,10 @@ suite "GossipSub":
# result = passed # result = passed
# check: # check:
# waitFor(testRun()) == true # waitFor(runTests()) == true
# test "e2e - send over mesh A -> B": # test "e2e - send over mesh A -> B":
# proc testRun(): Future[bool] {.async.} = # proc runTests(): Future[bool] {.async.} =
# var passed: bool # var passed: bool
# proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
# check topic == "foobar" # check topic == "foobar"
@ -318,10 +422,10 @@ suite "GossipSub":
# result = passed # result = passed
# check: # check:
# waitFor(testRun()) == true # waitFor(runTests()) == true
# test "with multiple peers": # test "with multiple peers":
# proc testRun(): Future[bool] {.async.} = # proc runTests(): Future[bool] {.async.} =
# var nodes: seq[GossipSub] # var nodes: seq[GossipSub]
# for i in 0..<10: # for i in 0..<10:
# nodes.add(createGossipSub()) # nodes.add(createGossipSub())
@ -376,10 +480,10 @@ suite "GossipSub":
# result = true # result = true
# check: # check:
# waitFor(testRun()) == true # waitFor(runTests()) == true
test "e2e - with multiple peers": test "e2e - with multiple peers":
proc testRun(): Future[bool] {.async.} = proc runTests(): Future[bool] {.async.} =
var nodes: seq[Switch] = newSeq[Switch]() var nodes: seq[Switch] = newSeq[Switch]()
var awaitters: seq[Future[void]] var awaitters: seq[Future[void]]
@ -419,4 +523,4 @@ suite "GossipSub":
result = true result = true
check: check:
waitFor(testRun()) == true waitFor(runTests()) == true