Fix race in ping test case for Go (#93)

* Fix race condition in go ping

* Fix import

* Pass host in

* Pass host in

* Fix import

* Remove events in v0.11
This commit is contained in:
Marco Munizaga 2022-12-30 20:30:15 -08:00 committed by GitHub
parent 34b6eaeffd
commit 43d50c2500
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 113 additions and 0 deletions

View File

@ -6,6 +6,7 @@ package compat
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
@ -36,3 +37,15 @@ func getSecurityByName(secureChannel string) libp2p.Option {
} }
panic(fmt.Sprintf("unknown secure channel: %s", secureChannel)) panic(fmt.Sprintf("unknown secure channel: %s", secureChannel))
} }
type ConnEventsSub struct {
}
func SubscribeToConnectedEvents(host host.Host) (ConnEventsSub, error) {
return ConnEventsSub{}, nil
}
func (s *ConnEventsSub) WaitForNConnectedEvents(n int) {
// We can't subscribe to events here. Let's just do a timeout.
time.Sleep(5 * time.Second)
}

View File

@ -8,7 +8,9 @@ import (
"fmt" "fmt"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p/config"
@ -35,3 +37,31 @@ func getSecurityByName(secureChannel string) libp2p.Option {
} }
panic(fmt.Sprintf("unknown secure channel: %s", secureChannel)) panic(fmt.Sprintf("unknown secure channel: %s", secureChannel))
} }
type ConnEventsSub struct {
sub event.Subscription
}
func SubscribeToConnectedEvents(host host.Host) (ConnEventsSub, error) {
sub, err := host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
return ConnEventsSub{}, err
}
return ConnEventsSub{
sub: sub,
}, nil
}
func (s *ConnEventsSub) WaitForNConnectedEvents(n int) {
connectedPeers := 0
for e := range s.sub.Out() {
if e.(event.EvtPeerConnectednessChanged).Connectedness == network.Connected {
connectedPeers++
}
if connectedPeers == n {
return
}
}
}

View File

@ -8,7 +8,9 @@ import (
"fmt" "fmt"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p/config"
@ -35,3 +37,31 @@ func getSecurityByName(secureChannel string) libp2p.Option {
} }
panic(fmt.Sprintf("unknown secure channel: %s", secureChannel)) panic(fmt.Sprintf("unknown secure channel: %s", secureChannel))
} }
type ConnEventsSub struct {
sub event.Subscription
}
func SubscribeToConnectedEvents(host host.Host) (ConnEventsSub, error) {
sub, err := host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
return ConnEventsSub{}, err
}
return ConnEventsSub{
sub: sub,
}, nil
}
func (s *ConnEventsSub) WaitForNConnectedEvents(n int) {
connectedPeers := 0
for e := range s.sub.Out() {
if e.(event.EvtPeerConnectednessChanged).Connectedness == network.Connected {
connectedPeers++
}
if connectedPeers == n {
return
}
}
}

View File

@ -9,7 +9,9 @@ import (
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
noise "github.com/libp2p/go-libp2p/p2p/security/noise" noise "github.com/libp2p/go-libp2p/p2p/security/noise"
@ -26,6 +28,34 @@ func NewLibp2(ctx context.Context, secureChannel string, opts ...config.Option)
) )
} }
type ConnEventsSub struct {
sub event.Subscription
}
func SubscribeToConnectedEvents(host host.Host) (ConnEventsSub, error) {
sub, err := host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
return ConnEventsSub{}, err
}
return ConnEventsSub{
sub: sub,
}, nil
}
func (s *ConnEventsSub) WaitForNConnectedEvents(n int) {
connectedPeers := 0
for e := range s.sub.Out() {
if e.(event.EvtPeerConnectednessChanged).Connectedness == network.Connected {
connectedPeers++
}
if connectedPeers == n {
return
}
}
}
func getSecurityByName(secureChannel string) libp2p.Option { func getSecurityByName(secureChannel string) libp2p.Option {
switch secureChannel { switch secureChannel {
case "noise": case "noise":

View File

@ -132,6 +132,12 @@ func runPing(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
// Record our listen addrs. // Record our listen addrs.
runenv.RecordMessage("my listen addrs: %v", host.Addrs()) runenv.RecordMessage("my listen addrs: %v", host.Addrs())
// Subscribe to connectedness events.
connectedEvents, err := compat.SubscribeToConnectedEvents(host)
if err != nil {
return err
}
// Obtain our own address info, and use the sync service to publish it to a // Obtain our own address info, and use the sync service to publish it to a
// 'peersTopic' topic, where others will read from. // 'peersTopic' topic, where others will read from.
var ( var (
@ -223,6 +229,10 @@ func runPing(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
runenv.RecordMessage("done dialling my peers") runenv.RecordMessage("done dialling my peers")
// Wait for a connection to all peers
connectedEvents.WaitForNConnectedEvents(runenv.TestInstanceCount - 1)
runenv.RecordMessage("Connected")
// Wait for all peers to signal that they're done with the connection phase. // Wait for all peers to signal that they're done with the connection phase.
initCtx.SyncClient.MustSignalAndWait(ctx, "connected", runenv.TestInstanceCount) initCtx.SyncClient.MustSignalAndWait(ctx, "connected", runenv.TestInstanceCount)