commit b69b777643aa0e53c6a3b3344992ce17249afcdb Author: Jeromy Date: Fri Sep 9 17:15:39 2016 -0700 first hack at it, kinda works for the most part diff --git a/dumbsub.go b/dumbsub.go new file mode 100644 index 0000000..8387a92 --- /dev/null +++ b/dumbsub.go @@ -0,0 +1,351 @@ +package dumbsub + +import ( + "bufio" + "encoding/json" + "fmt" + "sync" + "time" + + peer "github.com/ipfs/go-libp2p-peer" + logging "github.com/ipfs/go-log" + host "github.com/libp2p/go-libp2p/p2p/host" + inet "github.com/libp2p/go-libp2p/p2p/net" + protocol "github.com/libp2p/go-libp2p/p2p/protocol" +) + +const ID = protocol.ID("/dumbsub/1.0.0") + +var log = logging.Logger("dumbsub") + +type PubSub struct { + host host.Host + + incoming chan *RPC + outgoing chan *RPC + newPeers chan inet.Stream + peerDead chan peer.ID + + myTopics map[string]chan *Message + pubsubLk sync.Mutex + + topics map[string]map[peer.ID]struct{} + peers map[peer.ID]chan *RPC + lastMsg map[peer.ID]uint64 + + addSub chan *addSub +} + +type Message struct { + From peer.ID + Data []byte + Timestamp uint64 + Topic string +} + +func (m *Message) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]interface{}{ + "from": m.From.Pretty(), + "data": m.Data, + "timestamp": m.Timestamp, + "topic": m.Topic, + }) +} + +func (m *Message) UnmarshalJSON(data []byte) error { + mp := struct { + Data []byte + Timestamp uint64 + Topic string + From string + }{} + err := json.Unmarshal(data, &mp) + if err != nil { + return err + } + + m.Data = mp.Data + m.Timestamp = mp.Timestamp + m.Topic = mp.Topic + from, err := peer.IDB58Decode(mp.From) + if err != nil { + return err + } + m.From = from + return nil +} + +type RPC struct { + Type string + Msg *Message + Topics []string + + // unexported on purpose, not sending this over the wire + from peer.ID +} + +func NewDumbSub(h host.Host) *PubSub { + ps := &PubSub{ + host: h, + incoming: make(chan *RPC, 32), + outgoing: make(chan *RPC), + newPeers: make(chan inet.Stream), + myTopics: make(map[string]chan *Message), + topics: make(map[string]map[peer.ID]struct{}), + peers: make(map[peer.ID]chan *RPC), + lastMsg: make(map[peer.ID]uint64), + peerDead: make(chan peer.ID), + addSub: make(chan *addSub), + } + + h.SetStreamHandler(ID, ps.handleNewStream) + + h.Network().Notify(ps) + + go ps.processLoop() + + return ps +} + +func (p *PubSub) getHelloPacket() *RPC { + var rpc RPC + for t, _ := range p.myTopics { + rpc.Topics = append(rpc.Topics, t) + } + rpc.Type = "add" + return &rpc +} + +func (p *PubSub) handleNewStream(s inet.Stream) { + defer s.Close() + + scan := bufio.NewScanner(s) + for scan.Scan() { + rpc := new(RPC) + + err := json.Unmarshal(scan.Bytes(), rpc) + if err != nil { + log.Errorf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err) + log.Error("data: ", scan.Text()) + // TODO: cleanup of some sort + return + } + + rpc.from = s.Conn().RemotePeer() + p.incoming <- rpc + } +} + +func (p *PubSub) handleSendingMessages(s inet.Stream, in <-chan *RPC) { + var dead bool + for rpc := range in { + if dead { + continue + } + + err := writeRPC(s, rpc) + if err != nil { + log.Errorf("writing message to %s: %s", s.Conn().RemotePeer(), err) + dead = true + go func() { + p.peerDead <- s.Conn().RemotePeer() + }() + } + } +} + +func (p *PubSub) processLoop() { + + for { + select { + case s := <-p.newPeers: + pid := s.Conn().RemotePeer() + _, ok := p.peers[pid] + if ok { + log.Error("already have connection to peer: ", pid) + s.Close() + continue + } + + messages := make(chan *RPC, 32) + go p.handleSendingMessages(s, messages) + messages <- p.getHelloPacket() + + p.peers[pid] = messages + + fmt.Println("added peer: ", pid) + case pid := <-p.peerDead: + delete(p.peers, pid) + case sub := <-p.addSub: + _, ok := p.myTopics[sub.topic] + if ok { + // we don't allow multiple subs per topic at this point + sub.resp <- nil + continue + } + + resp := make(chan *Message, 16) + p.myTopics[sub.topic] = resp + sub.resp <- resp + + case rpc := <-p.incoming: + err := p.handleIncomingRPC(rpc) + if err != nil { + log.Error("handling RPC: ", err) + } + case rpc := <-p.outgoing: + switch rpc.Type { + case "add", "del": + for _, mch := range p.peers { + mch <- rpc + } + case "pub": + //fmt.Println("publishing outgoing message") + err := p.recvMessage(rpc) + if err != nil { + log.Error("error receiving message: ", err) + } + + err = p.publishMessage(rpc) + if err != nil { + log.Error("publishing message: ", err) + } + } + } + } +} + +func (p *PubSub) recvMessage(rpc *RPC) error { + subch, ok := p.myTopics[rpc.Msg.Topic] + if ok { + //fmt.Println("writing out to subscriber!") + subch <- rpc.Msg + } + return nil +} + +func (p *PubSub) handleIncomingRPC(rpc *RPC) error { + switch rpc.Type { + case "add": + for _, t := range rpc.Topics { + tmap, ok := p.topics[t] + if !ok { + tmap = make(map[peer.ID]struct{}) + p.topics[t] = tmap + } + + tmap[rpc.from] = struct{}{} + } + case "del": + for _, t := range rpc.Topics { + tmap, ok := p.topics[t] + if !ok { + return nil + } + delete(tmap, rpc.from) + } + case "pub": + //fmt.Println("incoming message! ", rpc.from) + if rpc.Msg == nil { + return fmt.Errorf("nil pub message") + } + // Note: Obviously this is an incredibly insecure way of + // filtering out "messages we've already seen". But it works for a + // cool demo, so i'm not gonna waste time thinking about it any more + if p.lastMsg[rpc.Msg.From] >= rpc.Msg.Timestamp { + //log.Error("skipping 'old' message") + return nil + } + + if rpc.Msg.From == p.host.ID() { + return nil + } + + p.lastMsg[rpc.Msg.From] = rpc.Msg.Timestamp + + if err := p.recvMessage(rpc); err != nil { + log.Error("error receiving message: ", err) + } + + err := p.publishMessage(rpc) + if err != nil { + log.Error("publish message: ", err) + } + } + return nil +} + +func (p *PubSub) publishMessage(rpc *RPC) error { + tmap, ok := p.topics[rpc.Msg.Topic] + if !ok { + return nil + } + + for pid, _ := range tmap { + if pid == rpc.from { + continue + } + + mch, ok := p.peers[pid] + if !ok { + continue + } + + go func() { mch <- rpc }() + } + + return nil +} + +type addSub struct { + topic string + resp chan chan *Message +} + +func (p *PubSub) Subscribe(topic string) (<-chan *Message, error) { + resp := make(chan chan *Message) + p.addSub <- &addSub{ + topic: topic, + resp: resp, + } + + outch := <-resp + if outch == nil { + return nil, fmt.Errorf("error, duplicate subscription") + } + + return outch, nil +} + +func (p *PubSub) Unsub(topic string) { + panic("NYI") +} + +func (p *PubSub) Publish(topic string, data []byte) error { + p.outgoing <- &RPC{ + Msg: &Message{ + Data: data, + Topic: topic, + From: p.host.ID(), + Timestamp: uint64(time.Now().UnixNano()), + }, + Type: "pub", + } + return nil +} + +func (p *PubSub) sendHelloPacket(s inet.Stream) error { + hello := p.getHelloPacket() + return writeRPC(s, hello) +} + +func writeRPC(s inet.Stream, rpc *RPC) error { + data, err := json.Marshal(rpc) + if err != nil { + return err + } + + data = append(data, '\n') + _, err = s.Write(data) + return err +} diff --git a/dumbsub_test.go b/dumbsub_test.go new file mode 100644 index 0000000..6fce43a --- /dev/null +++ b/dumbsub_test.go @@ -0,0 +1,97 @@ +package dumbsub + +import ( + "bytes" + "context" + "fmt" + "math/rand" + "testing" + "time" + + host "github.com/libp2p/go-libp2p/p2p/host" + netutil "github.com/libp2p/go-libp2p/p2p/test/util" +) + +func getNetHosts(t *testing.T, n int) []host.Host { + var out []host.Host + + for i := 0; i < n; i++ { + h := netutil.GenHostSwarm(t, context.Background()) + out = append(out, h) + } + + return out +} + +func connect(t *testing.T, a, b host.Host) { + pinfo := a.Peerstore().PeerInfo(a.ID()) + err := b.Connect(context.Background(), pinfo) + if err != nil { + t.Fatal(err) + } +} + +func connectAll(t *testing.T, hosts []host.Host) { + for i, a := range hosts { + for j, b := range hosts { + if i == j { + continue + } + + connect(t, a, b) + } + } +} + +func TestBasicDumbsub(t *testing.T) { + hosts := getNetHosts(t, 10) + + var psubs []*PubSub + for _, h := range hosts { + psubs = append(psubs, NewDumbSub(h)) + } + + var msgs []<-chan *Message + for _, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + connectAll(t, hosts) + + time.Sleep(time.Millisecond * 100) + psubs[0].Publish("foobar", []byte("ipfs rocks")) + + for i, resp := range msgs { + fmt.Printf("reading message from peer %d\n", i) + msg := <-resp + fmt.Printf("%s - %d: topic %s, from %s: %s\n", time.Now(), i, msg.Topic, msg.From, string(msg.Data)) + } + + psubs[2].Publish("foobar", []byte("libp2p is cool too")) + for i, resp := range msgs { + fmt.Printf("reading message from peer %d\n", i) + msg := <-resp + fmt.Printf("%s - %d: topic %s, from %s: %s\n", time.Now(), i, msg.Topic, msg.From, string(msg.Data)) + } + + for i := 0; i < 100; i++ { + fmt.Println("loop: ", i) + msg := []byte(fmt.Sprintf("%d the flooooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + + for _, resp := range msgs { + got := <-resp + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} diff --git a/notify.go b/notify.go new file mode 100644 index 0000000..ed23797 --- /dev/null +++ b/notify.go @@ -0,0 +1,42 @@ +package dumbsub + +import ( + "context" + "fmt" + + ma "github.com/jbenet/go-multiaddr" + inet "github.com/libp2p/go-libp2p/p2p/net" +) + +var _ inet.Notifiee = (*PubSub)(nil) + +func (p *PubSub) OpenedStream(n inet.Network, s inet.Stream) { + +} + +func (p *PubSub) ClosedStream(n inet.Network, s inet.Stream) { + +} + +func (p *PubSub) Connected(n inet.Network, c inet.Conn) { + fmt.Printf("got connection! %s -> %s\n", c.LocalPeer(), c.RemotePeer()) + s, err := p.host.NewStream(context.Background(), c.RemotePeer(), ID) + if err != nil { + log.Error("opening new stream to peer: ", err) + return + } + + p.newPeers <- s +} + +func (p *PubSub) Disconnected(n inet.Network, c inet.Conn) { + +} + +func (p *PubSub) Listen(n inet.Network, _ ma.Multiaddr) { + +} + +func (p *PubSub) ListenClose(n inet.Network, _ ma.Multiaddr) { + +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..a834a62 --- /dev/null +++ b/package.json @@ -0,0 +1,22 @@ +{ + "author": "whyrusleeping", + "bugs": { + "url": "https://github.com/whyrusleeping/dumbsub" + }, + "gx": { + "dvcsimport": "github.com/whyrusleeping/dumbsub" + }, + "gxDependencies": [ + { + "author": "whyrusleeping", + "hash": "QmXnaDLonE9YBTVDdWBM6Jb5YxxmW1MHMkXzgsnu1jTEmK", + "name": "go-libp2p", + "version": "3.4.3" + } + ], + "gxVersion": "0.9.0", + "language": "go", + "license": "", + "name": "dumbsub", + "version": "0.0.0" +}