mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-02 12:53:09 +00:00
first hack at it, kinda works for the most part
This commit is contained in:
commit
b69b777643
351
dumbsub.go
Normal file
351
dumbsub.go
Normal file
@ -0,0 +1,351 @@
|
||||
package dumbsub
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
peer "github.com/ipfs/go-libp2p-peer"
|
||||
logging "github.com/ipfs/go-log"
|
||||
host "github.com/libp2p/go-libp2p/p2p/host"
|
||||
inet "github.com/libp2p/go-libp2p/p2p/net"
|
||||
protocol "github.com/libp2p/go-libp2p/p2p/protocol"
|
||||
)
|
||||
|
||||
const ID = protocol.ID("/dumbsub/1.0.0")
|
||||
|
||||
var log = logging.Logger("dumbsub")
|
||||
|
||||
type PubSub struct {
|
||||
host host.Host
|
||||
|
||||
incoming chan *RPC
|
||||
outgoing chan *RPC
|
||||
newPeers chan inet.Stream
|
||||
peerDead chan peer.ID
|
||||
|
||||
myTopics map[string]chan *Message
|
||||
pubsubLk sync.Mutex
|
||||
|
||||
topics map[string]map[peer.ID]struct{}
|
||||
peers map[peer.ID]chan *RPC
|
||||
lastMsg map[peer.ID]uint64
|
||||
|
||||
addSub chan *addSub
|
||||
}
|
||||
|
||||
type Message struct {
|
||||
From peer.ID
|
||||
Data []byte
|
||||
Timestamp uint64
|
||||
Topic string
|
||||
}
|
||||
|
||||
func (m *Message) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(map[string]interface{}{
|
||||
"from": m.From.Pretty(),
|
||||
"data": m.Data,
|
||||
"timestamp": m.Timestamp,
|
||||
"topic": m.Topic,
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Message) UnmarshalJSON(data []byte) error {
|
||||
mp := struct {
|
||||
Data []byte
|
||||
Timestamp uint64
|
||||
Topic string
|
||||
From string
|
||||
}{}
|
||||
err := json.Unmarshal(data, &mp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.Data = mp.Data
|
||||
m.Timestamp = mp.Timestamp
|
||||
m.Topic = mp.Topic
|
||||
from, err := peer.IDB58Decode(mp.From)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.From = from
|
||||
return nil
|
||||
}
|
||||
|
||||
type RPC struct {
|
||||
Type string
|
||||
Msg *Message
|
||||
Topics []string
|
||||
|
||||
// unexported on purpose, not sending this over the wire
|
||||
from peer.ID
|
||||
}
|
||||
|
||||
func NewDumbSub(h host.Host) *PubSub {
|
||||
ps := &PubSub{
|
||||
host: h,
|
||||
incoming: make(chan *RPC, 32),
|
||||
outgoing: make(chan *RPC),
|
||||
newPeers: make(chan inet.Stream),
|
||||
myTopics: make(map[string]chan *Message),
|
||||
topics: make(map[string]map[peer.ID]struct{}),
|
||||
peers: make(map[peer.ID]chan *RPC),
|
||||
lastMsg: make(map[peer.ID]uint64),
|
||||
peerDead: make(chan peer.ID),
|
||||
addSub: make(chan *addSub),
|
||||
}
|
||||
|
||||
h.SetStreamHandler(ID, ps.handleNewStream)
|
||||
|
||||
h.Network().Notify(ps)
|
||||
|
||||
go ps.processLoop()
|
||||
|
||||
return ps
|
||||
}
|
||||
|
||||
func (p *PubSub) getHelloPacket() *RPC {
|
||||
var rpc RPC
|
||||
for t, _ := range p.myTopics {
|
||||
rpc.Topics = append(rpc.Topics, t)
|
||||
}
|
||||
rpc.Type = "add"
|
||||
return &rpc
|
||||
}
|
||||
|
||||
func (p *PubSub) handleNewStream(s inet.Stream) {
|
||||
defer s.Close()
|
||||
|
||||
scan := bufio.NewScanner(s)
|
||||
for scan.Scan() {
|
||||
rpc := new(RPC)
|
||||
|
||||
err := json.Unmarshal(scan.Bytes(), rpc)
|
||||
if err != nil {
|
||||
log.Errorf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
|
||||
log.Error("data: ", scan.Text())
|
||||
// TODO: cleanup of some sort
|
||||
return
|
||||
}
|
||||
|
||||
rpc.from = s.Conn().RemotePeer()
|
||||
p.incoming <- rpc
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) handleSendingMessages(s inet.Stream, in <-chan *RPC) {
|
||||
var dead bool
|
||||
for rpc := range in {
|
||||
if dead {
|
||||
continue
|
||||
}
|
||||
|
||||
err := writeRPC(s, rpc)
|
||||
if err != nil {
|
||||
log.Errorf("writing message to %s: %s", s.Conn().RemotePeer(), err)
|
||||
dead = true
|
||||
go func() {
|
||||
p.peerDead <- s.Conn().RemotePeer()
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) processLoop() {
|
||||
|
||||
for {
|
||||
select {
|
||||
case s := <-p.newPeers:
|
||||
pid := s.Conn().RemotePeer()
|
||||
_, ok := p.peers[pid]
|
||||
if ok {
|
||||
log.Error("already have connection to peer: ", pid)
|
||||
s.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
messages := make(chan *RPC, 32)
|
||||
go p.handleSendingMessages(s, messages)
|
||||
messages <- p.getHelloPacket()
|
||||
|
||||
p.peers[pid] = messages
|
||||
|
||||
fmt.Println("added peer: ", pid)
|
||||
case pid := <-p.peerDead:
|
||||
delete(p.peers, pid)
|
||||
case sub := <-p.addSub:
|
||||
_, ok := p.myTopics[sub.topic]
|
||||
if ok {
|
||||
// we don't allow multiple subs per topic at this point
|
||||
sub.resp <- nil
|
||||
continue
|
||||
}
|
||||
|
||||
resp := make(chan *Message, 16)
|
||||
p.myTopics[sub.topic] = resp
|
||||
sub.resp <- resp
|
||||
|
||||
case rpc := <-p.incoming:
|
||||
err := p.handleIncomingRPC(rpc)
|
||||
if err != nil {
|
||||
log.Error("handling RPC: ", err)
|
||||
}
|
||||
case rpc := <-p.outgoing:
|
||||
switch rpc.Type {
|
||||
case "add", "del":
|
||||
for _, mch := range p.peers {
|
||||
mch <- rpc
|
||||
}
|
||||
case "pub":
|
||||
//fmt.Println("publishing outgoing message")
|
||||
err := p.recvMessage(rpc)
|
||||
if err != nil {
|
||||
log.Error("error receiving message: ", err)
|
||||
}
|
||||
|
||||
err = p.publishMessage(rpc)
|
||||
if err != nil {
|
||||
log.Error("publishing message: ", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) recvMessage(rpc *RPC) error {
|
||||
subch, ok := p.myTopics[rpc.Msg.Topic]
|
||||
if ok {
|
||||
//fmt.Println("writing out to subscriber!")
|
||||
subch <- rpc.Msg
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
|
||||
switch rpc.Type {
|
||||
case "add":
|
||||
for _, t := range rpc.Topics {
|
||||
tmap, ok := p.topics[t]
|
||||
if !ok {
|
||||
tmap = make(map[peer.ID]struct{})
|
||||
p.topics[t] = tmap
|
||||
}
|
||||
|
||||
tmap[rpc.from] = struct{}{}
|
||||
}
|
||||
case "del":
|
||||
for _, t := range rpc.Topics {
|
||||
tmap, ok := p.topics[t]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
delete(tmap, rpc.from)
|
||||
}
|
||||
case "pub":
|
||||
//fmt.Println("incoming message! ", rpc.from)
|
||||
if rpc.Msg == nil {
|
||||
return fmt.Errorf("nil pub message")
|
||||
}
|
||||
// Note: Obviously this is an incredibly insecure way of
|
||||
// filtering out "messages we've already seen". But it works for a
|
||||
// cool demo, so i'm not gonna waste time thinking about it any more
|
||||
if p.lastMsg[rpc.Msg.From] >= rpc.Msg.Timestamp {
|
||||
//log.Error("skipping 'old' message")
|
||||
return nil
|
||||
}
|
||||
|
||||
if rpc.Msg.From == p.host.ID() {
|
||||
return nil
|
||||
}
|
||||
|
||||
p.lastMsg[rpc.Msg.From] = rpc.Msg.Timestamp
|
||||
|
||||
if err := p.recvMessage(rpc); err != nil {
|
||||
log.Error("error receiving message: ", err)
|
||||
}
|
||||
|
||||
err := p.publishMessage(rpc)
|
||||
if err != nil {
|
||||
log.Error("publish message: ", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PubSub) publishMessage(rpc *RPC) error {
|
||||
tmap, ok := p.topics[rpc.Msg.Topic]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
for pid, _ := range tmap {
|
||||
if pid == rpc.from {
|
||||
continue
|
||||
}
|
||||
|
||||
mch, ok := p.peers[pid]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
go func() { mch <- rpc }()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type addSub struct {
|
||||
topic string
|
||||
resp chan chan *Message
|
||||
}
|
||||
|
||||
func (p *PubSub) Subscribe(topic string) (<-chan *Message, error) {
|
||||
resp := make(chan chan *Message)
|
||||
p.addSub <- &addSub{
|
||||
topic: topic,
|
||||
resp: resp,
|
||||
}
|
||||
|
||||
outch := <-resp
|
||||
if outch == nil {
|
||||
return nil, fmt.Errorf("error, duplicate subscription")
|
||||
}
|
||||
|
||||
return outch, nil
|
||||
}
|
||||
|
||||
func (p *PubSub) Unsub(topic string) {
|
||||
panic("NYI")
|
||||
}
|
||||
|
||||
func (p *PubSub) Publish(topic string, data []byte) error {
|
||||
p.outgoing <- &RPC{
|
||||
Msg: &Message{
|
||||
Data: data,
|
||||
Topic: topic,
|
||||
From: p.host.ID(),
|
||||
Timestamp: uint64(time.Now().UnixNano()),
|
||||
},
|
||||
Type: "pub",
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PubSub) sendHelloPacket(s inet.Stream) error {
|
||||
hello := p.getHelloPacket()
|
||||
return writeRPC(s, hello)
|
||||
}
|
||||
|
||||
func writeRPC(s inet.Stream, rpc *RPC) error {
|
||||
data, err := json.Marshal(rpc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data = append(data, '\n')
|
||||
_, err = s.Write(data)
|
||||
return err
|
||||
}
|
||||
97
dumbsub_test.go
Normal file
97
dumbsub_test.go
Normal file
@ -0,0 +1,97 @@
|
||||
package dumbsub
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
host "github.com/libp2p/go-libp2p/p2p/host"
|
||||
netutil "github.com/libp2p/go-libp2p/p2p/test/util"
|
||||
)
|
||||
|
||||
func getNetHosts(t *testing.T, n int) []host.Host {
|
||||
var out []host.Host
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
h := netutil.GenHostSwarm(t, context.Background())
|
||||
out = append(out, h)
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func connect(t *testing.T, a, b host.Host) {
|
||||
pinfo := a.Peerstore().PeerInfo(a.ID())
|
||||
err := b.Connect(context.Background(), pinfo)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func connectAll(t *testing.T, hosts []host.Host) {
|
||||
for i, a := range hosts {
|
||||
for j, b := range hosts {
|
||||
if i == j {
|
||||
continue
|
||||
}
|
||||
|
||||
connect(t, a, b)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBasicDumbsub(t *testing.T) {
|
||||
hosts := getNetHosts(t, 10)
|
||||
|
||||
var psubs []*PubSub
|
||||
for _, h := range hosts {
|
||||
psubs = append(psubs, NewDumbSub(h))
|
||||
}
|
||||
|
||||
var msgs []<-chan *Message
|
||||
for _, ps := range psubs {
|
||||
subch, err := ps.Subscribe("foobar")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
msgs = append(msgs, subch)
|
||||
}
|
||||
|
||||
connectAll(t, hosts)
|
||||
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
psubs[0].Publish("foobar", []byte("ipfs rocks"))
|
||||
|
||||
for i, resp := range msgs {
|
||||
fmt.Printf("reading message from peer %d\n", i)
|
||||
msg := <-resp
|
||||
fmt.Printf("%s - %d: topic %s, from %s: %s\n", time.Now(), i, msg.Topic, msg.From, string(msg.Data))
|
||||
}
|
||||
|
||||
psubs[2].Publish("foobar", []byte("libp2p is cool too"))
|
||||
for i, resp := range msgs {
|
||||
fmt.Printf("reading message from peer %d\n", i)
|
||||
msg := <-resp
|
||||
fmt.Printf("%s - %d: topic %s, from %s: %s\n", time.Now(), i, msg.Topic, msg.From, string(msg.Data))
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
fmt.Println("loop: ", i)
|
||||
msg := []byte(fmt.Sprintf("%d the flooooooood %d", i, i))
|
||||
|
||||
owner := rand.Intn(len(psubs))
|
||||
|
||||
psubs[owner].Publish("foobar", msg)
|
||||
|
||||
for _, resp := range msgs {
|
||||
got := <-resp
|
||||
if !bytes.Equal(msg, got.Data) {
|
||||
t.Fatal("got wrong message!")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
42
notify.go
Normal file
42
notify.go
Normal file
@ -0,0 +1,42 @@
|
||||
package dumbsub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
ma "github.com/jbenet/go-multiaddr"
|
||||
inet "github.com/libp2p/go-libp2p/p2p/net"
|
||||
)
|
||||
|
||||
var _ inet.Notifiee = (*PubSub)(nil)
|
||||
|
||||
func (p *PubSub) OpenedStream(n inet.Network, s inet.Stream) {
|
||||
|
||||
}
|
||||
|
||||
func (p *PubSub) ClosedStream(n inet.Network, s inet.Stream) {
|
||||
|
||||
}
|
||||
|
||||
func (p *PubSub) Connected(n inet.Network, c inet.Conn) {
|
||||
fmt.Printf("got connection! %s -> %s\n", c.LocalPeer(), c.RemotePeer())
|
||||
s, err := p.host.NewStream(context.Background(), c.RemotePeer(), ID)
|
||||
if err != nil {
|
||||
log.Error("opening new stream to peer: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
p.newPeers <- s
|
||||
}
|
||||
|
||||
func (p *PubSub) Disconnected(n inet.Network, c inet.Conn) {
|
||||
|
||||
}
|
||||
|
||||
func (p *PubSub) Listen(n inet.Network, _ ma.Multiaddr) {
|
||||
|
||||
}
|
||||
|
||||
func (p *PubSub) ListenClose(n inet.Network, _ ma.Multiaddr) {
|
||||
|
||||
}
|
||||
22
package.json
Normal file
22
package.json
Normal file
@ -0,0 +1,22 @@
|
||||
{
|
||||
"author": "whyrusleeping",
|
||||
"bugs": {
|
||||
"url": "https://github.com/whyrusleeping/dumbsub"
|
||||
},
|
||||
"gx": {
|
||||
"dvcsimport": "github.com/whyrusleeping/dumbsub"
|
||||
},
|
||||
"gxDependencies": [
|
||||
{
|
||||
"author": "whyrusleeping",
|
||||
"hash": "QmXnaDLonE9YBTVDdWBM6Jb5YxxmW1MHMkXzgsnu1jTEmK",
|
||||
"name": "go-libp2p",
|
||||
"version": "3.4.3"
|
||||
}
|
||||
],
|
||||
"gxVersion": "0.9.0",
|
||||
"language": "go",
|
||||
"license": "",
|
||||
"name": "dumbsub",
|
||||
"version": "0.0.0"
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user