move the go-libp2p-blankhost here
This commit is contained in:
commit
f0d63ef033
|
@ -0,0 +1,241 @@
|
||||||
|
package blankhost
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-core/connmgr"
|
||||||
|
"github.com/libp2p/go-libp2p-core/event"
|
||||||
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
|
"github.com/libp2p/go-libp2p-core/network"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||||
|
"github.com/libp2p/go-libp2p-core/protocol"
|
||||||
|
"github.com/libp2p/go-libp2p-core/record"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-eventbus"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
|
||||||
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
|
mstream "github.com/multiformats/go-multistream"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("blankhost")
|
||||||
|
|
||||||
|
// BlankHost is the thinnest implementation of the host.Host interface
|
||||||
|
type BlankHost struct {
|
||||||
|
n network.Network
|
||||||
|
mux *mstream.MultistreamMuxer
|
||||||
|
cmgr connmgr.ConnManager
|
||||||
|
eventbus event.Bus
|
||||||
|
emitters struct {
|
||||||
|
evtLocalProtocolsUpdated event.Emitter
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type config struct {
|
||||||
|
cmgr connmgr.ConnManager
|
||||||
|
eventBus event.Bus
|
||||||
|
}
|
||||||
|
|
||||||
|
type Option = func(cfg *config)
|
||||||
|
|
||||||
|
func WithConnectionManager(cmgr connmgr.ConnManager) Option {
|
||||||
|
return func(cfg *config) {
|
||||||
|
cfg.cmgr = cmgr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithEventBus(eventBus event.Bus) Option {
|
||||||
|
return func(cfg *config) {
|
||||||
|
cfg.eventBus = eventBus
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBlankHost(n network.Network, options ...Option) *BlankHost {
|
||||||
|
cfg := config{
|
||||||
|
cmgr: &connmgr.NullConnMgr{},
|
||||||
|
}
|
||||||
|
for _, opt := range options {
|
||||||
|
opt(&cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
bh := &BlankHost{
|
||||||
|
n: n,
|
||||||
|
cmgr: cfg.cmgr,
|
||||||
|
mux: mstream.NewMultistreamMuxer(),
|
||||||
|
}
|
||||||
|
if bh.eventbus == nil {
|
||||||
|
bh.eventbus = eventbus.NewBus()
|
||||||
|
}
|
||||||
|
|
||||||
|
// subscribe the connection manager to network notifications (has no effect with NullConnMgr)
|
||||||
|
n.Notify(bh.cmgr.Notifee())
|
||||||
|
|
||||||
|
var err error
|
||||||
|
if bh.emitters.evtLocalProtocolsUpdated, err = bh.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
evtPeerConnectednessChanged, err := bh.eventbus.Emitter(&event.EvtPeerConnectednessChanged{})
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
n.Notify(newPeerConnectWatcher(evtPeerConnectednessChanged))
|
||||||
|
|
||||||
|
n.SetStreamHandler(bh.newStreamHandler)
|
||||||
|
|
||||||
|
// persist a signed peer record for self to the peerstore.
|
||||||
|
if err := bh.initSignedRecord(); err != nil {
|
||||||
|
log.Errorf("error creating blank host, err=%s", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return bh
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *BlankHost) initSignedRecord() error {
|
||||||
|
cab, ok := peerstore.GetCertifiedAddrBook(bh.n.Peerstore())
|
||||||
|
if !ok {
|
||||||
|
log.Error("peerstore does not support signed records")
|
||||||
|
return errors.New("peerstore does not support signed records")
|
||||||
|
}
|
||||||
|
rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{ID: bh.ID(), Addrs: bh.Addrs()})
|
||||||
|
ev, err := record.Seal(rec, bh.Peerstore().PrivKey(bh.ID()))
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to create signed record for self, err=%s", err)
|
||||||
|
return fmt.Errorf("failed to create signed record for self, err=%s", err)
|
||||||
|
}
|
||||||
|
_, err = cab.ConsumePeerRecord(ev, peerstore.PermanentAddrTTL)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to persist signed record to peerstore,err=%s", err)
|
||||||
|
return fmt.Errorf("failed to persist signed record for self, err=%s", err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ host.Host = (*BlankHost)(nil)
|
||||||
|
|
||||||
|
func (bh *BlankHost) Addrs() []ma.Multiaddr {
|
||||||
|
addrs, err := bh.n.InterfaceListenAddresses()
|
||||||
|
if err != nil {
|
||||||
|
log.Debug("error retrieving network interface addrs: ", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return addrs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *BlankHost) Close() error {
|
||||||
|
return bh.n.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *BlankHost) Connect(ctx context.Context, ai peer.AddrInfo) error {
|
||||||
|
// absorb addresses into peerstore
|
||||||
|
bh.Peerstore().AddAddrs(ai.ID, ai.Addrs, peerstore.TempAddrTTL)
|
||||||
|
|
||||||
|
cs := bh.n.ConnsToPeer(ai.ID)
|
||||||
|
if len(cs) > 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := bh.Network().DialPeer(ctx, ai.ID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *BlankHost) Peerstore() peerstore.Peerstore {
|
||||||
|
return bh.n.Peerstore()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *BlankHost) ID() peer.ID {
|
||||||
|
return bh.n.LocalPeer()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *BlankHost) NewStream(ctx context.Context, p peer.ID, protos ...protocol.ID) (network.Stream, error) {
|
||||||
|
s, err := bh.n.NewStream(ctx, p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var protoStrs []string
|
||||||
|
for _, pid := range protos {
|
||||||
|
protoStrs = append(protoStrs, string(pid))
|
||||||
|
}
|
||||||
|
|
||||||
|
selected, err := mstream.SelectOneOf(protoStrs, s)
|
||||||
|
if err != nil {
|
||||||
|
s.Reset()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
selpid := protocol.ID(selected)
|
||||||
|
s.SetProtocol(selpid)
|
||||||
|
bh.Peerstore().AddProtocols(p, selected)
|
||||||
|
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *BlankHost) RemoveStreamHandler(pid protocol.ID) {
|
||||||
|
bh.Mux().RemoveHandler(string(pid))
|
||||||
|
bh.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{
|
||||||
|
Removed: []protocol.ID{pid},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *BlankHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) {
|
||||||
|
bh.Mux().AddHandler(string(pid), func(p string, rwc io.ReadWriteCloser) error {
|
||||||
|
is := rwc.(network.Stream)
|
||||||
|
is.SetProtocol(protocol.ID(p))
|
||||||
|
handler(is)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
bh.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{
|
||||||
|
Added: []protocol.ID{pid},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *BlankHost) SetStreamHandlerMatch(pid protocol.ID, m func(string) bool, handler network.StreamHandler) {
|
||||||
|
bh.Mux().AddHandlerWithFunc(string(pid), m, func(p string, rwc io.ReadWriteCloser) error {
|
||||||
|
is := rwc.(network.Stream)
|
||||||
|
is.SetProtocol(protocol.ID(p))
|
||||||
|
handler(is)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
bh.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{
|
||||||
|
Added: []protocol.ID{pid},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// newStreamHandler is the remote-opened stream handler for network.Network
|
||||||
|
func (bh *BlankHost) newStreamHandler(s network.Stream) {
|
||||||
|
protoID, handle, err := bh.Mux().Negotiate(s)
|
||||||
|
if err != nil {
|
||||||
|
log.Infow("protocol negotiation failed", "error", err)
|
||||||
|
s.Reset()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.SetProtocol(protocol.ID(protoID))
|
||||||
|
|
||||||
|
go handle(protoID, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: i'm not sure this really needs to be here
|
||||||
|
func (bh *BlankHost) Mux() protocol.Switch {
|
||||||
|
return bh.mux
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: also not sure this fits... Might be better ways around this (leaky abstractions)
|
||||||
|
func (bh *BlankHost) Network() network.Network {
|
||||||
|
return bh.n
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *BlankHost) ConnManager() connmgr.ConnManager {
|
||||||
|
return bh.cmgr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *BlankHost) EventBus() event.Bus {
|
||||||
|
return bh.eventbus
|
||||||
|
}
|
|
@ -0,0 +1,73 @@
|
||||||
|
package blankhost
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-core/event"
|
||||||
|
"github.com/libp2p/go-libp2p-core/network"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
|
||||||
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
|
)
|
||||||
|
|
||||||
|
type peerConnectWatcher struct {
|
||||||
|
emitter event.Emitter
|
||||||
|
|
||||||
|
mutex sync.Mutex
|
||||||
|
connected map[peer.ID]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ network.Notifiee = &peerConnectWatcher{}
|
||||||
|
|
||||||
|
func newPeerConnectWatcher(emitter event.Emitter) *peerConnectWatcher {
|
||||||
|
return &peerConnectWatcher{
|
||||||
|
emitter: emitter,
|
||||||
|
connected: make(map[peer.ID]struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *peerConnectWatcher) Listen(network.Network, ma.Multiaddr) {}
|
||||||
|
func (w *peerConnectWatcher) ListenClose(network.Network, ma.Multiaddr) {}
|
||||||
|
func (w *peerConnectWatcher) OpenedStream(network.Network, network.Stream) {}
|
||||||
|
func (w *peerConnectWatcher) ClosedStream(network.Network, network.Stream) {}
|
||||||
|
|
||||||
|
func (w *peerConnectWatcher) Connected(n network.Network, conn network.Conn) {
|
||||||
|
p := conn.RemotePeer()
|
||||||
|
w.handleTransition(p, n.Connectedness(p))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *peerConnectWatcher) Disconnected(n network.Network, conn network.Conn) {
|
||||||
|
p := conn.RemotePeer()
|
||||||
|
w.handleTransition(p, n.Connectedness(p))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *peerConnectWatcher) handleTransition(p peer.ID, state network.Connectedness) {
|
||||||
|
if changed := w.checkTransition(p, state); !changed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.emitter.Emit(event.EvtPeerConnectednessChanged{
|
||||||
|
Peer: p,
|
||||||
|
Connectedness: state,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *peerConnectWatcher) checkTransition(p peer.ID, state network.Connectedness) bool {
|
||||||
|
w.mutex.Lock()
|
||||||
|
defer w.mutex.Unlock()
|
||||||
|
switch state {
|
||||||
|
case network.Connected:
|
||||||
|
if _, ok := w.connected[p]; ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
w.connected[p] = struct{}{}
|
||||||
|
return true
|
||||||
|
case network.NotConnected:
|
||||||
|
if _, ok := w.connected[p]; ok {
|
||||||
|
delete(w.connected, p)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,46 @@
|
||||||
|
package blankhost
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-core/event"
|
||||||
|
"github.com/libp2p/go-libp2p-core/network"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPeerConnectedness(t *testing.T) {
|
||||||
|
h1 := NewBlankHost(swarmt.GenSwarm(t))
|
||||||
|
defer h1.Close()
|
||||||
|
h2 := NewBlankHost(swarmt.GenSwarm(t))
|
||||||
|
|
||||||
|
sub1, err := h1.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer sub1.Close()
|
||||||
|
sub2, err := h2.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer sub2.Close()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}))
|
||||||
|
require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
|
||||||
|
Peer: h2.ID(),
|
||||||
|
Connectedness: network.Connected,
|
||||||
|
})
|
||||||
|
require.Equal(t, (<-sub2.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
|
||||||
|
Peer: h1.ID(),
|
||||||
|
Connectedness: network.Connected,
|
||||||
|
})
|
||||||
|
|
||||||
|
// now close h2. This will disconnect it from h1.
|
||||||
|
require.NoError(t, h2.Close())
|
||||||
|
require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
|
||||||
|
Peer: h2.ID(),
|
||||||
|
Connectedness: network.NotConnected,
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in New Issue