go-libp2p-pubsub/floodsub.go

368 lines
7.2 KiB
Go
Raw Normal View History

2016-09-10 03:13:50 +00:00
package floodsub
import (
2016-09-10 23:03:53 +00:00
"context"
2016-09-11 03:47:12 +00:00
"encoding/binary"
"fmt"
"time"
2016-09-13 02:59:24 +00:00
pb "github.com/libp2p/go-floodsub/pb"
2016-09-10 15:14:17 +00:00
proto "github.com/gogo/protobuf/proto"
2016-09-10 15:28:29 +00:00
logging "github.com/ipfs/go-log"
2016-10-05 19:47:20 +00:00
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
protocol "github.com/libp2p/go-libp2p-protocol"
2016-09-11 03:47:12 +00:00
timecache "github.com/whyrusleeping/timecache"
)
2016-09-10 03:13:50 +00:00
const ID = protocol.ID("/floodsub/1.0.0")
2016-09-10 03:13:50 +00:00
var log = logging.Logger("floodsub")
type PubSub struct {
host host.Host
2016-09-11 20:56:07 +00:00
// incoming messages from other peers
incoming chan *RPC
2016-09-11 20:56:07 +00:00
// messages we are publishing out to our peers
publish chan *Message
// addSub is a control channel for us to add and remove subscriptions
addSub chan *addSub
2016-09-14 22:11:41 +00:00
//
getTopics chan *topicReq
//
getPeers chan chan []peer.ID
2016-09-11 20:56:07 +00:00
// a notification channel for incoming streams from other peers
newPeers chan inet.Stream
2016-09-11 20:56:07 +00:00
// a notification channel for when our peers die
peerDead chan peer.ID
2016-09-11 20:56:07 +00:00
// The set of topics we are subscribed to
myTopics map[string]chan *Message
2016-09-11 20:56:07 +00:00
// topics tracks which topics each of our peers are subscribed to
topics map[string]map[peer.ID]struct{}
2016-09-11 03:47:12 +00:00
peers map[peer.ID]chan *RPC
seenMessages *timecache.TimeCache
2016-09-10 23:03:53 +00:00
ctx context.Context
}
type Message struct {
2016-09-10 15:14:17 +00:00
*pb.Message
}
2016-09-10 15:14:17 +00:00
func (m *Message) GetFrom() peer.ID {
return peer.ID(m.Message.GetFrom())
}
type RPC struct {
2016-09-10 15:14:17 +00:00
pb.RPC
// unexported on purpose, not sending this over the wire
from peer.ID
}
2016-09-10 23:03:53 +00:00
func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
ps := &PubSub{
2016-09-11 03:47:12 +00:00
host: h,
ctx: ctx,
incoming: make(chan *RPC, 32),
publish: make(chan *Message),
newPeers: make(chan inet.Stream),
peerDead: make(chan peer.ID),
getPeers: make(chan chan []peer.ID),
2016-09-11 03:47:12 +00:00
addSub: make(chan *addSub),
2016-09-14 22:11:41 +00:00
getTopics: make(chan *topicReq),
2016-09-11 03:47:12 +00:00
myTopics: make(map[string]chan *Message),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
seenMessages: timecache.NewTimeCache(time.Second * 30),
}
h.SetStreamHandler(ID, ps.handleNewStream)
h.Network().Notify((*PubSubNotif)(ps))
2016-09-10 23:03:53 +00:00
go ps.processLoop(ctx)
return ps
}
2016-09-10 23:03:53 +00:00
func (p *PubSub) processLoop(ctx context.Context) {
for {
select {
case s := <-p.newPeers:
pid := s.Conn().RemotePeer()
ch, ok := p.peers[pid]
if ok {
log.Error("already have connection to peer: ", pid)
close(ch)
}
messages := make(chan *RPC, 32)
2016-09-10 23:03:53 +00:00
go p.handleSendingMessages(ctx, s, messages)
messages <- p.getHelloPacket()
p.peers[pid] = messages
case pid := <-p.peerDead:
ch, ok := p.peers[pid]
if ok {
close(ch)
}
delete(p.peers, pid)
for _, t := range p.topics {
delete(t, pid)
}
2016-09-14 22:11:41 +00:00
case treq := <-p.getTopics:
var out []string
for t := range p.myTopics {
out = append(out, t)
}
treq.resp <- out
case sub := <-p.addSub:
2016-09-10 03:13:50 +00:00
p.handleSubscriptionChange(sub)
case pch := <-p.getPeers:
var peers []peer.ID
for p := range p.peers {
peers = append(peers, p)
}
pch <- peers
case rpc := <-p.incoming:
err := p.handleIncomingRPC(rpc)
if err != nil {
log.Error("handling RPC: ", err)
2016-09-11 20:56:07 +00:00
continue
}
2016-09-11 03:47:12 +00:00
case msg := <-p.publish:
p.maybePublishMessage(p.host.ID(), msg.Message)
2016-09-10 23:03:53 +00:00
case <-ctx.Done():
log.Info("pubsub processloop shutting down")
return
}
}
}
2016-09-10 03:13:50 +00:00
2016-09-11 20:56:07 +00:00
func (p *PubSub) handleSubscriptionChange(sub *addSub) {
subopt := &pb.RPC_SubOpts{
2016-09-11 03:47:12 +00:00
Topicid: &sub.topic,
Subscribe: &sub.sub,
}
2016-09-10 03:13:50 +00:00
2016-09-11 03:47:12 +00:00
ch, ok := p.myTopics[sub.topic]
if sub.sub {
2016-09-10 03:13:50 +00:00
if ok {
// we don't allow multiple subs per topic at this point
sub.resp <- nil
return
}
resp := make(chan *Message, 16)
p.myTopics[sub.topic] = resp
sub.resp <- resp
2016-09-11 03:47:12 +00:00
} else {
if !ok {
return
}
close(ch)
delete(p.myTopics, sub.topic)
}
2016-09-11 20:56:07 +00:00
out := rpcWithSubs(subopt)
2016-09-11 03:47:12 +00:00
for _, peer := range p.peers {
peer <- out
}
2016-09-10 03:13:50 +00:00
}
2016-09-11 20:56:07 +00:00
func (p *PubSub) notifySubs(msg *pb.Message) {
for _, topic := range msg.GetTopicIDs() {
subch, ok := p.myTopics[topic]
if ok {
subch <- &Message{msg}
}
}
}
2016-09-11 03:47:12 +00:00
func (p *PubSub) seenMessage(id string) bool {
return p.seenMessages.Has(id)
}
func (p *PubSub) markSeen(id string) {
p.seenMessages.Add(id)
}
2016-09-11 20:56:07 +00:00
func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
for _, t := range msg.GetTopicIDs() {
if _, ok := p.myTopics[t]; ok {
return true
}
}
return false
}
func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
2016-09-11 03:47:12 +00:00
for _, subopt := range rpc.GetSubscriptions() {
t := subopt.GetTopicid()
if subopt.GetSubscribe() {
tmap, ok := p.topics[t]
if !ok {
tmap = make(map[peer.ID]struct{})
p.topics[t] = tmap
}
tmap[rpc.from] = struct{}{}
2016-09-11 03:47:12 +00:00
} else {
tmap, ok := p.topics[t]
if !ok {
2016-09-11 03:47:12 +00:00
continue
}
delete(tmap, rpc.from)
}
2016-09-11 03:47:12 +00:00
}
2016-09-10 03:13:50 +00:00
2016-09-11 03:47:12 +00:00
for _, pmsg := range rpc.GetPublish() {
2016-09-11 20:56:07 +00:00
if !p.subscribedToMsg(pmsg) {
log.Warning("received message we didn't subscribe to. Dropping.")
2016-09-11 03:47:12 +00:00
continue
}
2016-09-11 20:56:07 +00:00
p.maybePublishMessage(rpc.from, pmsg)
}
return nil
}
func msgID(pmsg *pb.Message) string {
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
}
2016-09-11 20:56:07 +00:00
func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) {
id := msgID(pmsg)
2016-09-11 20:56:07 +00:00
if p.seenMessage(id) {
return
}
p.markSeen(id)
p.notifySubs(pmsg)
err := p.publishMessage(from, pmsg)
if err != nil {
log.Error("publish message: ", err)
}
}
2016-09-11 03:47:12 +00:00
func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error {
tosend := make(map[peer.ID]struct{})
for _, topic := range msg.GetTopicIDs() {
tmap, ok := p.topics[topic]
if !ok {
continue
}
2016-09-11 03:47:12 +00:00
for p, _ := range tmap {
tosend[p] = struct{}{}
}
}
out := rpcWithMessages(msg)
for pid := range tosend {
2016-09-11 03:47:12 +00:00
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
}
mch, ok := p.peers[pid]
if !ok {
continue
}
2016-09-11 03:47:12 +00:00
go func() { mch <- out }()
}
return nil
}
type addSub struct {
2016-09-11 03:47:12 +00:00
topic string
sub bool
resp chan chan *Message
}
2016-09-14 22:11:41 +00:00
func (p *PubSub) Subscribe(ctx context.Context, topic string) (<-chan *Message, error) {
2016-09-11 03:47:12 +00:00
return p.SubscribeComplicated(&pb.TopicDescriptor{
Name: proto.String(topic),
})
}
2016-09-14 22:11:41 +00:00
type topicReq struct {
resp chan []string
}
func (p *PubSub) GetTopics() []string {
out := make(chan []string, 1)
p.getTopics <- &topicReq{resp: out}
return <-out
}
2016-09-11 03:47:12 +00:00
func (p *PubSub) SubscribeComplicated(td *pb.TopicDescriptor) (<-chan *Message, error) {
if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE {
return nil, fmt.Errorf("Auth method not yet supported")
}
if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE {
return nil, fmt.Errorf("Encryption method not yet supported")
}
resp := make(chan chan *Message)
p.addSub <- &addSub{
2016-09-11 03:47:12 +00:00
topic: td.GetName(),
resp: resp,
2016-09-11 03:47:12 +00:00
sub: true,
}
outch := <-resp
if outch == nil {
return nil, fmt.Errorf("error, duplicate subscription")
}
return outch, nil
}
func (p *PubSub) Unsub(topic string) {
2016-09-10 03:13:50 +00:00
p.addSub <- &addSub{
2016-09-11 03:47:12 +00:00
topic: topic,
sub: false,
2016-09-10 03:13:50 +00:00
}
}
func (p *PubSub) Publish(topic string, data []byte) error {
2016-09-11 03:47:12 +00:00
seqno := make([]byte, 8)
binary.BigEndian.PutUint64(seqno, uint64(time.Now().UnixNano()))
p.publish <- &Message{
&pb.Message{
Data: data,
TopicIDs: []string{topic},
From: proto.String(string(p.host.ID())),
Seqno: seqno,
},
}
return nil
}
func (p *PubSub) ListPeers() []peer.ID {
out := make(chan []peer.ID)
p.getPeers <- out
return <-out
}