From 94015cee77772eac2473f9b41b993c8464d00d1b Mon Sep 17 00:00:00 2001 From: Yusef Napora Date: Wed, 6 May 2020 11:28:37 -0400 Subject: [PATCH] factor out doSendRPC and doDropRPC methods --- gossipsub.go | 45 ++++++++++++++++++++++----------------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index cf6537d..a519712 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -899,41 +899,40 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) { return } - dropRPC := func(rpc *RPC, template string, args ...interface{}) { - log.Warnf(template, args...) - gs.tracer.DropRPC(rpc, p) - // push control messages that need to be retried - ctl := rpc.GetControl() - if ctl != nil { - gs.pushControl(p, ctl) - } - } - // If we're below the max message size, go ahead and send if out.Size() < gs.p.maxMessageSize { - select { - case mch <- out: - gs.tracer.SendRPC(out, p) - default: - dropRPC(out, "dropping message to peer %s: queue full", p) - } + gs.doSendRPC(out, p, mch) return } // If we're too big, fragment into multiple RPCs and send each sequentially outRPCs, err := fragmentRPC(out, gs.p.maxMessageSize) if err != nil { - dropRPC(out, "unable to fragment message to peer %s: %s", p, err) + gs.doDropRPC(out, p, fmt.Sprintf("unable to fragment RPC: %s", err)) return } for _, rpc := range outRPCs { - select { - case mch <- rpc: - gs.tracer.SendRPC(rpc, p) - default: - dropRPC(rpc, "dropping message to peer %s: queue full", p) - } + gs.doSendRPC(rpc, p, mch) + } +} + +func (gs *GossipSubRouter) doDropRPC(rpc *RPC, p peer.ID, reason string) { + log.Warnf("dropping message to peer %s: %s", p.Pretty(), reason) + gs.tracer.DropRPC(rpc, p) + // push control messages that need to be retried + ctl := rpc.GetControl() + if ctl != nil { + gs.pushControl(p, ctl) + } +} + +func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, mch chan *RPC) { + select { + case mch <- rpc: + gs.tracer.SendRPC(rpc, p) + default: + gs.doDropRPC(rpc, p, "queue full") } }