From 21a4f8c0c82a0b980a8cb5fdf6460148cdcdb565 Mon Sep 17 00:00:00 2001 From: Yusef Napora Date: Wed, 6 May 2020 10:25:14 -0400 Subject: [PATCH] only call fragmentRPC if we're over the size limit --- gossipsub.go | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index 2e90b61..cf6537d 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -899,16 +899,31 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) { return } - // fragment into multiple RPCs if necessary - outRPCs, err := fragmentRPC(out, gs.p.maxMessageSize) - if err != nil { - log.Warnf("unable to fragment message to peer %s: %s", p, err) - gs.tracer.DropRPC(out, p) + dropRPC := func(rpc *RPC, template string, args ...interface{}) { + log.Warnf(template, args...) + gs.tracer.DropRPC(rpc, p) // push control messages that need to be retried - ctl := out.GetControl() + ctl := rpc.GetControl() if ctl != nil { gs.pushControl(p, ctl) } + } + + // If we're below the max message size, go ahead and send + if out.Size() < gs.p.maxMessageSize { + select { + case mch <- out: + gs.tracer.SendRPC(out, p) + default: + dropRPC(out, "dropping message to peer %s: queue full", p) + } + return + } + + // If we're too big, fragment into multiple RPCs and send each sequentially + outRPCs, err := fragmentRPC(out, gs.p.maxMessageSize) + if err != nil { + dropRPC(out, "unable to fragment message to peer %s: %s", p, err) return } @@ -917,13 +932,7 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) { case mch <- rpc: gs.tracer.SendRPC(rpc, p) default: - log.Infof("dropping message to peer %s: queue full", p) - gs.tracer.DropRPC(rpc, p) - // push control messages that need to be retried - ctl := rpc.GetControl() - if ctl != nil { - gs.pushControl(p, ctl) - } + dropRPC(rpc, "dropping message to peer %s: queue full", p) } } }