diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 2cc95d2cc..9e913587a 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -86,6 +86,139 @@ method init(g: GossipSub) = g.handler = handler g.codec = GossipSubCodec +proc replenishFanout(g: GossipSub, topic: string) {.async.} = + ## get fanout peers for a topic + trace "about to replenish fanout" + if topic notin g.fanout: + g.fanout[topic] = initHashSet[string]() + + if g.fanout[topic].len < GossipSubDLo: + trace "replenishing fanout", peers = g.fanout[topic].len + if topic in g.gossipsub: + for p in g.gossipsub[topic]: + if not g.fanout[topic].containsOrIncl(p): + if g.fanout[topic].len == GossipSubD: + break + + trace "fanout replenished with peers", peers = g.fanout[topic].len + +proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = + try: + trace "about to rebalance mesh" + # create a mesh topic that we're subscribing to + if topic notin g.mesh: + g.mesh[topic] = initHashSet[string]() + + if g.mesh[topic].len < GossipSubDlo: + trace "replenishing mesh" + # replenish the mesh if we're below GossipSubDlo + while g.mesh[topic].len < GossipSubD: + trace "gathering peers", peers = g.mesh[topic].len + var id: string + if topic in g.fanout and g.fanout[topic].len > 0: + id = sample(toSeq(g.fanout[topic])) + g.fanout[topic].excl(id) + trace "got fanout peer", peer = id + elif topic in g.gossipsub and g.gossipsub[topic].len > 0: + id = sample(toSeq(g.gossipsub[topic])) + g.gossipsub[topic].excl(id) + trace "got gossipsub peer", peer = id + else: + trace "no more peers" + break + + g.mesh[topic].incl(id) + if id in g.peers: + let p = g.peers[id] + # send a graft message to the peer + await p.sendGraft(@[topic]) + + # prune peers if we've gone over + if g.mesh[topic].len > GossipSubDhi: + trace "pruning mesh" + while g.mesh[topic].len > GossipSubD: + trace "pruning peers", peers = g.mesh[topic].len + let id = toSeq(g.mesh[topic])[rand(0.. val: + dropping.add(topic) + g.fanout.del(topic) + for topic in dropping: + g.lastFanoutPubSub.del(topic) + +proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = + ## gossip iHave messages to peers + let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) + for topic in topics: + let mesh: HashSet[string] = + if topic in g.mesh: + g.mesh[topic] + else: + initHashSet[string]() + + let fanout: HashSet[string] = + if topic in g.fanout: + g.fanout[topic] + else: + initHashSet[string]() + + let gossipPeers = mesh + fanout + let mids = g.mcache.window(topic) + if mids.len > 0: + let ihave = ControlIHave(topicID: topic, + messageIDs: toSeq(mids)) + + if topic notin g.gossipsub: + trace "topic not in gossip array, skipping", topicID = topic + continue + + while result.len < GossipSubD: + if not (g.gossipsub[topic].len > 0): + trace "no peers for topic, skipping", topicID = topic + break + + let id = toSeq(g.gossipsub[topic]).sample() + g.gossipsub[topic].excl(id) + if id notin gossipPeers: + if id notin result: + result[id] = ControlMessage() + result[id].ihave.add(ihave) + +proc heartbeat(g: GossipSub) {.async.} = + try: + await g.heartbeatLock.acquire() + trace "running heartbeat" + + await sleepAsync(GossipSubHeartbeatInitialDelay) + + for t in g.mesh.keys: + await g.rebalanceMesh(t) + + await g.dropFanoutPeers() + let peers = g.getGossipPeers() + for peer in peers.keys: + await g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))]) + + g.mcache.shift() # shift the cache + except CatchableError as exc: + trace "exception ocurred in gossipsub heartbeat", exc = exc.msg + finally: + g.heartbeatLock.release() + method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async.} = ## handle peer disconnects trace "peer disconnected", peer=peer.id @@ -148,19 +281,22 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = if prune.topicID in g.mesh: g.mesh[prune.topicID].excl(peer.id) -proc handleIHave(g: GossipSub, peer: PubSubPeer, ihaves: seq[ - ControlIHave]): ControlIWant = +proc handleIHave(g: GossipSub, + peer: PubSubPeer, + ihaves: seq[ControlIHave]): ControlIWant = for ihave in ihaves: trace "processing ihave message", peer = peer.id, - topicID = ihave.topicID + topicID = ihave.topicID, + msgs = ihave.messageIDs if ihave.topicID in g.mesh: for m in ihave.messageIDs: if m notin g.seen: result.messageIDs.add(m) -proc handleIWant(g: GossipSub, peer: PubSubPeer, iwants: seq[ - ControlIWant]): seq[Message] = +proc handleIWant(g: GossipSub, + peer: PubSubPeer, + iwants: seq[ControlIWant]): seq[Message] = for iwant in iwants: for mid in iwant.messageIDs: trace "processing iwant message", peer = peer.id, @@ -176,7 +312,7 @@ method rpcHandler(g: GossipSub, for m in rpcMsgs: # for all RPC messages if m.messages.len > 0: # if there are any messages - var toSendPeers: HashSet[string] = initHashSet[string]() + var toSendPeers: HashSet[string] for msg in m.messages: # for every message trace "processing message with id", msg = msg.msgId if msg.msgId in g.seen: @@ -199,7 +335,7 @@ method rpcHandler(g: GossipSub, continue for t in msg.topicIDs: # for every topic in the message - + await g.rebalanceMesh(t) # gather peers for each topic if t in g.floodsub: toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic @@ -218,20 +354,23 @@ method rpcHandler(g: GossipSub, for p in toSendPeers: if p in g.peers: let id = g.peers[p].peerInfo.peerId - trace "about to forward message to peer", peerId = id + trace "about to forward message to peer", peerId = id, msgs = m.messages - if id != peer.peerInfo.peerId: - let msgs = m.messages.filterIt( - # don't forward to message originator - id != it.fromPeerId() - ) + if id == peer.peerInfo.peerId: + trace "not forwarding message to originator", peerId = id + continue - var sent: seq[Future[void]] - if msgs.len > 0: - trace "forwarding message to", peerId = id - sent.add(g.peers[p].send(@[RPCMsg(messages: msgs)])) - sent = await allFinished(sent) - checkFutures(sent) + let msgs = m.messages.filterIt( + # don't forward to message originator + id != it.fromPeerId() + ) + + var sent: seq[Future[void]] + if msgs.len > 0: + trace "forwarding message to", peerId = id + sent.add(g.peers[p].send(@[RPCMsg(messages: msgs)])) + sent = await allFinished(sent) + checkFutures(sent) var respControl: ControlMessage if m.control.isSome: @@ -248,135 +387,11 @@ method rpcHandler(g: GossipSub, respControl.ihave.len > 0 or respControl.iwant.len > 0: await peer.send(@[RPCMsg(control: some(respControl), messages: messages)]) -proc replenishFanout(g: GossipSub, topic: string) {.async.} = - ## get fanout peers for a topic - trace "about to replenish fanout" - if topic notin g.fanout: - g.fanout[topic] = initHashSet[string]() - - if g.fanout[topic].len < GossipSubDLo: - trace "replenishing fanout", peers = g.fanout[topic].len - if topic in g.gossipsub: - for p in g.gossipsub[topic]: - if not g.fanout[topic].containsOrIncl(p): - if g.fanout[topic].len == GossipSubD: - break - - trace "fanout replenished with peers", peers = g.fanout[topic].len - -proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = - trace "about to rebalance mesh" - # create a mesh topic that we're subscribing to - if topic notin g.mesh: - g.mesh[topic] = initHashSet[string]() - - if g.mesh[topic].len < GossipSubDlo: - trace "replenishing mesh" - # replenish the mesh if we're below GossipSubDlo - while g.mesh[topic].len < GossipSubD: - trace "gathering peers", peers = g.mesh[topic].len - var id: string - if topic in g.fanout and g.fanout[topic].len > 0: - id = g.fanout[topic].pop() - trace "got fanout peer", peer = id - elif topic in g.gossipsub and g.gossipsub[topic].len > 0: - id = g.gossipsub[topic].pop() - trace "got gossipsub peer", peer = id - else: - trace "no more peers" - break - - g.mesh[topic].incl(id) - if id in g.peers: - let p = g.peers[id] - # send a graft message to the peer - await p.sendGraft(@[topic]) - - # prune peers if we've gone over - if g.mesh[topic].len > GossipSubDhi: - trace "pruning mesh" - while g.mesh[topic].len > GossipSubD: - trace "pruning peers", peers = g.mesh[topic].len - let id = toSeq(g.mesh[topic])[rand(0.. val: - dropping.add(topic) - g.fanout.del(topic) - for topic in dropping: - g.lastFanoutPubSub.del(topic) - -proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = - ## gossip iHave messages to peers - let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) - - for topic in topics: - let mesh: HashSet[string] = - if topic in g.mesh: - g.mesh[topic] - else: - initHashSet[string]() - - let fanout: HashSet[string] = - if topic in g.fanout: - g.fanout[topic] - else: - initHashSet[string]() - - let gossipPeers = mesh + fanout - let mids = g.mcache.window(topic) - let ihave = ControlIHave(topicID: topic, - messageIDs: toSeq(mids)) - - if topic notin g.gossipsub: - trace "topic not in gossip array, skipping", topicID = topic - continue - - while result.len < GossipSubD: - if not (g.gossipsub[topic].len > 0): - trace "no peers for topic, skipping", topicID = topic - break - - let id = toSeq(g.gossipsub[topic]).sample() - g.gossipsub[topic].excl(id) - if id notin gossipPeers: - if id notin result: - result[id] = ControlMessage() - result[id].ihave.add(ihave) - -proc heartbeat(g: GossipSub) {.async.} = - await g.heartbeatLock.acquire() - trace "running heartbeat" - - await sleepAsync(GossipSubHeartbeatInitialDelay) - - for t in g.mesh.keys: - await g.rebalanceMesh(t) - - await g.dropFanoutPeers() - let peers = g.getGossipPeers() - for peer in peers.keys: - await g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))]) - - g.mcache.shift() # shift the cache - g.heartbeatLock.release() - method subscribe*(g: GossipSub, topic: string, handler: TopicHandler) {.async.} = await procCall PubSub(g).subscribe(topic, handler) - asyncCheck g.rebalanceMesh(topic) + await g.rebalanceMesh(topic) method unsubscribe*(g: GossipSub, topics: seq[TopicPair]) {.async.} = @@ -447,6 +462,7 @@ method stop*(g: GossipSub) {.async.} = method initPubSub(g: GossipSub) = procCall FloodSub(g).initPubSub() + randomize() g.mcache = newMCache(GossipSubHistoryGossip, GossipSubHistoryLength) g.mesh = initTable[string, HashSet[string]]() # meshes - topic to peer g.fanout = initTable[string, HashSet[string]]() # fanout - topic to peer diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index a18581b53..bba17f40b 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -199,14 +199,13 @@ method subscribe*(p: PubSub, method publish*(p: PubSub, topic: string, data: seq[byte]) {.base, async.} = + # TODO: Should return bool indicating success/failure ## publish to a ``topic`` if p.triggerSelf and topic in p.topics: for h in p.topics[topic].handler: trace "triggering handler", topicID = topic try: await h(topic, data) - except LPStreamEOFError: - trace "Ignoring EOF while writing" except CancelledError as exc: raise exc except CatchableError as exc: diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 565d1e5ef..713bf0772 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -80,7 +80,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = try: for m in msgs.items: - trace "sending msgs to peer", toPeer = p.id + trace "sending msgs to peer", toPeer = p.id, msgs = msgs let encoded = encodeRpcMsg(m) # trigger hooks if not(isNil(p.observers)) and p.observers[].len > 0: @@ -98,25 +98,32 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = continue proc sendToRemote() {.async.} = - trace "sending encoded msgs to peer", peer = p.id, - encoded = encoded.buffer.shortLog - await p.sendConn.writeLp(encoded.buffer) - p.sentRpcCache.put(digest) + trace "about send message", peer = p.id, + encoded = digest + await p.onConnect.wait() + try: + trace "sending encoded msgs to peer", peer = p.id, + encoded = encoded.buffer.shortLog + await p.sendConn.writeLp(encoded.buffer) + p.sentRpcCache.put(digest) + except CatchableError as exc: + trace "unable to send to remote", exc = exc.msg + if not(isNil(p.sendConn)): + await p.sendConn.close() + p.sendConn = nil + p.onConnect.clear() # if no connection has been set, # queue messages untill a connection # becomes available - if p.isConnected: - await sendToRemote() - return - - p.onConnect.wait().addCallback do (udata: pointer): - asyncCheck sendToRemote() - trace "enqueued message to send at a later time", peer = p.id, - encoded = digest + asyncCheck sendToRemote() except CatchableError as exc: trace "Exception occurred in PubSubPeer.send", exc = exc.msg + if not(isNil(p.sendConn)): + await p.sendConn.close() + p.sendConn = nil + p.onConnect.clear() proc sendMsg*(p: PubSubPeer, peerId: PeerID, diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index 4e1e25c5d..17a68e0d4 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -49,19 +49,18 @@ proc decodeIHave*(pb: var ProtoBuffer): seq[ControlIHave] {.gcsafe.} = while true: var control: ControlIHave - if pb.enterSubMessage() > 0: - if pb.getString(1, control.topicID) < 0: - trace "topic field missing from ihave msg" + if pb.getString(1, control.topicID) < 0: + trace "topic field missing from ihave msg" + break + + trace "read topic field", topicID = control.topicID + + while true: + var mid: string + if pb.getString(2, mid) < 0: break - - trace "read topic field", topicID = control.topicID - - while true: - var mid: string - if pb.getString(2, mid) < 0: - break - trace "read messageID field", mid = mid - control.messageIDs.add(mid) + trace "read messageID field", mid = mid + control.messageIDs.add(mid) result.add(control) @@ -70,15 +69,16 @@ proc encodeIWant*(iwant: ControlIWant, pb: var ProtoBuffer) {.gcsafe.} = pb.write(initProtoField(1, mid)) proc decodeIWant*(pb: var ProtoBuffer): seq[ControlIWant] {.gcsafe.} = - trace "decoding ihave msg" + trace "decoding iwant msg" - while pb.enterSubMessage() > 0: + var control: ControlIWant + while true: var mid: string - var iWant: ControlIWant - while pb.getString(1, mid) > 0: - trace "read messageID field", mid = mid - iWant.messageIDs.add(mid) - result.add(iWant) + if pb.getString(1, mid) < 0: + break + control.messageIDs.add(mid) + trace "read messageID field", mid = mid + result.add(control) proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} = if control.ihave.len > 0: @@ -128,13 +128,13 @@ proc decodeControl*(pb: var ProtoBuffer): Option[ControlMessage] {.gcsafe.} = trace "no submessage found in Control msg" break of 1: - control.ihave = pb.decodeIHave() + control.ihave &= pb.decodeIHave() of 2: - control.iwant = pb.decodeIWant() + control.iwant &= pb.decodeIWant() of 3: - control.graft = pb.decodeGraft() + control.graft &= pb.decodeGraft() of 4: - control.prune = pb.decodePrune() + control.prune &= pb.decodePrune() else: raise newException(CatchableError, "message type not recognized") diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index a93d04a42..39f3e6910 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -53,9 +53,11 @@ method init*(s: Secure) {.gcsafe.} = method secure*(s: Secure, conn: Connection, initiator: bool): Future[Connection] {.async, base, gcsafe.} = try: result = await s.handleConn(conn, initiator) + except CancelledError as exc: + raise exc except CatchableError as exc: warn "securing connection failed", msg = exc.msg - await conn.close() + return nil method readExactly*(s: SecureConn, pbytes: pointer, diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 48a337d2d..9d7cc9406 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -207,10 +207,10 @@ suite "FloodSub": test "FloodSub multiple peers, no self trigger": proc runTests(): Future[bool] {.async.} = - var passed = 0 + var runs = 10 - var futs = newSeq[(Future[void], TopicHandler, ref int)](10) - for i in 0..<10: + var futs = newSeq[(Future[void], TopicHandler, ref int)](runs) + for i in 0..= runs: seenFut.complete() - subs.add(allFutures(dialer.subscribe("foobar", handler), waitSub(nodes[0], dialer, "foobar"))) + subs.add(allFutures(dialer.subscribe("foobar", handler), + waitSub(nodes[0], dialer, "foobar"))) await allFuturesThrowing(subs) @@ -350,7 +352,7 @@ suite "GossipSub": 1.minutes) await wait(seenFut, 2.minutes) - check: seen.len >= 10 + check: seen.len >= runs for k, v in seen.pairs: check: v == 1 diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index f46216809..4b2e3be2b 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -1,7 +1,10 @@ +import random import chronos import ../../libp2p/standard_setup export standard_setup +randomize() + proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] = for i in 0..