From cfb9a1dc96cfdeb72d5262ad3a73677be30837f1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 15 Jan 2019 16:07:58 +0200 Subject: [PATCH 1/7] implement peer blacklist --- pubsub.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/pubsub.go b/pubsub.go index 11824e8..8b4a8a7 100644 --- a/pubsub.go +++ b/pubsub.go @@ -98,6 +98,10 @@ type PubSub struct { // eval thunk in event loop eval chan func() + // peer blacklist + blacklist map[peer.ID]struct{} + blacklistPeer chan peer.ID + peers map[peer.ID]chan *RPC seenMessages *timecache.TimeCache @@ -179,6 +183,8 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), topicVals: make(map[string]*topicVal), + blacklist: make(map[peer.ID]struct{}), + blacklistPeer: make(chan peer.ID), seenMessages: timecache.NewTimeCache(TimeCacheDuration), counter: uint64(time.Now().UnixNano()), } @@ -374,6 +380,10 @@ func (p *PubSub) processLoop(ctx context.Context) { case thunk := <-p.eval: thunk() + case pid := <-p.blacklistPeer: + log.Infof("Blacklisting peer %s", pid) + p.blacklist[pid] = struct{}{} + case <-ctx.Done(): log.Info("pubsub processloop shutting down") return @@ -567,6 +577,12 @@ func msgID(pmsg *pb.Message) string { // pushMsg pushes a message performing validation as necessary func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { + // reject messages from blacklisted peers + if _, ok := p.blacklist[src]; ok { + log.Warningf("dropping message from blacklisted peer %s", src) + return + } + // reject unsigned messages when strict before we even process the id if p.signStrict && msg.Signature == nil { log.Debugf("dropping unsigned message from %s", src) @@ -821,6 +837,11 @@ func (p *PubSub) ListPeers(topic string) []peer.ID { return <-out } +// BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped. +func (p *PubSub) BlacklistPeer(pid peer.ID) { + p.blacklistPeer <- pid +} + // per topic validators type addValReq struct { topic string From 654b4e9bf365e1500c4d0762b4bce100a972fb2e Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 15 Jan 2019 18:31:21 +0200 Subject: [PATCH 2/7] close streams and ignore blacklisted peers --- pubsub.go | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/pubsub.go b/pubsub.go index 8b4a8a7..1a85642 100644 --- a/pubsub.go +++ b/pubsub.go @@ -288,6 +288,12 @@ func (p *PubSub) processLoop(ctx context.Context) { continue } + _, ok = p.blacklist[pid] + if ok { + log.Warning("ignoring connection from blacklisted peer: ", pid) + continue + } + messages := make(chan *RPC, 32) messages <- p.getHelloPacket() go p.handleNewPeer(ctx, pid, messages) @@ -296,13 +302,21 @@ func (p *PubSub) processLoop(ctx context.Context) { case s := <-p.newPeerStream: pid := s.Conn().RemotePeer() - _, ok := p.peers[pid] + ch, ok := p.peers[pid] if !ok { log.Warning("new stream for unknown peer: ", pid) s.Reset() continue } + _, ok = p.blacklist[pid] + if ok { + log.Warning("closing stream for blacklisted peer: ", pid) + close(ch) + s.Reset() + continue + } + p.rt.AddPeer(pid, s.Protocol()) case pid := <-p.newPeerError: @@ -384,6 +398,16 @@ func (p *PubSub) processLoop(ctx context.Context) { log.Infof("Blacklisting peer %s", pid) p.blacklist[pid] = struct{}{} + ch, ok := p.peers[pid] + if ok { + close(ch) + delete(p.peers, pid) + for _, t := range p.topics { + delete(t, pid) + } + p.rt.RemovePeer(pid) + } + case <-ctx.Done(): log.Info("pubsub processloop shutting down") return From 1a05f132512fea963dc214d3f6a80cf5d9cfa17d Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 15 Jan 2019 16:33:06 +0200 Subject: [PATCH 3/7] blacklist test --- blacklist_test.go | 97 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 blacklist_test.go diff --git a/blacklist_test.go b/blacklist_test.go new file mode 100644 index 0000000..67f8882 --- /dev/null +++ b/blacklist_test.go @@ -0,0 +1,97 @@ +package pubsub + +import ( + "context" + "testing" + "time" +) + +func TestBlacklist(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 2) + psubs := getPubsubs(ctx, hosts) + connect(t, hosts[0], hosts[1]) + + sub, err := psubs[1].Subscribe("test") + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond * 100) + psubs[1].BlacklistPeer(hosts[0].ID()) + time.Sleep(time.Millisecond * 100) + + psubs[0].Publish("test", []byte("message")) + + wctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + _, err = sub.Next(wctx) + + if err == nil { + t.Fatal("got message from blacklisted peer") + } +} + +func TestBlacklist2(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 2) + psubs := getPubsubs(ctx, hosts) + connect(t, hosts[0], hosts[1]) + + _, err := psubs[0].Subscribe("test") + if err != nil { + t.Fatal(err) + } + + sub1, err := psubs[1].Subscribe("test") + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond * 100) + psubs[1].BlacklistPeer(hosts[0].ID()) + time.Sleep(time.Millisecond * 100) + + psubs[0].Publish("test", []byte("message")) + + wctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + _, err = sub1.Next(wctx) + + if err == nil { + t.Fatal("got message from blacklisted peer") + } +} + +func TestBlacklist3(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 2) + psubs := getPubsubs(ctx, hosts) + + psubs[1].BlacklistPeer(hosts[0].ID()) + time.Sleep(time.Millisecond * 100) + connect(t, hosts[0], hosts[1]) + + sub, err := psubs[1].Subscribe("test") + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond * 100) + + psubs[0].Publish("test", []byte("message")) + + wctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + _, err = sub.Next(wctx) + + if err == nil { + t.Fatal("got message from blacklisted peer") + } +} From e0e995d889ad5d69a6e20e04027413269c5bc1a8 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 16 Jan 2019 12:26:26 +0200 Subject: [PATCH 4/7] add check for blacklisted sources --- pubsub.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pubsub.go b/pubsub.go index 1a85642..790db65 100644 --- a/pubsub.go +++ b/pubsub.go @@ -607,6 +607,12 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { return } + // even if they are forwarded by good peers + if _, ok := p.blacklist[msg.GetFrom()]; ok { + log.Warningf("dropping message from blacklisted source %s", src) + return + } + // reject unsigned messages when strict before we even process the id if p.signStrict && msg.Signature == nil { log.Debugf("dropping unsigned message from %s", src) From f0840875745075875af985ccdf88880393102003 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 17 Jan 2019 14:05:04 +0200 Subject: [PATCH 5/7] blacklist type and option --- blacklist.go | 28 ++++++++++++++++++++++++++++ pubsub.go | 28 +++++++++++++++++----------- 2 files changed, 45 insertions(+), 11 deletions(-) create mode 100644 blacklist.go diff --git a/blacklist.go b/blacklist.go new file mode 100644 index 0000000..17bb241 --- /dev/null +++ b/blacklist.go @@ -0,0 +1,28 @@ +package pubsub + +import ( + peer "github.com/libp2p/go-libp2p-peer" +) + +// Blacklist is an interface for peer blacklisting. +type Blacklist interface { + Add(peer.ID) + Contains(peer.ID) bool +} + +// MapBlacklist is a blacklist implementation using a perfect map +type MapBlacklist map[peer.ID]struct{} + +// NewMapBlacklist creates a new MapBlacklist +func NewMapBlacklist() Blacklist { + return MapBlacklist(make(map[peer.ID]struct{})) +} + +func (b MapBlacklist) Add(p peer.ID) { + b[p] = struct{}{} +} + +func (b MapBlacklist) Contains(p peer.ID) bool { + _, ok := b[p] + return ok +} diff --git a/pubsub.go b/pubsub.go index 790db65..3c5cebe 100644 --- a/pubsub.go +++ b/pubsub.go @@ -99,7 +99,7 @@ type PubSub struct { eval chan func() // peer blacklist - blacklist map[peer.ID]struct{} + blacklist Blacklist blacklistPeer chan peer.ID peers map[peer.ID]chan *RPC @@ -183,7 +183,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), topicVals: make(map[string]*topicVal), - blacklist: make(map[peer.ID]struct{}), + blacklist: NewMapBlacklist(), blacklistPeer: make(chan peer.ID), seenMessages: timecache.NewTimeCache(TimeCacheDuration), counter: uint64(time.Now().UnixNano()), @@ -268,6 +268,15 @@ func WithStrictSignatureVerification(required bool) Option { } } +// WithBlacklist provides an implementation of the blacklist; the default is a +// MapBlacklist +func WithBlacklist(b Blacklist) Option { + return func(p *PubSub) error { + p.blacklist = b + return nil + } +} + // processLoop handles all inputs arriving on the channels func (p *PubSub) processLoop(ctx context.Context) { defer func() { @@ -282,14 +291,12 @@ func (p *PubSub) processLoop(ctx context.Context) { for { select { case pid := <-p.newPeers: - _, ok := p.peers[pid] - if ok { + if p.blacklist.Contains(pid) { log.Warning("already have connection to peer: ", pid) continue } - _, ok = p.blacklist[pid] - if ok { + if p.blacklist.Contains(pid) { log.Warning("ignoring connection from blacklisted peer: ", pid) continue } @@ -309,8 +316,7 @@ func (p *PubSub) processLoop(ctx context.Context) { continue } - _, ok = p.blacklist[pid] - if ok { + if p.blacklist.Contains(pid) { log.Warning("closing stream for blacklisted peer: ", pid) close(ch) s.Reset() @@ -396,7 +402,7 @@ func (p *PubSub) processLoop(ctx context.Context) { case pid := <-p.blacklistPeer: log.Infof("Blacklisting peer %s", pid) - p.blacklist[pid] = struct{}{} + p.blacklist.Add(pid) ch, ok := p.peers[pid] if ok { @@ -602,13 +608,13 @@ func msgID(pmsg *pb.Message) string { // pushMsg pushes a message performing validation as necessary func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { // reject messages from blacklisted peers - if _, ok := p.blacklist[src]; ok { + if p.blacklist.Contains(src) { log.Warningf("dropping message from blacklisted peer %s", src) return } // even if they are forwarded by good peers - if _, ok := p.blacklist[msg.GetFrom()]; ok { + if p.blacklist.Contains(msg.GetFrom()) { log.Warningf("dropping message from blacklisted source %s", src) return } From f6ac3aab4faeaec37731c88b7520a9de15c8af20 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 17 Jan 2019 14:20:34 +0200 Subject: [PATCH 6/7] lru blackclist --- blacklist.go | 25 +++++++++++++++++++++++++ package.json | 7 +++++++ 2 files changed, 32 insertions(+) diff --git a/blacklist.go b/blacklist.go index 17bb241..84f2efb 100644 --- a/blacklist.go +++ b/blacklist.go @@ -1,6 +1,7 @@ package pubsub import ( + lru "github.com/hashicorp/golang-lru" peer "github.com/libp2p/go-libp2p-peer" ) @@ -26,3 +27,27 @@ func (b MapBlacklist) Contains(p peer.ID) bool { _, ok := b[p] return ok } + +// LRUBlacklist is a blacklist implementation using an LRU cache +type LRUBlacklist struct { + lru *lru.Cache +} + +// NewLRUBlacklist creates a new LRUBlacklist with capacity cap +func NewLRUBlacklist(cap int) (Blacklist, error) { + c, err := lru.New(cap) + if err != nil { + return nil, err + } + + b := &LRUBlacklist{lru: c} + return b, nil +} + +func (b LRUBlacklist) Add(p peer.ID) { + b.lru.Add(p, nil) +} + +func (b LRUBlacklist) Contains(p peer.ID) bool { + return b.lru.Contains(p) +} diff --git a/package.json b/package.json index dcd396c..92678ea 100644 --- a/package.json +++ b/package.json @@ -77,6 +77,12 @@ "hash": "QmabLh8TrJ3emfAoQk5AbqbLTbMyj7XqumMFmAFxa9epo8", "name": "go-multistream", "version": "0.3.9" + }, + { + "author": "hashicorp", + "hash": "QmQjMHF8ptRgx4E57UFMiT4YM6kqaJeYxZ1MCDX23aw4rK", + "name": "golang-lru", + "version": "2017.10.18" } ], "gxVersion": "0.9.0", @@ -86,3 +92,4 @@ "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", "version": "0.11.10" } + From 440c8b7331c7eac14007e924875b501083679d99 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 17 Jan 2019 14:26:06 +0200 Subject: [PATCH 7/7] test blacklist implementations --- blacklist_test.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/blacklist_test.go b/blacklist_test.go index 67f8882..514d9fa 100644 --- a/blacklist_test.go +++ b/blacklist_test.go @@ -4,8 +4,37 @@ import ( "context" "testing" "time" + + peer "github.com/libp2p/go-libp2p-peer" ) +func TestMapBlacklist(t *testing.T) { + b := NewMapBlacklist() + + p := peer.ID("test") + + b.Add(p) + if !b.Contains(p) { + t.Fatal("peer not in the blacklist") + } + +} + +func TestLRUBlacklist(t *testing.T) { + b, err := NewLRUBlacklist(10) + if err != nil { + t.Fatal(err) + } + + p := peer.ID("test") + + b.Add(p) + if !b.Contains(p) { + t.Fatal("peer not in the blacklist") + } + +} + func TestBlacklist(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()