From 8920cd7d60c5b23e2e05bd484565447f54bde35a Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 11 Sep 2019 20:10:38 -0600 Subject: [PATCH] misc: pubsub/floodsub and logging --- libp2p/crypto/crypto.nim | 4 +-- libp2p/muxers/mplex/coder.nim | 2 +- libp2p/muxers/mplex/mplex.nim | 4 ++- libp2p/protocols/pubsub/floodsub.nim | 35 +++++++++++++++---- libp2p/protocols/pubsub/pubsub.nim | 11 ++++-- libp2p/protocols/pubsub/pubsubpeer.nim | 17 +++++++-- libp2p/protocols/pubsub/rpcmsg.nim | 48 +++++++++++++++++--------- libp2p/stream/bufferstream.nim | 2 +- libp2p/stream/chronosstream.nim | 6 +++- libp2p/switch.nim | 23 ++++++++---- tests/test.nim | 48 +++++++++++++++++--------- tests/testbufferstream.nim | 14 ++++---- tests/testidentify.nim | 46 +++++++++++++----------- tests/testmplex.nim | 10 +++--- tests/testtransport.nim | 8 ++--- 15 files changed, 183 insertions(+), 95 deletions(-) diff --git a/libp2p/crypto/crypto.nim b/libp2p/crypto/crypto.nim index 9ffdb70..736fa8b 100644 --- a/libp2p/crypto/crypto.nim +++ b/libp2p/crypto/crypto.nim @@ -47,7 +47,7 @@ type skkey*: SkPublicKey of ECDSA: eckey*: EcPublicKey - of NoSupport: + else: discard PrivateKey* = object @@ -60,7 +60,7 @@ type skkey*: SkPrivateKey of ECDSA: eckey*: EcPrivateKey - of NoSupport: + else: discard KeyPair* = object diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index bd2827f..092e322 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -75,4 +75,4 @@ proc writeMsg*(conn: Connection, id: uint, msgType: MessageType, data: string) {.async, gcsafe.} = - result = conn.writeMsg(id, msgType, cast[seq[byte]](toSeq(data.items))) + result = conn.writeMsg(id, msgType, cast[seq[byte]](data)) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index b5a5346..b0cd263 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -28,7 +28,7 @@ import coder, types, channel, ../../stream/lpstream logScope: - topic = "mplex" + topic = "Mplex" type Mplex* = ref object of Muxer @@ -106,6 +106,8 @@ method handle*(m: Mplex) {.async, gcsafe.} = await channel.resetByRemote() break else: raise newMplexUnknownMsgError() + except: + debug "exception occurred", exception = getCurrentExceptionMsg() finally: await m.connection.close() diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index e6777dc..2b92b63 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import sequtils, tables, options, sets +import sequtils, tables, options, sets, sequtils, strutils import chronos, chronicles import pubsub, pubsubpeer, @@ -30,8 +30,10 @@ proc sendSubs(f: FloodSub, subscribe: bool) {.async, gcsafe.} = ## send subscriptions to remote peer + debug "sending subscriptions", peer = peer.id, subscribe = subscribe var msg: RPCMsg for t in topics: + debug "sending topic", peer = peer.id, subscribe = subscribe, topicName = t msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe)) await peer.send(@[msg]) @@ -47,14 +49,21 @@ proc rpcHandler(f: FloodSub, ## or messages forwarded to this peer ## + debug "processing RPC message", peer = peer.id, msg = rpcMsgs for m in rpcMsgs: # for all RPC messages if m.subscriptions.len > 0: # if there are any subscriptions + if peer.peerInfo.peerId.isSome: + debug "no valid PeerId for peer" + return + for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic - let id = peer.conn.peerInfo.get().peerId.pretty + let id = peer.id if s.subscribe: + debug "subscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic # subscribe the peer to the topic f.peerTopics[s.topic].incl(id) else: + debug "unsubscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic # unsubscribe the peer to the topic f.peerTopics[s.topic].excl(id) @@ -70,6 +79,7 @@ proc rpcHandler(f: FloodSub, if f.topics.contains(t): # check that we're subscribed to it await f.topics[t].handler(t, msg.data) # trigger user provided handler + # forward the message to all peers interested it for p in toSendPeers: await f.peers[p].send(@[RPCMsg(messages: m.messages)]) @@ -89,7 +99,11 @@ proc handleConn(f: FloodSub, conn: Connection) {.async, gcsafe.} = await f.rpcHandler(peer, msgs) var peer = newPubSubPeer(conn, handleRpc) - f.peers[peer.conn.peerInfo.get().peerId.pretty()] = peer + if peer.peerInfo.peerId.isNone: + debug "no valid PeerInfo for peer" + return + + f.peers[peer.id] = peer let topics = toSeq(f.topics.keys) await f.sendSubs(peer, topics, true) asyncCheck peer.handle() @@ -106,16 +120,19 @@ method init(f: FloodSub) = f.handler = handler f.codec = FloodSubCodec -method subscribePeer*(f: FloodSub, conn: Connection) {.async, gcsafe.} = +method subscribeToPeer*(f: FloodSub, conn: Connection) {.async, gcsafe.} = await f.handleConn(conn) method publish*(f: FloodSub, topic: string, data: seq[byte]) {.async, gcsafe.} = - for p in f.peerTopics[topic]: - f.peers[p].send(Message(fromPeer: f.peerInfo.peerId.data, - data: data)) + debug "about to publish message on topic", topic = topic, data = data + let msg = makeMessage(f.peerInfo.peerId.get(), data, topic) + if topic in f.peerTopics: + for p in f.peerTopics[topic]: + debug "pubslishing message", topic = topic, peer = p, data = data + await f.peers[p].send(@[RPCMsg(messages: @[msg])]) method subscribe*(f: FloodSub, topic: string, @@ -133,3 +150,7 @@ method unsubscribe*(f: FloodSub, topics: seq[string]) {.async, gcsafe.} = proc newFloodSub*(peerInfo: PeerInfo): FloodSub = new result result.peerInfo = peerInfo + result.peers = initTable[string, PubSubPeer]() + result.topics = initTable[string, Topic]() + result.peerTopics = initTable[string, HashSet[string]]() + result.init() diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 52a3d5f..ece8d58 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -8,7 +8,7 @@ ## those terms. import tables, sets -import chronos +import chronos, chronicles import pubsubpeer, ../protocol, ../../connection, @@ -16,8 +16,13 @@ import pubsubpeer, export PubSubPeer +logScope: + topic = "PubSub" + type - TopicHandler* = proc(topic:string, data: seq[byte]): Future[void] {.gcsafe.} + TopicHandler* = proc (topic: string, + data: seq[byte]): Future[void] {.closure, gcsafe.} + Topic* = object name*: string handler*: TopicHandler @@ -28,7 +33,7 @@ type peers*: Table[string, PubSubPeer] # peerid to peer map peerTopics*: Table[string, HashSet[string]] # topic to remote peer map -method subscribePeer*(p: PubSub, conn: Connection) {.base, async, gcsafe.} = +method subscribeToPeer*(p: PubSub, conn: Connection) {.base, async, gcsafe.} = ## subscribe to a peer to send/receive pubsub messages discard diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 3844937..db2c81e 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -12,6 +12,7 @@ import chronos, chronicles import ../../connection, ../../protobuf/minprotobuf, ../../peerinfo, + ../../peer, rpcmsg logScope: @@ -19,28 +20,40 @@ logScope: type PubSubPeer* = ref object of RootObj + peerInfo*: PeerInfo conn*: Connection handler*: RPCHandler topics*: seq[string] + id*: string # base58 peer id string RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} proc handle*(p: PubSubPeer) {.async, gcsafe.} = + debug "pubsub rpc", peer = p.id try: while not p.conn.closed: - let msg = decodeRpcMsg(await p.conn.readLp()) + let data = await p.conn.readLp() + debug "Read data from peer", peer = p.peerInfo, data = data + let msg = decodeRpcMsg(data) + debug "Decoded msg from peer", peer = p.peerInfo, msg = msg await p.handler(p, @[msg]) except: debug "An exception occured while processing pubsub rpc requests", exc = getCurrentExceptionMsg() return finally: + debug "closing connection to pubsub peer", peer = p.id await p.conn.close() proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} = for m in msgs: - await p.conn.writeLp(encodeRpcMsg(m).buffer) + debug "sending msgs to peer", peer = p.id, msgs = msgs + let encoded = encodeRpcMsg(m) + if encoded.buffer.len > 0: + await p.conn.writeLp(encoded.buffer) proc newPubSubPeer*(conn: Connection, handler: RPCHandler): PubSubPeer = new result result.handler = handler result.conn = conn + result.peerInfo = conn.peerInfo + result.id = conn.peerInfo.peerId.get().pretty() diff --git a/libp2p/protocols/pubsub/rpcmsg.nim b/libp2p/protocols/pubsub/rpcmsg.nim index 5cb29a9..9c1153c 100644 --- a/libp2p/protocols/pubsub/rpcmsg.nim +++ b/libp2p/protocols/pubsub/rpcmsg.nim @@ -8,12 +8,15 @@ ## those terms. import sequtils -import chronos, nimcrypto/sysrand +import chronos, nimcrypto/sysrand, chronicles import ../../peerinfo, ../../peer, ../../crypto/crypto, ../../protobuf/minprotobuf +logScope: + topic = "RpcMsg" + const SignPrefix = "libp2p-pubsub:" type @@ -47,31 +50,34 @@ proc encodeMessage(msg: Message, buff: var ProtoBuffer) {.gcsafe.} = buff.finish() proc encodeSubs(subs: SubOpts, buff: var ProtoBuffer) {.gcsafe.} = - buff.write(initProtoField(1, ord(subs.subscribe))) + buff.write(initProtoField(1, subs.subscribe)) buff.write(initProtoField(2, subs.topic)) proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} = - result = initProtoBuffer({WithVarintLength}) + result = initProtoBuffer({WithVarintLength}) + debug "encoding msg: ", msg = msg - var subs = initProtoBuffer() - for s in msg.subscriptions: - encodeSubs(s, subs) + if msg.subscriptions.len > 0: + var subs = initProtoBuffer() + for s in msg.subscriptions: + encodeSubs(s, subs) - subs.finish() - result.write(initProtoField(1, subs)) + subs.finish() + result.write(initProtoField(1, subs)) - var messages = initProtoBuffer() - for m in msg.messages: - encodeMessage(m, messages) + if msg.messages.len > 0: + var messages = initProtoBuffer() + for m in msg.messages: + encodeMessage(m, messages) - messages.finish() - result.write(initProtoField(2, messages)) + messages.finish() + result.write(initProtoField(2, messages)) result.finish() proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} = var pb = initProtoBuffer(msg) - + result.subscriptions = newSeq[SubOpts]() var subscr = newSeq[byte](1) @@ -115,18 +121,26 @@ proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} = if pb.getBytes(6, msg.key) < 0: break +var prefix {.threadvar.}: seq[byte] +proc getPreix(): var seq[byte] = + if prefix.len == 0: + prefix = cast[seq[byte]](SignPrefix) + result = prefix + proc sign*(peerId: PeerID, msg: Message): Message = var buff = initProtoBuffer() - var prefix = cast[seq[byte]](toSeq(SignPrefix.items)) # TODO: can we cache this? encodeMessage(msg, buff) if buff.buffer.len > 0: result = msg result.signature = peerId. privateKey. - sign(prefix & buff.buffer). + sign(getPreix() & buff.buffer). getBytes() -proc makeMessage*(peerId: PeerID, data: seq[byte], name: string): Message = +proc makeMessage*(peerId: PeerID, + data: seq[byte], + name: string): + Message {.gcsafe.} = var seqno: seq[byte] = newSeq[byte](20) if randomBytes(addr seqno[0], 20) > 0: result = Message(fromPeer: peerId.getBytes(), diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 07cd0d8..aa0dace 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -256,7 +256,7 @@ method write*(s: BufferStream, ## stream. var buf = "" shallowCopy(buf, if msglen > 0: msg[0..