From 5b3f93ba1c2684ad29837a4d0d4df505963dd525 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 24 Sep 2019 10:16:39 -0600 Subject: [PATCH] feat: allow multiple handlers per topic in pubsub --- libp2p/muxers/mplex/mplex.nim | 8 ++-- libp2p/protocols/pubsub/floodsub.nim | 15 +++---- libp2p/protocols/pubsub/pubsub.nim | 30 ++++++++++---- libp2p/protocols/pubsub/pubsubpeer.nim | 9 ++--- libp2p/protocols/pubsub/rpcmsg.nim | 54 +++++++++++++++----------- libp2p/protocols/secure/secure.nim | 2 +- libp2p/switch.nim | 2 +- libp2p/transports/tcptransport.nim | 21 ++++++---- tests/test.nim | 28 ++++++++----- tests/testpubsub.nim | 4 +- 10 files changed, 106 insertions(+), 67 deletions(-) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 2aa2b37ea..06655d155 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -15,11 +15,11 @@ import tables, sequtils, options, strformat import chronos, chronicles import coder, types, lpchannel, ../muxer, - ../../varint, - ../../connection, - ../../vbuffer, + ../../varint, + ../../connection, + ../../vbuffer, ../../protocols/protocol, - ../../stream/bufferstream, + ../../stream/bufferstream, ../../stream/lpstream logScope: diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index aed04beda..b04812840 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -7,8 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import sequtils, tables, options, - sets, sequtils, strutils, sets +import sequtils, tables, options, sets, strutils import chronos, chronicles import pubsub, pubsubpeer, @@ -41,10 +40,10 @@ proc sendSubs(f: FloodSub, proc rpcHandler(f: FloodSub, peer: PubSubPeer, rpcMsgs: seq[RPCMsg]) {.async, gcsafe.} = - ## method called by a PubSubPeer every + ## method called by a PubSubPeer every ## time it receives an RPC message ## - ## The RPC message might contain subscriptions + ## The RPC message might contain subscriptions ## or messages forwarded to this peer ## @@ -77,7 +76,8 @@ proc rpcHandler(f: FloodSub, for t in msg.topicIDs: # for every topic in the message toSendPeers.incl(f.peerTopics[t]) # get all the peers interested in this topic if f.topics.contains(t): # check that we're subscribed to it - await f.topics[t].handler(t, msg.data) # trigger user provided handler + for h in f.topics[t].handler: + await h(t, msg.data) # trigger user provided handler # forward the message to all peers interested in it for p in toSendPeers: @@ -141,10 +141,11 @@ method subscribe*(f: FloodSub, for p in f.peers.values: await f.sendSubs(p, @[topic], true) -method unsubscribe*(f: FloodSub, topics: seq[string]) {.async, gcsafe.} = +method unsubscribe*(f: FloodSub, + topics: seq[TopicPair]) {.async, gcsafe.} = await procCall PubSub(f).unsubscribe(topics) for p in f.peers.values: - await f.sendSubs(p, topics, false) + await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) proc newFloodSub*(peerInfo: PeerInfo): FloodSub = new result diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index ece8d5807..e407a755b 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -21,11 +21,14 @@ logScope: type TopicHandler* = proc (topic: string, - data: seq[byte]): Future[void] {.closure, gcsafe.} + data: seq[byte]): + Future[void] {.closure, gcsafe.} + + TopicPair* = tuple[topic: string, handler: TopicHandler] Topic* = object name*: string - handler*: TopicHandler + handler*: seq[TopicHandler] PubSub* = ref object of LPProtocol peerInfo*: PeerInfo @@ -37,12 +40,22 @@ method subscribeToPeer*(p: PubSub, conn: Connection) {.base, async, gcsafe.} = ## subscribe to a peer to send/receive pubsub messages discard -method unsubscribe*(p: PubSub, topics: seq[string]) {.base, async, gcsafe.} = +method unsubscribe*(p: PubSub, + topics: seq[TopicPair]) {.base, async, gcsafe.} = ## unsubscribe from a list of ``topic`` strings - discard + for t in topics: + for i, h in p.topics[t.topic].handler: + if h == t.handler: + p.topics[t.topic].handler.del(i) -method subscribe*(p: PubSub, - topic: string, +method unsubscribe*(p: PubSub, + topic: string, + handler: TopicHandler): Future[void] {.base, gcsafe.} = + ## unsubscribe from a ``topic`` string + result = p.unsubscribe(@[(topic, handler)]) + +method subscribe*(p: PubSub, + topic: string, handler: TopicHandler) {.base, async, gcsafe.} = ## subscribe to a topic @@ -53,7 +66,10 @@ method subscribe*(p: PubSub, ## that will be triggered ## on every received message ## - p.topics[topic] = Topic(name: topic, handler: handler) + if not p.topics.contains(topic): + p.topics[topic] = Topic(name: topic) + + p.topics[topic].handler.add(handler) method publish*(p: PubSub, topic: string, data: seq[byte]) {.base, async, gcsafe.} = ## publish to a ``topic`` diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index d755a5fbb..39035c83d 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -9,12 +9,12 @@ import options import chronos, chronicles -import ../../connection, - ../../protobuf/minprotobuf, - ../../peerinfo, +import rpcmsg, ../../peer, + ../../peerinfo, + ../../connection, ../../crypto/crypto, - rpcmsg + ../../protobuf/minprotobuf logScope: topic = "PubSubPeer" @@ -40,7 +40,6 @@ proc handle*(p: PubSubPeer) {.async, gcsafe.} = await p.handler(p, @[msg]) except: error "An exception occured while processing pubsub rpc requests", exc = getCurrentExceptionMsg() - return finally: trace "closing connection to pubsub peer", peer = p.id await p.conn.close() diff --git a/libp2p/protocols/pubsub/rpcmsg.nim b/libp2p/protocols/pubsub/rpcmsg.nim index 0fb746b01..570d7a215 100644 --- a/libp2p/protocols/pubsub/rpcmsg.nim +++ b/libp2p/protocols/pubsub/rpcmsg.nim @@ -17,8 +17,6 @@ import ../../peerinfo, logScope: topic = "RpcMsg" -const SignPrefix = "libp2p-pubsub:" - type SubOpts* = object subscribe*: bool @@ -87,7 +85,7 @@ proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} = var field = pb.enterSubMessage() trace "processing submessage", field = field case field: - of 0: + of 0: break of 1: while true: @@ -139,27 +137,38 @@ proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} = else: raise newException(CatchableError, "message type not recognized") -var prefix {.threadvar.}: seq[byte] -proc getPreix(): var seq[byte] = - if prefix.len == 0: - prefix = cast[seq[byte]](SignPrefix) - result = prefix - proc sign*(peerId: PeerID, msg: Message): Message = var buff = initProtoBuffer() encodeMessage(msg, buff) + # NOTE: leave as is, moving out would imply making this .threadsafe., etc... + let prefix = cast[seq[byte]]("libp2p-pubsub:") if buff.buffer.len > 0: result = msg if peerId.privateKey.isSome: result.signature = peerId. - privateKey. - get(). - sign(getPreix() & buff.buffer). - getBytes() + privateKey. + get(). + sign(prefix & buff.buffer). + getBytes() -proc makeMessage*(peerId: PeerID, - data: seq[byte], - name: string): Message {.gcsafe.} = +proc verify*(peerId: PeerID, m: Message): bool = + if m.signature.len > 0 and m.key.len > 0: + var msg = m + msg.signature = @[] + msg.key = @[] + + var buff = initProtoBuffer() + encodeMessage(msg, buff) + + var remote: Signature + var key: PublicKey + if remote.init(m.signature) and key.init(m.key): + result = remote.verify(buff.buffer, key) + +proc makeMessage*(peerId: PeerID, + data: seq[byte], + name: string, + sign: bool = true): Message {.gcsafe.} = var seqno: seq[byte] = newSeq[byte](20) if randomBytes(addr seqno[0], 20) > 0: var key: seq[byte] = @[] @@ -168,9 +177,10 @@ proc makeMessage*(peerId: PeerID, key = peerId.publicKey.get().getRawBytes() result = Message(fromPeer: peerId.getBytes(), - data: data, - seqno: seqno, - topicIDs: @[name], - signature: @[], - key: key) - result = sign(peerId, result) + data: data, + seqno: seqno, + topicIDs: @[name]) + if sign: + result = sign(peerId, result) + + result.key = key diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index e61a6f3cd..5d78679c8 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -15,6 +15,6 @@ type Secure* = ref object of LPProtocol # base type for secure managers method secure*(p: Secure, conn: Connection): Future[Connection] - {.base, async, gcsafe.} = + {.base, async, gcsafe.} = ## default implementation matches plaintext result = conn diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 6ddc61987..f9a783503 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -266,7 +266,7 @@ proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] { result = s.pubSub.get().subscribe(topic, handler) -proc unsubscribe*(s: Switch, topics: seq[string]): Future[void] {.gcsafe.} = +proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] {.gcsafe.} = ## unsubscribe from topics if s.pubSub.isNone: raise newNoPubSubException() diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 7ed9ca327..9986c3afe 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -8,9 +8,12 @@ ## those terms. import chronos, chronicles -import transport, ../wire, ../connection, - ../multiaddress, ../connection, - ../multicodec, ../stream/chronosstream +import transport, + ../wire, + ../connection, + ../multiaddress, + ../multicodec, + ../stream/chronosstream logScope: topic = "TcpTransport" @@ -20,8 +23,8 @@ type TcpTransport* = ref object of Transport proc connHandler*(t: Transport, server: StreamServer, - client: StreamTransport, - initiator: bool = false): + client: StreamTransport, + initiator: bool = false): Future[Connection] {.async, gcsafe.} = trace "handling connection for", address = $client.remoteAddress let conn: Connection = newConnection(newChronosStream(server, client)) @@ -53,7 +56,7 @@ method close*(t: TcpTransport): Future[void] {.async, gcsafe.} = method listen*(t: TcpTransport, ma: MultiAddress, handler: ConnHandler): - # TODO: need to check how this futures + # TODO: need to check how this futures # are being returned, it doesn't seem to be right Future[Future[void]] {.async, gcsafe.} = discard await procCall Transport(t).listen(ma, handler) # call base @@ -64,11 +67,13 @@ method listen*(t: TcpTransport, result = t.server.join() method dial*(t: TcpTransport, - address: MultiAddress): + address: MultiAddress): Future[Connection] {.async, gcsafe.} = trace "dialing remote peer", address = $address ## dial a peer let client: StreamTransport = await connect(address) result = await t.connHandler(t.server, client, true) -method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} = true +method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} = + ## TODO: implement logic to properly discriminat TCP multiaddrs + true diff --git a/tests/test.nim b/tests/test.nim index 5dcfc44cc..0e78bdb95 100644 --- a/tests/test.nim +++ b/tests/test.nim @@ -16,14 +16,17 @@ import ../libp2p/switch, ../libp2p/protocols/secure/secure, ../libp2p/protocols/secure/secio, ../libp2p/protocols/pubsub/pubsub, - ../libp2p/protocols/pubsub/floodsub + ../libp2p/protocols/pubsub/floodsub, + ../libp2p/base58 type TestProto = ref object of LPProtocol switch*: Switch method init(p: TestProto) {.gcsafe.} = - proc handle(stream: Connection, proto: string) {.async, gcsafe.} = discard + proc handle(stream: Connection, proto: string) {.async, gcsafe.} = + echo "IN PROTO HANDLER!!!!!!!!!!!!!!!!!!!!!!!!!!" + echo cast[string](await stream.readLp()) p.codec = "/test/proto/1.0.0" p.handler = handle @@ -34,7 +37,7 @@ proc newTestProto(switch: Switch): TestProto = result.init() proc main() {.async.} = - let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/30333") + let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/52521") let seckey = PrivateKey.random(RSA) var peerInfo: PeerInfo @@ -48,9 +51,14 @@ proc main() {.async.} = let transports = @[Transport(newTransport(TcpTransport))] let muxers = [(MplexCodec, mplexProvider)].toTable() let identify = newIdentify(peerInfo) - # let secureManagers = @[Secure(newSecIo(seckey.getKey()))] + let secureManagers = [(SecioCodec, Secure(newSecio(seckey)))].toTable() let pubSub = some(PubSub(newFloodSub(peerInfo))) - let switch = newSwitch(peerInfo, transports, identify, muxers, pubSub = pubSub) + let switch = newSwitch(peerInfo, + transports, + identify, + muxers, + secureManagers, + pubSub) var libp2pFuts = await switch.start() echo "Right after start" @@ -58,20 +66,22 @@ proc main() {.async.} = echo item.finished var remotePeer: PeerInfo - remotePeer.peerId = some(PeerID.init("QmUA1Ghihi5u3gDwEDxhbu49jU42QPbvHttZFwB6b4K5oC")) + remotePeer.peerId = some(PeerID.init("QmPT854SM2WqCAXm4KsYkJs1NPft64m7ubaa8mgV5Tvvqg")) remotePeer.addrs.add(ma) switch.mount(newTestProto(switch)) echo "PeerID: " & peerInfo.peerId.get().pretty # let conn = await switch.dial(remotePeer, "/test/proto/1.0.0") + # await conn.writeLp(cast[seq[byte]]("Hello from nim!!")) await switch.subscribeToPeer(remotePeer) proc handler(topic: string, data: seq[byte]): Future[void] {.closure, gcsafe.} = debug "IN HANDLER" - await switch.subscribe("mytesttopic", handler) - # let msg = cast[seq[byte]]("hello from nim") - # await switch.publish("mytesttopic", msg) + let topic = Base58.encode(cast[seq[byte]]("chat")) + await switch.subscribe(topic, handler) + let msg = cast[seq[byte]]("hello from nim") + await switch.publish(topic, msg) # debug "published message from test" # TODO: for some reason the connection closes unless I do a forever loop await allFutures(libp2pFuts) diff --git a/tests/testpubsub.nim b/tests/testpubsub.nim index 81ca2209c..46ae482a0 100644 --- a/tests/testpubsub.nim +++ b/tests/testpubsub.nim @@ -11,6 +11,4 @@ import unittest import chronos suite "PubSub": - test "PubSub subscribe": discard - test "PubSub unsubscribe": discard - + test "basic FloodSub": discard