diff --git a/comm.go b/comm.go index adda4d4..5d05e09 100644 --- a/comm.go +++ b/comm.go @@ -33,7 +33,7 @@ func (p *PubSub) handleNewStream(s inet.Stream) { if err != nil { if err != io.EOF { s.Reset() - log.Errorf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err) + log.Infof("error reading rpc from %s: %s", s.Conn().RemotePeer(), err) } else { // Just be nice. They probably won't read this // but it doesn't hurt to send it. @@ -81,7 +81,7 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgo err := writeMsg(&rpc.RPC) if err != nil { s.Reset() - log.Warningf("writing message to %s: %s", s.Conn().RemotePeer(), err) + log.Infof("writing message to %s: %s", s.Conn().RemotePeer(), err) select { case p.peerDead <- s.Conn().RemotePeer(): case <-ctx.Done(): diff --git a/floodsub.go b/floodsub.go index b1247d5..102e692 100644 --- a/floodsub.go +++ b/floodsub.go @@ -207,11 +207,7 @@ func (p *PubSub) processLoop(ctx context.Context) { } preq.resp <- peers case rpc := <-p.incoming: - err := p.handleIncomingRPC(rpc) - if err != nil { - log.Error("handling RPC: ", err) - continue - } + p.handleIncomingRPC(rpc) case msg := <-p.publish: vals := p.getValidators(msg) p.pushMsg(vals, p.host.ID(), msg) @@ -331,7 +327,7 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { return false } -func (p *PubSub) handleIncomingRPC(rpc *RPC) error { +func (p *PubSub) handleIncomingRPC(rpc *RPC) { for _, subopt := range rpc.GetSubscriptions() { t := subopt.GetTopicid() if subopt.GetSubscribe() { @@ -361,8 +357,6 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { vals := p.getValidators(msg) p.pushMsg(vals, rpc.from, msg) } - - return nil } // msgID returns a unique ID of the passed Message @@ -449,13 +443,10 @@ func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) { p.notifySubs(pmsg) - err := p.publishMessage(from, pmsg) - if err != nil { - log.Error("publish message: ", err) - } + p.publishMessage(from, pmsg) } -func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error { +func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) { tosend := make(map[peer.ID]struct{}) for _, topic := range msg.GetTopicIDs() { tmap, ok := p.topics[topic] @@ -486,8 +477,6 @@ func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error { // Drop it. The peer is too slow. } } - - return nil } // getValidators returns all validators that apply to a given message