mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-05-02 15:03:10 +00:00
feat: WithLocalPublication option to enable local only publishing on a topic (#481)
* feat: WithLocalPublication option to enable local only publishing on a topic * docs: improve comment on WithLocalPublication option
This commit is contained in:
parent
ca702289e6
commit
96efa27a1a
@ -221,6 +221,7 @@ type Message struct {
|
||||
ID string
|
||||
ReceivedFrom peer.ID
|
||||
ValidatorData interface{}
|
||||
Local bool
|
||||
}
|
||||
|
||||
func (m *Message) GetFrom() peer.ID {
|
||||
@ -1066,7 +1067,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
|
||||
continue
|
||||
}
|
||||
|
||||
p.pushMsg(&Message{pmsg, "", rpc.from, nil})
|
||||
p.pushMsg(&Message{pmsg, "", rpc.from, nil, false})
|
||||
}
|
||||
}
|
||||
|
||||
@ -1165,7 +1166,9 @@ func (p *PubSub) checkSigningPolicy(msg *Message) error {
|
||||
func (p *PubSub) publishMessage(msg *Message) {
|
||||
p.tracer.DeliverMessage(msg)
|
||||
p.notifySubs(msg)
|
||||
p.rt.Publish(msg)
|
||||
if !msg.Local {
|
||||
p.rt.Publish(msg)
|
||||
}
|
||||
}
|
||||
|
||||
type addTopicReq struct {
|
||||
|
||||
15
topic.go
15
topic.go
@ -215,6 +215,7 @@ type ProvideKey func() (crypto.PrivKey, peer.ID)
|
||||
type PublishOptions struct {
|
||||
ready RouterReady
|
||||
customKey ProvideKey
|
||||
local bool
|
||||
}
|
||||
|
||||
type PubOpt func(pub *PublishOptions) error
|
||||
@ -307,7 +308,7 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
|
||||
}
|
||||
}
|
||||
|
||||
return t.p.val.PushLocal(&Message{m, "", t.p.host.ID(), nil})
|
||||
return t.p.val.PushLocal(&Message{m, "", t.p.host.ID(), nil, pub.local})
|
||||
}
|
||||
|
||||
// WithReadiness returns a publishing option for only publishing when the router is ready.
|
||||
@ -319,6 +320,18 @@ func WithReadiness(ready RouterReady) PubOpt {
|
||||
}
|
||||
}
|
||||
|
||||
// WithLocalPublication returns a publishing option to notify in-process subscribers only.
|
||||
// It prevents message publication to mesh peers.
|
||||
// Useful in edge cases where the msg needs to be only delivered to the in-process subscribers,
|
||||
// e.g. not to spam the network with outdated msgs.
|
||||
// Should not be used specifically for in-process pubsubing.
|
||||
func WithLocalPublication(local bool) PubOpt {
|
||||
return func(pub *PublishOptions) error {
|
||||
pub.local = local
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithSecretKeyAndPeerId returns a publishing option for providing a custom private key and its corresponding peer ID
|
||||
// This option is useful when we want to send messages from "virtual", never-connectable peers in the network
|
||||
func WithSecretKeyAndPeerId(key crypto.PrivKey, pid peer.ID) PubOpt {
|
||||
|
||||
@ -1019,3 +1019,48 @@ func TestTopicRelayPublishWithKey(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithLocalPublication(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
const topic = "test"
|
||||
|
||||
hosts := getNetHosts(t, ctx, 2)
|
||||
pubsubs := getPubsubs(ctx, hosts)
|
||||
topics := getTopics(pubsubs, topic)
|
||||
connectAll(t, hosts)
|
||||
|
||||
payload := []byte("pubsub smashes")
|
||||
|
||||
local, err := topics[0].Subscribe()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
remote, err := topics[1].Subscribe()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = topics[0].Publish(ctx, payload, WithLocalPublication(true))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
remoteCtx, cancel := context.WithTimeout(ctx, time.Millisecond*100)
|
||||
defer cancel()
|
||||
|
||||
msg, err := remote.Next(remoteCtx)
|
||||
if msg != nil || err == nil {
|
||||
t.Fatal("unexpected msg")
|
||||
}
|
||||
|
||||
msg, err = local.Next(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !msg.Local || !bytes.Equal(msg.Data, payload) {
|
||||
t.Fatal("wrong message")
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user