Added PubSub.Join(topic) that returns a Topic object. This object can be Subscribed and Published to as well as to get a TopicEventHandler for topic events. This means that the Subscription object will no longer handle PeerEvents and that PubSub's Publish and Subscribe functions are deprecated.
175 lines
3.4 KiB
Go
175 lines
3.4 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"io"
|
|
|
|
"github.com/libp2p/go-libp2p-core/helpers"
|
|
"github.com/libp2p/go-libp2p-core/network"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
|
|
ggio "github.com/gogo/protobuf/io"
|
|
proto "github.com/gogo/protobuf/proto"
|
|
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
|
|
|
ms "github.com/multiformats/go-multistream"
|
|
)
|
|
|
|
// get the initial RPC containing all of our subscriptions to send to new peers
|
|
func (p *PubSub) getHelloPacket() *RPC {
|
|
var rpc RPC
|
|
for t := range p.mySubs {
|
|
as := &pb.RPC_SubOpts{
|
|
Topicid: proto.String(t),
|
|
Subscribe: proto.Bool(true),
|
|
}
|
|
rpc.Subscriptions = append(rpc.Subscriptions, as)
|
|
}
|
|
return &rpc
|
|
}
|
|
|
|
func (p *PubSub) handleNewStream(s network.Stream) {
|
|
r := ggio.NewDelimitedReader(s, 1<<20)
|
|
for {
|
|
rpc := new(RPC)
|
|
err := r.ReadMsg(&rpc.RPC)
|
|
if err != nil {
|
|
if err != io.EOF {
|
|
s.Reset()
|
|
log.Infof("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
|
|
} else {
|
|
// Just be nice. They probably won't read this
|
|
// but it doesn't hurt to send it.
|
|
s.Close()
|
|
}
|
|
return
|
|
}
|
|
|
|
rpc.from = s.Conn().RemotePeer()
|
|
select {
|
|
case p.incoming <- rpc:
|
|
case <-p.ctx.Done():
|
|
// Close is useless because the other side isn't reading.
|
|
s.Reset()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) {
|
|
s, err := p.host.NewStream(p.ctx, pid, p.rt.Protocols()...)
|
|
if err != nil {
|
|
log.Warning("opening new stream to peer: ", err, pid)
|
|
|
|
var ch chan peer.ID
|
|
if err == ms.ErrNotSupported {
|
|
ch = p.newPeerError
|
|
} else {
|
|
ch = p.peerDead
|
|
}
|
|
|
|
select {
|
|
case ch <- pid:
|
|
case <-ctx.Done():
|
|
}
|
|
return
|
|
}
|
|
|
|
go p.handleSendingMessages(ctx, s, outgoing)
|
|
go p.handlePeerEOF(ctx, s)
|
|
select {
|
|
case p.newPeerStream <- s:
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
|
|
func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
|
|
r := ggio.NewDelimitedReader(s, 1<<20)
|
|
rpc := new(RPC)
|
|
for {
|
|
err := r.ReadMsg(&rpc.RPC)
|
|
if err != nil {
|
|
select {
|
|
case p.peerDead <- s.Conn().RemotePeer():
|
|
case <-ctx.Done():
|
|
}
|
|
return
|
|
}
|
|
log.Warning("unexpected message from ", s.Conn().RemotePeer())
|
|
}
|
|
}
|
|
|
|
func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) {
|
|
bufw := bufio.NewWriter(s)
|
|
wc := ggio.NewDelimitedWriter(bufw)
|
|
|
|
writeMsg := func(msg proto.Message) error {
|
|
err := wc.WriteMsg(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return bufw.Flush()
|
|
}
|
|
|
|
defer helpers.FullClose(s)
|
|
for {
|
|
select {
|
|
case rpc, ok := <-outgoing:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
err := writeMsg(&rpc.RPC)
|
|
if err != nil {
|
|
s.Reset()
|
|
log.Infof("writing message to %s: %s", s.Conn().RemotePeer(), err)
|
|
return
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC {
|
|
return &RPC{
|
|
RPC: pb.RPC{
|
|
Subscriptions: subs,
|
|
},
|
|
}
|
|
}
|
|
|
|
func rpcWithMessages(msgs ...*pb.Message) *RPC {
|
|
return &RPC{RPC: pb.RPC{Publish: msgs}}
|
|
}
|
|
|
|
func rpcWithControl(msgs []*pb.Message,
|
|
ihave []*pb.ControlIHave,
|
|
iwant []*pb.ControlIWant,
|
|
graft []*pb.ControlGraft,
|
|
prune []*pb.ControlPrune) *RPC {
|
|
return &RPC{
|
|
RPC: pb.RPC{
|
|
Publish: msgs,
|
|
Control: &pb.ControlMessage{
|
|
Ihave: ihave,
|
|
Iwant: iwant,
|
|
Graft: graft,
|
|
Prune: prune,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func copyRPC(rpc *RPC) *RPC {
|
|
res := new(RPC)
|
|
*res = *rpc
|
|
if rpc.Control != nil {
|
|
res.Control = new(pb.ControlMessage)
|
|
*res.Control = *rpc.Control
|
|
}
|
|
return res
|
|
}
|