From e623e70e7bedaa31f2b9686035f34b3c00db6501 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 5 Dec 2019 20:16:18 -0600 Subject: [PATCH] PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop --- examples/directchat.nim | 1 - libp2p.nimble | 1 + libp2p/multistream.nim | 3 +- libp2p/muxers/mplex/coder.nim | 9 +- libp2p/muxers/mplex/mplex.nim | 3 +- libp2p/protocols/pubsub/floodsub.nim | 150 ++--- libp2p/protocols/pubsub/gossipsub.nim | 772 +++++++++++++++++++++++ libp2p/protocols/pubsub/mcache.nim | 71 +++ libp2p/protocols/pubsub/pubsub.nim | 158 ++++- libp2p/protocols/pubsub/pubsubpeer.nim | 122 ++-- libp2p/protocols/pubsub/rpc/message.nim | 73 +++ libp2p/protocols/pubsub/rpc/messages.nim | 47 ++ libp2p/protocols/pubsub/rpc/protobuf.nim | 266 ++++++++ libp2p/protocols/pubsub/rpcmsg.nim | 189 ------ libp2p/protocols/pubsub/timedcache.nim | 50 +- libp2p/switch.nim | 6 + tests/pubsub/testfloodsub.nim | 130 ++++ tests/pubsub/testgossipsub.nim | 425 +++++++++++++ tests/pubsub/testmcache.nim | 93 +++ tests/pubsub/testpubsub.nim | 4 + tests/pubsub/utils.nim | 64 ++ tests/test.nim | 89 --- tests/testinterop.nim | 389 ++++++++++++ tests/testmplex.nim | 9 +- tests/testnative.nim | 5 +- tests/testpubsub.nim | 181 ------ tests/testswitch.nim | 4 +- 27 files changed, 2668 insertions(+), 646 deletions(-) create mode 100644 libp2p/protocols/pubsub/gossipsub.nim create mode 100644 libp2p/protocols/pubsub/mcache.nim create mode 100644 libp2p/protocols/pubsub/rpc/message.nim create mode 100644 libp2p/protocols/pubsub/rpc/messages.nim create mode 100644 libp2p/protocols/pubsub/rpc/protobuf.nim delete mode 100644 libp2p/protocols/pubsub/rpcmsg.nim create mode 100644 tests/pubsub/testfloodsub.nim create mode 100644 tests/pubsub/testgossipsub.nim create mode 100644 tests/pubsub/testmcache.nim create mode 100644 tests/pubsub/testpubsub.nim create mode 100644 tests/pubsub/utils.nim delete mode 100644 tests/test.nim create mode 100644 tests/testinterop.nim delete mode 100644 tests/testpubsub.nim diff --git a/examples/directchat.nim b/examples/directchat.nim index a1af4a81e..1aac8b578 100644 --- a/examples/directchat.nim +++ b/examples/directchat.nim @@ -25,7 +25,6 @@ import ../libp2p/[switch, const ChatCodec = "/nim-libp2p/chat/1.0.0" const DefaultAddr = "/ip4/127.0.0.1/tcp/55505" - const Help = """ Commands: /[?|hep|connect|disconnect|exit] help: Prints this help diff --git a/libp2p.nimble b/libp2p.nimble index 2a48e41e0..5dce11a63 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -21,3 +21,4 @@ proc runTest(filename: string) = task test, "Runs the test suite": runTest "testnative" runTest "testdaemon" + runTest "testinterop" \ No newline at end of file diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 13087f143..cd239d340 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -11,8 +11,7 @@ import strutils import chronos, chronicles import connection, vbuffer, - protocols/protocol, - stream/lpstream + protocols/protocol logScope: topic = "Multistream" diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index b005d4c69..c7ad10f00 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -34,11 +34,10 @@ proc readMplexVarint(conn: Connection): Future[Option[uint]] {.async, gcsafe.} = result = none(uint) try: for i in 0.. 0: # if there are any subscriptions for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic - let id = peer.id + f.subscribeTopic(s.topic, s.subscribe, peer.id) - f.subscribeTopic(s.topic, s.subscribe, id) - - # send subscriptions to every peer - for p in f.peers.values: - if p.id != peer.id: - await p.send(@[RPCMsg(subscriptions: m.subscriptions)]) - - var toSendPeers: HashSet[string] = initSet[string]() - if m.messages.len > 0: # if there are any messages - for msg in m.messages: # for every message - for t in msg.topicIDs: # for every topic in the message - toSendPeers.incl(f.peerTopics[t]) # get all the peers interested in this topic - if f.topics.contains(t): # check that we're subscribed to it - for h in f.topics[t].handler: - await h(t, msg.data) # trigger user provided handler + if m.messages.len > 0: # if there are any messages + var toSendPeers: HashSet[string] = initHashSet[string]() + for msg in m.messages: # for every message + if msg.msgId notin f.seen: + f.seen.put(msg.msgId) # add the message to the seen cache + for t in msg.topicIDs: # for every topic in the message + if t in f.floodsub: + toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic + if t in f.topics: # check that we're subscribed to it + for h in f.topics[t].handler: + await h(t, msg.data) # trigger user provided handler # forward the message to all peers interested in it for p in toSendPeers: if p in f.peers and f.peers[p].id != peer.id: await f.peers[p].send(@[RPCMsg(messages: m.messages)]) -proc handleConn(f: FloodSub, - conn: Connection) {.async, gcsafe.} = - ## handle incoming/outgoing connections - ## - ## this proc will: - ## 1) register a new PubSubPeer for the connection - ## 2) register a handler with the peer; - ## this handler gets called on every rpc message - ## that the peer receives - ## 3) ask the peer to subscribe us to every topic - ## that we're interested in - ## - - if conn.peerInfo.peerId.isNone: - trace "no valid PeerId for peer" - return - - # create new pubsub peer - var peer = newPubSubPeer(conn, proc (peer: PubSubPeer, - msgs: seq[RPCMsg]) {.async, gcsafe.} = - # call floodsub rpc handler - await f.rpcHandler(peer, msgs)) - - trace "created new pubsub peer", id = peer.id - - f.peers[peer.id] = peer - let topics = toSeq(f.topics.keys) - await f.sendSubs(peer, topics, true) - let handlerFut = peer.handle() # spawn peer read loop - handlerFut.addCallback( - proc(udata: pointer = nil) {.gcsafe.} = - trace "pubsub peer handler ended, cleaning up", - peer = conn.peerInfo.peerId.get().pretty - f.peers.del(peer.id) - ) - method init(f: FloodSub) = proc handler(conn: Connection, proto: string) {.async, gcsafe.} = ## main protocol handler that gets triggered on every @@ -134,45 +85,40 @@ method init(f: FloodSub) = ## e.g. ``/floodsub/1.0.0``, etc... ## - await f.handleConn(conn) + await f.handleConn(conn, proto) f.handler = handler f.codec = FloodSubCodec -method subscribeToPeer*(f: FloodSub, conn: Connection) {.async, gcsafe.} = - await f.handleConn(conn) - method publish*(f: FloodSub, topic: string, data: seq[byte]) {.async, gcsafe.} = await procCall PubSub(f).publish(topic, data) - trace "about to publish message on topic", name = topic, data = data.toHex() - if data.len > 0 and topic.len > 0: - let msg = makeMessage(f.peerInfo.peerId.get(), data, topic) - if topic in f.peerTopics: - trace "publishing on topic", name = topic - for p in f.peerTopics[topic]: - trace "publishing message", name = topic, peer = p, data = data - await f.peers[p].send(@[RPCMsg(messages: @[msg])]) + if data.len <= 0 or topic.len <= 0: + trace "topic or data missing, skipping publish" + return -method subscribe*(f: FloodSub, - topic: string, - handler: TopicHandler) {.async, gcsafe.} = - await procCall PubSub(f).subscribe(topic, handler) + if topic notin f.floodsub: + trace "missing peers for topic, skipping publish" + return - f.subscribeTopic(topic, true, f.peerInfo.peerId.get().pretty) - for p in f.peers.values: - await f.sendSubs(p, @[topic], true) + trace "publishing on topic", name = topic + let msg = newMessage(f.peerInfo.peerId.get(), data, topic) + for p in f.floodsub[topic]: + trace "publishing message", name = topic, peer = p, data = data + await f.peers[p].send(@[RPCMsg(messages: @[msg])]) method unsubscribe*(f: FloodSub, topics: seq[TopicPair]) {.async, gcsafe.} = await procCall PubSub(f).unsubscribe(topics) + for p in f.peers.values: await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) -method initPubSub*(f: FloodSub) = +method initPubSub*(f: FloodSub) = f.peers = initTable[string, PubSubPeer]() f.topics = initTable[string, Topic]() - f.peerTopics = initTable[string, HashSet[string]]() + f.floodsub = initTable[string, HashSet[string]]() + f.seen = newTimedCache[string](2.minutes) f.init() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim new file mode 100644 index 000000000..c2c036932 --- /dev/null +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -0,0 +1,772 @@ +## Nim-LibP2P +## Copyright (c) 2019 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import tables, sets, options, sequtils, random +import chronos, chronicles +import pubsub, + floodsub, + pubsubpeer, + mcache, + timedcache, + rpc/[messages, message], + ../../crypto/crypto, + ../protocol, + ../../peerinfo, + ../../connection, + ../../peer + +logScope: + topic = "GossipSub" + +const GossipSubCodec* = "/meshsub/1.0.0" + +# overlay parameters +const GossipSubD* = 6 +const GossipSubDlo* = 4 +const GossipSubDhi* = 12 + +# gossip parameters +const GossipSubHistoryLength* = 5 +const GossipSubHistoryGossip* = 3 + +# heartbeat interval +const GossipSubHeartbeatInitialDelay* = 100.millis +const GossipSubHeartbeatInterval* = 1.seconds + +# fanout ttl +const GossipSubFanoutTTL* = 60.seconds + +type + GossipSub* = ref object of FloodSub + mesh*: Table[string, HashSet[string]] # meshes - topic to peer + fanout*: Table[string, HashSet[string]] # fanout - topic to peer + gossipsub*: Table[string, HashSet[string]] # topic to peer map of all gossipsub peers + lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics + gossip*: Table[string, seq[ControlIHave]] # pending gossip + control*: Table[string, ControlMessage] # pending control messages + mcache*: MCache # messages cache + heartbeatCancel*: Future[void] # cancelation future for heartbeat interval + heartbeatLock: AsyncLock + +# TODO: This belong in chronos, temporary left here until chronos is updated +proc addInterval(every: Duration, cb: CallbackFunc, udata: pointer = nil): Future[void] = + ## Arrange the callback ``cb`` to be called on every ``Duration`` window + + var retFuture = newFuture[void]("chronos.addInterval(Duration)") + proc interval(arg: pointer = nil) {.gcsafe.} + proc scheduleNext() = + if not retFuture.finished(): + addTimer(Moment.fromNow(every), interval) + + proc interval(arg: pointer = nil) {.gcsafe.} = + cb(udata) + scheduleNext() + + scheduleNext() + return retFuture + +method init(g: GossipSub) = + proc handler(conn: Connection, proto: string) {.async, gcsafe.} = + ## main protocol handler that gets triggered on every + ## connection for a protocol string + ## e.g. ``/floodsub/1.0.0``, etc... + ## + + await g.handleConn(conn, proto) + + g.handler = handler + g.codec = GossipSubCodec + +method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async, gcsafe.} = + ## handle peer disconnects + await procCall FloodSub(g).handleDisconnect(peer) + for t in g.gossipsub.keys: + g.gossipsub[t].excl(peer.id) + + for t in g.mesh.keys: + g.mesh[t].excl(peer.id) + + for t in g.fanout.keys: + g.fanout[t].excl(peer.id) + +method subscribeTopic*(g: GossipSub, + topic: string, + subscribe: bool, + peerId: string) {.gcsafe.} = + procCall PubSub(g).subscribeTopic(topic, subscribe, peerId) + + if topic notin g.gossipsub: + g.gossipsub[topic] = initHashSet[string]() + + if subscribe: + trace "adding subscription for topic", peer = peerId, name = topic + # subscribe the peer to the topic + g.gossipsub[topic].incl(peerId) + else: + trace "removing subscription for topic", peer = peerId, name = topic + # unsubscribe the peer from the topic + g.gossipsub[topic].excl(peerId) + +proc handleGraft(g: GossipSub, + peer: PubSubPeer, + grafts: seq[ControlGraft], + respControl: var ControlMessage) = + for graft in grafts: + trace "processing graft message", peer = peer.id, + topicID = graft.topicID + + if graft.topicID in g.topics: + if g.mesh.len < GossipSubD: + g.mesh[graft.topicID].incl(peer.id) + else: + g.gossipsub[graft.topicID].incl(peer.id) + else: + respControl.prune.add(ControlPrune(topicID: graft.topicID)) + +proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = + for prune in prunes: + trace "processing prune message", peer = peer.id, + topicID = prune.topicID + + if prune.topicID in g.mesh: + g.mesh[prune.topicID].excl(peer.id) + +proc handleIHave(g: GossipSub, peer: PubSubPeer, ihaves: seq[ControlIHave]): ControlIWant = + for ihave in ihaves: + trace "processing ihave message", peer = peer.id, + topicID = ihave.topicID + + if ihave.topicID in g.mesh: + for m in ihave.messageIDs: + if m notin g.seen: + result.messageIDs.add(m) + +proc handleIWant(g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]): seq[Message] = + for iwant in iwants: + for mid in iwant.messageIDs: + trace "processing iwant message", peer = peer.id, + messageID = mid + let msg = g.mcache.get(mid) + if msg.isSome: + result.add(msg.get()) + +method rpcHandler(g: GossipSub, + peer: PubSubPeer, + rpcMsgs: seq[RPCMsg]) {.async, gcsafe.} = + await procCall PubSub(g).rpcHandler(peer, rpcMsgs) + + trace "processing RPC message", peer = peer.id, msg = rpcMsgs + for m in rpcMsgs: # for all RPC messages + trace "processing messages", msg = rpcMsgs + if m.subscriptions.len > 0: # if there are any subscriptions + for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic + g.subscribeTopic(s.topic, s.subscribe, peer.id) + + if m.messages.len > 0: # if there are any messages + var toSendPeers: HashSet[string] = initHashSet[string]() + for msg in m.messages: # for every message + trace "processing message with id", msg = msg.msgId + if msg.msgId in g.seen: + trace "message already processed, skipping", msg = msg.msgId + continue + + g.seen.put(msg.msgId) # add the message to the seen cache + + # this shouldn't happen + if g.peerInfo.peerId.get() == msg.fromPeerId(): + trace "skipping messages from self", msg = msg.msgId + continue + + for t in msg.topicIDs: # for every topic in the message + + if t in g.floodsub: + toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic + + if t in g.mesh: + toSendPeers.incl(g.mesh[t]) # get all mesh peers for topic + + if t in g.topics: # if we're subscribed to the topic + for h in g.topics[t].handler: + trace "calling handler for message", msg = msg.msgId, + topicId = t, + localPeer = g.peerInfo.peerId.get().pretty, + fromPeer = msg.fromPeerId().pretty + await h(t, msg.data) # trigger user provided handler + + # forward the message to all peers interested in it + for p in toSendPeers: + if p in g.peers and + g.peers[p].peerInfo.peerId != peer.peerInfo.peerId: + let id = g.peers[p].peerInfo.peerId.get() + let msgs = m.messages.filterIt( + # don't forward to message originator + id != it.fromPeerId() + ) + if msgs.len > 0: + await g.peers[p].send(@[RPCMsg(messages: msgs)]) + + var respControl: ControlMessage + if m.control.isSome: + var control: ControlMessage = m.control.get() + let iWant: ControlIWant = g.handleIHave(peer, control.ihave) + if iWant.messageIDs.len > 0: + respControl.iwant.add(iWant) + let messages: seq[Message] = g.handleIWant(peer, control.iwant) + + g.handleGraft(peer, control.graft, respControl) + g.handlePrune(peer, control.prune) + + if respControl.graft.len > 0 or respControl.prune.len > 0 or + respControl.ihave.len > 0 or respControl.iwant.len > 0: + await peer.send(@[RPCMsg(control: some(respControl), messages: messages)]) + +proc replenishFanout(g: GossipSub, topic: string) {.async, gcsafe.} = + ## get fanout peers for a topic + trace "about to replenish fanout" + if topic notin g.fanout: + g.fanout[topic] = initHashSet[string]() + + if g.fanout[topic].len < GossipSubDLo: + trace "replenishing fanout", peers = g.fanout[topic].len + if topic in g.gossipsub: + for p in g.gossipsub[topic]: + if not g.fanout[topic].containsOrIncl(p): + if g.fanout[topic].len == GossipSubD: + break + + trace "fanout replenished with peers", peers = g.fanout[topic].len + +proc rebalanceMesh(g: GossipSub, topic: string) {.async, gcsafe.} = + trace "about to rebalance mesh" + # create a mesh topic that we're subscribing to + if topic notin g.mesh: + g.mesh[topic] = initHashSet[string]() + + if g.mesh[topic].len < GossipSubDlo: + trace "replenishing mesh" + # replenish the mesh if we're bellow GossipSubDlo + while g.mesh[topic].len < GossipSubD: + trace "gattering peers", peers = g.mesh[topic].len + var id: string + if topic in g.fanout and g.fanout[topic].len > 0: + id = g.fanout[topic].pop() + trace "got fanout peer", peer = id + elif topic in g.gossipsub and g.gossipsub[topic].len > 0: + id = g.gossipsub[topic].pop() + trace "got gossipsub peer", peer = id + else: + trace "no more peers" + break + + g.mesh[topic].incl(id) + if id in g.peers: + let p = g.peers[id] + # send a graft message to the peer + await p.sendGraft(@[topic]) + + # prune peers if we've gone over + if g.mesh[topic].len > GossipSubDhi: + trace "pruning mesh" + while g.mesh[topic].len > GossipSubD: + trace "pruning peers", peers = g.mesh[topic].len + let id = toSeq(g.mesh[topic])[rand(0.. g.lastFanoutPubSub[topic]: + g.lastFanoutPubSub.del(topic) + g.fanout.del(topic) + +proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = + ## gossip iHave messages to peers + let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) + + for topic in topics: + let mesh: HashSet[string] = + if topic in g.mesh: + g.mesh[topic] + else: + initHashSet[string]() + + let fanout: HashSet[string] = + if topic in g.fanout: + g.fanout[topic] + else: + initHashSet[string]() + + let gossipPeers = mesh + fanout + let mids = g.mcache.window(topic) + let ihave = ControlIHave(topicID: topic, + messageIDs: toSeq(mids)) + + if topic notin g.gossipsub: + trace "topic not in gossip array, skipping", topicID = topic + continue + + while result.len < GossipSubD: + if not (g.gossipsub[topic].len > 0): + trace "no peers for topic, skipping", topicID = topic + break + + let id = toSeq(g.gossipsub[topic]).sample() + g.gossipsub[topic].excl(id) + if id notin gossipPeers: + if id notin result: + result[id] = ControlMessage() + result[id].ihave.add(ihave) + +proc heartbeat(g: GossipSub) {.async, gcsafe.} = + trace "running heartbeat" + + await g.heartbeatLock.acquire() + await sleepAsync(GossipSubHeartbeatInitialDelay) + + for t in g.mesh.keys: + await g.rebalanceMesh(t) + + await g.dropFanoutPeers() + let peers = g.getGossipPeers() + for peer in peers.keys: + await g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))]) + + g.mcache.shift() # shift the cache + g.heartbeatLock.release() + +method subscribe*(g: GossipSub, + topic: string, + handler: TopicHandler) {.async, gcsafe.} = + await procCall PubSub(g).subscribe(topic, handler) + asyncCheck g.rebalanceMesh(topic) + +method unsubscribe*(g: GossipSub, + topics: seq[TopicPair]) {.async, gcsafe.} = + await procCall PubSub(g).unsubscribe(topics) + + for pair in topics: + let topic = pair.topic + if topic in g.mesh: + let peers = g.mesh[topic] + g.mesh.del(topic) + for id in peers: + let p = g.peers[id] + await p.sendPrune(@[topic]) + +method publish*(g: GossipSub, + topic: string, + data: seq[byte]) {.async, gcsafe.} = + await procCall PubSub(g).publish(topic, data) + + trace "about to publish message on topic", name = topic, data = data.toHex() + if data.len > 0 and topic.len > 0: + var peers: HashSet[string] + if topic in g.topics: # if we're subscribed to the topic attempt to build a mesh + await g.rebalanceMesh(topic) + peers = g.mesh[topic] + else: # send to fanout peers + await g.replenishFanout(topic) + if topic in g.fanout: + peers = g.fanout[topic] + # set the fanout expiery time + g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) + + let msg = newMessage(g.peerInfo.peerId.get(), data, topic) + for p in peers: + if p == g.peerInfo.peerId.get().pretty: + continue + + trace "publishing on topic", name = topic + g.mcache.put(msg) + await g.peers[p].send(@[RPCMsg(messages: @[msg])]) + +method start*(g: GossipSub) {.async.} = + ## start pubsub + ## start long running/repeating procedures + + # setup the heartbeat interval + g.heartbeatCancel = addInterval(GossipSubHeartbeatInterval, + proc (arg: pointer = nil) {.gcsafe, locks: 0.} = + asyncCheck g.heartbeat) + +method stop*(g: GossipSub) {.async.} = + ## stopt pubsub + ## stop long running tasks + + await g.heartbeatLock.acquire() + + # stop heartbeat interval + if not g.heartbeatCancel.finished: + g.heartbeatCancel.complete() + + g.heartbeatLock.release() + +method initPubSub(g: GossipSub) = + procCall FloodSub(g).initPubSub() + + g.mcache = newMCache(GossipSubHistoryGossip, GossipSubHistoryLength) + g.mesh = initTable[string, HashSet[string]]() # meshes - topic to peer + g.fanout = initTable[string, HashSet[string]]() # fanout - topic to peer + g.gossipsub = initTable[string, HashSet[string]]() # topic to peer map of all gossipsub peers + g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics + g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip + g.control = initTable[string, ControlMessage]() # pending control messages + g.heartbeatLock = newAsyncLock() + +## Unit tests +when isMainModule and not defined(release): + ## Test internal (private) methods for gossip, + ## mesh and fanout maintenance. + ## Usually I wouldn't test private behaviour, + ## but the maintenance methods are quite involved, + ## hence these tests are here. + ## + + import unittest + import ../../stream/bufferstream + + type + TestGossipSub = ref object of GossipSub + + suite "GossipSub": + test "`rebalanceMesh` Degree Lo": + proc testRun(): Future[bool] {.async.} = + var peerInfo: PeerInfo + var seckey = some(PrivateKey.random(RSA)) + + peerInfo.peerId = some(PeerID.init(seckey.get())) + let gossipSub = newPubSub(TestGossipSub, peerInfo) + + let topic = "foobar" + gossipSub.mesh[topic] = initHashSet[string]() + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + discard + + for i in 0..<15: + let conn = newConnection(newBufferStream(writeHandler)) + let peerId = PeerID.init(PrivateKey.random(RSA)) + conn.peerInfo.peerId = some(peerId) + gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) + gossipSub.peers[peerId.pretty].conn = conn + gossipSub.mesh[topic].incl(peerId.pretty) + + check gossipSub.peers.len == 15 + await gossipSub.rebalanceMesh(topic) + check gossipSub.mesh[topic].len == GossipSubD + + result = true + + check: + waitFor(testRun()) == true + + test "`rebalanceMesh` Degree Hi": + proc testRun(): Future[bool] {.async.} = + var peerInfo: PeerInfo + var seckey = some(PrivateKey.random(RSA)) + + peerInfo.peerId = some(PeerID.init(seckey.get())) + let gossipSub = newPubSub(TestGossipSub, peerInfo) + + let topic = "foobar" + gossipSub.gossipsub[topic] = initHashSet[string]() + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + discard + + for i in 0..<15: + let conn = newConnection(newBufferStream(writeHandler)) + let peerId = PeerID.init(PrivateKey.random(RSA)) + conn.peerInfo.peerId = some(peerId) + gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) + gossipSub.peers[peerId.pretty].conn = conn + gossipSub.gossipsub[topic].incl(peerId.pretty) + + check gossipSub.gossipsub[topic].len == 15 + await gossipSub.rebalanceMesh(topic) + check gossipSub.mesh[topic].len == GossipSubD + + result = true + + check: + waitFor(testRun()) == true + + test "`replenishFanout` Degree Lo": + proc testRun(): Future[bool] {.async.} = + var peerInfo: PeerInfo + var seckey = some(PrivateKey.random(RSA)) + + peerInfo.peerId = some(PeerID.init(seckey.get())) + let gossipSub = newPubSub(TestGossipSub, peerInfo) + + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = + discard + + let topic = "foobar" + gossipSub.gossipsub[topic] = initHashSet[string]() + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + discard + + for i in 0..<15: + let conn = newConnection(newBufferStream(writeHandler)) + let peerId = PeerID.init(PrivateKey.random(RSA)) + conn.peerInfo.peerId = some(peerId) + gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) + gossipSub.peers[peerId.pretty].handler = handler + gossipSub.gossipsub[topic].incl(peerId.pretty) + + check gossipSub.gossipsub[topic].len == 15 + await gossipSub.replenishFanout(topic) + check gossipSub.fanout[topic].len == GossipSubD + + result = true + + check: + waitFor(testRun()) == true + + test "`dropFanoutPeers` drop expired fanout topics": + proc testRun(): Future[bool] {.async.} = + var peerInfo: PeerInfo + var seckey = some(PrivateKey.random(RSA)) + + peerInfo.peerId = some(PeerID.init(seckey.get())) + let gossipSub = newPubSub(TestGossipSub, peerInfo) + + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = + discard + + let topic = "foobar" + gossipSub.fanout[topic] = initHashSet[string]() + gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(100.millis) + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + discard + + for i in 0..<6: + let conn = newConnection(newBufferStream(writeHandler)) + let peerId = PeerID.init(PrivateKey.random(RSA)) + conn.peerInfo.peerId = some(peerId) + gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) + gossipSub.peers[peerId.pretty].handler = handler + gossipSub.fanout[topic].incl(peerId.pretty) + + check gossipSub.fanout[topic].len == GossipSubD + + await sleepAsync(101.millis) + await gossipSub.dropFanoutPeers() + check topic notin gossipSub.fanout + + result = true + + check: + waitFor(testRun()) == true + + test "`dropFanoutPeers` leave unexpired fanout topics": + proc testRun(): Future[bool] {.async.} = + var peerInfo: PeerInfo + var seckey = some(PrivateKey.random(RSA)) + + peerInfo.peerId = some(PeerID.init(seckey.get())) + let gossipSub = newPubSub(TestGossipSub, peerInfo) + + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = + discard + + let topic1 = "foobar1" + let topic2 = "foobar2" + gossipSub.fanout[topic1] = initHashSet[string]() + gossipSub.fanout[topic2] = initHashSet[string]() + gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(100.millis) + gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(500.millis) + + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + discard + + for i in 0..<6: + let conn = newConnection(newBufferStream(writeHandler)) + let peerId = PeerID.init(PrivateKey.random(RSA)) + conn.peerInfo.peerId = some(peerId) + gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) + gossipSub.peers[peerId.pretty].handler = handler + gossipSub.fanout[topic1].incl(peerId.pretty) + gossipSub.fanout[topic2].incl(peerId.pretty) + + check gossipSub.fanout[topic1].len == GossipSubD + check gossipSub.fanout[topic2].len == GossipSubD + + await sleepAsync(101.millis) + await gossipSub.dropFanoutPeers() + check topic1 notin gossipSub.fanout + check topic2 in gossipSub.fanout + + result = true + + check: + waitFor(testRun()) == true + + test "`getGossipPeers` - should gather up to degree D non intersecting peers": + proc testRun(): Future[bool] {.async.} = + var peerInfo: PeerInfo + var seckey = some(PrivateKey.random(RSA)) + + peerInfo.peerId = some(PeerID.init(seckey.get())) + let gossipSub = newPubSub(TestGossipSub, peerInfo) + + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = + discard + + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + discard + + let topic = "foobar" + gossipSub.mesh[topic] = initHashSet[string]() + gossipSub.fanout[topic] = initHashSet[string]() + gossipSub.gossipsub[topic] = initHashSet[string]() + for i in 0..<30: + let conn = newConnection(newBufferStream(writeHandler)) + let peerId = PeerID.init(PrivateKey.random(RSA)) + conn.peerInfo.peerId = some(peerId) + gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) + gossipSub.peers[peerId.pretty].handler = handler + if i mod 2 == 0: + gossipSub.fanout[topic].incl(peerId.pretty) + else: + gossipSub.mesh[topic].incl(peerId.pretty) + + for i in 0..<15: + let conn = newConnection(newBufferStream(writeHandler)) + let peerId = PeerID.init(PrivateKey.random(RSA)) + conn.peerInfo.peerId = some(peerId) + gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) + gossipSub.peers[peerId.pretty].handler = handler + gossipSub.gossipsub[topic].incl(peerId.pretty) + + check gossipSub.fanout[topic].len == 15 + check gossipSub.fanout[topic].len == 15 + check gossipSub.gossipsub[topic].len == 15 + + let peers = gossipSub.getGossipPeers() + check peers.len == GossipSubD + for p in peers.keys: + check p notin gossipSub.fanout[topic] + check p notin gossipSub.mesh[topic] + + result = true + + check: + waitFor(testRun()) == true + + test "`getGossipPeers` - should not crash on missing topics in mesh": + proc testRun(): Future[bool] {.async.} = + var peerInfo: PeerInfo + var seckey = some(PrivateKey.random(RSA)) + + peerInfo.peerId = some(PeerID.init(seckey.get())) + let gossipSub = newPubSub(TestGossipSub, peerInfo) + + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = + discard + + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + discard + + let topic = "foobar" + gossipSub.fanout[topic] = initHashSet[string]() + gossipSub.gossipsub[topic] = initHashSet[string]() + for i in 0..<30: + let conn = newConnection(newBufferStream(writeHandler)) + let peerId = PeerID.init(PrivateKey.random(RSA)) + conn.peerInfo.peerId = some(peerId) + gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) + gossipSub.peers[peerId.pretty].handler = handler + if i mod 2 == 0: + gossipSub.fanout[topic].incl(peerId.pretty) + else: + gossipSub.gossipsub[topic].incl(peerId.pretty) + + let peers = gossipSub.getGossipPeers() + check peers.len == GossipSubD + result = true + + check: + waitFor(testRun()) == true + + test "`getGossipPeers` - should not crash on missing topics in gossip": + proc testRun(): Future[bool] {.async.} = + var peerInfo: PeerInfo + var seckey = some(PrivateKey.random(RSA)) + + peerInfo.peerId = some(PeerID.init(seckey.get())) + let gossipSub = newPubSub(TestGossipSub, peerInfo) + + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = + discard + + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + discard + + let topic = "foobar" + gossipSub.mesh[topic] = initHashSet[string]() + gossipSub.gossipsub[topic] = initHashSet[string]() + for i in 0..<30: + let conn = newConnection(newBufferStream(writeHandler)) + let peerId = PeerID.init(PrivateKey.random(RSA)) + conn.peerInfo.peerId = some(peerId) + gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) + gossipSub.peers[peerId.pretty].handler = handler + if i mod 2 == 0: + gossipSub.mesh[topic].incl(peerId.pretty) + else: + gossipSub.gossipsub[topic].incl(peerId.pretty) + + let peers = gossipSub.getGossipPeers() + check peers.len == GossipSubD + result = true + + check: + waitFor(testRun()) == true + + test "`getGossipPeers` - should not crash on missing topics in gossip": + proc testRun(): Future[bool] {.async.} = + var peerInfo: PeerInfo + var seckey = some(PrivateKey.random(RSA)) + + peerInfo.peerId = some(PeerID.init(seckey.get())) + let gossipSub = newPubSub(TestGossipSub, peerInfo) + + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = + discard + + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + discard + + let topic = "foobar" + gossipSub.mesh[topic] = initHashSet[string]() + gossipSub.fanout[topic] = initHashSet[string]() + for i in 0..<30: + let conn = newConnection(newBufferStream(writeHandler)) + let peerId = PeerID.init(PrivateKey.random(RSA)) + conn.peerInfo.peerId = some(peerId) + gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) + gossipSub.peers[peerId.pretty].handler = handler + if i mod 2 == 0: + gossipSub.mesh[topic].incl(peerId.pretty) + else: + gossipSub.fanout[topic].incl(peerId.pretty) + + let peers = gossipSub.getGossipPeers() + check peers.len == 0 + result = true + + check: + waitFor(testRun()) == true diff --git a/libp2p/protocols/pubsub/mcache.nim b/libp2p/protocols/pubsub/mcache.nim new file mode 100644 index 000000000..a7e92176d --- /dev/null +++ b/libp2p/protocols/pubsub/mcache.nim @@ -0,0 +1,71 @@ +## Nim-LibP2P +## Copyright (c) 2019 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import chronos, chronicles +import tables, options, sets, sequtils +import rpc/[messages, message], timedcache + +type + CacheEntry* = object + mid*: string + msg*: Message + + MCache* = ref object of RootObj + msgs*: TimedCache[Message] + history*: seq[seq[CacheEntry]] + historySize*: Natural + windowSize*: Natural + +proc put*(c: MCache, msg: Message) = + proc handler(key: string, val: Message) {.gcsafe.} = + ## make sure we remove the message from history + ## to keep things consisten + c.history.applyIt( + it.filterIt(it.mid != msg.msgId) + ) + + c.msgs.put(msg.msgId, msg, handler = handler) + c.history[0].add(CacheEntry(mid: msg.msgId, msg: msg)) + +proc get*(c: MCache, mid: string): Option[Message] = + result = none(Message) + if mid in c.msgs: + result = some(c.msgs[mid]) + +proc window*(c: MCache, topic: string): HashSet[string] = + result = initHashSet[string]() + + let len = + if c.windowSize > c.history.len: + c.history.len + else: + c.windowSize + + if c.history.len > 0: + for slot in c.history[0.. c.historySize: + for entry in c.history.pop(): + c.msgs.del(entry.mid) + + c.history.insert(@[]) + +proc newMCache*(window: Natural, history: Natural): MCache = + new result + result.historySize = history + result.windowSize = window + result.history = newSeq[seq[CacheEntry]]() + result.history.add(@[]) # initialize with empty slot + result.msgs = newTimedCache[Message](2.minutes) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 58023d404..022d30ece 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -7,12 +7,14 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import tables, sets +import tables, options, sequtils import chronos, chronicles import pubsubpeer, + rpc/messages, ../protocol, ../../connection, - ../../peerinfo + ../../peerinfo, + ../../peer export PubSubPeer @@ -20,9 +22,8 @@ logScope: topic = "PubSub" type - TopicHandler* = proc (topic: string, - data: seq[byte]): - Future[void] {.closure, gcsafe.} + TopicHandler* = proc (topic: string, + data: seq[byte]): Future[void] {.gcsafe.} TopicPair* = tuple[topic: string, handler: TopicHandler] @@ -31,14 +32,113 @@ type handler*: seq[TopicHandler] PubSub* = ref object of LPProtocol - peerInfo*: PeerInfo + peerInfo*: PeerInfo # this peer's info topics*: Table[string, Topic] # local topics - triggerSelf*: bool # flag indicating if the local handler should be triggered on publish + peers*: Table[string, PubSubPeer] # peerid to peer map + triggerSelf*: bool # trigger own local handler on publish + cleanupLock: AsyncLock -method subscribeToPeer*(p: PubSub, conn: Connection) {.base, async, gcsafe.} = - ## subscribe to a peer to send/receive pubsub messages +proc sendSubs*(p: PubSub, + peer: PubSubPeer, + topics: seq[string], + subscribe: bool) {.async, gcsafe.} = + ## send subscriptions to remote peer + trace "sending subscriptions", peer = peer.id, + subscribe = subscribe, + topicIDs = topics + + var msg: RPCMsg + for t in topics: + trace "sending topic", peer = peer.id, + subscribe = subscribe, + topicName = t + msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe)) + + await peer.send(@[msg]) + +method rpcHandler*(p: PubSub, + peer: PubSubPeer, + rpcMsgs: seq[RPCMsg]) {.async, base, gcsafe.} = + ## handle rpc messages discard +method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base, gcsafe.} = + ## handle peer disconnects + if peer.id in p.peers: + p.peers.del(peer.id) + +proc cleanUpHelper(p: PubSub, peer: PubSubPeer) {.async.} = + await p.cleanupLock.acquire() + if peer.refs == 0: + await p.handleDisconnect(peer) + + peer.refs.dec() # decrement refcount + p.cleanupLock.release() + +proc getPeer(p: PubSub, peerInfo: PeerInfo, proto: string): PubSubPeer = + if peerInfo.id in p.peers: + result = p.peers[peerInfo.id] + return + + # create new pubsub peer + let peer = newPubSubPeer(peerInfo, proto) + trace "created new pubsub peer", peerId = peer.id + + p.peers[peer.id] = peer + peer.refs.inc # increment reference cound + result = peer + +method handleConn*(p: PubSub, + conn: Connection, + proto: string) {.base, async, gcsafe.} = + ## handle incoming connections + ## + ## this proc will: + ## 1) register a new PubSubPeer for the connection + ## 2) register a handler with the peer; + ## this handler gets called on every rpc message + ## that the peer receives + ## 3) ask the peer to subscribe us to every topic + ## that we're interested in + ## + + if conn.peerInfo.peerId.isNone: + trace "no valid PeerId for peer" + await conn.close() + return + + proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} = + # call floodsub rpc handler + await p.rpcHandler(peer, msgs) + + let peer = p.getPeer(conn.peerInfo, proto) + let topics = toSeq(p.topics.keys) + if topics.len > 0: + await p.sendSubs(peer, topics, true) + + peer.handler = handler + await peer.handle(conn) # spawn peer read loop + trace "pubsub peer handler ended, cleaning up" + await p.cleanUpHelper(peer) + +method subscribeToPeer*(p: PubSub, + conn: Connection) {.base, async, gcsafe.} = + var peer = p.getPeer(conn.peerInfo, p.codec) + trace "setting connection for peer", peerId = conn.peerInfo.id + if not peer.isConnected: + peer.conn = conn + + # handle connection close + conn.closeEvent.wait() + .addCallback( + proc(udata: pointer = nil) {.gcsafe.} = + trace "connection closed, cleaning up peer", + peer = conn.peerInfo.id + + # TODO: figureout how to handle properly without dicarding + asyncCheck p.cleanUpHelper(peer) + ) + method unsubscribe*(p: PubSub, topics: seq[TopicPair]) {.base, async, gcsafe.} = ## unsubscribe from a list of ``topic`` strings @@ -47,16 +147,21 @@ method unsubscribe*(p: PubSub, if h == t.handler: p.topics[t.topic].handler.del(i) -method unsubscribe*(p: PubSub, - topic: string, +method unsubscribe*(p: PubSub, + topic: string, handler: TopicHandler): Future[void] {.base, gcsafe.} = ## unsubscribe from a ``topic`` string result = p.unsubscribe(@[(topic, handler)]) +method subscribeTopic*(p: PubSub, + topic: string, + subscribe: bool, + peerId: string) {.base, gcsafe.} = + discard + method subscribe*(p: PubSub, topic: string, - handler: TopicHandler) - {.base, async, gcsafe.} = + handler: TopicHandler) {.base, async, gcsafe.} = ## subscribe to a topic ## ## ``topic`` - a string topic to subscribe to @@ -65,23 +170,42 @@ method subscribe*(p: PubSub, ## that will be triggered ## on every received message ## - if not p.topics.contains(topic): + if topic notin p.topics: trace "subscribing to topic", name = topic p.topics[topic] = Topic(name: topic) p.topics[topic].handler.add(handler) -method publish*(p: PubSub, topic: string, data: seq[byte]) {.base, async, gcsafe.} = + for peer in p.peers.values: + await p.sendSubs(peer, @[topic], true) + +method publish*(p: PubSub, + topic: string, + data: seq[byte]) {.base, async, gcsafe.} = ## publish to a ``topic`` if p.triggerSelf and topic in p.topics: for h in p.topics[topic].handler: await h(topic, data) -method initPubSub*(p: PubSub) {.base.} = +method initPubSub*(p: PubSub) {.base.} = + ## perform pubsub initializaion discard -proc newPubSub*(p: typedesc[PubSub], peerInfo: PeerInfo, triggerSelf: bool = false): p = +method start*(p: PubSub) {.async, base.} = + ## start pubsub + ## start long running/repeating procedures + discard + +method stop*(p: PubSub) {.async, base.} = + ## stopt pubsub + ## stop long running/repeating procedures + discard + +proc newPubSub*(p: typedesc[PubSub], + peerInfo: PeerInfo, + triggerSelf: bool = false): p = new result result.peerInfo = peerInfo result.triggerSelf = triggerSelf + result.cleanupLock = newAsyncLock() result.initPubSub() diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index a834ec624..5ea7b7798 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -7,9 +7,9 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import options, sets, hashes, strutils +import options, hashes, strutils, tables, hashes import chronos, chronicles -import rpcmsg, +import rpc/[messages, message, protobuf], timedcache, ../../peer, ../../peerinfo, @@ -23,55 +23,105 @@ logScope: type PubSubPeer* = ref object of RootObj - id*: string # base58 peer id string + proto: string # the protocol that this peer joined from + sendConn: Connection peerInfo*: PeerInfo - conn*: Connection handler*: RPCHandler topics*: seq[string] - seen: TimedCache[string] # list of messages forwarded to peers + sentRpcCache: TimedCache[string] # a cache of already sent messages + recvdRpcCache: TimedCache[string] # a cache of already sent messages + refs*: int # refcount of the connections this peer is handling + onConnect: AsyncEvent RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} -proc handle*(p: PubSubPeer) {.async, gcsafe.} = - trace "handling pubsub rpc", peer = p.id +proc id*(p: PubSubPeer): string = p.peerInfo.id + +proc isConnected*(p: PubSubPeer): bool = + (not isNil(p.sendConn)) + +proc `conn=`*(p: PubSubPeer, conn: Connection) = + trace "attaching send connection for peer", peer = p.id + p.sendConn = conn + p.onConnect.fire() + +proc handle*(p: PubSubPeer, conn: Connection) {.async, gcsafe.} = + trace "handling pubsub rpc", peer = p.id, closed = conn.closed try: - while not p.conn.closed: - let data = await p.conn.readLp() - trace "Read data from peer", peer = p.id, data = data.toHex() - if data.toHex() in p.seen: - trace "Message already received, skipping", peer = p.id + while not conn.closed: + trace "waiting for data", peer = p.id, closed = conn.closed + let data = await conn.readLp() + let hexData = data.toHex() + trace "read data from peer", peer = p.id, data = hexData + if $hexData.hash in p.recvdRpcCache: + trace "message already received, skipping", peer = p.id continue let msg = decodeRpcMsg(data) - trace "Decoded msg from peer", peer = p.id, msg = msg + trace "decoded msg from peer", peer = p.id, msg = msg await p.handler(p, @[msg]) - except: - trace "An exception occured while processing pubsub rpc requests", exc = getCurrentExceptionMsg() + p.recvdRpcCache.put($hexData.hash) + except CatchableError as exc: + error "an exception occured while processing pubsub rpc requests", exc = exc.msg finally: - trace "closing connection to pubsub peer", peer = p.id - await p.conn.close() + trace "exiting pubsub peer read loop", peer = p.id proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} = - for m in msgs: - trace "sending msgs to peer", peer = p.id, msgs = msgs - let encoded = encodeRpcMsg(m) - if encoded.buffer.len <= 0: - trace "empty message, skipping", peer = p.id - return + try: + for m in msgs: + trace "sending msgs to peer", toPeer = p.id + let encoded = encodeRpcMsg(m) + let encodedHex = encoded.buffer.toHex() + if encoded.buffer.len <= 0: + trace "empty message, skipping", peer = p.id + return - let encodedHex = encoded.buffer.toHex() - if encodedHex in p.seen: - trace "message already sent to peer, skipping", peer = p.id - continue + if $encodedHex.hash in p.sentRpcCache: + trace "message already sent to peer, skipping", peer = p.id + continue + + proc sendToRemote() {.async.} = + trace "sending encoded msgs to peer", peer = p.id, encoded = encodedHex + await p.sendConn.writeLp(encoded.buffer) + p.sentRpcCache.put($encodedHex.hash) - trace "sending encoded msgs to peer", peer = p.id, encoded = encodedHex - await p.conn.writeLp(encoded.buffer) - p.seen.put(encodedHex) + # if no connection has been set, + # queue messages untill a connection + # becomes available + if p.isConnected: + await sendToRemote() + return -proc newPubSubPeer*(conn: Connection, handler: RPCHandler): PubSubPeer = + p.onConnect.wait().addCallback( + proc(udata: pointer) = + asyncCheck sendToRemote() + ) + trace "enqueued message to send at a later time" + + except CatchableError as exc: + trace "exception occured", exc = exc.msg + +proc sendMsg*(p: PubSubPeer, + peerId: PeerID, + topic: string, + data: seq[byte]): Future[void] {.gcsafe.} = + p.send(@[RPCMsg(messages: @[newMessage(p.peerInfo.peerId.get(), data, topic)])]) + +proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async, gcsafe.} = + for topic in topics: + trace "sending graft msg to peer", peer = p.id, topicID = topic + await p.send(@[RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))]) + +proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async, gcsafe.} = + for topic in topics: + trace "sending prune msg to peer", peer = p.id, topicID = topic + await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))]) + +proc newPubSubPeer*(peerInfo: PeerInfo, + proto: string): PubSubPeer = new result - result.handler = handler - result.conn = conn - result.peerInfo = conn.peerInfo - result.id = conn.peerInfo.peerId.get().pretty() - result.seen = newTimedCache[string]() + result.proto = proto + result.peerInfo = peerInfo + result.sentRpcCache = newTimedCache[string](2.minutes) + result.recvdRpcCache = newTimedCache[string](2.minutes) + result.onConnect = newAsyncEvent() \ No newline at end of file diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim new file mode 100644 index 000000000..55eae6a86 --- /dev/null +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -0,0 +1,73 @@ +## Nim-LibP2P +## Copyright (c) 2019 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import options +import chronicles +import nimcrypto/sysrand +import messages, protobuf, + ../../../peer, + ../../../crypto/crypto, + ../../../protobuf/minprotobuf + +logScope: + topic = "PubSubMessage" + +proc msgId*(m: Message): string = + m.seqno.toHex() & PeerID.init(m.fromPeer).pretty + +proc fromPeerId*(m: Message): PeerId = + PeerID.init(m.fromPeer) + +proc sign*(peerId: PeerID, msg: Message): Message {.gcsafe.} = + var buff = initProtoBuffer() + encodeMessage(msg, buff) + # NOTE: leave as is, moving out would imply making this .threadsafe., etc... + let prefix = cast[seq[byte]]("libp2p-pubsub:") + if buff.buffer.len > 0: + result = msg + if peerId.privateKey.isSome: + result.signature = peerId. + privateKey. + get(). + sign(prefix & buff.buffer). + getBytes() + +proc verify*(peerId: PeerID, m: Message): bool = + if m.signature.len > 0 and m.key.len > 0: + var msg = m + msg.signature = @[] + msg.key = @[] + + var buff = initProtoBuffer() + encodeMessage(msg, buff) + + var remote: Signature + var key: PublicKey + if remote.init(m.signature) and key.init(m.key): + result = remote.verify(buff.buffer, key) + +proc newMessage*(peerId: PeerID, + data: seq[byte], + name: string, + sign: bool = true): Message {.gcsafe.} = + var seqno: seq[byte] = newSeq[byte](20) + if randomBytes(addr seqno[0], 20) > 0: + var key: seq[byte] = @[] + + if peerId.publicKey.isSome: + key = peerId.publicKey.get().getBytes() + + result = Message(fromPeer: peerId.getBytes(), + data: data, + seqno: seqno, + topicIDs: @[name]) + if sign: + result = sign(peerId, result) + + result.key = key diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim new file mode 100644 index 000000000..abeae671f --- /dev/null +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -0,0 +1,47 @@ +## Nim-LibP2P +## Copyright (c) 2019 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import options + +type + SubOpts* = object + subscribe*: bool + topic*: string + + Message* = object + fromPeer*: seq[byte] + data*: seq[byte] + seqno*: seq[byte] + topicIDs*: seq[string] + signature*: seq[byte] + key*: seq[byte] + + ControlMessage* = object + ihave*: seq[ControlIHave] + iwant*: seq[ControlIWant] + graft*: seq[ControlGraft] + prune*: seq[ControlPrune] + + ControlIHave* = object + topicID*: string + messageIDs*: seq[string] + + ControlIWant* = object + messageIDs*: seq[string] + + ControlGraft* = object + topicID*: string + + ControlPrune* = object + topicID*: string + + RPCMsg* = object + subscriptions*: seq[SubOpts] + messages*: seq[Message] + control*: Option[ControlMessage] diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim new file mode 100644 index 000000000..37710be06 --- /dev/null +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -0,0 +1,266 @@ +## Nim-LibP2P +## Copyright (c) 2019 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import options +import chronicles +import messages, + ../../../protobuf/minprotobuf, + ../../../crypto/crypto, + ../../../peer + +proc encodeGraft*(graft: ControlGraft, pb: var ProtoBuffer) {.gcsafe.} = + pb.write(initProtoField(1, graft.topicID)) + +proc decodeGraft*(pb: var ProtoBuffer): seq[ControlGraft] {.gcsafe.} = + trace "decoding graft msg", buffer = pb.buffer.toHex() + while true: + var topic: string + if pb.getString(1, topic) < 0: + trace "unable to read topic field from graft msg, breaking" + break + + trace "read topic field from graft msg", topicID = topic + result.add(ControlGraft(topicID: topic)) + +proc encodePrune*(prune: ControlPrune, pb: var ProtoBuffer) {.gcsafe.} = + pb.write(initProtoField(1, prune.topicID)) + +proc decodePrune*(pb: var ProtoBuffer): seq[ControlPrune] {.gcsafe.} = + trace "decoding prune msg" + while true: + var topic: string + if pb.getString(1, topic) < 0: + break + trace "read topic field", topicID = topic + + result.add(ControlPrune(topicID: topic)) + +proc encodeIHave*(ihave: ControlIHave, pb: var ProtoBuffer) {.gcsafe.} = + pb.write(initProtoField(1, ihave.topicID)) + for mid in ihave.messageIDs: + pb.write(initProtoField(2, mid)) + +proc decodeIHave*(pb: var ProtoBuffer): seq[ControlIHave] {.gcsafe.} = + trace "decoding ihave msg" + + while true: + var control: ControlIHave + if pb.enterSubMessage() > 0: + if pb.getString(1, control.topicID) < 0: + trace "topic field missing from ihave msg" + break + + trace "read topic field", topicID = control.topicID + + while true: + var mid: string + if pb.getString(2, mid) < 0: + break + trace "read messageID field", mid = mid + control.messageIDs.add(mid) + + result.add(control) + +proc encodeIWant*(iwant: ControlIWant, pb: var ProtoBuffer) {.gcsafe.} = + for mid in iwant.messageIDs: + pb.write(initProtoField(1, mid)) + +proc decodeIWant*(pb: var ProtoBuffer): seq[ControlIWant] {.gcsafe.} = + trace "decoding ihave msg" + + while pb.enterSubMessage() > 0: + var mid: string + var iWant: ControlIWant + while pb.getString(1, mid) > 0: + trace "read messageID field", mid = mid + iWant.messageIDs.add(mid) + result.add(iWant) + +proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} = + if control.ihave.len > 0: + var ihave = initProtoBuffer() + for h in control.ihave: + h.encodeIHave(ihave) + + # write messages to protobuf + ihave.finish() + pb.write(initProtoField(1, ihave)) + + if control.iwant.len > 0: + var iwant = initProtoBuffer() + for w in control.iwant: + w.encodeIWant(iwant) + + # write messages to protobuf + iwant.finish() + pb.write(initProtoField(2, iwant)) + + if control.graft.len > 0: + var graft = initProtoBuffer() + for g in control.graft: + g.encodeGraft(graft) + + # write messages to protobuf + graft.finish() + pb.write(initProtoField(3, graft)) + + if control.prune.len > 0: + var prune = initProtoBuffer() + for p in control.prune: + p.encodePrune(prune) + + # write messages to protobuf + prune.finish() + pb.write(initProtoField(4, prune)) + +proc decodeControl*(pb: var ProtoBuffer): Option[ControlMessage] {.gcsafe.} = + trace "decoding control submessage" + var control: ControlMessage + while true: + var field = pb.enterSubMessage() + trace "processing submessage", field = field + case field: + of 0: + trace "no submessage found in Control msg" + break + of 1: + control.ihave = pb.decodeIHave() + of 2: + control.iwant = pb.decodeIWant() + of 3: + control.graft = pb.decodeGraft() + of 4: + control.prune = pb.decodePrune() + else: + raise newException(CatchableError, "message type not recognized") + + if result.isNone: + result = some(control) + +proc encodeSubs*(subs: SubOpts, pb: var ProtoBuffer) {.gcsafe.} = + pb.write(initProtoField(1, subs.subscribe)) + pb.write(initProtoField(2, subs.topic)) + +proc decodeSubs*(pb: var ProtoBuffer): seq[SubOpts] {.gcsafe.} = + while true: + var subOpt: SubOpts + var subscr: int + discard pb.getVarintValue(1, subscr) + subOpt.subscribe = cast[bool](subscr) + trace "read subscribe field", subscribe = subOpt.subscribe + + if pb.getString(2, subOpt.topic) < 0: + break + trace "read subscribe field", topicName = subOpt.topic + + result.add(subOpt) + + trace "got subscriptions", subscriptions = result + +proc encodeMessage*(msg: Message, pb: var ProtoBuffer) {.gcsafe.} = + pb.write(initProtoField(1, msg.fromPeer)) + pb.write(initProtoField(2, msg.data)) + pb.write(initProtoField(3, msg.seqno)) + + for t in msg.topicIDs: + pb.write(initProtoField(4, t)) + + if msg.signature.len > 0: + pb.write(initProtoField(5, msg.signature)) + + if msg.key.len > 0: + pb.write(initProtoField(6, msg.key)) + + pb.finish() + +proc decodeMessages*(pb: var ProtoBuffer): seq[Message] {.gcsafe.} = + # TODO: which of this fields are really optional? + while true: + var msg: Message + if pb.getBytes(1, msg.fromPeer) < 0: + break + trace "read message field", fromPeer = msg.fromPeer + + if pb.getBytes(2, msg.data) < 0: + break + trace "read message field", data = msg.data + + if pb.getBytes(3, msg.seqno) < 0: + break + trace "read message field", seqno = msg.seqno + + var topic: string + while true: + if pb.getString(4, topic) < 0: + break + msg.topicIDs.add(topic) + trace "read message field", topicName = topic + topic = "" + + discard pb.getBytes(5, msg.signature) + trace "read message field", signature = msg.signature + + discard pb.getBytes(6, msg.key) + trace "read message field", key = msg.key + + result.add(msg) + +proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} = + result = initProtoBuffer() + trace "encoding msg: ", msg = msg + + if msg.subscriptions.len > 0: + var subs = initProtoBuffer() + for s in msg.subscriptions: + encodeSubs(s, subs) + + # write subscriptions to protobuf + subs.finish() + result.write(initProtoField(1, subs)) + + if msg.messages.len > 0: + var messages = initProtoBuffer() + for m in msg.messages: + encodeMessage(m, messages) + + # write messages to protobuf + messages.finish() + result.write(initProtoField(2, messages)) + + if msg.control.isSome: + var control = initProtoBuffer() + msg.control.get.encodeControl(control) + + # write messages to protobuf + control.finish() + result.write(initProtoField(3, control)) + + if result.buffer.len > 0: + result.finish() + +proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} = + var pb = initProtoBuffer(msg) + + result.subscriptions = newSeq[SubOpts]() + while true: + # decode SubOpts array + var field = pb.enterSubMessage() + trace "processing submessage", field = field + case field: + of 0: + trace "no submessage found in RPC msg" + break + of 1: + result.subscriptions = pb.decodeSubs() + of 2: + result.messages = pb.decodeMessages() + of 3: + result.control = pb.decodeControl() + else: + raise newException(CatchableError, "message type not recognized") diff --git a/libp2p/protocols/pubsub/rpcmsg.nim b/libp2p/protocols/pubsub/rpcmsg.nim deleted file mode 100644 index 9ca5f3ab7..000000000 --- a/libp2p/protocols/pubsub/rpcmsg.nim +++ /dev/null @@ -1,189 +0,0 @@ -## Nim-LibP2P -## Copyright (c) 2019 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -import sequtils, options -import chronos, nimcrypto/sysrand, chronicles -import ../../peerinfo, - ../../peer, - ../../crypto/crypto, - ../../protobuf/minprotobuf - -logScope: - topic = "RpcMsg" - -type - SubOpts* = object - subscribe*: bool - topic*: string - - Message* = object - fromPeer*: seq[byte] - data*: seq[byte] - seqno*: seq[byte] - topicIDs*: seq[string] - signature*: seq[byte] - key*: seq[byte] - - RPCMsg* = object - subscriptions*: seq[SubOpts] - messages*: seq[Message] - -proc encodeMessage(msg: Message, buff: var ProtoBuffer) {.gcsafe.} = - buff.write(initProtoField(1, msg.fromPeer)) - buff.write(initProtoField(2, msg.data)) - buff.write(initProtoField(3, msg.seqno)) - - for t in msg.topicIDs: - buff.write(initProtoField(4, t)) - - if msg.signature.len > 0: - buff.write(initProtoField(5, msg.signature)) - - if msg.key.len > 0: - buff.write(initProtoField(6, msg.key)) - - buff.finish() - -proc encodeSubs(subs: SubOpts, buff: var ProtoBuffer) {.gcsafe.} = - buff.write(initProtoField(1, subs.subscribe)) - buff.write(initProtoField(2, subs.topic)) - -proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} = - result = initProtoBuffer() - trace "encoding msg: ", msg = msg - - if msg.subscriptions.len > 0: - var subs = initProtoBuffer() - for s in msg.subscriptions: - encodeSubs(s, subs) - - # write subscriptions to protobuf - subs.finish() - result.write(initProtoField(1, subs)) - - if msg.messages.len > 0: - var messages = initProtoBuffer() - for m in msg.messages: - encodeMessage(m, messages) - - # write messages to protobuf - messages.finish() - result.write(initProtoField(2, messages)) - - if result.buffer.len > 0: - result.finish() - -proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} = - var pb = initProtoBuffer(msg) - - result.subscriptions = newSeq[SubOpts]() - while true: - # decode SubOpts array - var field = pb.enterSubMessage() - trace "processing submessage", field = field - case field: - of 0: - break - of 1: - while true: - var subOpt: SubOpts - var subscr: int - discard pb.getVarintValue(1, subscr) - subOpt.subscribe = cast[bool](subscr) - trace "read subscribe field", subscribe = subOpt.subscribe - - if pb.getString(2, subOpt.topic) < 0: - break - trace "read subscribe field", topicName = subOpt.topic - - result.subscriptions.add(subOpt) - trace "got subscriptions", subscriptions = result.subscriptions - - of 2: - result.messages = newSeq[Message]() - # TODO: which of this fields are really optional? - while true: - var msg: Message - if pb.getBytes(1, msg.fromPeer) < 0: - break - trace "read message field", fromPeer = msg.fromPeer - - if pb.getBytes(2, msg.data) < 0: - break - trace "read message field", data = msg.data - - if pb.getBytes(3, msg.seqno) < 0: - break - trace "read message field", seqno = msg.seqno - - var topic: string - while true: - if pb.getString(4, topic) < 0: - break - msg.topicIDs.add(topic) - trace "read message field", topicName = topic - topic = "" - - discard pb.getBytes(5, msg.signature) - trace "read message field", signature = msg.signature - - discard pb.getBytes(6, msg.key) - trace "read message field", key = msg.key - - result.messages.add(msg) - else: - raise newException(CatchableError, "message type not recognized") - -proc sign*(peerId: PeerID, msg: Message): Message = - var buff = initProtoBuffer() - encodeMessage(msg, buff) - # NOTE: leave as is, moving out would imply making this .threadsafe., etc... - let prefix = cast[seq[byte]]("libp2p-pubsub:") - if buff.buffer.len > 0: - result = msg - if peerId.privateKey.isSome: - result.signature = peerId. - privateKey. - get(). - sign(prefix & buff.buffer). - getBytes() - -proc verify*(peerId: PeerID, m: Message): bool = - if m.signature.len > 0 and m.key.len > 0: - var msg = m - msg.signature = @[] - msg.key = @[] - - var buff = initProtoBuffer() - encodeMessage(msg, buff) - - var remote: Signature - var key: PublicKey - if remote.init(m.signature) and key.init(m.key): - result = remote.verify(buff.buffer, key) - -proc makeMessage*(peerId: PeerID, - data: seq[byte], - name: string, - sign: bool = true): Message {.gcsafe.} = - var seqno: seq[byte] = newSeq[byte](20) - if randomBytes(addr seqno[0], 20) > 0: - var key: seq[byte] = @[] - - if peerId.publicKey.isSome: - key = peerId.publicKey.get().getRawBytes() - - result = Message(fromPeer: peerId.getBytes(), - data: data, - seqno: seqno, - topicIDs: @[name]) - if sign: - result = sign(peerId, result) - - result.key = key diff --git a/libp2p/protocols/pubsub/timedcache.nim b/libp2p/protocols/pubsub/timedcache.nim index 0966fc6e3..a0bb09007 100644 --- a/libp2p/protocols/pubsub/timedcache.nim +++ b/libp2p/protocols/pubsub/timedcache.nim @@ -7,16 +7,16 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import tables, hashes +import tables import chronos, chronicles logScope: topic = "TimedCache" -const Timeout* = 10 * 1000 # default timeout in ms +const Timeout* = 10.seconds # default timeout in ms type - ExpireHandler*[V] = proc(val: V) {.gcsafe.} + ExpireHandler*[V] = proc(key: string, val: V) {.gcsafe.} TimedEntry*[V] = object of RootObj val: V handler: ExpireHandler[V] @@ -24,33 +24,57 @@ type TimedCache*[V] = ref object of RootObj cache*: Table[string, TimedEntry[V]] onExpire*: ExpireHandler[V] + timeout*: Duration -proc newTimedCache*[V](): TimedCache[V] = - new result - result.cache = initTable[string, TimedEntry[V]]() +# TODO: This belong in chronos, temporary left here until chronos is updated +proc addTimer*(at: Duration, cb: CallbackFunc, udata: pointer = nil) = + ## Arrange for the callback ``cb`` to be called at the given absolute + ## timestamp ``at``. You can also pass ``udata`` to callback. + addTimer(Moment.fromNow(at), cb, udata) proc put*[V](t: TimedCache[V], key: string, val: V = "", - timeout: uint64 = Timeout, + timeout: Duration, handler: ExpireHandler[V] = nil) = - trace "adding entry to timed cache", key = key, val = val + trace "adding entry to timed cache", key = key t.cache[key] = TimedEntry[V](val: val, handler: handler) # TODO: addTimer with param Duration is missing from chronos, needs to be added addTimer( timeout, proc (arg: pointer = nil) {.gcsafe.} = - trace "deleting expired entry from timed cache", key = key, val = val - var entry = t.cache[key] - t.cache.del(key) - if not isNil(entry.handler): - entry.handler(entry.val) + trace "deleting expired entry from timed cache", key = key + if key in t.cache: + let entry = t.cache[key] + t.cache.del(key) + if not isNil(entry.handler): + entry.handler(key, entry.val) ) +proc put*[V](t: TimedCache[V], + key: string, + val: V = "", + handler: ExpireHandler[V] = nil) = + t.put(key, val, t.timeout, handler) + proc contains*[V](t: TimedCache[V], key: string): bool = t.cache.contains(key) proc del*[V](t: TimedCache[V], key: string) = trace "deleting entry from timed cache", key = key t.cache.del(key) + +proc get*[V](t: TimedCache[V], key: string): V = + t.cache[key].val + +proc `[]`*[V](t: TimedCache[V], key: string): V = + t.get(key) + +proc `[]=`*[V](t: TimedCache[V], key: string, val: V): V = + t.put(key, val) + +proc newTimedCache*[V](timeout: Duration = Timeout): TimedCache[V] = + new result + result.cache = initTable[string, TimedEntry[V]]() + result.timeout = timeout diff --git a/libp2p/switch.nim b/libp2p/switch.nim index a320e0e8a..3e33664b3 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -256,12 +256,18 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = var server = await t.listen(a, handle) s.peerInfo.addrs[i] = t.ma # update peer's address startFuts.add(server) + + if s.pubSub.isSome: + await s.pubSub.get().start() result = startFuts # listen for incoming connections proc stop*(s: Switch) {.async.} = trace "stopping switch" + if s.pubSub.isSome: + await s.pubSub.get().stop() + await allFutures(toSeq(s.connections.values).mapIt(s.cleanupConn(it))) await allFutures(s.transports.mapIt(it.close())) diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim new file mode 100644 index 000000000..7e98d8076 --- /dev/null +++ b/tests/pubsub/testfloodsub.nim @@ -0,0 +1,130 @@ +## Nim-Libp2p +## Copyright (c) 2018 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import unittest, sequtils, options +import chronos +import utils, + ../../libp2p/[switch, crypto/crypto] + +suite "FloodSub": + test "FloodSub basic publish/subscribe A -> B": + proc testBasicPubSub(): Future[bool] {.async.} = + var completionFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + completionFut.complete(true) + + var nodes = generateNodes(2) + var awaiters: seq[Future[void]] + awaiters.add((await nodes[0].start())) + awaiters.add((await nodes[1].start())) + + await subscribeNodes(nodes) + await nodes[1].subscribe("foobar", handler) + await sleepAsync(1000.millis) + + await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) + + result = await completionFut + await allFutures(nodes[0].stop(), nodes[1].stop()) + await allFutures(awaiters) + + check: + waitFor(testBasicPubSub()) == true + + test "FloodSub basic publish/subscribe B -> A": + proc testBasicPubSub(): Future[bool] {.async.} = + var completionFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + completionFut.complete(true) + + var nodes = generateNodes(2) + var awaiters: seq[Future[void]] + awaiters.add((await nodes[0].start())) + awaiters.add((await nodes[1].start())) + + await subscribeNodes(nodes) + await nodes[0].subscribe("foobar", handler) + await sleepAsync(1000.millis) + + await nodes[1].publish("foobar", cast[seq[byte]]("Hello!")) + + result = await completionFut + await allFutures(nodes[0].stop(), nodes[1].stop()) + await allFutures(awaiters) + + check: + waitFor(testBasicPubSub()) == true + + test "FloodSub multiple peers, no self trigger": + proc testBasicFloodSub(): Future[bool] {.async.} = + var passed: int + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + passed.inc() + + var nodes: seq[Switch] = newSeq[Switch]() + for i in 0..<10: + nodes.add(createNode()) + + var awaitters: seq[Future[void]] + for node in nodes: + awaitters.add(await node.start()) + await node.subscribe("foobar", handler) + await sleepAsync(10.millis) + + await subscribeNodes(nodes) + await sleepAsync(10.millis) + + for node in nodes: + await node.publish("foobar", cast[seq[byte]]("Hello!")) + await sleepAsync(10.millis) + + await allFutures(nodes.mapIt(it.stop())) + await allFutures(awaitters) + + result = passed >= 10 # non deterministic, so at least 2 times + + check: + waitFor(testBasicFloodSub()) == true + + test "FloodSub multiple peers, with self trigger": + proc testBasicFloodSub(): Future[bool] {.async.} = + var passed: int + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + passed.inc() + + var nodes: seq[Switch] = newSeq[Switch]() + for i in 0..<10: + nodes.add(createNode(none(PrivateKey), "/ip4/127.0.0.1/tcp/0", true)) + + var awaitters: seq[Future[void]] + for node in nodes: + awaitters.add((await node.start())) + await node.subscribe("foobar", handler) + await sleepAsync(10.millis) + + await subscribeNodes(nodes) + await sleepAsync(500.millis) + + for node in nodes: + await node.publish("foobar", cast[seq[byte]]("Hello!")) + await sleepAsync(10.millis) + + await sleepAsync(100.millis) + + await allFutures(nodes.mapIt(it.stop())) + await allFutures(awaitters) + + result = passed >= 10 # non deterministic, so at least 20 times + + check: + waitFor(testBasicFloodSub()) == true diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim new file mode 100644 index 000000000..d9d146b60 --- /dev/null +++ b/tests/pubsub/testgossipsub.nim @@ -0,0 +1,425 @@ +## Nim-Libp2p +## Copyright (c) 2018 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import unittest, sequtils, options, tables, sets +import chronos +import utils, ../../libp2p/[switch, + peer, + peerinfo, + connection, + crypto/crypto, + stream/bufferstream, + protocols/pubsub/pubsub, + protocols/pubsub/gossipsub] + +proc createGossipSub(): GossipSub = + var peerInfo: PeerInfo + var seckey = some(PrivateKey.random(RSA)) + + peerInfo.peerId = some(PeerID.init(seckey.get())) + result = newPubSub(GossipSub, peerInfo) + +suite "GossipSub": + test "should add remote peer topic subscriptions": + proc testRun(): Future[bool] {.async.} = + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + discard + + let gossip1 = createGossipSub() + let gossip2 = createGossipSub() + + var buf1 = newBufferStream() + var conn1 = newConnection(buf1) + conn1.peerInfo = gossip1.peerInfo + + var buf2 = newBufferStream() + var conn2 = newConnection(buf2) + conn2.peerInfo = gossip2.peerInfo + + buf1 = buf1 | buf2 | buf1 + + await gossip1.subscribeToPeer(conn2) + asyncCheck gossip2.handleConn(conn1, GossipSubCodec) + + await gossip1.subscribe("foobar", handler) + await sleepAsync(1.seconds) + + check: + "foobar" in gossip2.gossipsub + gossip1.peerInfo.peerId.get().pretty in gossip2.gossipsub["foobar"] + + result = true + + check: + waitFor(testRun()) == true + + test "e2e - should add remote peer topic subscriptions": + proc testBasicGossipSub(): Future[bool] {.async.} = + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + discard + + var nodes: seq[Switch] = newSeq[Switch]() + for i in 0..<2: + nodes.add(createNode(gossip = true)) + + var awaitters: seq[Future[void]] + for node in nodes: + awaitters.add(await node.start()) + + await nodes[1].subscribe("foobar", handler) + await sleepAsync(100.millis) + + await subscribeNodes(nodes) + + let gossip1 = GossipSub(nodes[0].pubSub.get()) + let gossip2 = GossipSub(nodes[1].pubSub.get()) + + check: + "foobar" in gossip2.topics + "foobar" in gossip1.gossipsub + gossip2.peerInfo.peerId.get().pretty in gossip1.gossipsub["foobar"] + + await allFutures(nodes.mapIt(it.stop())) + await allFutures(awaitters) + + result = true + + check: + waitFor(testBasicGossipSub()) == true + + test "should add remote peer topic subscriptions if both peers are subscribed": + proc testRun(): Future[bool] {.async.} = + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + discard + + let gossip1 = createGossipSub() + let gossip2 = createGossipSub() + + var buf1 = newBufferStream() + var conn1 = newConnection(buf1) + conn1.peerInfo = gossip1.peerInfo + + var buf2 = newBufferStream() + var conn2 = newConnection(buf2) + conn2.peerInfo = gossip2.peerInfo + + buf1 = buf1 | buf2 | buf1 + + await gossip1.subscribeToPeer(conn2) + asyncCheck gossip1.handleConn(conn1, GossipSubCodec) + + await gossip2.subscribeToPeer(conn1) + asyncCheck gossip2.handleConn(conn2, GossipSubCodec) + + await gossip1.subscribe("foobar", handler) + await gossip2.subscribe("foobar", handler) + await sleepAsync(1.seconds) + + check: + "foobar" in gossip1.topics + "foobar" in gossip2.topics + + "foobar" in gossip1.gossipsub + "foobar" in gossip2.gossipsub + + # TODO: in a real setting, we would be checking for the peerId from + # gossip1 in gossip2 and vice versa, but since we're doing some mockery + # with connection piping and such, this is fine - do not change! + gossip1.peerInfo.peerId.get().pretty in gossip1.gossipsub["foobar"] + gossip2.peerInfo.peerId.get().pretty in gossip2.gossipsub["foobar"] + + result = true + + check: + waitFor(testRun()) == true + + test "e2e - should add remote peer topic subscriptions if both peers are subscribed": + proc testBasicGossipSub(): Future[bool] {.async.} = + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + discard + + var nodes: seq[Switch] = newSeq[Switch]() + for i in 0..<2: + nodes.add(createNode(gossip = true)) + + var awaitters: seq[Future[void]] + for node in nodes: + awaitters.add(await node.start()) + + await nodes[0].subscribe("foobar", handler) + await sleepAsync(100.millis) + + await nodes[1].subscribe("foobar", handler) + await sleepAsync(100.millis) + + await subscribeNodes(nodes) + + let gossip1 = GossipSub(nodes[0].pubSub.get()) + let gossip2 = GossipSub(nodes[1].pubSub.get()) + + check: + "foobar" in gossip1.topics + "foobar" in gossip2.topics + + "foobar" in gossip1.gossipsub + "foobar" in gossip2.gossipsub + + gossip1.peerInfo.peerId.get().pretty in gossip2.gossipsub["foobar"] + gossip2.peerInfo.peerId.get().pretty in gossip1.gossipsub["foobar"] + + await allFutures(nodes.mapIt(it.stop())) + await allFutures(awaitters) + + result = true + + check: + waitFor(testBasicGossipSub()) == true + + # test "send over fanout A -> B": + # proc testRun(): Future[bool] {.async.} = + # var handlerFut = newFuture[bool]() + # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + # check: + # topic == "foobar" + # cast[string](data) == "Hello!" + + # handlerFut.complete(true) + + # let gossip1 = createGossipSub() + # let gossip2 = createGossipSub() + + # var buf1 = newBufferStream() + # var conn1 = newConnection(buf1) + + # var buf2 = newBufferStream() + # var conn2 = newConnection(buf2) + + # conn1.peerInfo = gossip2.peerInfo + # conn2.peerInfo = gossip1.peerInfo + + # buf1 = buf1 | buf2 | buf1 + + # await gossip1.subscribeToPeer(conn2) + # asyncCheck gossip1.handleConn(conn1, GossipSubCodec) + + # await gossip2.subscribeToPeer(conn1) + # asyncCheck gossip2.handleConn(conn2, GossipSubCodec) + + # await gossip1.subscribe("foobar", handler) + # await sleepAsync(1.seconds) + # await gossip2.publish("foobar", cast[seq[byte]]("Hello!")) + # await sleepAsync(1.seconds) + + # result = await handlerFut + + # check: + # waitFor(testRun()) == true + + test "e2e - send over fanout A -> B": + proc testRun(): Future[bool] {.async.} = + var passed: bool + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + passed = true + + var nodes = generateNodes(2, true) + var wait = newSeq[Future[void]]() + wait.add(await nodes[0].start()) + wait.add(await nodes[1].start()) + + await subscribeNodes(nodes) + + await nodes[1].subscribe("foobar", handler) + await sleepAsync(3.seconds) + + await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) + await sleepAsync(3.seconds) + + var gossipSub1: GossipSub = GossipSub(nodes[0].pubSub.get()) + + check: + "foobar" in gossipSub1.gossipsub + + await nodes[1].stop() + await nodes[0].stop() + + await allFutures(wait) + result = passed + + check: + waitFor(testRun()) == true + + # test "send over mesh A -> B": + # proc testRun(): Future[bool] {.async.} = + # var passed: bool + # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + # check: + # topic == "foobar" + # cast[string](data) == "Hello!" + + # passed = true + + # let gossip1 = createGossipSub() + # let gossip2 = createGossipSub() + + # var buf1 = newBufferStream() + # var conn1 = newConnection(buf1) + # conn1.peerInfo = gossip1.peerInfo + + # var buf2 = newBufferStream() + # var conn2 = newConnection(buf2) + # conn2.peerInfo = gossip2.peerInfo + + # buf1 = buf1 | buf2 | buf1 + + # await gossip1.subscribeToPeer(conn2) + # await gossip2.subscribeToPeer(conn1) + + # await gossip1.subscribe("foobar", handler) + # await sleepAsync(1.seconds) + + # await gossip2.subscribe("foobar", handler) + # await sleepAsync(1.seconds) + + # await gossip2.publish("foobar", cast[seq[byte]]("Hello!")) + # await sleepAsync(1.seconds) + # result = passed + + # check: + # waitFor(testRun()) == true + + # test "e2e - send over mesh A -> B": + # proc testRun(): Future[bool] {.async.} = + # var passed: bool + # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + # check topic == "foobar" + # passed = true + + # var nodes = generateNodes(2, true) + # var wait = await nodes[1].start() + + # await nodes[0].subscribeToPeer(nodes[1].peerInfo) + # await sleepAsync(100.millis) + + # await nodes[0].subscribe("foobar", handler) + # await sleepAsync(100.millis) + + # await nodes[1].subscribe("foobar", handler) + # await sleepAsync(100.millis) + + # await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) + # await sleepAsync(1000.millis) + + # await nodes[1].stop() + # await allFutures(wait) + # result = passed + + # check: + # waitFor(testRun()) == true + + # test "with multiple peers": + # proc testRun(): Future[bool] {.async.} = + # var nodes: seq[GossipSub] + # for i in 0..<10: + # nodes.add(createGossipSub()) + + # var pending: seq[Future[void]] + # var awaitters: seq[Future[void]] + # var seen: Table[string, int] + # for dialer in nodes: + # var handler: TopicHandler + # closureScope: + # var dialerNode = dialer + # handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} = + # if dialerNode.peerInfo.peerId.get().pretty notin seen: + # seen[dialerNode.peerInfo.peerId.get().pretty] = 0 + # seen[dialerNode.peerInfo.peerId.get().pretty].inc + # check topic == "foobar" + + # await dialer.subscribe("foobar", handler) + # await sleepAsync(20.millis) + + # for i, node in nodes: + # if dialer.peerInfo.peerId != node.peerInfo.peerId: + # var buf1 = newBufferStream() + # var conn1 = newConnection(buf1) + # conn1.peerInfo = dialer.peerInfo + + # var buf2 = newBufferStream() + # var conn2 = newConnection(buf2) + # conn2.peerInfo = node.peerInfo + + # buf1 = buf2 | buf1 + # buf2 = buf1 | buf2 + + # pending.add(dialer.subscribeToPeer(conn2)) + # pending.add(node.subscribeToPeer(conn1)) + # await sleepAsync(10.millis) + + # awaitters.add(dialer.start()) + + # await nodes[0].publish("foobar", + # cast[seq[byte]]("from node " & + # nodes[1].peerInfo.peerId.get().pretty)) + + # await sleepAsync(1000.millis) + # await allFutures(nodes.mapIt(it.stop())) + # await allFutures(awaitters) + + # check: seen.len == 9 + # for k, v in seen.pairs: + # check: v == 1 + + # result = true + + # check: + # waitFor(testRun()) == true + + test "e2e - with multiple peers": + proc testRun(): Future[bool] {.async.} = + var nodes: seq[Switch] = newSeq[Switch]() + var awaitters: seq[Future[void]] + + for i in 0..<10: + nodes.add(createNode(none(PrivateKey), "/ip4/127.0.0.1/tcp/0", true, true)) + awaitters.add((await nodes[i].start())) + + var seen: Table[string, int] + for dialer in nodes: + var handler: TopicHandler + closureScope: + var dialerNode = dialer + handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} = + if dialerNode.peerInfo.peerId.get().pretty notin seen: + seen[dialerNode.peerInfo.peerId.get().pretty] = 0 + seen[dialerNode.peerInfo.peerId.get().pretty].inc + check topic == "foobar" + + await dialer.subscribe("foobar", handler) + await sleepAsync(20.millis) + + await subscribeNodes(nodes) + await sleepAsync(10.millis) + + await nodes[0].publish("foobar", + cast[seq[byte]]("from node " & + nodes[1].peerInfo.peerId.get().pretty)) + + await sleepAsync(1000.millis) + await allFutures(nodes.mapIt(it.stop())) + await allFutures(awaitters) + + check: seen.len == 10 + for k, v in seen.pairs: + check: v == 1 + + result = true + + check: + waitFor(testRun()) == true diff --git a/tests/pubsub/testmcache.nim b/tests/pubsub/testmcache.nim new file mode 100644 index 000000000..226063aad --- /dev/null +++ b/tests/pubsub/testmcache.nim @@ -0,0 +1,93 @@ +import options, sets, sequtils +import unittest +import ../../libp2p/[peer, + crypto/crypto, + protocols/pubsub/mcache, + protocols/pubsub/rpc/message, + protocols/pubsub/rpc/messages] + +suite "MCache": + test "put/get": + var mCache = newMCache(3, 5) + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(RSA)).data, + seqno: cast[seq[byte]]("12345")) + mCache.put(msg) + check mCache.get(msg.msgId).isSome and mCache.get(msg.msgId).get() == msg + + test "window": + var mCache = newMCache(3, 5) + + for i in 0..<3: + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(RSA)).data, + seqno: cast[seq[byte]]("12345"), + topicIDs: @["foo"]) + mCache.put(msg) + + for i in 0..<5: + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(RSA)).data, + seqno: cast[seq[byte]]("12345"), + topicIDs: @["bar"]) + mCache.put(msg) + + var mids = mCache.window("foo") + check mids.len == 3 + + var id = toSeq(mids)[0] + check mCache.get(id).get().topicIDs[0] == "foo" + + test "shift - shift 1 window at a time": + var mCache = newMCache(1, 5) + + for i in 0..<3: + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(RSA)).data, + seqno: cast[seq[byte]]("12345"), + topicIDs: @["foo"]) + mCache.put(msg) + + mCache.shift() + check mCache.window("foo").len == 0 + + for i in 0..<3: + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(RSA)).data, + seqno: cast[seq[byte]]("12345"), + topicIDs: @["bar"]) + mCache.put(msg) + + mCache.shift() + check mCache.window("bar").len == 0 + + for i in 0..<3: + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(RSA)).data, + seqno: cast[seq[byte]]("12345"), + topicIDs: @["baz"]) + mCache.put(msg) + + mCache.shift() + check mCache.window("baz").len == 0 + + test "shift - 2 windows at a time": + var mCache = newMCache(1, 5) + + for i in 0..<3: + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(RSA)).data, + seqno: cast[seq[byte]]("12345"), + topicIDs: @["foo"]) + mCache.put(msg) + + for i in 0..<3: + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(RSA)).data, + seqno: cast[seq[byte]]("12345"), + topicIDs: @["bar"]) + mCache.put(msg) + + for i in 0..<3: + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(RSA)).data, + seqno: cast[seq[byte]]("12345"), + topicIDs: @["baz"]) + mCache.put(msg) + + mCache.shift() + check mCache.window("foo").len == 0 + + mCache.shift() + check mCache.window("bar").len == 0 diff --git a/tests/pubsub/testpubsub.nim b/tests/pubsub/testpubsub.nim new file mode 100644 index 000000000..1401b3168 --- /dev/null +++ b/tests/pubsub/testpubsub.nim @@ -0,0 +1,4 @@ +include ../../libp2p/protocols/pubsub/gossipsub +import testfloodsub, + testgossipsub, + testmcache diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim new file mode 100644 index 000000000..951e07b84 --- /dev/null +++ b/tests/pubsub/utils.nim @@ -0,0 +1,64 @@ +import options, tables +import chronos +import ../../libp2p/[switch, + peer, + connection, + multiaddress, + peerinfo, + muxers/muxer, + crypto/crypto, + muxers/mplex/mplex, + muxers/mplex/types, + protocols/identify, + transports/transport, + transports/tcptransport, + protocols/secure/secure, + protocols/secure/secio, + protocols/pubsub/pubsub, + protocols/pubsub/gossipsub, + protocols/pubsub/floodsub] + +proc createMplex(conn: Connection): Muxer = + result = newMplex(conn) + +proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey), + address: string = "/ip4/127.0.0.1/tcp/0", + triggerSelf: bool = false, + gossip: bool = false): Switch = + var peerInfo: PeerInfo + var seckey = privKey + if privKey.isNone: + seckey = some(PrivateKey.random(RSA)) + + peerInfo.peerId = some(PeerID.init(seckey.get())) + peerInfo.addrs.add(Multiaddress.init(address)) + + let mplexProvider = newMuxerProvider(createMplex, MplexCodec) + let transports = @[Transport(newTransport(TcpTransport))] + let muxers = [(MplexCodec, mplexProvider)].toTable() + let identify = newIdentify(peerInfo) + let secureManagers = [(SecioCodec, Secure(newSecio(seckey.get())))].toTable() + + var pubSub: Option[PubSub] + if gossip: + pubSub = some(PubSub(newPubSub(GossipSub, peerInfo, triggerSelf))) + else: + pubSub = some(PubSub(newPubSub(FloodSub, peerInfo, triggerSelf))) + + result = newSwitch(peerInfo, + transports, + identify, + muxers, + secureManagers = secureManagers, + pubSub = pubSub) + +proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] = + for i in 0.. DefaultReadSize: + raise newInvalidVarintException() + result.setLen(size) + if size > 0.uint: + await s.readExactly(addr result[0], int(size)) + except LPStreamIncompleteError, LPStreamReadError: + trace "remote connection ended unexpectedly", exc = getCurrentExceptionMsg() + +proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey), + address: string = "/ip4/127.0.0.1/tcp/0", + triggerSelf: bool = false, + gossip: bool = false): Switch = + var peerInfo: NativePeerInfo + var seckey = privKey + if privKey.isNone: + seckey = some(PrivateKey.random(RSA)) + + peerInfo.peerId = some(PeerID.init(seckey.get())) + peerInfo.addrs.add(Multiaddress.init(address)) + + proc createMplex(conn: Connection): Muxer = newMplex(conn) + let mplexProvider = newMuxerProvider(createMplex, MplexCodec) + let transports = @[Transport(newTransport(TcpTransport))] + let muxers = [(MplexCodec, mplexProvider)].toTable() + let identify = newIdentify(peerInfo) + let secureManagers = [(SecioCodec, Secure(newSecio(seckey.get())))].toTable() + + var pubSub: Option[PubSub] + if gossip: + pubSub = some(PubSub(newPubSub(GossipSub, peerInfo, triggerSelf))) + else: + pubSub = some(PubSub(newPubSub(FloodSub, peerInfo, triggerSelf))) + + result = newSwitch(peerInfo, + transports, + identify, + muxers, + secureManagers = secureManagers, + pubSub = pubSub) + +proc testPubSubDaemonPublish(gossip: bool = false): Future[bool] {.async.} = + var pubsubData = "TEST MESSAGE" + var testTopic = "test-topic" + var msgData = cast[seq[byte]](pubsubData) + + var flags = {PSFloodSub} + if gossip: + flags = {PSGossipSub} + + let daemonNode = await newDaemonApi(flags) + let daemonPeer = await daemonNode.identity() + let nativeNode = createNode(gossip = gossip) + let awaiters = nativeNode.start() + let nativePeer = nativeNode.peerInfo + + var handlerFuture = newFuture[bool]() + proc nativeHandler(topic: string, data: seq[byte]) {.async.} = + let smsg = cast[string](data) + check smsg == pubsubData + handlerFuture.complete(true) + + await nativeNode.subscribeToPeer(NativePeerInfo(peerId: some(daemonPeer.peer), + addrs: daemonPeer.addresses)) + + await sleepAsync(1.seconds) + await daemonNode.connect(nativePeer.peerId.get(), nativePeer.addrs) + + proc pubsubHandler(api: DaemonAPI, + ticket: PubsubTicket, + message: PubSubMessage): Future[bool] {.async.} = + result = true # don't cancel subscription + + asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler) + await nativeNode.subscribe(testTopic, nativeHandler) + await sleepAsync(1.seconds) + await daemonNode.pubsubPublish(testTopic, msgData) + + result = await handlerFuture + await nativeNode.stop() + await allFutures(awaiters) + await daemonNode.close() + +proc testPubSubNodePublish(gossip: bool = false): Future[bool] {.async.} = + var pubsubData = "TEST MESSAGE" + var testTopic = "test-topic" + var msgData = cast[seq[byte]](pubsubData) + + var flags = {PSFloodSub} + if gossip: + flags = {PSGossipSub} + + let daemonNode = await newDaemonApi(flags) + let daemonPeer = await daemonNode.identity() + let nativeNode = createNode(gossip = gossip) + let awaiters = nativeNode.start() + let nativePeer = nativeNode.peerInfo + + var handlerFuture = newFuture[bool]() + await nativeNode.subscribeToPeer(NativePeerInfo(peerId: some(daemonPeer.peer), + addrs: daemonPeer.addresses)) + + await sleepAsync(1.seconds) + await daemonNode.connect(nativePeer.peerId.get(), nativePeer.addrs) + + proc pubsubHandler(api: DaemonAPI, + ticket: PubsubTicket, + message: PubSubMessage): Future[bool] {.async.} = + let smsg = cast[string](message.data) + check smsg == pubsubData + handlerFuture.complete(true) + result = true # don't cancel subscription + + asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler) + await sleepAsync(1.seconds) + await nativeNode.publish(testTopic, msgData) + + result = await handlerFuture + await nativeNode.stop() + await allFutures(awaiters) + await daemonNode.close() + +suite "Interop": + test "native -> daemon multiple reads and writes": + proc runTests(): Future[bool] {.async.} = + var protos = @["/test-stream"] + + let nativeNode = createNode() + let awaiters = await nativeNode.start() + let daemonNode = await newDaemonApi() + let daemonPeer = await daemonNode.identity() + + var testFuture = newFuture[void]("test.future") + proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = + check cast[string](await stream.transp.readLp()) == "test 1" + asyncDiscard stream.transp.writeLp("test 2") + + await sleepAsync(10.millis) + check cast[string](await stream.transp.readLp()) == "test 3" + asyncDiscard stream.transp.writeLp("test 4") + testFuture.complete() + + await daemonNode.addHandler(protos, daemonHandler) + let conn = await nativeNode.dial(NativePeerInfo(peerId: some(daemonPeer.peer), + addrs: daemonPeer.addresses), + protos[0]) + await conn.writeLp("test 1") + check "test 2" == cast[string]((await conn.readLp())) + await sleepAsync(10.millis) + + await conn.writeLp("test 3") + check "test 4" == cast[string]((await conn.readLp())) + + await wait(testFuture, 10.secs) + await nativeNode.stop() + await allFutures(awaiters) + await daemonNode.close() + result = true + + check: + waitFor(runTests()) == true + + test "native -> daemon connection": + proc runTests(): Future[bool] {.async.} = + var protos = @["/test-stream"] + var test = "TEST STRING" + + let nativeNode = createNode() + let awaiters = await nativeNode.start() + + let daemonNode = await newDaemonApi() + let daemonPeer = await daemonNode.identity() + + var testFuture = newFuture[string]("test.future") + proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = + var line = await stream.transp.readLine() + check line == test + testFuture.complete(line) + + await daemonNode.addHandler(protos, daemonHandler) + let conn = await nativeNode.dial(NativePeerInfo(peerId: some(daemonPeer.peer), + addrs: daemonPeer.addresses), + protos[0]) + await conn.writeLp(test & "\r\n") + result = test == (await wait(testFuture, 10.secs)) + await nativeNode.stop() + await allFutures(awaiters) + await daemonNode.close() + + check: + waitFor(runTests()) == true + + test "daemon -> native connection": + proc runTests(): Future[bool] {.async.} = + var protos = @["/test-stream"] + var test = "TEST STRING" + + var testFuture = newFuture[string]("test.future") + proc nativeHandler(conn: Connection, proto: string) {.async.} = + var line = cast[string](await conn.readLp()) + check line == test + testFuture.complete(line) + await conn.close() + + # custom proto + var proto = new LPProtocol + proto.handler = nativeHandler + proto.codec = protos[0] # codec + + let nativeNode = createNode() + nativeNode.mount(proto) + + let awaiters = await nativeNode.start() + let nativePeer = nativeNode.peerInfo + + let daemonNode = await newDaemonApi() + await daemonNode.connect(nativePeer.peerId.get(), nativePeer.addrs) + var stream = await daemonNode.openStream(nativePeer.peerId.get(), protos) + discard await stream.transp.writeLp(test) + + result = test == (await wait(testFuture, 10.secs)) + await nativeNode.stop() + await allFutures(awaiters) + await daemonNode.close() + + check: + waitFor(runTests()) == true + + test "daemon -> multiple reads and writes": + proc runTests(): Future[bool] {.async.} = + var protos = @["/test-stream"] + + var testFuture = newFuture[void]("test.future") + proc nativeHandler(conn: Connection, proto: string) {.async.} = + check "test 1" == cast[string](await conn.readLp()) + await conn.writeLp(cast[seq[byte]]("test 2")) + + check "test 3" == cast[string](await conn.readLp()) + await conn.writeLp(cast[seq[byte]]("test 4")) + + testFuture.complete() + await conn.close() + + # custom proto + var proto = new LPProtocol + proto.handler = nativeHandler + proto.codec = protos[0] # codec + + let nativeNode = createNode() + nativeNode.mount(proto) + + let awaiters = await nativeNode.start() + let nativePeer = nativeNode.peerInfo + + let daemonNode = await newDaemonApi() + await daemonNode.connect(nativePeer.peerId.get(), nativePeer.addrs) + var stream = await daemonNode.openStream(nativePeer.peerId.get(), protos) + + asyncDiscard stream.transp.writeLp("test 1") + check "test 2" == cast[string](await stream.transp.readLp()) + + asyncDiscard stream.transp.writeLp("test 3") + check "test 4" == cast[string](await stream.transp.readLp()) + + await wait(testFuture, 10.secs) + + result = true + await nativeNode.stop() + await allFutures(awaiters) + await daemonNode.close() + + check: + waitFor(runTests()) == true + + test "read write multiple": + proc runTests(): Future[bool] {.async.} = + var protos = @["/test-stream"] + var test = "TEST STRING" + + var count = 0 + var testFuture = newFuture[int]("test.future") + proc nativeHandler(conn: Connection, proto: string) {.async.} = + while count < 10: + var line = cast[string](await conn.readLp()) + check line == test + await conn.writeLp(cast[seq[byte]](test)) + count.inc() + + testFuture.complete(count) + await conn.close() + + # custom proto + var proto = new LPProtocol + proto.handler = nativeHandler + proto.codec = protos[0] # codec + + let nativeNode = createNode() + nativeNode.mount(proto) + + let awaiters = await nativeNode.start() + let nativePeer = nativeNode.peerInfo + + let daemonNode = await newDaemonApi() + await daemonNode.connect(nativePeer.peerId.get(), nativePeer.addrs) + var stream = await daemonNode.openStream(nativePeer.peerId.get(), protos) + + while count < 10: + discard await stream.transp.writeLp(test) + let line = await stream.transp.readLp() + check test == cast[string](line) + + result = 10 == (await wait(testFuture, 10.secs)) + await nativeNode.stop() + await allFutures(awaiters) + await daemonNode.close() + + check: + waitFor(runTests()) == true + + test "floodsub: daemon publish": + check: + waitFor(testPubSubDaemonPublish()) == true + + test "gossipsub: daemon publish": + check: + waitFor(testPubSubDaemonPublish(true)) == true + + test "floodsub: node publish": + check: + waitFor(testPubSubNodePublish()) == true + + test "gossipsub: node publish": + check: + waitFor(testPubSubNodePublish(true)) == true diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 09975f89d..90a4a777a 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -72,8 +72,7 @@ suite "Mplex": test "decode header with channel id 0": proc testDecodeHeader(): Future[bool] {.async.} = - proc encHandler(msg: seq[byte]) {.async.} = discard - let stream = newBufferStream(encHandler) + let stream = newBufferStream() let conn = newConnection(stream) await stream.pushTo(fromHex("000873747265616d2031")) let msg = await conn.readMsg() @@ -88,8 +87,7 @@ suite "Mplex": test "decode header and body with channel id 0": proc testDecodeHeader(): Future[bool] {.async.} = - proc encHandler(msg: seq[byte]) {.async.} = discard - let stream = newBufferStream(encHandler) + let stream = newBufferStream() let conn = newConnection(stream) await stream.pushTo(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() @@ -105,8 +103,7 @@ suite "Mplex": test "decode header and body with channel id other than 0": proc testDecodeHeader(): Future[bool] {.async.} = - proc encHandler(msg: seq[byte]) {.async.} = discard - let stream = newBufferStream(encHandler) + let stream = newBufferStream() let conn = newConnection(stream) await stream.pushTo(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() diff --git a/tests/testnative.nim b/tests/testnative.nim index b5e3e598c..552f15520 100644 --- a/tests/testnative.nim +++ b/tests/testnative.nim @@ -1,4 +1,3 @@ -import unittest import testvarint, testbase32, testbase58, testbase64 import testrsa, testecnist, tested25519, testsecp256k1, testcrypto import testmultibase, testmultihash, testmultiaddress, testcid, testpeer @@ -8,5 +7,7 @@ import testtransport, testbufferstream, testidentify, testswitch, - testpubsub, + pubsub/testpubsub, + # TODO: placing this before pubsub tests, + # breaks some flood and gossip tests - no idea why testmplex diff --git a/tests/testpubsub.nim b/tests/testpubsub.nim deleted file mode 100644 index 3aeb279ec..000000000 --- a/tests/testpubsub.nim +++ /dev/null @@ -1,181 +0,0 @@ -## Nim-Libp2p -## Copyright (c) 2018 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. -import unittest, options, tables, sugar, sequtils -import chronos, chronicles -import ../libp2p/[switch, - multistream, - protocols/identify, - connection, - transports/transport, - transports/tcptransport, - multiaddress, - peerinfo, - crypto/crypto, - peer, - protocols/protocol, - muxers/muxer, - muxers/mplex/mplex, - muxers/mplex/types, - protocols/secure/secure, - protocols/secure/secio, - protocols/pubsub/pubsub, - protocols/pubsub/floodsub] - -when defined(nimHasUsed): {.used.} - -proc createMplex(conn: Connection): Muxer = - result = newMplex(conn) - -proc createNode(privKey: Option[PrivateKey] = none(PrivateKey), - address: string = "/ip4/127.0.0.1/tcp/0", - triggerSelf: bool = false): Switch = - var peerInfo: PeerInfo - var seckey = privKey - if privKey.isNone: - seckey = some(PrivateKey.random(RSA)) - - peerInfo.peerId = some(PeerID.init(seckey.get())) - peerInfo.addrs.add(Multiaddress.init(address)) - - let mplexProvider = newMuxerProvider(createMplex, MplexCodec) - let transports = @[Transport(newTransport(TcpTransport))] - let muxers = [(MplexCodec, mplexProvider)].toTable() - let identify = newIdentify(peerInfo) - let secureManagers = [(SecioCodec, Secure(newSecio(seckey.get())))].toTable() - let pubSub = some(PubSub(newPubSub(FloodSub, peerInfo, triggerSelf))) - result = newSwitch(peerInfo, - transports, - identify, - muxers, - secureManagers = secureManagers, - pubSub = pubSub) - -proc generateNodes*(num: Natural): seq[Switch] = - for i in 0.. B": - proc testBasicPubSub(): Future[bool] {.async.} = - var passed: bool - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - passed = true - - var nodes = generateNodes(2) - var wait = await nodes[1].start() - - await nodes[0].subscribeToPeer(nodes[1].peerInfo) - - await nodes[1].subscribe("foobar", handler) - await sleepAsync(100.millis) - - await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) - await sleepAsync(100.millis) - - await nodes[1].stop() - await allFutures(wait) - result = passed - - check: - waitFor(testBasicPubSub()) == true - - test "FloodSub basic publish/subscribe B -> A": - proc testBasicPubSub(): Future[bool] {.async.} = - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - - var nodes = generateNodes(2) - var wait = await nodes[1].start() - - await nodes[0].subscribeToPeer(nodes[1].peerInfo) - - await nodes[0].subscribe("foobar", handler) - await sleepAsync(10.millis) - - await nodes[1].publish("foobar", cast[seq[byte]]("Hello!")) - await sleepAsync(10.millis) - - await nodes[1].stop() - await allFutures(wait) - result = true - - check: - waitFor(testBasicPubSub()) == true - - test "FloodSub multiple peers, no self trigger": - proc testBasicFloodSub(): Future[bool] {.async.} = - var passed: int - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - passed.inc() - - var nodes: seq[Switch] = newSeq[Switch]() - for i in 0..<10: - nodes.add(createNode()) - - var awaitters: seq[Future[void]] - for node in nodes: - awaitters.add(await node.start()) - await node.subscribe("foobar", handler) - await sleepAsync(10.millis) - - await subscribeNodes(nodes) - await sleepAsync(50.millis) - - for node in nodes: - await node.publish("foobar", cast[seq[byte]]("Hello!")) - await sleepAsync(100.millis) - - await allFutures(nodes.mapIt(it.stop())) - await allFutures(awaitters) - - result = passed >= 0 # non deterministic, so at least 10 times - - check: - waitFor(testBasicFloodSub()) == true - - test "FloodSub multiple peers, with self trigger": - proc testBasicFloodSub(): Future[bool] {.async.} = - var passed: int - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - passed.inc() - - var nodes: seq[Switch] = newSeq[Switch]() - for i in 0..<10: - nodes.add(createNode(none(PrivateKey), "/ip4/127.0.0.1/tcp/0", true)) - - var awaitters: seq[Future[void]] - for node in nodes: - awaitters.add(await node.start()) - await node.subscribe("foobar", handler) - await sleepAsync(10.millis) - - await subscribeNodes(nodes) - await sleepAsync(50.millis) - - for node in nodes: - await node.publish("foobar", cast[seq[byte]]("Hello!")) - await sleepAsync(100.millis) - - await allFutures(nodes.mapIt(it.stop())) - await allFutures(awaitters) - - result = passed >= 0 # non deterministic, so at least 20 times - - check: - waitFor(testBasicFloodSub()) == true diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 5b71fe5fd..0f3d0b1f6 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -69,10 +69,10 @@ suite "Switch": testProto.init() testProto.codec = TestCodec switch1.mount(testProto) - var switch1Fut = await switch1.start() + asyncCheck switch1.start() (switch2, peerInfo2) = createSwitch(ma2) - var switch2Fut = await switch2.start() + asyncCheck switch2.start() let conn = await switch2.dial(switch1.peerInfo, TestCodec) await conn.writeLp("Hello!") let msg = cast[string](await conn.readLp())