mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-02 12:53:09 +00:00
fix: code review
This commit is contained in:
parent
faffd2a0bd
commit
37d36d77b1
84
gossipsub_matchfn_test.go
Normal file
84
gossipsub_matchfn_test.go
Normal file
@ -0,0 +1,84 @@
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
)
|
||||
|
||||
func TestGossipSubMatchingFn(t *testing.T) {
|
||||
customsubA100 := protocol.ID("/customsub_a/1.0.0")
|
||||
customsubA101Beta := protocol.ID("/customsub_a/1.0.1-beta")
|
||||
customsubB100 := protocol.ID("/customsub_b/1.0.0")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
h := getNetHosts(t, ctx, 4)
|
||||
psubs := []*PubSub{
|
||||
getGossipsub(ctx, h[0], WithProtocolMatchFn(protocolNameMatch), WithGossipSubProtocols([]protocol.ID{customsubA100, GossipSubID_v11}, GossipSubDefaultFeatures)),
|
||||
getGossipsub(ctx, h[1], WithProtocolMatchFn(protocolNameMatch), WithGossipSubProtocols([]protocol.ID{customsubA101Beta}, GossipSubDefaultFeatures)),
|
||||
getGossipsub(ctx, h[2], WithProtocolMatchFn(protocolNameMatch), WithGossipSubProtocols([]protocol.ID{GossipSubID_v11}, GossipSubDefaultFeatures)),
|
||||
getGossipsub(ctx, h[3], WithProtocolMatchFn(protocolNameMatch), WithGossipSubProtocols([]protocol.ID{customsubB100}, GossipSubDefaultFeatures)),
|
||||
}
|
||||
|
||||
connect(t, h[0], h[1])
|
||||
connect(t, h[0], h[2])
|
||||
connect(t, h[0], h[3])
|
||||
|
||||
// verify that the peers are connected
|
||||
time.Sleep(2 * time.Second)
|
||||
for i := 1; i < len(h); i++ {
|
||||
if len(h[0].Network().ConnsToPeer(h[i].ID())) == 0 {
|
||||
t.Fatal("expected a connection between peers")
|
||||
}
|
||||
}
|
||||
|
||||
// build the mesh
|
||||
var subs []*Subscription
|
||||
for _, ps := range psubs {
|
||||
sub, err := ps.Subscribe("test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
subs = append(subs, sub)
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// publish a message
|
||||
msg := []byte("message")
|
||||
psubs[0].Publish("test", msg)
|
||||
|
||||
assertReceive(t, subs[0], msg)
|
||||
assertReceive(t, subs[1], msg) // Should match via semver over CustomSub name, ignoring the version
|
||||
assertReceive(t, subs[2], msg) // Should match via GossipSubID_v11
|
||||
|
||||
// No message should be received because customsubA and customsubB have different names
|
||||
ctxTimeout, timeoutCancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer timeoutCancel()
|
||||
received := false
|
||||
for {
|
||||
msg, err := subs[3].Next(ctxTimeout)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
if msg != nil {
|
||||
received = true
|
||||
}
|
||||
}
|
||||
if received {
|
||||
t.Fatal("Should not have received a message")
|
||||
}
|
||||
}
|
||||
|
||||
func protocolNameMatch(base string) func(string) bool {
|
||||
return func(check string) bool {
|
||||
baseName := strings.Split(string(base), "/")[1]
|
||||
checkName := strings.Split(string(check), "/")[1]
|
||||
return baseName == checkName
|
||||
}
|
||||
}
|
||||
17
pubsub.go
17
pubsub.go
@ -36,7 +36,7 @@ var (
|
||||
|
||||
var log = logging.Logger("pubsub")
|
||||
|
||||
type MatchingFunction func(string) func(string) bool
|
||||
type ProtocolMatchFn = func(string) func(string) bool
|
||||
|
||||
// PubSub is the implementation of the pubsub system.
|
||||
type PubSub struct {
|
||||
@ -160,7 +160,7 @@ type PubSub struct {
|
||||
subFilter SubscriptionFilter
|
||||
|
||||
// protoMatchFunc is a matching function for protocol selection.
|
||||
protoMatchFunc *MatchingFunction
|
||||
protoMatchFunc ProtocolMatchFn
|
||||
|
||||
ctx context.Context
|
||||
}
|
||||
@ -240,7 +240,6 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
||||
peerOutboundQueueSize: 32,
|
||||
signID: h.ID(),
|
||||
signKey: nil,
|
||||
protoMatchFunc: nil,
|
||||
signPolicy: StrictSign,
|
||||
incoming: make(chan *RPC, 32),
|
||||
newPeers: make(chan struct{}, 1),
|
||||
@ -299,7 +298,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
||||
|
||||
for _, id := range rt.Protocols() {
|
||||
if ps.protoMatchFunc != nil {
|
||||
h.SetStreamHandlerMatch(id, (*ps.protoMatchFunc)(string(id)), ps.handleNewStream)
|
||||
h.SetStreamHandlerMatch(id, ps.protoMatchFunc(string(id)), ps.handleNewStream)
|
||||
} else {
|
||||
h.SetStreamHandler(id, ps.handleNewStream)
|
||||
}
|
||||
@ -485,11 +484,13 @@ func WithMaxMessageSize(maxMessageSize int) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WithProtocolMatchFunction sets a custom matching function for protocol
|
||||
// selection to be used by the protocol handler on the Host's Mux
|
||||
func WithProtocolMatchFunction(m MatchingFunction) Option {
|
||||
// WithProtocolMatchFn sets a custom matching function for protocol selection to
|
||||
// be used by the protocol handler on the Host's Mux. Should be combined with
|
||||
// WithGossipSubProtocols feature function for checking if certain protocol features
|
||||
// are supported
|
||||
func WithProtocolMatchFn(m ProtocolMatchFn) Option {
|
||||
return func(ps *PubSub) error {
|
||||
ps.protoMatchFunc = &m
|
||||
ps.protoMatchFunc = m
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user