demote erros per #62
This commit is contained in:
parent
e7b1fe6e75
commit
327e72d4fd
|
@ -36,11 +36,9 @@ func (fs *FloodSubRouter) AddPeer(peer.ID, protocol.ID) {}
|
|||
|
||||
func (fs *FloodSubRouter) RemovePeer(peer.ID) {}
|
||||
|
||||
func (fs *FloodSubRouter) HandleRPC(rpc *RPC) error {
|
||||
return nil
|
||||
}
|
||||
func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {}
|
||||
|
||||
func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) error {
|
||||
func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) {
|
||||
tosend := make(map[peer.ID]struct{})
|
||||
for _, topic := range msg.GetTopicIDs() {
|
||||
tmap, ok := fs.p.topics[topic]
|
||||
|
|
22
pubsub.go
22
pubsub.go
|
@ -87,8 +87,8 @@ type PubSubRouter interface {
|
|||
Attach(*PubSub)
|
||||
AddPeer(peer.ID, protocol.ID)
|
||||
RemovePeer(peer.ID)
|
||||
HandleRPC(*RPC) error
|
||||
Publish(peer.ID, *pb.Message) error
|
||||
HandleRPC(*RPC)
|
||||
Publish(peer.ID, *pb.Message)
|
||||
}
|
||||
|
||||
type Message struct {
|
||||
|
@ -228,11 +228,8 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
|||
}
|
||||
preq.resp <- peers
|
||||
case rpc := <-p.incoming:
|
||||
err := p.handleIncomingRPC(rpc)
|
||||
if err != nil {
|
||||
log.Error("handling RPC: ", err)
|
||||
continue
|
||||
}
|
||||
p.handleIncomingRPC(rpc)
|
||||
|
||||
case msg := <-p.publish:
|
||||
vals := p.getValidators(msg)
|
||||
p.pushMsg(vals, p.host.ID(), msg)
|
||||
|
@ -352,7 +349,7 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
|
||||
func (p *PubSub) handleIncomingRPC(rpc *RPC) {
|
||||
for _, subopt := range rpc.GetSubscriptions() {
|
||||
t := subopt.GetTopicid()
|
||||
if subopt.GetSubscribe() {
|
||||
|
@ -383,7 +380,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
|
|||
p.pushMsg(vals, rpc.from, msg)
|
||||
}
|
||||
|
||||
return p.rt.HandleRPC(rpc)
|
||||
p.rt.HandleRPC(rpc)
|
||||
}
|
||||
|
||||
// msgID returns a unique ID of the passed Message
|
||||
|
@ -467,13 +464,8 @@ func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) {
|
|||
}
|
||||
|
||||
p.markSeen(id)
|
||||
|
||||
p.notifySubs(pmsg)
|
||||
|
||||
err := p.rt.Publish(from, pmsg)
|
||||
if err != nil {
|
||||
log.Error("publish message: ", err)
|
||||
}
|
||||
p.rt.Publish(from, pmsg)
|
||||
}
|
||||
|
||||
// getValidators returns all validators that apply to a given message
|
||||
|
|
Loading…
Reference in New Issue