diff --git a/chronos b/chronos new file mode 160000 index 000000000..7029f8bc1 --- /dev/null +++ b/chronos @@ -0,0 +1 @@ +Subproject commit 7029f8bc1e98e6b4360ffcfe37e1602cf25fd6c8 diff --git a/libp2p/peer.nim b/libp2p/peer.nim index db6d9c3b5..fbbc9c944 100644 --- a/libp2p/peer.nim +++ b/libp2p/peer.nim @@ -21,7 +21,7 @@ const type PeerID* = object data*: seq[byte] - privateKey: PrivateKey + privateKey*: PrivateKey publicKey: PublicKey PeerIDError* = object of CatchableError diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index f2df0d6b6..d4fd74401 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -68,7 +68,7 @@ proc decodeMsg*(buf: seq[byte]): IdentifyInfo = result.addrs = newSeq[MultiAddress]() var address = newSeq[byte]() - while pb.getBytes(2, address) != -1: + while pb.getBytes(2, address) > 0: if len(address) != 0: var copyaddr = address result.addrs.add(MultiAddress.init(copyaddr)) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 11b8ef914..e6777dc3f 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -7,47 +7,129 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import sequtils, tables, options, sets import chronos, chronicles -import ../protocol, - ../../connection +import pubsub, + pubsubpeer, + rpcmsg, + ../../connection, + ../../peerinfo, + ../../peer logScope: - topic = "floodsub" + topic = "FloodSub" const FloodSubCodec* = "/floodsub/1.0.0" type - TopicHandler* = proc(topic:string, data: seq[byte]): Future[void] {.gcsafe.} - Topic* = object - name*: string - handler*: TopicHandler + FloodSub = ref object of PubSub - Peer* = object - conn: Connection - topics: string +proc sendSubs(f: FloodSub, + peer: PubSubPeer, + topics: seq[string], + subscribe: bool) + {.async, gcsafe.} = + ## send subscriptions to remote peer + var msg: RPCMsg + for t in topics: + msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe)) - FloodSub* = ref object of LPProtocol - topics: seq[Topic] - peers: seq[Peer] + await peer.send(@[msg]) -proc encodeRpcMsg() = discard -proc decodeRpcMsg() = discard +proc rpcHandler(f: FloodSub, + peer: PubSubPeer, + rpcMsgs: seq[RPCMsg]) + {.async, gcsafe.} = + ## method called by a PubSubPeer every + ## time it receives an RPC message + ## + ## The RPC message might contain subscriptions + ## or messages forwarded to this peer + ## -method init*(f: FloodSub) = + for m in rpcMsgs: # for all RPC messages + if m.subscriptions.len > 0: # if there are any subscriptions + for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic + let id = peer.conn.peerInfo.get().peerId.pretty + if s.subscribe: + # subscribe the peer to the topic + f.peerTopics[s.topic].incl(id) + else: + # unsubscribe the peer to the topic + f.peerTopics[s.topic].excl(id) + + # send subscriptions to every peer + for p in f.peers.values: + await p.send(@[RPCMsg(subscriptions: m.subscriptions)]) + + if m.messages.len > 0: # if there are any messages + var toSendPeers: HashSet[string] + for msg in m.messages: # for every message + for t in msg.topicIDs: # for every topic in the message + toSendPeers.incl(f.peerTopics[t]) # get all the peers interested in this topic + if f.topics.contains(t): # check that we're subscribed to it + await f.topics[t].handler(t, msg.data) # trigger user provided handler + + for p in toSendPeers: + await f.peers[p].send(@[RPCMsg(messages: m.messages)]) + +proc handleConn(f: FloodSub, conn: Connection) {.async, gcsafe.} = + ## handle incoming/outgoing connections + ## + ## this proc will: + ## 1) register a new PubSubPeer for the connection + ## 2) register a handler with the peer; + ## this handler gets called on every rpc message + ## that the peer receives + ## 3) ask the peer to subscribe us to every topic + ## that we're interested in + ## + + proc handleRpc(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} = + await f.rpcHandler(peer, msgs) + + var peer = newPubSubPeer(conn, handleRpc) + f.peers[peer.conn.peerInfo.get().peerId.pretty()] = peer + let topics = toSeq(f.topics.keys) + await f.sendSubs(peer, topics, true) + asyncCheck peer.handle() + +method init(f: FloodSub) = proc handler(conn: Connection, proto: string) {.async, gcsafe.} = - discard + ## main protocol handler that gets triggered on every + ## connection for a protocol string + ## e.g. ``/floodsub/1.0.0``, etc... + ## + + await f.handleConn(conn) - f.codec = FloodSubCodec f.handler = handler + f.codec = FloodSubCodec + +method subscribePeer*(f: FloodSub, conn: Connection) {.async, gcsafe.} = + await f.handleConn(conn) + +method publish*(f: FloodSub, + topic: string, + data: seq[byte]) + {.async, gcsafe.} = + for p in f.peerTopics[topic]: + f.peers[p].send(Message(fromPeer: f.peerInfo.peerId.data, + data: data)) method subscribe*(f: FloodSub, topic: string, - handler: TopicHandler) - {.base, async, gcsafe.} = - discard + handler: TopicHandler) + {.async, gcsafe.} = + await procCall PubSub(f).subscribe(topic, handler) + for p in f.peers.values: + await f.sendSubs(p, @[topic], true) -method publish*(f: FloodSub, topic: string) {.base, async, gcsafe.} = - discard +method unsubscribe*(f: FloodSub, topics: seq[string]) {.async, gcsafe.} = + await procCall PubSub(f).unsubscribe(topics) + for p in f.peers.values: + await f.sendSubs(p, topics, false) -proc newFloodSub*(): FloodSub = +proc newFloodSub*(peerInfo: PeerInfo): FloodSub = new result + result.peerInfo = peerInfo diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim new file mode 100644 index 000000000..52a3d5fc7 --- /dev/null +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -0,0 +1,55 @@ +## Nim-LibP2P +## Copyright (c) 2018 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import tables, sets +import chronos +import pubsubpeer, + ../protocol, + ../../connection, + ../../peerinfo + +export PubSubPeer + +type + TopicHandler* = proc(topic:string, data: seq[byte]): Future[void] {.gcsafe.} + Topic* = object + name*: string + handler*: TopicHandler + + PubSub* = ref object of LPProtocol + peerInfo*: PeerInfo + topics*: Table[string, Topic] # local topics + peers*: Table[string, PubSubPeer] # peerid to peer map + peerTopics*: Table[string, HashSet[string]] # topic to remote peer map + +method subscribePeer*(p: PubSub, conn: Connection) {.base, async, gcsafe.} = + ## subscribe to a peer to send/receive pubsub messages + discard + +method unsubscribe*(p: PubSub, topics: seq[string]) {.base, async, gcsafe.} = + ## unsubscribe from a list of ``topic`` strings + discard + +method subscribe*(p: PubSub, + topic: string, + handler: TopicHandler) + {.base, async, gcsafe.} = + ## subscribe to a topic + ## + ## ``topic`` - a string topic to subscribe to + ## + ## ``handler`` - is a user provided proc + ## that will be triggered + ## on every received message + ## + p.topics[topic] = Topic(name: topic, handler: handler) + +method publish*(p: PubSub, topic: string, data: seq[byte]) {.base, async, gcsafe.} = + ## publish to a ``topic`` + discard diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim new file mode 100644 index 000000000..384493756 --- /dev/null +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -0,0 +1,46 @@ +## Nim-LibP2P +## Copyright (c) 2018 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import options +import chronos, chronicles +import ../../connection, + ../../protobuf/minprotobuf, + ../../peerinfo, + rpcmsg + +logScope: + topic = "PubSubPeer" + +type + PubSubPeer* = ref object of RootObj + conn*: Connection + handler*: RPCHandler + topics*: seq[string] + + RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} + +proc handle*(p: PubSubPeer) {.async, gcsafe.} = + try: + while not p.conn.closed: + let msg = decodeRpcMsg(await p.conn.readLp()) + await p.handler(p, @[msg]) + except: + debug "An exception occured while processing pubsub rpc requests", exc = getCurrentExceptionMsg() + return + finally: + await p.conn.close() + +proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} = + for m in msgs: + await p.conn.writeLp(encodeRpcMsg(m).buffer) + +proc newPubSubPeer*(conn: Connection, handler: RPCHandler): PubSubPeer = + new result + result.handler = handler + result.conn = conn diff --git a/libp2p/protocols/pubsub/rpcmsg.nim b/libp2p/protocols/pubsub/rpcmsg.nim new file mode 100644 index 000000000..5cb29a969 --- /dev/null +++ b/libp2p/protocols/pubsub/rpcmsg.nim @@ -0,0 +1,138 @@ +## Nim-LibP2P +## Copyright (c) 2018 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import sequtils +import chronos, nimcrypto/sysrand +import ../../peerinfo, + ../../peer, + ../../crypto/crypto, + ../../protobuf/minprotobuf + +const SignPrefix = "libp2p-pubsub:" + +type + SubOpts* = object + subscribe*: bool + topic*: string + + Message* = object + fromPeer*: seq[byte] + data*: seq[byte] + seqno*: seq[byte] + topicIDs*: seq[string] + signature*: seq[byte] + key*: seq[byte] + + RPCMsg* = object + subscriptions*: seq[SubOpts] + messages*: seq[Message] + +proc encodeMessage(msg: Message, buff: var ProtoBuffer) {.gcsafe.} = + buff.write(initProtoField(1, msg.fromPeer)) + buff.write(initProtoField(2, msg.data)) + buff.write(initProtoField(3, msg.seqno)) + + for t in msg.topicIDs: + buff.write(initProtoField(4, t)) + + buff.write(initProtoField(5, msg.signature)) + buff.write(initProtoField(6, msg.key)) + + buff.finish() + +proc encodeSubs(subs: SubOpts, buff: var ProtoBuffer) {.gcsafe.} = + buff.write(initProtoField(1, ord(subs.subscribe))) + buff.write(initProtoField(2, subs.topic)) + +proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} = + result = initProtoBuffer({WithVarintLength}) + + var subs = initProtoBuffer() + for s in msg.subscriptions: + encodeSubs(s, subs) + + subs.finish() + result.write(initProtoField(1, subs)) + + var messages = initProtoBuffer() + for m in msg.messages: + encodeMessage(m, messages) + + messages.finish() + result.write(initProtoField(2, messages)) + + result.finish() + +proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} = + var pb = initProtoBuffer(msg) + + result.subscriptions = newSeq[SubOpts]() + var subscr = newSeq[byte](1) + + # decode SubOpts array + if pb.enterSubMessage() > 0: + while true: + var subOpt: SubOpts + if pb.getBytes(1, subscr) < 0: + break + subOpt.subscribe = cast[bool](subscr) + + if pb.getString(2, subOpt.topic) < 0: + break + result.subscriptions.add(subOpt) + + result.messages = newSeq[Message]() + # TODO: which of this fields are really optional? + # Decode Messages array + if pb.enterSubMessage() > 0: + while true: + var msg: Message + if pb.getBytes(1, msg.fromPeer) < 0: + break + + if pb.getBytes(2, msg.data) < 0: + break + + if pb.getBytes(3, msg.seqno) < 0: + break + + var topic: string + while true: + if pb.getString(4, topic) < 0: + break + topic.add(topic) + topic = "" + + if pb.getBytes(5, msg.signature) < 0: + break + + if pb.getBytes(6, msg.key) < 0: + break + +proc sign*(peerId: PeerID, msg: Message): Message = + var buff = initProtoBuffer() + var prefix = cast[seq[byte]](toSeq(SignPrefix.items)) # TODO: can we cache this? + encodeMessage(msg, buff) + if buff.buffer.len > 0: + result = msg + result.signature = peerId. + privateKey. + sign(prefix & buff.buffer). + getBytes() + +proc makeMessage*(peerId: PeerID, data: seq[byte], name: string): Message = + var seqno: seq[byte] = newSeq[byte](20) + if randomBytes(addr seqno[0], 20) > 0: + result = Message(fromPeer: peerId.getBytes(), + data: data, + seqno: seqno, + topicIDs: @[name], + signature: @[], + key: peerId.publicKey.getRawBytes()) + result = sign(peerId, result) diff --git a/tests/test.nim b/tests/test.nim new file mode 100644 index 000000000..b306502a9 --- /dev/null +++ b/tests/test.nim @@ -0,0 +1,65 @@ +import tables +import chronos, chronicles +import ../libp2p/switch, + ../libp2p/multistream, + ../libp2p/protocols/identify, + ../libp2p/connection, + ../libp2p/transports/[transport, tcptransport], + ../libp2p/multiaddress, + ../libp2p/peerinfo, + ../libp2p/crypto/crypto, + ../libp2p/peer, + ../libp2p/protocols/protocol, + ../libp2p/muxers/muxer, + ../libp2p/muxers/mplex/mplex, + ../libp2p/muxers/mplex/types, + ../libp2p/protocols/secure/secure, + ../libp2p/protocols/secure/secio + +type + TestProto = ref object of LPProtocol + +method init(p: TestProto) {.gcsafe.} = + proc handle(stream: Connection, proto: string) {.async, gcsafe.} = + await stream.writeLp("Hello from handler") + await stream.close() + + p.codec = "/test/proto/1.0.0" + p.handler = handle + +proc newTestProto(): TestProto = + new result + result.init() + +proc main() {.async.} = + let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/30333") + + let seckey = PrivateKey.random(RSA) + var peerInfo: PeerInfo + peerInfo.peerId = PeerID.init(seckey) + peerInfo.addrs.add(Multiaddress.init("/ip4/127.0.0.1/tcp/55055")) + + proc createMplex(conn: Connection): Muxer = + result = newMplex(conn) + + let mplexProvider = newMuxerProvider(createMplex, MplexCodec) + let transports = @[Transport(newTransport(TcpTransport))] + let muxers = [(MplexCodec, mplexProvider)].toTable() + let identify = newIdentify(peerInfo) + let switch = newSwitch(peerInfo, transports, identify, muxers, @[Secure(newSecIo(seckey.getKey()))]) + + await switch.start() + + var remotePeer: PeerInfo + remotePeer.peerId = PeerID.init("QmUA1Ghihi5u3gDwEDxhbu49jU42QPbvHttZFwB6b4K5oC") + remotePeer.addrs.add(ma) + + switch.mount(newTestProto()) + echo "PeerID: " & peerInfo.peerId.pretty + let conn = await switch.dial(remotePeer, "/test/proto/1.0.0") + await conn.writeLp("Hello from dialer!") + let msg = cast[string](await conn.readLp()) + echo msg + await conn.close() + +waitFor(main()) \ No newline at end of file diff --git a/tests/testpubsub.nim b/tests/testpubsub.nim new file mode 100644 index 000000000..81ca2209c --- /dev/null +++ b/tests/testpubsub.nim @@ -0,0 +1,16 @@ +## Nim-Libp2p +## Copyright (c) 2018 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import unittest +import chronos + +suite "PubSub": + test "PubSub subscribe": discard + test "PubSub unsubscribe": discard + diff --git a/tests/testsecio.nim b/tests/testsecio.nim new file mode 100644 index 000000000..c5c2eee96 --- /dev/null +++ b/tests/testsecio.nim @@ -0,0 +1,2 @@ +import unittest +