diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index c186408..32240bc 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -199,13 +199,12 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = proc dropFanoutPeers(g: GossipSub) = # drop peers that we haven't published to in # GossipSubFanoutTTL seconds - var dropping = newSeq[string]() let now = Moment.now() - - for topic, val in g.lastFanoutPubSub: + for topic in toSeq(g.lastFanoutPubSub.keys): + let val = g.lastFanoutPubSub[topic] if now > val: - dropping.add(topic) g.fanout.del(topic) + g.lastFanoutPubSub.del(topic) trace "dropping fanout topic", topic libp2p_gossipsub_peers_per_topic_fanout @@ -338,8 +337,7 @@ method subscribeTopic*(g: GossipSub, proc handleGraft(g: GossipSub, peer: PubSubPeer, - grafts: seq[ControlGraft], - respControl: var ControlMessage) = + grafts: seq[ControlGraft]): seq[ControlPrune] = let peerId = peer.id for graft in grafts: let topic = graft.topicID @@ -358,9 +356,9 @@ proc handleGraft(g: GossipSub, else: trace "Peer already in mesh", topic, peerId else: - respControl.prune.add(ControlPrune(topicID: topic)) + result.add(ControlPrune(topicID: topic)) else: - respControl.prune.add(ControlPrune(topicID: topic)) + result.add(ControlPrune(topicID: topic)) libp2p_gossipsub_peers_per_topic_mesh .set(g.mesh.peers(topic).int64, labelValues = [topic]) @@ -459,18 +457,17 @@ method rpcHandler*(g: GossipSub, var respControl: ControlMessage if m.control.isSome: - var control: ControlMessage = m.control.get() - let iWant: ControlIWant = g.handleIHave(peer, control.ihave) - if iWant.messageIDs.len > 0: - respControl.iwant.add(iWant) - let messages: seq[Message] = g.handleIWant(peer, control.iwant) - - g.handleGraft(peer, control.graft, respControl) + let control = m.control.get() g.handlePrune(peer, control.prune) + respControl.iwant.add(g.handleIHave(peer, control.ihave)) + respControl.prune.add(g.handleGraft(peer, control.graft)) + if respControl.graft.len > 0 or respControl.prune.len > 0 or respControl.ihave.len > 0 or respControl.iwant.len > 0: - await peer.send(@[RPCMsg(control: some(respControl), messages: messages)]) + await peer.send( + @[RPCMsg(control: some(respControl), + messages: g.handleIWant(peer, control.iwant))]) method subscribe*(g: GossipSub, topic: string, diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index ce42275..922546b 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -88,8 +88,9 @@ proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} = h.encodeIHave(ihave) # write messages to protobuf - ihave.finish() - pb.write(initProtoField(1, ihave)) + if ihave.buffer.len > 0: + ihave.finish() + pb.write(initProtoField(1, ihave)) if control.iwant.len > 0: var iwant = initProtoBuffer() @@ -97,8 +98,9 @@ proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} = w.encodeIWant(iwant) # write messages to protobuf - iwant.finish() - pb.write(initProtoField(2, iwant)) + if iwant.buffer.len > 0: + iwant.finish() + pb.write(initProtoField(2, iwant)) if control.graft.len > 0: var graft = initProtoBuffer() @@ -106,8 +108,9 @@ proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} = g.encodeGraft(graft) # write messages to protobuf - graft.finish() - pb.write(initProtoField(3, graft)) + if graft.buffer.len > 0: + graft.finish() + pb.write(initProtoField(3, graft)) if control.prune.len > 0: var prune = initProtoBuffer() @@ -115,8 +118,9 @@ proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} = p.encodePrune(prune) # write messages to protobuf - prune.finish() - pb.write(initProtoField(4, prune)) + if prune.buffer.len > 0: + prune.finish() + pb.write(initProtoField(4, prune)) proc decodeControl*(pb: var ProtoBuffer): Option[ControlMessage] {.gcsafe.} = trace "decoding control submessage" @@ -225,9 +229,11 @@ proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} = for s in msg.subscriptions: var subs = initProtoBuffer() encodeSubs(s, subs) + # write subscriptions to protobuf - subs.finish() - result.write(initProtoField(1, subs)) + if subs.buffer.len > 0: + subs.finish() + result.write(initProtoField(1, subs)) if msg.messages.len > 0: var messages = initProtoBuffer() @@ -235,16 +241,18 @@ proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} = encodeMessage(m, messages) # write messages to protobuf - messages.finish() - result.write(initProtoField(2, messages)) + if messages.buffer.len > 0: + messages.finish() + result.write(initProtoField(2, messages)) if msg.control.isSome: var control = initProtoBuffer() msg.control.get.encodeControl(control) # write messages to protobuf - control.finish() - result.write(initProtoField(3, control)) + if control.buffer.len > 0: + control.finish() + result.write(initProtoField(3, control)) if result.buffer.len > 0: result.finish()