feat(mock): reliable notifications

* Export StreamComplement/ConnComplement convenience functions.
* Make the TestNotifications test pass reliably, even when we have a bunch of
streams (identify, etc.).
* Make the mock net order disconnect events after connect events.
* Make closing one side of a connection actually close both sides.
* Make it possible to extract a mock stream's complement.
* Fire remote events at the same time as the local events.
This commit is contained in:
Steven Allen 2020-03-09 17:04:37 -07:00
parent 4bbf43e8b2
commit 5f75aa2068
6 changed files with 164 additions and 136 deletions

View File

@ -0,0 +1,17 @@
package mocknet
import (
"github.com/libp2p/go-libp2p-core/network"
)
// StreamComplement returns the other end of the given stream. This function
// panics when passed a non-mocknet stream.
func StreamComplement(s network.Stream) network.Stream {
return s.(*stream).rstream
}
// ConnComplement returns the other end of the given connection. This function
// panics when passed a non-mocknet connection.
func ConnComplement(c network.Conn) network.Conn {
return c.(*conn).rconn
}

View File

@ -15,6 +15,8 @@ import (
// live connection between two peers.
// it goes over a particular link.
type conn struct {
notifLk sync.Mutex
local peer.ID
remote peer.ID
@ -34,8 +36,8 @@ type conn struct {
sync.RWMutex
}
func newConn(ln, rn *peernet, l *link, dir network.Direction) *conn {
c := &conn{net: ln, link: l}
func newConn(p process.Process, ln, rn *peernet, l *link, dir network.Direction) *conn {
c := &conn{net: ln, link: l, proc: p}
c.local = ln.peer
c.remote = rn.peer
c.stat = network.Stat{Direction: dir}
@ -46,7 +48,7 @@ func newConn(ln, rn *peernet, l *link, dir network.Direction) *conn {
c.localPrivKey = ln.ps.PrivKey(ln.peer)
c.remotePubKey = rn.ps.PubKey(rn.peer)
c.proc = process.WithTeardown(c.teardown)
c.proc.AddChild(process.WithTeardown(c.teardown))
return c
}
@ -59,6 +61,9 @@ func (c *conn) teardown() error {
s.Reset()
}
c.net.removeConn(c)
c.notifLk.Lock()
defer c.notifLk.Unlock()
c.net.notifyAll(func(n network.Notifiee) {
n.Disconnected(c.net, c)
})
@ -69,18 +74,29 @@ func (c *conn) addStream(s *stream) {
c.Lock()
s.conn = c
c.streams.PushBack(s)
s.notifLk.Lock()
defer s.notifLk.Unlock()
c.Unlock()
c.net.notifyAll(func(n network.Notifiee) {
n.OpenedStream(c.net, s)
})
}
func (c *conn) removeStream(s *stream) {
c.Lock()
defer c.Unlock()
for e := c.streams.Front(); e != nil; e = e.Next() {
if s == e.Value {
c.streams.Remove(e)
return
break
}
}
c.Unlock()
s.notifLk.Lock()
defer s.notifLk.Unlock()
s.conn.net.notifyAll(func(n network.Notifiee) {
n.ClosedStream(s.conn.net, s)
})
}
func (c *conn) allStreams() []network.Stream {
@ -98,18 +114,12 @@ func (c *conn) allStreams() []network.Stream {
func (c *conn) remoteOpenedStream(s *stream) {
c.addStream(s)
c.net.handleNewStream(s)
c.net.notifyAll(func(n network.Notifiee) {
n.OpenedStream(c.net, s)
})
}
func (c *conn) openStream() *stream {
sl, sr := c.link.newStreamPair()
sl, sr := newStreamPair()
go c.rconn.remoteOpenedStream(sr)
c.addStream(sl)
c.net.notifyAll(func(n network.Notifiee) {
n.OpenedStream(c.net, sl)
})
c.rconn.remoteOpenedStream(sr)
return sl
}

View File

@ -1,13 +1,13 @@
package mocknet
import (
// "fmt"
"io"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
process "github.com/jbenet/goprocess"
)
// link implements mocknet.Link
@ -33,8 +33,9 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
l.RLock()
defer l.RUnlock()
c1 := newConn(l.nets[0], l.nets[1], l, network.DirOutbound)
c2 := newConn(l.nets[1], l.nets[0], l, network.DirInbound)
parent := process.WithTeardown(func() error { return nil })
c1 := newConn(parent, l.nets[0], l.nets[1], l, network.DirOutbound)
c2 := newConn(parent, l.nets[1], l.nets[0], l, network.DirInbound)
c1.rconn = c2
c2.rconn = c1
@ -44,15 +45,6 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
return c2, c1
}
func (l *link) newStreamPair() (*stream, *stream) {
ra, wb := io.Pipe()
rb, wa := io.Pipe()
sa := NewStream(wa, ra, network.DirOutbound)
sb := NewStream(wb, rb, network.DirInbound)
return sa, sb
}
func (l *link) Networks() []network.Network {
l.RLock()
defer l.RUnlock()

View File

@ -2,6 +2,7 @@ package mocknet
import (
"context"
"sync"
"testing"
"time"
@ -13,8 +14,10 @@ import (
func TestNotifications(t *testing.T) {
const swarmSize = 5
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mn, err := FullMeshLinked(context.Background(), swarmSize)
mn, err := FullMeshLinked(ctx, swarmSize)
if err != nil {
t.Fatal(err)
}
@ -23,11 +26,13 @@ func TestNotifications(t *testing.T) {
// signup notifs
nets := mn.Nets()
notifiees := make([]*netNotifiee, len(nets))
for i, pn := range nets {
n := newNetNotifiee(swarmSize)
notifiees := make(map[peer.ID]*netNotifiee, len(nets))
for _, pn := range nets {
defer pn.Close()
n := newNetNotifiee(t, swarmSize)
pn.Notify(n)
notifiees[i] = n
notifiees[pn.LocalPeer()] = n
}
// connect all but self
@ -36,16 +41,16 @@ func TestNotifications(t *testing.T) {
}
// test everyone got the correct connection opened calls
for i, s := range nets {
n := notifiees[i]
for _, s1 := range nets {
n := notifiees[s1.LocalPeer()]
notifs := make(map[peer.ID][]network.Conn)
for j, s2 := range nets {
if i == j {
for _, s2 := range nets {
if s2 == s1 {
continue
}
// this feels a little sketchy, but its probably okay
for len(s.ConnsToPeer(s2.LocalPeer())) != len(notifs[s2.LocalPeer()]) {
for len(s1.ConnsToPeer(s2.LocalPeer())) != len(notifs[s2.LocalPeer()]) {
select {
case c := <-n.connected:
nfp := notifs[c.RemotePeer()]
@ -57,7 +62,7 @@ func TestNotifications(t *testing.T) {
}
for p, cons := range notifs {
expect := s.ConnsToPeer(p)
expect := s1.ConnsToPeer(p)
if len(expect) != len(cons) {
t.Fatal("got different number of connections")
}
@ -78,39 +83,10 @@ func TestNotifications(t *testing.T) {
}
}
complement := func(c network.Conn) (network.Network, *netNotifiee, *conn) {
for i, s := range nets {
for _, c2 := range s.Conns() {
if c2.(*conn).rconn == c {
return s, notifiees[i], c2.(*conn)
}
}
}
t.Fatal("complementary conn not found", c)
return nil, nil, nil
}
testOCStream := func(n *netNotifiee, s network.Stream) {
var s2 network.Stream
select {
case s2 = <-n.openedStream:
t.Log("got notif for opened stream")
case <-time.After(timeout):
t.Fatal("timeout")
}
if s != nil && s != s2 {
t.Fatalf("got incorrect stream %p %p", s, s2)
}
select {
case s2 = <-n.closedStream:
t.Log("got notif for closed stream")
case <-time.After(timeout):
t.Fatal("timeout")
}
if s != nil && s != s2 {
t.Fatalf("got incorrect stream %p %p", s, s2)
}
for _, s := range nets {
s.SetStreamHandler(func(s network.Stream) {
helpers.FullClose(s)
})
}
for _, s := range nets {
@ -119,59 +95,37 @@ func TestNotifications(t *testing.T) {
})
}
// there's one stream per conn that we need to drain....
// unsure where these are coming from
for i := range nets {
n := notifiees[i]
for j := 0; j < len(nets)-1; j++ {
testOCStream(n, nil)
}
}
streams := make(chan network.Stream)
// Make sure we've received at last one stream per conn.
for _, s := range nets {
s.SetStreamHandler(func(s network.Stream) {
streams <- s
helpers.FullClose(s)
})
}
// open a streams in each conn
for i, s := range nets {
conns := s.Conns()
for _, c := range conns {
_, n2, c2 := complement(c)
st1, err := c.NewStream()
if err != nil {
t.Error(err)
} else {
t.Logf("%s %s <--%p--> %s %s", c.LocalPeer(), c.LocalMultiaddr(), st1, c.RemotePeer(), c.RemoteMultiaddr())
// st1.Write([]byte("hello"))
go helpers.FullClose(st1)
st2 := <-streams
t.Logf("%s %s <--%p--> %s %s", c2.LocalPeer(), c2.LocalMultiaddr(), st2, c2.RemotePeer(), c2.RemoteMultiaddr())
testOCStream(notifiees[i], st1)
testOCStream(n2, st2)
continue
}
t.Logf("%s %s <--%p--> %s %s", c.LocalPeer(), c.LocalMultiaddr(), st1, c.RemotePeer(), c.RemoteMultiaddr())
helpers.FullClose(st1)
}
}
// close conns
for i, s := range nets {
n := notifiees[i]
for _, c := range s.Conns() {
_, n2, c2 := complement(c)
c.(*conn).Close()
c2.Close()
for _, s1 := range nets {
n1 := notifiees[s1.LocalPeer()]
for _, c1 := range s1.Conns() {
c2 := ConnComplement(c1)
n2 := notifiees[c2.LocalPeer()]
c1.Close()
var c3, c4 network.Conn
select {
case c3 = <-n.disconnected:
case c3 = <-n1.disconnected:
case <-time.After(timeout):
t.Fatal("timeout")
}
if c != c3 {
t.Fatal("got incorrect conn", c, c3)
if c1 != c3 {
t.Fatal("got incorrect conn", c1, c3)
}
select {
@ -180,30 +134,53 @@ func TestNotifications(t *testing.T) {
t.Fatal("timeout")
}
if c2 != c4 {
t.Fatal("got incorrect conn", c, c2)
t.Fatal("got incorrect conn", c1, c2)
}
}
}
for _, n1 := range notifiees {
n1.streamState.Lock()
for str1, ch1 := range n1.streamState.m {
<-ch1
str2 := StreamComplement(str1)
n2 := notifiees[str1.Conn().RemotePeer()]
n2.streamState.Lock()
ch2 := n2.streamState.m[str2]
n2.streamState.Unlock()
<-ch2
}
n1.streamState.Unlock()
}
}
type netNotifiee struct {
t *testing.T
listen chan ma.Multiaddr
listenClose chan ma.Multiaddr
connected chan network.Conn
disconnected chan network.Conn
openedStream chan network.Stream
closedStream chan network.Stream
streamState struct {
sync.Mutex
m map[network.Stream]chan struct{}
}
}
func newNetNotifiee(buffer int) *netNotifiee {
return &netNotifiee{
listen: make(chan ma.Multiaddr, buffer),
listenClose: make(chan ma.Multiaddr, buffer),
connected: make(chan network.Conn, buffer),
disconnected: make(chan network.Conn, buffer),
openedStream: make(chan network.Stream, buffer),
closedStream: make(chan network.Stream, buffer),
func newNetNotifiee(t *testing.T, buffer int) *netNotifiee {
nn := &netNotifiee{
t: t,
listen: make(chan ma.Multiaddr, 1),
listenClose: make(chan ma.Multiaddr, 1),
connected: make(chan network.Conn, buffer*2),
disconnected: make(chan network.Conn, buffer*2),
}
nn.streamState.m = make(map[network.Stream]chan struct{})
return nn
}
func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {
@ -218,9 +195,28 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) {
nn.disconnected <- v
}
func (nn *netNotifiee) OpenedStream(n network.Network, v network.Stream) {
nn.openedStream <- v
func (nn *netNotifiee) OpenedStream(n network.Network, s network.Stream) {
nn.streamState.Lock()
defer nn.streamState.Unlock()
_, ok := nn.streamState.m[s]
if ok {
nn.t.Error("duplicate stream open")
return
}
nn.streamState.m[s] = make(chan struct{})
}
func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {
nn.closedStream <- v
func (nn *netNotifiee) ClosedStream(n network.Network, s network.Stream) {
nn.streamState.Lock()
defer nn.streamState.Unlock()
ch, ok := nn.streamState.m[s]
if !ok {
nn.t.Error("saw close event but no open event")
return
}
select {
case <-ch:
nn.t.Error("duplicate close event")
default:
close(ch)
}
}

View File

@ -160,11 +160,8 @@ func (pn *peernet) connect(p peer.ID) (*conn, error) {
func (pn *peernet) openConn(r peer.ID, l *link) *conn {
lc, rc := l.newConnPair(pn)
log.Debugf("%s opening connection to %s", pn.LocalPeer(), lc.RemotePeer())
go rc.net.remoteOpenedConn(rc)
pn.addConn(lc)
pn.notifyAll(func(n network.Notifiee) {
n.Connected(pn, lc)
})
rc.net.remoteOpenedConn(rc)
return lc
}
@ -172,16 +169,12 @@ func (pn *peernet) remoteOpenedConn(c *conn) {
log.Debugf("%s accepting connection from %s", pn.LocalPeer(), c.RemotePeer())
pn.addConn(c)
pn.handleNewConn(c)
pn.notifyAll(func(n network.Notifiee) {
n.Connected(pn, c)
})
}
// addConn constructs and adds a connection
// to given remote peer over given link
func (pn *peernet) addConn(c *conn) {
pn.Lock()
defer pn.Unlock()
_, found := pn.connsByPeer[c.RemotePeer()]
if !found {
@ -194,6 +187,14 @@ func (pn *peernet) addConn(c *conn) {
pn.connsByLink[c.link] = map[*conn]struct{}{}
}
pn.connsByLink[c.link][c] = struct{}{}
c.notifLk.Lock()
defer c.notifLk.Unlock()
pn.Unlock()
pn.notifyAll(func(n network.Notifiee) {
n.Connected(pn, c)
})
}
// removeConn removes a given conn
@ -380,6 +381,6 @@ func (pn *peernet) notifyAll(notification func(f network.Notifiee)) {
notification(n)
}(n)
}
wg.Wait()
pn.notifmu.Unlock()
wg.Wait()
}

View File

@ -5,6 +5,7 @@ import (
"errors"
"io"
"net"
"sync"
"sync/atomic"
"time"
@ -15,9 +16,13 @@ import (
// stream implements network.Stream
type stream struct {
notifLk sync.Mutex
rstream *stream
conn *conn
write *io.PipeWriter
read *io.PipeReader
conn *conn
toDeliver chan *transportObject
reset chan struct{}
@ -37,7 +42,18 @@ type transportObject struct {
arrivalTime time.Time
}
func NewStream(w *io.PipeWriter, r *io.PipeReader, dir network.Direction) *stream {
func newStreamPair() (*stream, *stream) {
ra, wb := io.Pipe()
rb, wa := io.Pipe()
sa := newStream(wa, ra, network.DirOutbound)
sb := newStream(wb, rb, network.DirInbound)
sa.rstream = sb
sb.rstream = sa
return sa, sb
}
func newStream(w *io.PipeWriter, r *io.PipeReader, dir network.Direction) *stream {
s := &stream{
read: r,
write: w,
@ -117,10 +133,6 @@ func (s *stream) teardown() {
// Mark as closed.
close(s.closed)
s.conn.net.notifyAll(func(n network.Notifiee) {
n.ClosedStream(s.conn.net, s)
})
}
func (s *stream) Conn() network.Conn {