2016-09-10 03:13:50 +00:00
|
|
|
package floodsub
|
2016-09-10 00:15:39 +00:00
|
|
|
|
|
|
|
import (
|
2016-09-10 15:28:29 +00:00
|
|
|
"bufio"
|
2016-09-10 23:03:53 +00:00
|
|
|
"context"
|
2016-09-11 03:47:12 +00:00
|
|
|
"encoding/binary"
|
2016-09-10 00:15:39 +00:00
|
|
|
"fmt"
|
2016-09-10 23:03:53 +00:00
|
|
|
"io"
|
2016-09-10 00:15:39 +00:00
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
2016-09-10 15:14:17 +00:00
|
|
|
pb "github.com/whyrusleeping/go-floodsub/pb"
|
|
|
|
|
|
|
|
ggio "github.com/gogo/protobuf/io"
|
|
|
|
proto "github.com/gogo/protobuf/proto"
|
2016-09-10 15:28:29 +00:00
|
|
|
peer "github.com/ipfs/go-libp2p-peer"
|
|
|
|
logging "github.com/ipfs/go-log"
|
|
|
|
host "github.com/libp2p/go-libp2p/p2p/host"
|
|
|
|
inet "github.com/libp2p/go-libp2p/p2p/net"
|
|
|
|
protocol "github.com/libp2p/go-libp2p/p2p/protocol"
|
2016-09-11 03:47:12 +00:00
|
|
|
timecache "github.com/whyrusleeping/timecache"
|
2016-09-10 00:15:39 +00:00
|
|
|
)
|
|
|
|
|
2016-09-10 03:13:50 +00:00
|
|
|
const ID = protocol.ID("/floodsub/1.0.0")
|
2016-09-10 00:15:39 +00:00
|
|
|
|
2016-09-10 15:14:17 +00:00
|
|
|
var (
|
2016-09-10 03:13:50 +00:00
|
|
|
AddSubMessageType = "sub"
|
|
|
|
UnsubMessageType = "unsub"
|
|
|
|
PubMessageType = "pub"
|
|
|
|
)
|
|
|
|
|
|
|
|
var log = logging.Logger("floodsub")
|
2016-09-10 00:15:39 +00:00
|
|
|
|
|
|
|
type PubSub struct {
|
|
|
|
host host.Host
|
|
|
|
|
|
|
|
incoming chan *RPC
|
2016-09-11 03:47:12 +00:00
|
|
|
publish chan *Message
|
2016-09-10 00:15:39 +00:00
|
|
|
newPeers chan inet.Stream
|
|
|
|
peerDead chan peer.ID
|
|
|
|
|
|
|
|
myTopics map[string]chan *Message
|
|
|
|
pubsubLk sync.Mutex
|
|
|
|
|
2016-09-11 03:47:12 +00:00
|
|
|
topics map[string]map[peer.ID]struct{}
|
|
|
|
peers map[peer.ID]chan *RPC
|
|
|
|
seenMessages *timecache.TimeCache
|
2016-09-10 00:15:39 +00:00
|
|
|
|
|
|
|
addSub chan *addSub
|
2016-09-10 23:03:53 +00:00
|
|
|
|
|
|
|
ctx context.Context
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type Message struct {
|
2016-09-10 15:14:17 +00:00
|
|
|
*pb.Message
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
|
|
|
|
2016-09-10 15:14:17 +00:00
|
|
|
func (m *Message) GetFrom() peer.ID {
|
|
|
|
return peer.ID(m.Message.GetFrom())
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type RPC struct {
|
2016-09-10 15:14:17 +00:00
|
|
|
pb.RPC
|
2016-09-10 00:15:39 +00:00
|
|
|
|
|
|
|
// unexported on purpose, not sending this over the wire
|
|
|
|
from peer.ID
|
|
|
|
}
|
|
|
|
|
2016-09-10 23:03:53 +00:00
|
|
|
func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
|
2016-09-10 00:15:39 +00:00
|
|
|
ps := &PubSub{
|
2016-09-11 03:47:12 +00:00
|
|
|
host: h,
|
|
|
|
ctx: ctx,
|
|
|
|
incoming: make(chan *RPC, 32),
|
|
|
|
publish: make(chan *Message),
|
|
|
|
newPeers: make(chan inet.Stream),
|
|
|
|
peerDead: make(chan peer.ID),
|
|
|
|
addSub: make(chan *addSub),
|
|
|
|
myTopics: make(map[string]chan *Message),
|
|
|
|
topics: make(map[string]map[peer.ID]struct{}),
|
|
|
|
peers: make(map[peer.ID]chan *RPC),
|
|
|
|
seenMessages: timecache.NewTimeCache(time.Second * 30),
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
h.SetStreamHandler(ID, ps.handleNewStream)
|
|
|
|
h.Network().Notify(ps)
|
|
|
|
|
2016-09-10 23:03:53 +00:00
|
|
|
go ps.processLoop(ctx)
|
2016-09-10 00:15:39 +00:00
|
|
|
|
|
|
|
return ps
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *PubSub) getHelloPacket() *RPC {
|
|
|
|
var rpc RPC
|
|
|
|
for t, _ := range p.myTopics {
|
2016-09-11 03:47:12 +00:00
|
|
|
as := &pb.RPC_SubOpts{
|
|
|
|
Topicid: proto.String(t),
|
|
|
|
Subscribe: proto.Bool(true),
|
|
|
|
}
|
|
|
|
rpc.Subscriptions = append(rpc.Subscriptions, as)
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
|
|
|
return &rpc
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *PubSub) handleNewStream(s inet.Stream) {
|
|
|
|
defer s.Close()
|
|
|
|
|
2016-09-10 15:14:17 +00:00
|
|
|
r := ggio.NewDelimitedReader(s, 1<<20)
|
|
|
|
for {
|
2016-09-10 00:15:39 +00:00
|
|
|
rpc := new(RPC)
|
2016-09-10 15:14:17 +00:00
|
|
|
err := r.ReadMsg(&rpc.RPC)
|
2016-09-10 00:15:39 +00:00
|
|
|
if err != nil {
|
2016-09-10 23:03:53 +00:00
|
|
|
if err != io.EOF {
|
|
|
|
log.Errorf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
|
|
|
|
}
|
2016-09-10 00:15:39 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
rpc.from = s.Conn().RemotePeer()
|
2016-09-10 23:03:53 +00:00
|
|
|
select {
|
|
|
|
case p.incoming <- rpc:
|
|
|
|
case <-p.ctx.Done():
|
|
|
|
return
|
|
|
|
}
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-09-10 23:03:53 +00:00
|
|
|
func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, in <-chan *RPC) {
|
2016-09-10 00:15:39 +00:00
|
|
|
var dead bool
|
2016-09-10 15:28:29 +00:00
|
|
|
bufw := bufio.NewWriter(s)
|
|
|
|
wc := ggio.NewDelimitedWriter(bufw)
|
2016-09-10 00:15:39 +00:00
|
|
|
|
2016-09-10 23:03:53 +00:00
|
|
|
writeMsg := func(msg proto.Message) error {
|
|
|
|
err := wc.WriteMsg(msg)
|
2016-09-10 00:15:39 +00:00
|
|
|
if err != nil {
|
2016-09-10 23:03:53 +00:00
|
|
|
return err
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
2016-09-10 15:28:29 +00:00
|
|
|
|
2016-09-10 23:03:53 +00:00
|
|
|
return bufw.Flush()
|
|
|
|
}
|
|
|
|
|
|
|
|
defer wc.Close()
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case rpc, ok := <-in:
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if dead {
|
|
|
|
// continue in order to drain messages
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
err := writeMsg(&rpc.RPC)
|
|
|
|
if err != nil {
|
|
|
|
log.Errorf("writing message to %s: %s", s.Conn().RemotePeer(), err)
|
|
|
|
dead = true
|
|
|
|
go func() {
|
|
|
|
p.peerDead <- s.Conn().RemotePeer()
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
2016-09-10 15:28:29 +00:00
|
|
|
}
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-09-10 23:03:53 +00:00
|
|
|
func (p *PubSub) processLoop(ctx context.Context) {
|
2016-09-10 00:15:39 +00:00
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case s := <-p.newPeers:
|
|
|
|
pid := s.Conn().RemotePeer()
|
|
|
|
_, ok := p.peers[pid]
|
|
|
|
if ok {
|
|
|
|
log.Error("already have connection to peer: ", pid)
|
|
|
|
s.Close()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
messages := make(chan *RPC, 32)
|
2016-09-10 23:03:53 +00:00
|
|
|
go p.handleSendingMessages(ctx, s, messages)
|
2016-09-10 00:15:39 +00:00
|
|
|
messages <- p.getHelloPacket()
|
|
|
|
|
|
|
|
p.peers[pid] = messages
|
|
|
|
|
|
|
|
case pid := <-p.peerDead:
|
|
|
|
delete(p.peers, pid)
|
|
|
|
case sub := <-p.addSub:
|
2016-09-10 03:13:50 +00:00
|
|
|
p.handleSubscriptionChange(sub)
|
2016-09-10 00:15:39 +00:00
|
|
|
case rpc := <-p.incoming:
|
|
|
|
err := p.handleIncomingRPC(rpc)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("handling RPC: ", err)
|
|
|
|
}
|
2016-09-11 03:47:12 +00:00
|
|
|
case msg := <-p.publish:
|
|
|
|
err := p.recvMessage(msg.Message)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("error receiving message: ", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
err = p.publishMessage(p.host.ID(), msg.Message)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("publishing message: ", err)
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
2016-09-10 23:03:53 +00:00
|
|
|
case <-ctx.Done():
|
|
|
|
log.Info("pubsub processloop shutting down")
|
|
|
|
return
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2016-09-10 03:13:50 +00:00
|
|
|
func (p *PubSub) handleSubscriptionChange(sub *addSub) {
|
|
|
|
|
2016-09-11 03:47:12 +00:00
|
|
|
subopt := pb.RPC_SubOpts{
|
|
|
|
Topicid: &sub.topic,
|
|
|
|
Subscribe: &sub.sub,
|
|
|
|
}
|
2016-09-10 03:13:50 +00:00
|
|
|
|
2016-09-11 03:47:12 +00:00
|
|
|
ch, ok := p.myTopics[sub.topic]
|
|
|
|
if sub.sub {
|
2016-09-10 03:13:50 +00:00
|
|
|
if ok {
|
|
|
|
// we don't allow multiple subs per topic at this point
|
|
|
|
sub.resp <- nil
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
resp := make(chan *Message, 16)
|
|
|
|
p.myTopics[sub.topic] = resp
|
|
|
|
sub.resp <- resp
|
2016-09-11 03:47:12 +00:00
|
|
|
} else {
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
close(ch)
|
|
|
|
delete(p.myTopics, sub.topic)
|
|
|
|
}
|
|
|
|
|
|
|
|
out := &RPC{
|
|
|
|
RPC: pb.RPC{
|
|
|
|
Subscriptions: []*pb.RPC_SubOpts{
|
|
|
|
&subopt,
|
|
|
|
},
|
|
|
|
},
|
2016-09-10 03:13:50 +00:00
|
|
|
}
|
|
|
|
|
2016-09-11 03:47:12 +00:00
|
|
|
for _, peer := range p.peers {
|
|
|
|
peer <- out
|
|
|
|
}
|
2016-09-10 03:13:50 +00:00
|
|
|
}
|
2016-09-10 00:15:39 +00:00
|
|
|
|
2016-09-11 03:47:12 +00:00
|
|
|
func (p *PubSub) recvMessage(msg *pb.Message) error {
|
|
|
|
if len(msg.GetTopicIDs()) > 1 {
|
|
|
|
return fmt.Errorf("Dont yet handle multiple topics per message")
|
|
|
|
}
|
|
|
|
if len(msg.GetTopicIDs()) == 0 {
|
|
|
|
return fmt.Errorf("no topic on received message")
|
|
|
|
}
|
|
|
|
|
|
|
|
topic := msg.GetTopicIDs()[0]
|
|
|
|
subch, ok := p.myTopics[topic]
|
2016-09-10 00:15:39 +00:00
|
|
|
if ok {
|
2016-09-11 03:47:12 +00:00
|
|
|
subch <- &Message{msg}
|
|
|
|
} else {
|
|
|
|
log.Error("received message we we'rent subscribed to")
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-09-11 03:47:12 +00:00
|
|
|
func (p *PubSub) seenMessage(id string) bool {
|
|
|
|
return p.seenMessages.Has(id)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *PubSub) markSeen(id string) {
|
|
|
|
p.seenMessages.Add(id)
|
|
|
|
}
|
|
|
|
|
2016-09-10 00:15:39 +00:00
|
|
|
func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
|
2016-09-11 03:47:12 +00:00
|
|
|
for _, subopt := range rpc.GetSubscriptions() {
|
|
|
|
t := subopt.GetTopicid()
|
|
|
|
if subopt.GetSubscribe() {
|
2016-09-10 00:15:39 +00:00
|
|
|
tmap, ok := p.topics[t]
|
|
|
|
if !ok {
|
|
|
|
tmap = make(map[peer.ID]struct{})
|
|
|
|
p.topics[t] = tmap
|
|
|
|
}
|
|
|
|
|
|
|
|
tmap[rpc.from] = struct{}{}
|
2016-09-11 03:47:12 +00:00
|
|
|
} else {
|
2016-09-10 00:15:39 +00:00
|
|
|
tmap, ok := p.topics[t]
|
|
|
|
if !ok {
|
2016-09-11 03:47:12 +00:00
|
|
|
continue
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
|
|
|
delete(tmap, rpc.from)
|
|
|
|
}
|
2016-09-11 03:47:12 +00:00
|
|
|
}
|
2016-09-10 03:13:50 +00:00
|
|
|
|
2016-09-11 03:47:12 +00:00
|
|
|
for _, pmsg := range rpc.GetPublish() {
|
|
|
|
msg := &Message{pmsg}
|
2016-09-10 15:14:17 +00:00
|
|
|
|
2016-09-11 03:47:12 +00:00
|
|
|
id := msg.Message.GetFrom() + string(msg.GetSeqno())
|
|
|
|
|
|
|
|
if p.seenMessage(id) {
|
|
|
|
continue
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
|
|
|
|
2016-09-10 15:14:17 +00:00
|
|
|
if msg.GetFrom() == p.host.ID() {
|
|
|
|
log.Error("skipping message from self")
|
2016-09-10 00:15:39 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-09-11 03:47:12 +00:00
|
|
|
p.markSeen(id)
|
2016-09-10 00:15:39 +00:00
|
|
|
|
2016-09-11 03:47:12 +00:00
|
|
|
if err := p.recvMessage(pmsg); err != nil {
|
2016-09-10 00:15:39 +00:00
|
|
|
log.Error("error receiving message: ", err)
|
|
|
|
}
|
|
|
|
|
2016-09-11 03:47:12 +00:00
|
|
|
err := p.publishMessage(rpc.from, pmsg)
|
2016-09-10 00:15:39 +00:00
|
|
|
if err != nil {
|
|
|
|
log.Error("publish message: ", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-09-11 03:47:12 +00:00
|
|
|
func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error {
|
|
|
|
if len(msg.GetTopicIDs()) != 1 {
|
|
|
|
return fmt.Errorf("don't support publishing to multiple topics in a single message")
|
|
|
|
}
|
|
|
|
|
|
|
|
tmap, ok := p.topics[msg.GetTopicIDs()[0]]
|
2016-09-10 00:15:39 +00:00
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-09-11 03:47:12 +00:00
|
|
|
out := &RPC{RPC: pb.RPC{Publish: []*pb.Message{msg}}}
|
|
|
|
|
2016-09-10 00:15:39 +00:00
|
|
|
for pid, _ := range tmap {
|
2016-09-11 03:47:12 +00:00
|
|
|
if pid == from || pid == peer.ID(msg.GetFrom()) {
|
2016-09-10 00:15:39 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
mch, ok := p.peers[pid]
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2016-09-11 03:47:12 +00:00
|
|
|
go func() { mch <- out }()
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type addSub struct {
|
2016-09-11 03:47:12 +00:00
|
|
|
topic string
|
|
|
|
sub bool
|
|
|
|
resp chan chan *Message
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (p *PubSub) Subscribe(topic string) (<-chan *Message, error) {
|
2016-09-11 03:47:12 +00:00
|
|
|
return p.SubscribeComplicated(&pb.TopicDescriptor{
|
|
|
|
Name: proto.String(topic),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *PubSub) SubscribeComplicated(td *pb.TopicDescriptor) (<-chan *Message, error) {
|
|
|
|
if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE {
|
|
|
|
return nil, fmt.Errorf("Auth method not yet supported")
|
|
|
|
}
|
|
|
|
|
|
|
|
if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE {
|
|
|
|
return nil, fmt.Errorf("Encryption method not yet supported")
|
|
|
|
}
|
|
|
|
|
2016-09-10 00:15:39 +00:00
|
|
|
resp := make(chan chan *Message)
|
|
|
|
p.addSub <- &addSub{
|
2016-09-11 03:47:12 +00:00
|
|
|
topic: td.GetName(),
|
2016-09-10 00:15:39 +00:00
|
|
|
resp: resp,
|
2016-09-11 03:47:12 +00:00
|
|
|
sub: true,
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
outch := <-resp
|
|
|
|
if outch == nil {
|
|
|
|
return nil, fmt.Errorf("error, duplicate subscription")
|
|
|
|
}
|
|
|
|
|
|
|
|
return outch, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *PubSub) Unsub(topic string) {
|
2016-09-10 03:13:50 +00:00
|
|
|
p.addSub <- &addSub{
|
2016-09-11 03:47:12 +00:00
|
|
|
topic: topic,
|
|
|
|
sub: false,
|
2016-09-10 03:13:50 +00:00
|
|
|
}
|
2016-09-10 00:15:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (p *PubSub) Publish(topic string, data []byte) error {
|
2016-09-11 03:47:12 +00:00
|
|
|
seqno := make([]byte, 8)
|
|
|
|
binary.BigEndian.PutUint64(seqno, uint64(time.Now().UnixNano()))
|
|
|
|
|
|
|
|
p.publish <- &Message{
|
|
|
|
&pb.Message{
|
|
|
|
Data: data,
|
|
|
|
TopicIDs: []string{topic},
|
|
|
|
From: proto.String(string(p.host.ID())),
|
|
|
|
Seqno: seqno,
|
2016-09-10 00:15:39 +00:00
|
|
|
},
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|