Merge pull request #149 from libp2p/feat/blacklist

Implement peer blacklist
This commit is contained in:
vyzo 2019-01-17 14:35:57 +02:00 committed by GitHub
commit 87e2f5619f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 246 additions and 3 deletions

53
blacklist.go Normal file
View File

@ -0,0 +1,53 @@
package pubsub
import (
lru "github.com/hashicorp/golang-lru"
peer "github.com/libp2p/go-libp2p-peer"
)
// Blacklist is an interface for peer blacklisting.
type Blacklist interface {
Add(peer.ID)
Contains(peer.ID) bool
}
// MapBlacklist is a blacklist implementation using a perfect map
type MapBlacklist map[peer.ID]struct{}
// NewMapBlacklist creates a new MapBlacklist
func NewMapBlacklist() Blacklist {
return MapBlacklist(make(map[peer.ID]struct{}))
}
func (b MapBlacklist) Add(p peer.ID) {
b[p] = struct{}{}
}
func (b MapBlacklist) Contains(p peer.ID) bool {
_, ok := b[p]
return ok
}
// LRUBlacklist is a blacklist implementation using an LRU cache
type LRUBlacklist struct {
lru *lru.Cache
}
// NewLRUBlacklist creates a new LRUBlacklist with capacity cap
func NewLRUBlacklist(cap int) (Blacklist, error) {
c, err := lru.New(cap)
if err != nil {
return nil, err
}
b := &LRUBlacklist{lru: c}
return b, nil
}
func (b LRUBlacklist) Add(p peer.ID) {
b.lru.Add(p, nil)
}
func (b LRUBlacklist) Contains(p peer.ID) bool {
return b.lru.Contains(p)
}

126
blacklist_test.go Normal file
View File

@ -0,0 +1,126 @@
package pubsub
import (
"context"
"testing"
"time"
peer "github.com/libp2p/go-libp2p-peer"
)
func TestMapBlacklist(t *testing.T) {
b := NewMapBlacklist()
p := peer.ID("test")
b.Add(p)
if !b.Contains(p) {
t.Fatal("peer not in the blacklist")
}
}
func TestLRUBlacklist(t *testing.T) {
b, err := NewLRUBlacklist(10)
if err != nil {
t.Fatal(err)
}
p := peer.ID("test")
b.Add(p)
if !b.Contains(p) {
t.Fatal("peer not in the blacklist")
}
}
func TestBlacklist(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
sub, err := psubs[1].Subscribe("test")
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 100)
psubs[1].BlacklistPeer(hosts[0].ID())
time.Sleep(time.Millisecond * 100)
psubs[0].Publish("test", []byte("message"))
wctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
_, err = sub.Next(wctx)
if err == nil {
t.Fatal("got message from blacklisted peer")
}
}
func TestBlacklist2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
_, err := psubs[0].Subscribe("test")
if err != nil {
t.Fatal(err)
}
sub1, err := psubs[1].Subscribe("test")
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 100)
psubs[1].BlacklistPeer(hosts[0].ID())
time.Sleep(time.Millisecond * 100)
psubs[0].Publish("test", []byte("message"))
wctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
_, err = sub1.Next(wctx)
if err == nil {
t.Fatal("got message from blacklisted peer")
}
}
func TestBlacklist3(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
psubs := getPubsubs(ctx, hosts)
psubs[1].BlacklistPeer(hosts[0].ID())
time.Sleep(time.Millisecond * 100)
connect(t, hosts[0], hosts[1])
sub, err := psubs[1].Subscribe("test")
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 100)
psubs[0].Publish("test", []byte("message"))
wctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
_, err = sub.Next(wctx)
if err == nil {
t.Fatal("got message from blacklisted peer")
}
}

View File

@ -77,6 +77,12 @@
"hash": "QmabLh8TrJ3emfAoQk5AbqbLTbMyj7XqumMFmAFxa9epo8", "hash": "QmabLh8TrJ3emfAoQk5AbqbLTbMyj7XqumMFmAFxa9epo8",
"name": "go-multistream", "name": "go-multistream",
"version": "0.3.9" "version": "0.3.9"
},
{
"author": "hashicorp",
"hash": "QmQjMHF8ptRgx4E57UFMiT4YM6kqaJeYxZ1MCDX23aw4rK",
"name": "golang-lru",
"version": "2017.10.18"
} }
], ],
"gxVersion": "0.9.0", "gxVersion": "0.9.0",
@ -86,3 +92,4 @@
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "0.11.10" "version": "0.11.10"
} }

View File

@ -98,6 +98,10 @@ type PubSub struct {
// eval thunk in event loop // eval thunk in event loop
eval chan func() eval chan func()
// peer blacklist
blacklist Blacklist
blacklistPeer chan peer.ID
peers map[peer.ID]chan *RPC peers map[peer.ID]chan *RPC
seenMessages *timecache.TimeCache seenMessages *timecache.TimeCache
@ -179,6 +183,8 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
topics: make(map[string]map[peer.ID]struct{}), topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC), peers: make(map[peer.ID]chan *RPC),
topicVals: make(map[string]*topicVal), topicVals: make(map[string]*topicVal),
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMessages: timecache.NewTimeCache(TimeCacheDuration), seenMessages: timecache.NewTimeCache(TimeCacheDuration),
counter: uint64(time.Now().UnixNano()), counter: uint64(time.Now().UnixNano()),
} }
@ -262,6 +268,15 @@ func WithStrictSignatureVerification(required bool) Option {
} }
} }
// WithBlacklist provides an implementation of the blacklist; the default is a
// MapBlacklist
func WithBlacklist(b Blacklist) Option {
return func(p *PubSub) error {
p.blacklist = b
return nil
}
}
// processLoop handles all inputs arriving on the channels // processLoop handles all inputs arriving on the channels
func (p *PubSub) processLoop(ctx context.Context) { func (p *PubSub) processLoop(ctx context.Context) {
defer func() { defer func() {
@ -276,12 +291,16 @@ func (p *PubSub) processLoop(ctx context.Context) {
for { for {
select { select {
case pid := <-p.newPeers: case pid := <-p.newPeers:
_, ok := p.peers[pid] if p.blacklist.Contains(pid) {
if ok {
log.Warning("already have connection to peer: ", pid) log.Warning("already have connection to peer: ", pid)
continue continue
} }
if p.blacklist.Contains(pid) {
log.Warning("ignoring connection from blacklisted peer: ", pid)
continue
}
messages := make(chan *RPC, 32) messages := make(chan *RPC, 32)
messages <- p.getHelloPacket() messages <- p.getHelloPacket()
go p.handleNewPeer(ctx, pid, messages) go p.handleNewPeer(ctx, pid, messages)
@ -290,13 +309,20 @@ func (p *PubSub) processLoop(ctx context.Context) {
case s := <-p.newPeerStream: case s := <-p.newPeerStream:
pid := s.Conn().RemotePeer() pid := s.Conn().RemotePeer()
_, ok := p.peers[pid] ch, ok := p.peers[pid]
if !ok { if !ok {
log.Warning("new stream for unknown peer: ", pid) log.Warning("new stream for unknown peer: ", pid)
s.Reset() s.Reset()
continue continue
} }
if p.blacklist.Contains(pid) {
log.Warning("closing stream for blacklisted peer: ", pid)
close(ch)
s.Reset()
continue
}
p.rt.AddPeer(pid, s.Protocol()) p.rt.AddPeer(pid, s.Protocol())
case pid := <-p.newPeerError: case pid := <-p.newPeerError:
@ -374,6 +400,20 @@ func (p *PubSub) processLoop(ctx context.Context) {
case thunk := <-p.eval: case thunk := <-p.eval:
thunk() thunk()
case pid := <-p.blacklistPeer:
log.Infof("Blacklisting peer %s", pid)
p.blacklist.Add(pid)
ch, ok := p.peers[pid]
if ok {
close(ch)
delete(p.peers, pid)
for _, t := range p.topics {
delete(t, pid)
}
p.rt.RemovePeer(pid)
}
case <-ctx.Done(): case <-ctx.Done():
log.Info("pubsub processloop shutting down") log.Info("pubsub processloop shutting down")
return return
@ -567,6 +607,18 @@ func msgID(pmsg *pb.Message) string {
// pushMsg pushes a message performing validation as necessary // pushMsg pushes a message performing validation as necessary
func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) {
// reject messages from blacklisted peers
if p.blacklist.Contains(src) {
log.Warningf("dropping message from blacklisted peer %s", src)
return
}
// even if they are forwarded by good peers
if p.blacklist.Contains(msg.GetFrom()) {
log.Warningf("dropping message from blacklisted source %s", src)
return
}
// reject unsigned messages when strict before we even process the id // reject unsigned messages when strict before we even process the id
if p.signStrict && msg.Signature == nil { if p.signStrict && msg.Signature == nil {
log.Debugf("dropping unsigned message from %s", src) log.Debugf("dropping unsigned message from %s", src)
@ -821,6 +873,11 @@ func (p *PubSub) ListPeers(topic string) []peer.ID {
return <-out return <-out
} }
// BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
func (p *PubSub) BlacklistPeer(pid peer.ID) {
p.blacklistPeer <- pid
}
// per topic validators // per topic validators
type addValReq struct { type addValReq struct {
topic string topic string