Merge pull request #10 from keks/feat/feed-refactor

Subscribe to a topic multiple times
This commit is contained in:
Jeromy Johnson 2016-11-21 08:45:27 -08:00 committed by GitHub
commit c3de971f06
5 changed files with 266 additions and 111 deletions

View File

@ -1 +1 @@
0.8.0: QmWiLbk7eE1jGePDAuS26E2A9bMK3e3PMH3dcSeRY3MEBR
0.8.1: QmRJs5veT3gnuYpLAagC3NbzixbkgwjSdUXTKfh3hMo6XM

View File

@ -31,14 +31,17 @@ type PubSub struct {
publish chan *Message
// addSub is a control channel for us to add and remove subscriptions
addSub chan *addSub
addSub chan *addSubReq
//
// get list of topics we are subscribed to
getTopics chan *topicReq
//
// get chan of peers we are connected to
getPeers chan *listPeerReq
// send subscription here to cancel it
cancelCh chan *Subscription
// a notification channel for incoming streams from other peers
newPeers chan inet.Stream
@ -46,7 +49,7 @@ type PubSub struct {
peerDead chan peer.ID
// The set of topics we are subscribed to
myTopics map[string]chan *Message
myTopics map[string]map[*Subscription]struct{}
// topics tracks which topics each of our peers are subscribed to
topics map[string]map[peer.ID]struct{}
@ -72,6 +75,7 @@ type RPC struct {
from peer.ID
}
// NewFloodSub returns a new FloodSub management object
func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
ps := &PubSub{
host: h,
@ -80,10 +84,11 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
publish: make(chan *Message),
newPeers: make(chan inet.Stream),
peerDead: make(chan peer.ID),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSub),
addSub: make(chan *addSubReq),
getTopics: make(chan *topicReq),
myTopics: make(map[string]chan *Message),
myTopics: make(map[string]map[*Subscription]struct{}),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
seenMessages: timecache.NewTimeCache(time.Second * 30),
@ -97,6 +102,7 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
return ps
}
// processLoop handles all inputs arriving on the channels
func (p *PubSub) processLoop(ctx context.Context) {
for {
select {
@ -130,8 +136,10 @@ func (p *PubSub) processLoop(ctx context.Context) {
out = append(out, t)
}
treq.resp <- out
case sub := <-p.cancelCh:
p.handleRemoveSubscription(sub)
case sub := <-p.addSub:
p.handleSubscriptionChange(sub)
p.handleAddSubscription(sub)
case preq := <-p.getPeers:
tmap, ok := p.topics[preq.topic]
if preq.topic != "" && !ok {
@ -164,30 +172,62 @@ func (p *PubSub) processLoop(ctx context.Context) {
}
}
func (p *PubSub) handleSubscriptionChange(sub *addSub) {
subopt := &pb.RPC_SubOpts{
Topicid: &sub.topic,
Subscribe: &sub.sub,
// handleRemoveSubscription removes Subscription sub from bookeeping.
// If this was the last Subscription for a given topic, it will also announce
// that this node is not subscribing to this topic anymore.
// Only called from processLoop.
func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
subs := p.myTopics[sub.topic]
if subs == nil {
return
}
ch, ok := p.myTopics[sub.topic]
if sub.sub {
if ok {
// we don't allow multiple subs per topic at this point
sub.resp <- nil
return
}
sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()")
close(sub.ch)
delete(subs, sub)
resp := make(chan *Message, 16)
p.myTopics[sub.topic] = resp
sub.resp <- resp
} else {
if !ok {
return
}
close(ch)
if len(subs) == 0 {
delete(p.myTopics, sub.topic)
p.announce(sub.topic, false)
}
}
// handleAddSubscription adds a Subscription for a particular topic. If it is
// the first Subscription for the topic, it will announce that this node
// subscribes to the topic.
// Only called from processLoop.
func (p *PubSub) handleAddSubscription(req *addSubReq) {
subs := p.myTopics[req.topic]
// announce we want this topic
if len(subs) == 0 {
p.announce(req.topic, true)
}
// make new if not there
if subs == nil {
p.myTopics[req.topic] = make(map[*Subscription]struct{})
subs = p.myTopics[req.topic]
}
sub := &Subscription{
ch: make(chan *Message, 32),
topic: req.topic,
cancelCh: p.cancelCh,
}
p.myTopics[sub.topic][sub] = struct{}{}
req.resp <- sub
}
// announce announces whether or not this node is interested in a given topic
// Only called from processLoop.
func (p *PubSub) announce(topic string, sub bool) {
subopt := &pb.RPC_SubOpts{
Topicid: &topic,
Subscribe: &sub,
}
out := rpcWithSubs(subopt)
@ -196,23 +236,29 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) {
}
}
// notifySubs sends a given message to all corresponding subscribbers.
// Only called from processLoop.
func (p *PubSub) notifySubs(msg *pb.Message) {
for _, topic := range msg.GetTopicIDs() {
subch, ok := p.myTopics[topic]
if ok {
subch <- &Message{msg}
subs := p.myTopics[topic]
for f := range subs {
f.ch <- &Message{msg}
}
}
}
// seenMessage returns whether we already saw this message before
func (p *PubSub) seenMessage(id string) bool {
return p.seenMessages.Has(id)
}
// markSeen marks a message as seen such that seenMessage returns `true' for the given id
func (p *PubSub) markSeen(id string) {
p.seenMessages.Add(id)
}
// subscribedToMessage returns whether we are subscribed to one of the topics
// of a given message
func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
for _, t := range msg.GetTopicIDs() {
if _, ok := p.myTopics[t]; ok {
@ -253,6 +299,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
return nil
}
// msgID returns a unique ID of the passed Message
func msgID(pmsg *pb.Message) string {
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
}
@ -303,59 +350,49 @@ func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error {
return nil
}
type addSub struct {
type addSubReq struct {
topic string
sub bool
resp chan chan *Message
resp chan *Subscription
}
func (p *PubSub) Subscribe(ctx context.Context, topic string) (<-chan *Message, error) {
return p.SubscribeComplicated(&pb.TopicDescriptor{
Name: proto.String(topic),
})
// Subscribe returns a new Subscription for the given topic
func (p *PubSub) Subscribe(topic string) (*Subscription, error) {
td := pb.TopicDescriptor{Name: &topic}
return p.SubscribeByTopicDescriptor(&td)
}
// SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor
func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor) (*Subscription, error) {
if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE {
return nil, fmt.Errorf("auth mode not yet supported")
}
if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE {
return nil, fmt.Errorf("encryption mode not yet supported")
}
out := make(chan *Subscription, 1)
p.addSub <- &addSubReq{
topic: td.GetName(),
resp: out,
}
return <-out, nil
}
type topicReq struct {
resp chan []string
}
// GetTopics returns the topics this node is subscribed to
func (p *PubSub) GetTopics() []string {
out := make(chan []string, 1)
p.getTopics <- &topicReq{resp: out}
return <-out
}
func (p *PubSub) SubscribeComplicated(td *pb.TopicDescriptor) (<-chan *Message, error) {
if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE {
return nil, fmt.Errorf("Auth method not yet supported")
}
if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE {
return nil, fmt.Errorf("Encryption method not yet supported")
}
resp := make(chan chan *Message)
p.addSub <- &addSub{
topic: td.GetName(),
resp: resp,
sub: true,
}
outch := <-resp
if outch == nil {
return nil, fmt.Errorf("error, duplicate subscription")
}
return outch, nil
}
func (p *PubSub) Unsub(topic string) {
p.addSub <- &addSub{
topic: topic,
sub: false,
}
}
// Publish publishes data under the given topic
func (p *PubSub) Publish(topic string, data []byte) error {
seqno := make([]byte, 8)
binary.BigEndian.PutUint64(seqno, uint64(time.Now().UnixNano()))
@ -376,6 +413,7 @@ type listPeerReq struct {
topic string
}
// ListPeers returns a list of peers we are connected to.
func (p *PubSub) ListPeers(topic string) []peer.ID {
out := make(chan []peer.ID)
p.getPeers <- &listPeerReq{

View File

@ -10,11 +10,12 @@ import (
"time"
host "github.com/libp2p/go-libp2p-host"
netutil "github.com/libp2p/go-libp2p-netutil"
peer "github.com/libp2p/go-libp2p-peer"
netutil "github.com/libp2p/go-libp2p/p2p/test/util"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
)
func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []<-chan *Message) {
func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []*Subscription) {
data := make([]byte, 16)
rand.Read(data)
@ -34,7 +35,8 @@ func getNetHosts(t *testing.T, ctx context.Context, n int) []host.Host {
var out []host.Host
for i := 0; i < n; i++ {
h := netutil.GenHostSwarm(t, ctx)
netw := netutil.GenSwarmNetwork(t, ctx)
h := bhost.New(netw)
out = append(out, h)
}
@ -85,13 +87,14 @@ func getPubsubs(ctx context.Context, hs []host.Host) []*PubSub {
return psubs
}
func assertReceive(t *testing.T, ch <-chan *Message, exp []byte) {
func assertReceive(t *testing.T, ch *Subscription, exp []byte) {
select {
case msg := <-ch:
case msg := <-ch.ch:
if !bytes.Equal(msg.GetData(), exp) {
t.Fatalf("got wrong message, expected %s but got %s", string(exp), string(msg.GetData()))
}
case <-time.After(time.Second * 5):
t.Logf("%#v\n", ch)
t.Fatal("timed out waiting for message of: ", string(exp))
}
}
@ -103,9 +106,9 @@ func TestBasicFloodsub(t *testing.T) {
psubs := getPubsubs(ctx, hosts)
var msgs []<-chan *Message
var msgs []*Subscription
for _, ps := range psubs {
subch, err := ps.Subscribe(ctx, "foobar")
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
@ -125,8 +128,11 @@ func TestBasicFloodsub(t *testing.T) {
psubs[owner].Publish("foobar", msg)
for _, resp := range msgs {
got := <-resp
for _, sub := range msgs {
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
}
if !bytes.Equal(msg, got.Data) {
t.Fatal("got wrong message!")
}
@ -149,13 +155,13 @@ func TestMultihops(t *testing.T) {
connect(t, hosts[3], hosts[4])
connect(t, hosts[4], hosts[5])
var msgChs []<-chan *Message
var subs []*Subscription
for i := 1; i < 6; i++ {
ch, err := psubs[i].Subscribe(ctx, "foobar")
ch, err := psubs[i].Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
msgChs = append(msgChs, ch)
subs = append(subs, ch)
}
time.Sleep(time.Millisecond * 100)
@ -168,7 +174,7 @@ func TestMultihops(t *testing.T) {
// last node in the chain should get the message
select {
case out := <-msgChs[4]:
case out := <-subs[4].ch:
if !bytes.Equal(out.GetData(), msg) {
t.Fatal("got wrong data")
}
@ -188,12 +194,12 @@ func TestReconnects(t *testing.T) {
connect(t, hosts[0], hosts[1])
connect(t, hosts[0], hosts[2])
A, err := psubs[1].Subscribe(ctx, "cats")
A, err := psubs[1].Subscribe("cats")
if err != nil {
t.Fatal(err)
}
B, err := psubs[2].Subscribe(ctx, "cats")
B, err := psubs[2].Subscribe("cats")
if err != nil {
t.Fatal(err)
}
@ -209,7 +215,7 @@ func TestReconnects(t *testing.T) {
assertReceive(t, A, msg)
assertReceive(t, B, msg)
psubs[2].Unsub("cats")
B.Cancel()
time.Sleep(time.Millisecond * 50)
@ -221,7 +227,7 @@ func TestReconnects(t *testing.T) {
assertReceive(t, A, msg2)
select {
case _, ok := <-B:
case _, ok := <-B.ch:
if ok {
t.Fatal("shouldnt have gotten data on this channel")
}
@ -229,12 +235,17 @@ func TestReconnects(t *testing.T) {
t.Fatal("timed out waiting for B chan to be closed")
}
ch2, err := psubs[2].Subscribe(ctx, "cats")
nSubs := len(psubs[2].myTopics["cats"])
if nSubs > 0 {
t.Fatal(`B should have 0 subscribers for channel "cats", has`, nSubs)
}
ch2, err := psubs[2].Subscribe("cats")
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 50)
time.Sleep(time.Millisecond * 100)
nextmsg := []byte("ifps is kul")
err = psubs[0].Publish("cats", nextmsg)
@ -254,7 +265,7 @@ func TestNoConnection(t *testing.T) {
psubs := getPubsubs(ctx, hosts)
ch, err := psubs[5].Subscribe(ctx, "foobar")
ch, err := psubs[5].Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
@ -265,7 +276,7 @@ func TestNoConnection(t *testing.T) {
}
select {
case <-ch:
case <-ch.ch:
t.Fatal("shouldnt have gotten a message")
case <-time.After(time.Millisecond * 200):
}
@ -288,7 +299,7 @@ func TestSelfReceive(t *testing.T) {
time.Sleep(time.Millisecond * 10)
ch, err := psub.Subscribe(ctx, "foobar")
ch, err := psub.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
@ -311,14 +322,14 @@ func TestOneToOne(t *testing.T) {
connect(t, hosts[0], hosts[1])
ch, err := psubs[1].Subscribe(ctx, "foobar")
ch, err := psubs[1].Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 50)
checkMessageRouting(t, "foobar", psubs, []<-chan *Message{ch})
checkMessageRouting(t, "foobar", psubs, []*Subscription{ch})
}
func assertPeerLists(t *testing.T, hosts []host.Host, ps *PubSub, has ...int) {
@ -362,9 +373,9 @@ func TestTreeTopology(t *testing.T) {
[8] -> [9]
*/
var chs []<-chan *Message
var chs []*Subscription
for _, ps := range psubs {
ch, err := ps.Subscribe(ctx, "fizzbuzz")
ch, err := ps.Subscribe("fizzbuzz")
if err != nil {
t.Fatal(err)
}
@ -404,31 +415,31 @@ func TestSubReporting(t *testing.T) {
host := getNetHosts(t, ctx, 1)[0]
psub := NewFloodSub(ctx, host)
_, err := psub.Subscribe(ctx, "foo")
fooSub, err := psub.Subscribe("foo")
if err != nil {
t.Fatal(err)
}
_, err = psub.Subscribe(ctx, "bar")
barSub, err := psub.Subscribe("bar")
if err != nil {
t.Fatal(err)
}
assertHasTopics(t, psub, "foo", "bar")
_, err = psub.Subscribe(ctx, "baz")
_, err = psub.Subscribe("baz")
if err != nil {
t.Fatal(err)
}
assertHasTopics(t, psub, "foo", "bar", "baz")
psub.Unsub("bar")
barSub.Cancel()
assertHasTopics(t, psub, "foo", "baz")
psub.Unsub("foo")
fooSub.Cancel()
assertHasTopics(t, psub, "baz")
_, err = psub.Subscribe(ctx, "fish")
_, err = psub.Subscribe("fish")
if err != nil {
t.Fatal(err)
}
@ -447,17 +458,39 @@ func TestPeerTopicReporting(t *testing.T) {
connect(t, hosts[0], hosts[2])
connect(t, hosts[0], hosts[3])
psubs[1].Subscribe(ctx, "foo")
psubs[1].Subscribe(ctx, "bar")
psubs[1].Subscribe(ctx, "baz")
_, err := psubs[1].Subscribe("foo")
if err != nil {
t.Fatal(err)
}
_, err = psubs[1].Subscribe("bar")
if err != nil {
t.Fatal(err)
}
_, err = psubs[1].Subscribe("baz")
if err != nil {
t.Fatal(err)
}
psubs[2].Subscribe(ctx, "foo")
psubs[2].Subscribe(ctx, "ipfs")
_, err = psubs[2].Subscribe("foo")
if err != nil {
t.Fatal(err)
}
_, err = psubs[2].Subscribe("ipfs")
if err != nil {
t.Fatal(err)
}
psubs[3].Subscribe(ctx, "baz")
psubs[3].Subscribe(ctx, "ipfs")
_, err = psubs[3].Subscribe("baz")
if err != nil {
t.Fatal(err)
}
_, err = psubs[3].Subscribe("ipfs")
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 10)
peers := psubs[0].ListPeers("ipfs")
assertPeerList(t, peers, hosts[2].ID(), hosts[3].ID())
@ -471,17 +504,62 @@ func TestPeerTopicReporting(t *testing.T) {
assertPeerList(t, peers, hosts[1].ID())
}
func TestSubscribeMultipleTimes(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
sub1, err := psubs[0].Subscribe("foo")
if err != nil {
t.Fatal(err)
}
sub2, err := psubs[0].Subscribe("foo")
if err != nil {
t.Fatal(err)
}
// make sure subscribing is finished by the time we publish
time.Sleep(1 * time.Millisecond)
psubs[1].Publish("foo", []byte("bar"))
msg, err := sub1.Next(ctx)
if err != nil {
t.Fatalf("unexpected error: %v.", err)
}
data := string(msg.GetData())
if data != "bar" {
t.Fatalf("data is %s, expected %s.", data, "bar")
}
msg, err = sub2.Next(ctx)
if err != nil {
t.Fatalf("unexpected error: %v.", err)
}
data = string(msg.GetData())
if data != "bar" {
t.Fatalf("data is %s, expected %s.", data, "bar")
}
}
func assertPeerList(t *testing.T, peers []peer.ID, expected ...peer.ID) {
sort.Sort(peer.IDSlice(peers))
sort.Sort(peer.IDSlice(expected))
if len(peers) != len(expected) {
t.Fatal("mismatch: %s != %s", peers, expected)
t.Fatalf("mismatch: %s != %s", peers, expected)
}
for i, p := range peers {
if expected[i] != p {
t.Fatal("mismatch: %s != %s", peers, expected)
t.Fatalf("mismatch: %s != %s", peers, expected)
}
}
}

View File

@ -30,6 +30,12 @@
"hash": "Qmb6UFbVu1grhv5o5KnouvtZ6cqdrjXj6zLejAHWunxgCt",
"name": "go-libp2p-host",
"version": "1.3.0"
},
{
"author": "whyrusleeping",
"hash": "QmcDTquYLTYirqj71RRWKUWEEw3nJt11Awzun5ep8kfY7W",
"name": "go-libp2p-netutil",
"version": "0.1.0"
}
],
"gxVersion": "0.9.0",
@ -37,6 +43,6 @@
"license": "",
"name": "floodsub",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "0.8.0"
"version": "0.8.1"
}

33
subscription.go Normal file
View File

@ -0,0 +1,33 @@
package floodsub
import (
"context"
)
type Subscription struct {
topic string
ch chan *Message
cancelCh chan<- *Subscription
err error
}
func (sub *Subscription) Topic() string {
return sub.topic
}
func (sub *Subscription) Next(ctx context.Context) (*Message, error) {
select {
case msg, ok := <-sub.ch:
if !ok {
return msg, sub.err
}
return msg, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (sub *Subscription) Cancel() {
sub.cancelCh <- sub
}