demote errors/warnings to info
* dropped/killed connections are not errors. * handleIncomingRPC/publishMessage do not return errors.
This commit is contained in:
parent
c82e67dcd3
commit
098f1d5819
4
comm.go
4
comm.go
|
@ -33,7 +33,7 @@ func (p *PubSub) handleNewStream(s inet.Stream) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
s.Reset()
|
s.Reset()
|
||||||
log.Errorf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
|
log.Infof("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
|
||||||
} else {
|
} else {
|
||||||
// Just be nice. They probably won't read this
|
// Just be nice. They probably won't read this
|
||||||
// but it doesn't hurt to send it.
|
// but it doesn't hurt to send it.
|
||||||
|
@ -81,7 +81,7 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgo
|
||||||
err := writeMsg(&rpc.RPC)
|
err := writeMsg(&rpc.RPC)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.Reset()
|
s.Reset()
|
||||||
log.Warningf("writing message to %s: %s", s.Conn().RemotePeer(), err)
|
log.Infof("writing message to %s: %s", s.Conn().RemotePeer(), err)
|
||||||
select {
|
select {
|
||||||
case p.peerDead <- s.Conn().RemotePeer():
|
case p.peerDead <- s.Conn().RemotePeer():
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
19
floodsub.go
19
floodsub.go
|
@ -207,11 +207,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||||
}
|
}
|
||||||
preq.resp <- peers
|
preq.resp <- peers
|
||||||
case rpc := <-p.incoming:
|
case rpc := <-p.incoming:
|
||||||
err := p.handleIncomingRPC(rpc)
|
p.handleIncomingRPC(rpc)
|
||||||
if err != nil {
|
|
||||||
log.Error("handling RPC: ", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
case msg := <-p.publish:
|
case msg := <-p.publish:
|
||||||
vals := p.getValidators(msg)
|
vals := p.getValidators(msg)
|
||||||
p.pushMsg(vals, p.host.ID(), msg)
|
p.pushMsg(vals, p.host.ID(), msg)
|
||||||
|
@ -331,7 +327,7 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
|
func (p *PubSub) handleIncomingRPC(rpc *RPC) {
|
||||||
for _, subopt := range rpc.GetSubscriptions() {
|
for _, subopt := range rpc.GetSubscriptions() {
|
||||||
t := subopt.GetTopicid()
|
t := subopt.GetTopicid()
|
||||||
if subopt.GetSubscribe() {
|
if subopt.GetSubscribe() {
|
||||||
|
@ -361,8 +357,6 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
|
||||||
vals := p.getValidators(msg)
|
vals := p.getValidators(msg)
|
||||||
p.pushMsg(vals, rpc.from, msg)
|
p.pushMsg(vals, rpc.from, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// msgID returns a unique ID of the passed Message
|
// msgID returns a unique ID of the passed Message
|
||||||
|
@ -449,13 +443,10 @@ func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) {
|
||||||
|
|
||||||
p.notifySubs(pmsg)
|
p.notifySubs(pmsg)
|
||||||
|
|
||||||
err := p.publishMessage(from, pmsg)
|
p.publishMessage(from, pmsg)
|
||||||
if err != nil {
|
|
||||||
log.Error("publish message: ", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error {
|
func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) {
|
||||||
tosend := make(map[peer.ID]struct{})
|
tosend := make(map[peer.ID]struct{})
|
||||||
for _, topic := range msg.GetTopicIDs() {
|
for _, topic := range msg.GetTopicIDs() {
|
||||||
tmap, ok := p.topics[topic]
|
tmap, ok := p.topics[topic]
|
||||||
|
@ -486,8 +477,6 @@ func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error {
|
||||||
// Drop it. The peer is too slow.
|
// Drop it. The peer is too slow.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// getValidators returns all validators that apply to a given message
|
// getValidators returns all validators that apply to a given message
|
||||||
|
|
Loading…
Reference in New Issue