diff --git a/dumbsub.go b/floodsub.go similarity index 79% rename from dumbsub.go rename to floodsub.go index 8387a92..2bddef9 100644 --- a/dumbsub.go +++ b/floodsub.go @@ -1,4 +1,4 @@ -package dumbsub +package floodsub import ( "bufio" @@ -7,16 +7,22 @@ import ( "sync" "time" - peer "github.com/ipfs/go-libp2p-peer" - logging "github.com/ipfs/go-log" - host "github.com/libp2p/go-libp2p/p2p/host" - inet "github.com/libp2p/go-libp2p/p2p/net" - protocol "github.com/libp2p/go-libp2p/p2p/protocol" + logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" + peer "gx/ipfs/QmWtbQU15LaB5B1JC2F7TV9P4K88vD3PpA4AJrwfCjhML8/go-libp2p-peer" + host "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/host" + inet "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/net" + protocol "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/protocol" ) -const ID = protocol.ID("/dumbsub/1.0.0") +const ID = protocol.ID("/floodsub/1.0.0") -var log = logging.Logger("dumbsub") +const ( + AddSubMessageType = "sub" + UnsubMessageType = "unsub" + PubMessageType = "pub" +) + +var log = logging.Logger("floodsub") type PubSub struct { host host.Host @@ -84,7 +90,7 @@ type RPC struct { from peer.ID } -func NewDumbSub(h host.Host) *PubSub { +func NewFloodSub(h host.Host) *PubSub { ps := &PubSub{ host: h, incoming: make(chan *RPC, 32), @@ -112,7 +118,7 @@ func (p *PubSub) getHelloPacket() *RPC { for t, _ := range p.myTopics { rpc.Topics = append(rpc.Topics, t) } - rpc.Type = "add" + rpc.Type = AddSubMessageType return &rpc } @@ -177,17 +183,7 @@ func (p *PubSub) processLoop() { case pid := <-p.peerDead: delete(p.peers, pid) case sub := <-p.addSub: - _, ok := p.myTopics[sub.topic] - if ok { - // we don't allow multiple subs per topic at this point - sub.resp <- nil - continue - } - - resp := make(chan *Message, 16) - p.myTopics[sub.topic] = resp - sub.resp <- resp - + p.handleSubscriptionChange(sub) case rpc := <-p.incoming: err := p.handleIncomingRPC(rpc) if err != nil { @@ -195,11 +191,11 @@ func (p *PubSub) processLoop() { } case rpc := <-p.outgoing: switch rpc.Type { - case "add", "del": + case AddSubMessageType, UnsubMessageType: for _, mch := range p.peers { mch <- rpc } - case "pub": + case PubMessageType: //fmt.Println("publishing outgoing message") err := p.recvMessage(rpc) if err != nil { @@ -214,6 +210,37 @@ func (p *PubSub) processLoop() { } } } +func (p *PubSub) handleSubscriptionChange(sub *addSub) { + ch, ok := p.myTopics[sub.topic] + out := &RPC{ + Topics: []string{sub.topic}, + } + + if sub.cancel { + if !ok { + return + } + + close(ch) + delete(p.myTopics, sub.topic) + out.Type = UnsubMessageType + } else { + if ok { + // we don't allow multiple subs per topic at this point + sub.resp <- nil + return + } + + resp := make(chan *Message, 16) + p.myTopics[sub.topic] = resp + sub.resp <- resp + out.Type = AddSubMessageType + } + + go func() { + p.outgoing <- out + }() +} func (p *PubSub) recvMessage(rpc *RPC) error { subch, ok := p.myTopics[rpc.Msg.Topic] @@ -226,7 +253,7 @@ func (p *PubSub) recvMessage(rpc *RPC) error { func (p *PubSub) handleIncomingRPC(rpc *RPC) error { switch rpc.Type { - case "add": + case AddSubMessageType: for _, t := range rpc.Topics { tmap, ok := p.topics[t] if !ok { @@ -236,7 +263,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { tmap[rpc.from] = struct{}{} } - case "del": + case UnsubMessageType: for _, t := range rpc.Topics { tmap, ok := p.topics[t] if !ok { @@ -244,11 +271,11 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { } delete(tmap, rpc.from) } - case "pub": - //fmt.Println("incoming message! ", rpc.from) + case PubMessageType: if rpc.Msg == nil { return fmt.Errorf("nil pub message") } + // Note: Obviously this is an incredibly insecure way of // filtering out "messages we've already seen". But it works for a // cool demo, so i'm not gonna waste time thinking about it any more @@ -282,7 +309,7 @@ func (p *PubSub) publishMessage(rpc *RPC) error { } for pid, _ := range tmap { - if pid == rpc.from { + if pid == rpc.from || pid == rpc.Msg.From { continue } @@ -298,8 +325,9 @@ func (p *PubSub) publishMessage(rpc *RPC) error { } type addSub struct { - topic string - resp chan chan *Message + topic string + cancel bool + resp chan chan *Message } func (p *PubSub) Subscribe(topic string) (<-chan *Message, error) { @@ -318,7 +346,10 @@ func (p *PubSub) Subscribe(topic string) (<-chan *Message, error) { } func (p *PubSub) Unsub(topic string) { - panic("NYI") + p.addSub <- &addSub{ + topic: topic, + cancel: true, + } } func (p *PubSub) Publish(topic string, data []byte) error { @@ -329,16 +360,11 @@ func (p *PubSub) Publish(topic string, data []byte) error { From: p.host.ID(), Timestamp: uint64(time.Now().UnixNano()), }, - Type: "pub", + Type: PubMessageType, } return nil } -func (p *PubSub) sendHelloPacket(s inet.Stream) error { - hello := p.getHelloPacket() - return writeRPC(s, hello) -} - func writeRPC(s inet.Stream, rpc *RPC) error { data, err := json.Marshal(rpc) if err != nil { diff --git a/dumbsub_test.go b/floodsub_test.go similarity index 85% rename from dumbsub_test.go rename to floodsub_test.go index 6fce43a..e9d9720 100644 --- a/dumbsub_test.go +++ b/floodsub_test.go @@ -1,4 +1,4 @@ -package dumbsub +package floodsub import ( "bytes" @@ -8,8 +8,8 @@ import ( "testing" "time" - host "github.com/libp2p/go-libp2p/p2p/host" - netutil "github.com/libp2p/go-libp2p/p2p/test/util" + host "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/host" + netutil "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/test/util" ) func getNetHosts(t *testing.T, n int) []host.Host { @@ -43,12 +43,12 @@ func connectAll(t *testing.T, hosts []host.Host) { } } -func TestBasicDumbsub(t *testing.T) { - hosts := getNetHosts(t, 10) +func TestBasicFloodsub(t *testing.T) { + hosts := getNetHosts(t, 20) var psubs []*PubSub for _, h := range hosts { - psubs = append(psubs, NewDumbSub(h)) + psubs = append(psubs, NewFloodSub(h)) } var msgs []<-chan *Message diff --git a/notify.go b/notify.go index ed23797..6bcaab6 100644 --- a/notify.go +++ b/notify.go @@ -1,11 +1,10 @@ -package dumbsub +package floodsub import ( "context" - "fmt" - ma "github.com/jbenet/go-multiaddr" - inet "github.com/libp2p/go-libp2p/p2p/net" + ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr" + inet "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/net" ) var _ inet.Notifiee = (*PubSub)(nil) @@ -19,7 +18,6 @@ func (p *PubSub) ClosedStream(n inet.Network, s inet.Stream) { } func (p *PubSub) Connected(n inet.Network, c inet.Conn) { - fmt.Printf("got connection! %s -> %s\n", c.LocalPeer(), c.RemotePeer()) s, err := p.host.NewStream(context.Background(), c.RemotePeer(), ID) if err != nil { log.Error("opening new stream to peer: ", err) diff --git a/package.json b/package.json index a834a62..328c6ff 100644 --- a/package.json +++ b/package.json @@ -9,9 +9,9 @@ "gxDependencies": [ { "author": "whyrusleeping", - "hash": "QmXnaDLonE9YBTVDdWBM6Jb5YxxmW1MHMkXzgsnu1jTEmK", + "hash": "Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS", "name": "go-libp2p", - "version": "3.4.3" + "version": "3.4.1" } ], "gxVersion": "0.9.0",