1. Don't hang marking a peer as dead if we're shutting down. 2. No need to "drain" the outgoing channel anymore. This may have been necessary to prevent a deadlock where the main loop blocked on sending on sending a message while we waited to tell the main loop that the peer was dead. However, this is no longer an issue (we never block on sending).
107 lines
2.0 KiB
Go
107 lines
2.0 KiB
Go
package floodsub
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"io"
|
|
|
|
pb "github.com/libp2p/go-floodsub/pb"
|
|
|
|
ggio "github.com/gogo/protobuf/io"
|
|
proto "github.com/gogo/protobuf/proto"
|
|
inet "github.com/libp2p/go-libp2p-net"
|
|
)
|
|
|
|
// get the initial RPC containing all of our subscriptions to send to new peers
|
|
func (p *PubSub) getHelloPacket() *RPC {
|
|
var rpc RPC
|
|
for t := range p.myTopics {
|
|
as := &pb.RPC_SubOpts{
|
|
Topicid: proto.String(t),
|
|
Subscribe: proto.Bool(true),
|
|
}
|
|
rpc.Subscriptions = append(rpc.Subscriptions, as)
|
|
}
|
|
return &rpc
|
|
}
|
|
|
|
func (p *PubSub) handleNewStream(s inet.Stream) {
|
|
r := ggio.NewDelimitedReader(s, 1<<20)
|
|
for {
|
|
rpc := new(RPC)
|
|
err := r.ReadMsg(&rpc.RPC)
|
|
if err != nil {
|
|
if err != io.EOF {
|
|
s.Reset()
|
|
log.Errorf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
|
|
} else {
|
|
// Just be nice. They probably won't read this
|
|
// but it doesn't hurt to send it.
|
|
s.Close()
|
|
}
|
|
select {
|
|
case p.peerDead <- s.Conn().RemotePeer():
|
|
case <-p.ctx.Done():
|
|
}
|
|
return
|
|
}
|
|
|
|
rpc.from = s.Conn().RemotePeer()
|
|
select {
|
|
case p.incoming <- rpc:
|
|
case <-p.ctx.Done():
|
|
// Close is useless because the other side isn't reading.
|
|
s.Reset()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgoing <-chan *RPC) {
|
|
bufw := bufio.NewWriter(s)
|
|
wc := ggio.NewDelimitedWriter(bufw)
|
|
|
|
writeMsg := func(msg proto.Message) error {
|
|
err := wc.WriteMsg(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return bufw.Flush()
|
|
}
|
|
|
|
defer s.Close()
|
|
for {
|
|
select {
|
|
case rpc, ok := <-outgoing:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
err := writeMsg(&rpc.RPC)
|
|
if err != nil {
|
|
s.Reset()
|
|
log.Warningf("writing message to %s: %s", s.Conn().RemotePeer(), err)
|
|
select {
|
|
case p.peerDead <- s.Conn().RemotePeer():
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC {
|
|
return &RPC{
|
|
RPC: pb.RPC{
|
|
Subscriptions: subs,
|
|
},
|
|
}
|
|
}
|
|
|
|
func rpcWithMessages(msgs ...*pb.Message) *RPC {
|
|
return &RPC{RPC: pb.RPC{Publish: msgs}}
|
|
}
|