mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-03 13:23:07 +00:00
Merge pull request #70 from libp2p/feat/remove-validator
UnregisterTopicValidator for removing topic validators
This commit is contained in:
commit
ceff712aba
@ -341,6 +341,31 @@ func TestOneToOne(t *testing.T) {
|
||||
checkMessageRouting(t, "foobar", psubs, []*Subscription{sub})
|
||||
}
|
||||
|
||||
func TestRegisterUnregisterValidator(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
hosts := getNetHosts(t, ctx, 1)
|
||||
psubs := getPubsubs(ctx, hosts)
|
||||
|
||||
err := psubs[0].RegisterTopicValidator("foo", func(context.Context, *Message) bool {
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = psubs[0].UnregisterTopicValidator("foo")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = psubs[0].UnregisterTopicValidator("foo")
|
||||
if err == nil {
|
||||
t.Fatal("Unregistered bogus topic validator")
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidate(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
36
pubsub.go
36
pubsub.go
@ -66,6 +66,9 @@ type PubSub struct {
|
||||
// addVal handles validator registration requests
|
||||
addVal chan *addValReq
|
||||
|
||||
// rmVal handles validator unregistration requests
|
||||
rmVal chan *rmValReq
|
||||
|
||||
// topicVals tracks per topic validators
|
||||
topicVals map[string]*topicVal
|
||||
|
||||
@ -124,6 +127,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
||||
getTopics: make(chan *topicReq),
|
||||
sendMsg: make(chan *sendReq, 32),
|
||||
addVal: make(chan *addValReq),
|
||||
rmVal: make(chan *rmValReq),
|
||||
validateThrottle: make(chan struct{}, defaultValidateThrottle),
|
||||
myTopics: make(map[string]map[*Subscription]struct{}),
|
||||
topics: make(map[string]map[peer.ID]struct{}),
|
||||
@ -240,6 +244,9 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||
case req := <-p.addVal:
|
||||
p.addValidator(req)
|
||||
|
||||
case req := <-p.rmVal:
|
||||
p.rmValidator(req)
|
||||
|
||||
case <-ctx.Done():
|
||||
log.Info("pubsub processloop shutting down")
|
||||
return
|
||||
@ -586,6 +593,11 @@ type addValReq struct {
|
||||
resp chan error
|
||||
}
|
||||
|
||||
type rmValReq struct {
|
||||
topic string
|
||||
resp chan error
|
||||
}
|
||||
|
||||
type topicVal struct {
|
||||
topic string
|
||||
validate Validator
|
||||
@ -662,6 +674,30 @@ func (ps *PubSub) addValidator(req *addValReq) {
|
||||
req.resp <- nil
|
||||
}
|
||||
|
||||
// UnregisterTopicValidator removes a validator from a topic
|
||||
// returns an error if there was no validator registered with the topic
|
||||
func (p *PubSub) UnregisterTopicValidator(topic string) error {
|
||||
rmVal := &rmValReq{
|
||||
topic: topic,
|
||||
resp: make(chan error, 1),
|
||||
}
|
||||
|
||||
p.rmVal <- rmVal
|
||||
return <-rmVal.resp
|
||||
}
|
||||
|
||||
func (ps *PubSub) rmValidator(req *rmValReq) {
|
||||
topic := req.topic
|
||||
|
||||
_, ok := ps.topicVals[topic]
|
||||
if ok {
|
||||
delete(ps.topicVals, topic)
|
||||
req.resp <- nil
|
||||
} else {
|
||||
req.resp <- fmt.Errorf("No validator for topic %s", topic)
|
||||
}
|
||||
}
|
||||
|
||||
func (val *topicVal) validateMsg(ctx context.Context, msg *Message) bool {
|
||||
vctx, cancel := context.WithTimeout(ctx, val.validateTimeout)
|
||||
defer cancel()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user