From 9f3b80b60ca2e31f151cfe10204ab9db47da3e59 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 11 Sep 2019 23:46:08 -0600 Subject: [PATCH] got pubsub working without signing --- libp2p/protocols/pubsub/floodsub.nim | 13 ++-- libp2p/protocols/pubsub/pubsubpeer.nim | 5 +- libp2p/protocols/pubsub/rpcmsg.nim | 93 +++++++++++++++----------- 3 files changed, 64 insertions(+), 47 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 2b92b63ce..16a3d1899 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import sequtils, tables, options, sets, sequtils, strutils +import sequtils, tables, options, sets, sequtils, strutils, sets import chronos, chronicles import pubsub, pubsubpeer, @@ -51,13 +51,14 @@ proc rpcHandler(f: FloodSub, debug "processing RPC message", peer = peer.id, msg = rpcMsgs for m in rpcMsgs: # for all RPC messages + debug "processing message", msg = rpcMsgs if m.subscriptions.len > 0: # if there are any subscriptions - if peer.peerInfo.peerId.isSome: - debug "no valid PeerId for peer" - return - for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic let id = peer.id + + if not f.peerTopics.contains(s.topic): + f.peerTopics[s.topic] = initSet[string]() + if s.subscribe: debug "subscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic # subscribe the peer to the topic @@ -72,7 +73,7 @@ proc rpcHandler(f: FloodSub, await p.send(@[RPCMsg(subscriptions: m.subscriptions)]) if m.messages.len > 0: # if there are any messages - var toSendPeers: HashSet[string] + var toSendPeers: HashSet[string] = initSet[string]() for msg in m.messages: # for every message for t in msg.topicIDs: # for every topic in the message toSendPeers.incl(f.peerTopics[t]) # get all the peers interested in this topic diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index db2c81e0f..419774c86 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -13,6 +13,7 @@ import ../../connection, ../../protobuf/minprotobuf, ../../peerinfo, ../../peer, + ../../crypto/crypto, rpcmsg logScope: @@ -29,11 +30,11 @@ type RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} proc handle*(p: PubSubPeer) {.async, gcsafe.} = - debug "pubsub rpc", peer = p.id + debug "handling pubsub rpc", peer = p.id try: while not p.conn.closed: let data = await p.conn.readLp() - debug "Read data from peer", peer = p.peerInfo, data = data + debug "Read data from peer", peer = p.peerInfo, data = data.toHex() let msg = decodeRpcMsg(data) debug "Decoded msg from peer", peer = p.peerInfo, msg = msg await p.handler(p, @[msg]) diff --git a/libp2p/protocols/pubsub/rpcmsg.nim b/libp2p/protocols/pubsub/rpcmsg.nim index 9c1153c3d..4f3a95c49 100644 --- a/libp2p/protocols/pubsub/rpcmsg.nim +++ b/libp2p/protocols/pubsub/rpcmsg.nim @@ -79,47 +79,62 @@ proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} = var pb = initProtoBuffer(msg) result.subscriptions = newSeq[SubOpts]() - var subscr = newSeq[byte](1) - - # decode SubOpts array - if pb.enterSubMessage() > 0: - while true: - var subOpt: SubOpts - if pb.getBytes(1, subscr) < 0: - break - subOpt.subscribe = cast[bool](subscr) - - if pb.getString(2, subOpt.topic) < 0: - break - result.subscriptions.add(subOpt) - - result.messages = newSeq[Message]() - # TODO: which of this fields are really optional? - # Decode Messages array - if pb.enterSubMessage() > 0: - while true: - var msg: Message - if pb.getBytes(1, msg.fromPeer) < 0: - break - - if pb.getBytes(2, msg.data) < 0: - break - - if pb.getBytes(3, msg.seqno) < 0: - break - - var topic: string + while true: + # decode SubOpts array + var field = pb.enterSubMessage() + debug "processing submessage", field = field + case field: + of 0: + break + of 1: while true: - if pb.getString(4, topic) < 0: - break - topic.add(topic) - topic = "" - - if pb.getBytes(5, msg.signature) < 0: - break + var subOpt: SubOpts + var subscr: int + discard pb.getVarintValue(1, subscr) + subOpt.subscribe = cast[bool](subscr) + debug "read subscribe field", subscribe = subOpt.subscribe - if pb.getBytes(6, msg.key) < 0: - break + if pb.getString(2, subOpt.topic) < 0: + break + debug "read subscribe field", topicName = subOpt.topic + + result.subscriptions.add(subOpt) + debug "got subscriptions", subscriptions = result.subscriptions + + of 2: + result.messages = newSeq[Message]() + # TODO: which of this fields are really optional? + while true: + var msg: Message + if pb.getBytes(1, msg.fromPeer) < 0: + break + debug "read message field", fromPeer = msg.fromPeer + + if pb.getBytes(2, msg.data) < 0: + break + debug "read message field", data = msg.data + + if pb.getBytes(3, msg.seqno) < 0: + break + debug "read message field", seqno = msg.seqno + + var topic: string + while true: + if pb.getString(4, topic) < 0: + break + msg.topicIDs.add(topic) + debug "read message field", topicName = topic + topic = "" + + discard pb.getBytes(5, msg.signature) + debug "read message field", signature = msg.signature + + discard pb.getBytes(6, msg.key) + debug "read message field", key = msg.key + + result.messages.add(msg) + else: + raise newException(CatchableError, "message type not recognizedd") var prefix {.threadvar.}: seq[byte] proc getPreix(): var seq[byte] =