feat: allow multiple handlers per topic in pubsub

This commit is contained in:
Dmitriy Ryajov 2019-09-24 10:16:39 -06:00
parent 2f31fc6940
commit 5b3f93ba1c
10 changed files with 106 additions and 67 deletions

View File

@ -15,11 +15,11 @@ import tables, sequtils, options, strformat
import chronos, chronicles import chronos, chronicles
import coder, types, lpchannel, import coder, types, lpchannel,
../muxer, ../muxer,
../../varint, ../../varint,
../../connection, ../../connection,
../../vbuffer, ../../vbuffer,
../../protocols/protocol, ../../protocols/protocol,
../../stream/bufferstream, ../../stream/bufferstream,
../../stream/lpstream ../../stream/lpstream
logScope: logScope:

View File

@ -7,8 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import sequtils, tables, options, import sequtils, tables, options, sets, strutils
sets, sequtils, strutils, sets
import chronos, chronicles import chronos, chronicles
import pubsub, import pubsub,
pubsubpeer, pubsubpeer,
@ -41,10 +40,10 @@ proc sendSubs(f: FloodSub,
proc rpcHandler(f: FloodSub, proc rpcHandler(f: FloodSub,
peer: PubSubPeer, peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async, gcsafe.} = rpcMsgs: seq[RPCMsg]) {.async, gcsafe.} =
## method called by a PubSubPeer every ## method called by a PubSubPeer every
## time it receives an RPC message ## time it receives an RPC message
## ##
## The RPC message might contain subscriptions ## The RPC message might contain subscriptions
## or messages forwarded to this peer ## or messages forwarded to this peer
## ##
@ -77,7 +76,8 @@ proc rpcHandler(f: FloodSub,
for t in msg.topicIDs: # for every topic in the message for t in msg.topicIDs: # for every topic in the message
toSendPeers.incl(f.peerTopics[t]) # get all the peers interested in this topic toSendPeers.incl(f.peerTopics[t]) # get all the peers interested in this topic
if f.topics.contains(t): # check that we're subscribed to it if f.topics.contains(t): # check that we're subscribed to it
await f.topics[t].handler(t, msg.data) # trigger user provided handler for h in f.topics[t].handler:
await h(t, msg.data) # trigger user provided handler
# forward the message to all peers interested in it # forward the message to all peers interested in it
for p in toSendPeers: for p in toSendPeers:
@ -141,10 +141,11 @@ method subscribe*(f: FloodSub,
for p in f.peers.values: for p in f.peers.values:
await f.sendSubs(p, @[topic], true) await f.sendSubs(p, @[topic], true)
method unsubscribe*(f: FloodSub, topics: seq[string]) {.async, gcsafe.} = method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) {.async, gcsafe.} =
await procCall PubSub(f).unsubscribe(topics) await procCall PubSub(f).unsubscribe(topics)
for p in f.peers.values: for p in f.peers.values:
await f.sendSubs(p, topics, false) await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false)
proc newFloodSub*(peerInfo: PeerInfo): FloodSub = proc newFloodSub*(peerInfo: PeerInfo): FloodSub =
new result new result

View File

@ -21,11 +21,14 @@ logScope:
type type
TopicHandler* = proc (topic: string, TopicHandler* = proc (topic: string,
data: seq[byte]): Future[void] {.closure, gcsafe.} data: seq[byte]):
Future[void] {.closure, gcsafe.}
TopicPair* = tuple[topic: string, handler: TopicHandler]
Topic* = object Topic* = object
name*: string name*: string
handler*: TopicHandler handler*: seq[TopicHandler]
PubSub* = ref object of LPProtocol PubSub* = ref object of LPProtocol
peerInfo*: PeerInfo peerInfo*: PeerInfo
@ -37,12 +40,22 @@ method subscribeToPeer*(p: PubSub, conn: Connection) {.base, async, gcsafe.} =
## subscribe to a peer to send/receive pubsub messages ## subscribe to a peer to send/receive pubsub messages
discard discard
method unsubscribe*(p: PubSub, topics: seq[string]) {.base, async, gcsafe.} = method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async, gcsafe.} =
## unsubscribe from a list of ``topic`` strings ## unsubscribe from a list of ``topic`` strings
discard for t in topics:
for i, h in p.topics[t.topic].handler:
if h == t.handler:
p.topics[t.topic].handler.del(i)
method subscribe*(p: PubSub, method unsubscribe*(p: PubSub,
topic: string, topic: string,
handler: TopicHandler): Future[void] {.base, gcsafe.} =
## unsubscribe from a ``topic`` string
result = p.unsubscribe(@[(topic, handler)])
method subscribe*(p: PubSub,
topic: string,
handler: TopicHandler) handler: TopicHandler)
{.base, async, gcsafe.} = {.base, async, gcsafe.} =
## subscribe to a topic ## subscribe to a topic
@ -53,7 +66,10 @@ method subscribe*(p: PubSub,
## that will be triggered ## that will be triggered
## on every received message ## on every received message
## ##
p.topics[topic] = Topic(name: topic, handler: handler) if not p.topics.contains(topic):
p.topics[topic] = Topic(name: topic)
p.topics[topic].handler.add(handler)
method publish*(p: PubSub, topic: string, data: seq[byte]) {.base, async, gcsafe.} = method publish*(p: PubSub, topic: string, data: seq[byte]) {.base, async, gcsafe.} =
## publish to a ``topic`` ## publish to a ``topic``

View File

@ -9,12 +9,12 @@
import options import options
import chronos, chronicles import chronos, chronicles
import ../../connection, import rpcmsg,
../../protobuf/minprotobuf,
../../peerinfo,
../../peer, ../../peer,
../../peerinfo,
../../connection,
../../crypto/crypto, ../../crypto/crypto,
rpcmsg ../../protobuf/minprotobuf
logScope: logScope:
topic = "PubSubPeer" topic = "PubSubPeer"
@ -40,7 +40,6 @@ proc handle*(p: PubSubPeer) {.async, gcsafe.} =
await p.handler(p, @[msg]) await p.handler(p, @[msg])
except: except:
error "An exception occured while processing pubsub rpc requests", exc = getCurrentExceptionMsg() error "An exception occured while processing pubsub rpc requests", exc = getCurrentExceptionMsg()
return
finally: finally:
trace "closing connection to pubsub peer", peer = p.id trace "closing connection to pubsub peer", peer = p.id
await p.conn.close() await p.conn.close()

View File

@ -17,8 +17,6 @@ import ../../peerinfo,
logScope: logScope:
topic = "RpcMsg" topic = "RpcMsg"
const SignPrefix = "libp2p-pubsub:"
type type
SubOpts* = object SubOpts* = object
subscribe*: bool subscribe*: bool
@ -87,7 +85,7 @@ proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} =
var field = pb.enterSubMessage() var field = pb.enterSubMessage()
trace "processing submessage", field = field trace "processing submessage", field = field
case field: case field:
of 0: of 0:
break break
of 1: of 1:
while true: while true:
@ -139,27 +137,38 @@ proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} =
else: else:
raise newException(CatchableError, "message type not recognized") raise newException(CatchableError, "message type not recognized")
var prefix {.threadvar.}: seq[byte]
proc getPreix(): var seq[byte] =
if prefix.len == 0:
prefix = cast[seq[byte]](SignPrefix)
result = prefix
proc sign*(peerId: PeerID, msg: Message): Message = proc sign*(peerId: PeerID, msg: Message): Message =
var buff = initProtoBuffer() var buff = initProtoBuffer()
encodeMessage(msg, buff) encodeMessage(msg, buff)
# NOTE: leave as is, moving out would imply making this .threadsafe., etc...
let prefix = cast[seq[byte]]("libp2p-pubsub:")
if buff.buffer.len > 0: if buff.buffer.len > 0:
result = msg result = msg
if peerId.privateKey.isSome: if peerId.privateKey.isSome:
result.signature = peerId. result.signature = peerId.
privateKey. privateKey.
get(). get().
sign(getPreix() & buff.buffer). sign(prefix & buff.buffer).
getBytes() getBytes()
proc makeMessage*(peerId: PeerID, proc verify*(peerId: PeerID, m: Message): bool =
data: seq[byte], if m.signature.len > 0 and m.key.len > 0:
name: string): Message {.gcsafe.} = var msg = m
msg.signature = @[]
msg.key = @[]
var buff = initProtoBuffer()
encodeMessage(msg, buff)
var remote: Signature
var key: PublicKey
if remote.init(m.signature) and key.init(m.key):
result = remote.verify(buff.buffer, key)
proc makeMessage*(peerId: PeerID,
data: seq[byte],
name: string,
sign: bool = true): Message {.gcsafe.} =
var seqno: seq[byte] = newSeq[byte](20) var seqno: seq[byte] = newSeq[byte](20)
if randomBytes(addr seqno[0], 20) > 0: if randomBytes(addr seqno[0], 20) > 0:
var key: seq[byte] = @[] var key: seq[byte] = @[]
@ -168,9 +177,10 @@ proc makeMessage*(peerId: PeerID,
key = peerId.publicKey.get().getRawBytes() key = peerId.publicKey.get().getRawBytes()
result = Message(fromPeer: peerId.getBytes(), result = Message(fromPeer: peerId.getBytes(),
data: data, data: data,
seqno: seqno, seqno: seqno,
topicIDs: @[name], topicIDs: @[name])
signature: @[], if sign:
key: key) result = sign(peerId, result)
result = sign(peerId, result)
result.key = key

View File

@ -15,6 +15,6 @@ type
Secure* = ref object of LPProtocol # base type for secure managers Secure* = ref object of LPProtocol # base type for secure managers
method secure*(p: Secure, conn: Connection): Future[Connection] method secure*(p: Secure, conn: Connection): Future[Connection]
{.base, async, gcsafe.} = {.base, async, gcsafe.} =
## default implementation matches plaintext ## default implementation matches plaintext
result = conn result = conn

View File

@ -266,7 +266,7 @@ proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] {
result = s.pubSub.get().subscribe(topic, handler) result = s.pubSub.get().subscribe(topic, handler)
proc unsubscribe*(s: Switch, topics: seq[string]): Future[void] {.gcsafe.} = proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] {.gcsafe.} =
## unsubscribe from topics ## unsubscribe from topics
if s.pubSub.isNone: if s.pubSub.isNone:
raise newNoPubSubException() raise newNoPubSubException()

View File

@ -8,9 +8,12 @@
## those terms. ## those terms.
import chronos, chronicles import chronos, chronicles
import transport, ../wire, ../connection, import transport,
../multiaddress, ../connection, ../wire,
../multicodec, ../stream/chronosstream ../connection,
../multiaddress,
../multicodec,
../stream/chronosstream
logScope: logScope:
topic = "TcpTransport" topic = "TcpTransport"
@ -20,8 +23,8 @@ type TcpTransport* = ref object of Transport
proc connHandler*(t: Transport, proc connHandler*(t: Transport,
server: StreamServer, server: StreamServer,
client: StreamTransport, client: StreamTransport,
initiator: bool = false): initiator: bool = false):
Future[Connection] {.async, gcsafe.} = Future[Connection] {.async, gcsafe.} =
trace "handling connection for", address = $client.remoteAddress trace "handling connection for", address = $client.remoteAddress
let conn: Connection = newConnection(newChronosStream(server, client)) let conn: Connection = newConnection(newChronosStream(server, client))
@ -53,7 +56,7 @@ method close*(t: TcpTransport): Future[void] {.async, gcsafe.} =
method listen*(t: TcpTransport, method listen*(t: TcpTransport,
ma: MultiAddress, ma: MultiAddress,
handler: ConnHandler): handler: ConnHandler):
# TODO: need to check how this futures # TODO: need to check how this futures
# are being returned, it doesn't seem to be right # are being returned, it doesn't seem to be right
Future[Future[void]] {.async, gcsafe.} = Future[Future[void]] {.async, gcsafe.} =
discard await procCall Transport(t).listen(ma, handler) # call base discard await procCall Transport(t).listen(ma, handler) # call base
@ -64,11 +67,13 @@ method listen*(t: TcpTransport,
result = t.server.join() result = t.server.join()
method dial*(t: TcpTransport, method dial*(t: TcpTransport,
address: MultiAddress): address: MultiAddress):
Future[Connection] {.async, gcsafe.} = Future[Connection] {.async, gcsafe.} =
trace "dialing remote peer", address = $address trace "dialing remote peer", address = $address
## dial a peer ## dial a peer
let client: StreamTransport = await connect(address) let client: StreamTransport = await connect(address)
result = await t.connHandler(t.server, client, true) result = await t.connHandler(t.server, client, true)
method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} = true method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} =
## TODO: implement logic to properly discriminat TCP multiaddrs
true

View File

@ -16,14 +16,17 @@ import ../libp2p/switch,
../libp2p/protocols/secure/secure, ../libp2p/protocols/secure/secure,
../libp2p/protocols/secure/secio, ../libp2p/protocols/secure/secio,
../libp2p/protocols/pubsub/pubsub, ../libp2p/protocols/pubsub/pubsub,
../libp2p/protocols/pubsub/floodsub ../libp2p/protocols/pubsub/floodsub,
../libp2p/base58
type type
TestProto = ref object of LPProtocol TestProto = ref object of LPProtocol
switch*: Switch switch*: Switch
method init(p: TestProto) {.gcsafe.} = method init(p: TestProto) {.gcsafe.} =
proc handle(stream: Connection, proto: string) {.async, gcsafe.} = discard proc handle(stream: Connection, proto: string) {.async, gcsafe.} =
echo "IN PROTO HANDLER!!!!!!!!!!!!!!!!!!!!!!!!!!"
echo cast[string](await stream.readLp())
p.codec = "/test/proto/1.0.0" p.codec = "/test/proto/1.0.0"
p.handler = handle p.handler = handle
@ -34,7 +37,7 @@ proc newTestProto(switch: Switch): TestProto =
result.init() result.init()
proc main() {.async.} = proc main() {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/30333") let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/52521")
let seckey = PrivateKey.random(RSA) let seckey = PrivateKey.random(RSA)
var peerInfo: PeerInfo var peerInfo: PeerInfo
@ -48,9 +51,14 @@ proc main() {.async.} =
let transports = @[Transport(newTransport(TcpTransport))] let transports = @[Transport(newTransport(TcpTransport))]
let muxers = [(MplexCodec, mplexProvider)].toTable() let muxers = [(MplexCodec, mplexProvider)].toTable()
let identify = newIdentify(peerInfo) let identify = newIdentify(peerInfo)
# let secureManagers = @[Secure(newSecIo(seckey.getKey()))] let secureManagers = [(SecioCodec, Secure(newSecio(seckey)))].toTable()
let pubSub = some(PubSub(newFloodSub(peerInfo))) let pubSub = some(PubSub(newFloodSub(peerInfo)))
let switch = newSwitch(peerInfo, transports, identify, muxers, pubSub = pubSub) let switch = newSwitch(peerInfo,
transports,
identify,
muxers,
secureManagers,
pubSub)
var libp2pFuts = await switch.start() var libp2pFuts = await switch.start()
echo "Right after start" echo "Right after start"
@ -58,20 +66,22 @@ proc main() {.async.} =
echo item.finished echo item.finished
var remotePeer: PeerInfo var remotePeer: PeerInfo
remotePeer.peerId = some(PeerID.init("QmUA1Ghihi5u3gDwEDxhbu49jU42QPbvHttZFwB6b4K5oC")) remotePeer.peerId = some(PeerID.init("QmPT854SM2WqCAXm4KsYkJs1NPft64m7ubaa8mgV5Tvvqg"))
remotePeer.addrs.add(ma) remotePeer.addrs.add(ma)
switch.mount(newTestProto(switch)) switch.mount(newTestProto(switch))
echo "PeerID: " & peerInfo.peerId.get().pretty echo "PeerID: " & peerInfo.peerId.get().pretty
# let conn = await switch.dial(remotePeer, "/test/proto/1.0.0") # let conn = await switch.dial(remotePeer, "/test/proto/1.0.0")
# await conn.writeLp(cast[seq[byte]]("Hello from nim!!"))
await switch.subscribeToPeer(remotePeer) await switch.subscribeToPeer(remotePeer)
proc handler(topic: string, data: seq[byte]): Future[void] {.closure, gcsafe.} = proc handler(topic: string, data: seq[byte]): Future[void] {.closure, gcsafe.} =
debug "IN HANDLER" debug "IN HANDLER"
await switch.subscribe("mytesttopic", handler) let topic = Base58.encode(cast[seq[byte]]("chat"))
# let msg = cast[seq[byte]]("hello from nim") await switch.subscribe(topic, handler)
# await switch.publish("mytesttopic", msg) let msg = cast[seq[byte]]("hello from nim")
await switch.publish(topic, msg)
# debug "published message from test" # debug "published message from test"
# TODO: for some reason the connection closes unless I do a forever loop # TODO: for some reason the connection closes unless I do a forever loop
await allFutures(libp2pFuts) await allFutures(libp2pFuts)

View File

@ -11,6 +11,4 @@ import unittest
import chronos import chronos
suite "PubSub": suite "PubSub":
test "PubSub subscribe": discard test "basic FloodSub": discard
test "PubSub unsubscribe": discard