refactor publish messaging

This commit is contained in:
Jeromy 2016-09-11 13:56:07 -07:00
parent ae7f9622fb
commit ab2fef7c1b
3 changed files with 183 additions and 143 deletions

98
comm.go Normal file
View File

@ -0,0 +1,98 @@
package floodsub
import (
"bufio"
"context"
"io"
pb "github.com/whyrusleeping/go-floodsub/pb"
ggio "github.com/gogo/protobuf/io"
proto "github.com/gogo/protobuf/proto"
inet "github.com/libp2p/go-libp2p/p2p/net"
)
// get the initial RPC containing all of our subscriptions to send to new peers
func (p *PubSub) getHelloPacket() *RPC {
var rpc RPC
for t := range p.myTopics {
as := &pb.RPC_SubOpts{
Topicid: proto.String(t),
Subscribe: proto.Bool(true),
}
rpc.Subscriptions = append(rpc.Subscriptions, as)
}
return &rpc
}
func (p *PubSub) handleNewStream(s inet.Stream) {
defer s.Close()
r := ggio.NewDelimitedReader(s, 1<<20)
for {
rpc := new(RPC)
err := r.ReadMsg(&rpc.RPC)
if err != nil {
if err != io.EOF {
log.Errorf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
}
return
}
rpc.from = s.Conn().RemotePeer()
select {
case p.incoming <- rpc:
case <-p.ctx.Done():
return
}
}
}
func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgoing <-chan *RPC) {
var dead bool
bufw := bufio.NewWriter(s)
wc := ggio.NewDelimitedWriter(bufw)
writeMsg := func(msg proto.Message) error {
err := wc.WriteMsg(msg)
if err != nil {
return err
}
return bufw.Flush()
}
defer wc.Close()
for {
select {
case rpc, ok := <-outgoing:
if !ok {
return
}
if dead {
// continue in order to drain messages
continue
}
err := writeMsg(&rpc.RPC)
if err != nil {
log.Errorf("writing message to %s: %s", s.Conn().RemotePeer(), err)
dead = true
go func() {
p.peerDead <- s.Conn().RemotePeer()
}()
}
case <-ctx.Done():
return
}
}
}
func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC {
return &RPC{
RPC: pb.RPC{
Subscriptions: subs,
},
}
}

View File

@ -1,17 +1,13 @@
package floodsub
import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"
"sync"
"time"
pb "github.com/whyrusleeping/go-floodsub/pb"
ggio "github.com/gogo/protobuf/io"
proto "github.com/gogo/protobuf/proto"
peer "github.com/ipfs/go-libp2p-peer"
logging "github.com/ipfs/go-log"
@ -23,31 +19,35 @@ import (
const ID = protocol.ID("/floodsub/1.0.0")
var (
AddSubMessageType = "sub"
UnsubMessageType = "unsub"
PubMessageType = "pub"
)
var log = logging.Logger("floodsub")
type PubSub struct {
host host.Host
// incoming messages from other peers
incoming chan *RPC
publish chan *Message
// messages we are publishing out to our peers
publish chan *Message
// addSub is a control channel for us to add and remove subscriptions
addSub chan *addSub
// a notification channel for incoming streams from other peers
newPeers chan inet.Stream
// a notification channel for when our peers die
peerDead chan peer.ID
// The set of topics we are subscribed to
myTopics map[string]chan *Message
pubsubLk sync.Mutex
topics map[string]map[peer.ID]struct{}
// topics tracks which topics each of our peers are subscribed to
topics map[string]map[peer.ID]struct{}
peers map[peer.ID]chan *RPC
seenMessages *timecache.TimeCache
addSub chan *addSub
ctx context.Context
}
@ -89,84 +89,7 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
return ps
}
func (p *PubSub) getHelloPacket() *RPC {
var rpc RPC
for t, _ := range p.myTopics {
as := &pb.RPC_SubOpts{
Topicid: proto.String(t),
Subscribe: proto.Bool(true),
}
rpc.Subscriptions = append(rpc.Subscriptions, as)
}
return &rpc
}
func (p *PubSub) handleNewStream(s inet.Stream) {
defer s.Close()
r := ggio.NewDelimitedReader(s, 1<<20)
for {
rpc := new(RPC)
err := r.ReadMsg(&rpc.RPC)
if err != nil {
if err != io.EOF {
log.Errorf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
}
return
}
rpc.from = s.Conn().RemotePeer()
select {
case p.incoming <- rpc:
case <-p.ctx.Done():
return
}
}
}
func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, in <-chan *RPC) {
var dead bool
bufw := bufio.NewWriter(s)
wc := ggio.NewDelimitedWriter(bufw)
writeMsg := func(msg proto.Message) error {
err := wc.WriteMsg(msg)
if err != nil {
return err
}
return bufw.Flush()
}
defer wc.Close()
for {
select {
case rpc, ok := <-in:
if !ok {
return
}
if dead {
// continue in order to drain messages
continue
}
err := writeMsg(&rpc.RPC)
if err != nil {
log.Errorf("writing message to %s: %s", s.Conn().RemotePeer(), err)
dead = true
go func() {
p.peerDead <- s.Conn().RemotePeer()
}()
}
case <-ctx.Done():
return
}
}
}
func (p *PubSub) processLoop(ctx context.Context) {
for {
select {
case s := <-p.newPeers:
@ -192,16 +115,15 @@ func (p *PubSub) processLoop(ctx context.Context) {
err := p.handleIncomingRPC(rpc)
if err != nil {
log.Error("handling RPC: ", err)
continue
}
case msg := <-p.publish:
err := p.recvMessage(msg.Message)
if err != nil {
log.Error("error receiving message: ", err)
}
p.notifySubs(msg.Message)
err = p.publishMessage(p.host.ID(), msg.Message)
err := p.publishMessage(p.host.ID(), msg.Message)
if err != nil {
log.Error("publishing message: ", err)
continue
}
case <-ctx.Done():
log.Info("pubsub processloop shutting down")
@ -209,9 +131,9 @@ func (p *PubSub) processLoop(ctx context.Context) {
}
}
}
func (p *PubSub) handleSubscriptionChange(sub *addSub) {
subopt := pb.RPC_SubOpts{
func (p *PubSub) handleSubscriptionChange(sub *addSub) {
subopt := &pb.RPC_SubOpts{
Topicid: &sub.topic,
Subscribe: &sub.sub,
}
@ -236,35 +158,19 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) {
delete(p.myTopics, sub.topic)
}
out := &RPC{
RPC: pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{
&subopt,
},
},
}
out := rpcWithSubs(subopt)
for _, peer := range p.peers {
peer <- out
}
}
func (p *PubSub) recvMessage(msg *pb.Message) error {
if len(msg.GetTopicIDs()) > 1 {
return fmt.Errorf("Dont yet handle multiple topics per message")
func (p *PubSub) notifySubs(msg *pb.Message) {
for _, topic := range msg.GetTopicIDs() {
subch, ok := p.myTopics[topic]
if ok {
subch <- &Message{msg}
}
}
if len(msg.GetTopicIDs()) == 0 {
return fmt.Errorf("no topic on received message")
}
topic := msg.GetTopicIDs()[0]
subch, ok := p.myTopics[topic]
if ok {
subch <- &Message{msg}
} else {
log.Error("received message we we'rent subscribed to")
}
return nil
}
func (p *PubSub) seenMessage(id string) bool {
@ -275,6 +181,15 @@ func (p *PubSub) markSeen(id string) {
p.seenMessages.Add(id)
}
func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
for _, t := range msg.GetTopicIDs() {
if _, ok := p.myTopics[t]; ok {
return true
}
}
return false
}
func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
for _, subopt := range rpc.GetSubscriptions() {
t := subopt.GetTopicid()
@ -296,33 +211,35 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
}
for _, pmsg := range rpc.GetPublish() {
msg := &Message{pmsg}
id := msg.Message.GetFrom() + string(msg.GetSeqno())
if p.seenMessage(id) {
if !p.subscribedToMsg(pmsg) {
log.Warning("received message we didn't subscribe to. Dropping.")
continue
}
if msg.GetFrom() == p.host.ID() {
log.Error("skipping message from self")
return nil
}
p.markSeen(id)
if err := p.recvMessage(pmsg); err != nil {
log.Error("error receiving message: ", err)
}
err := p.publishMessage(rpc.from, pmsg)
if err != nil {
log.Error("publish message: ", err)
}
p.maybePublishMessage(rpc.from, pmsg)
}
return nil
}
func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) {
msg := &Message{pmsg}
id := msg.Message.GetFrom() + string(msg.GetSeqno())
if p.seenMessage(id) {
return
}
p.markSeen(id)
p.notifySubs(pmsg)
err := p.publishMessage(from, pmsg)
if err != nil {
log.Error("publish message: ", err)
}
}
func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error {
if len(msg.GetTopicIDs()) != 1 {
return fmt.Errorf("don't support publishing to multiple topics in a single message")
@ -335,7 +252,7 @@ func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error {
out := &RPC{RPC: pb.RPC{Publish: []*pb.Message{msg}}}
for pid, _ := range tmap {
for pid := range tmap {
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
}

View File

@ -207,3 +207,28 @@ func assertReceive(t *testing.T, ch <-chan *Message, exp []byte) {
t.Fatal("timed out waiting for message of: ", exp)
}
}
func TestNoConnection(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)
psubs := getPubsubs(ctx, hosts)
ch, err := psubs[5].Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
err = psubs[0].Publish("foobar", []byte("TESTING"))
if err != nil {
t.Fatal(err)
}
select {
case <-ch:
t.Fatal("shouldnt have gotten a message")
case <-time.After(time.Millisecond * 200):
}
}