mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-04 05:43:06 +00:00
add api for dynamically setting and resetting topic score parameters
This commit is contained in:
parent
a3445b756f
commit
5d06aa2d4f
44
score.go
44
score.go
@ -182,6 +182,50 @@ func newPeerScore(params *PeerScoreParams) *peerScore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update interface
|
||||||
|
func (ps *peerScore) SetTopicScoreParams(topic string, p *TopicScoreParams) error {
|
||||||
|
// Note: assumes that the topic score parameters have already been validated
|
||||||
|
ps.Lock()
|
||||||
|
defer ps.Unlock()
|
||||||
|
|
||||||
|
old, exist := ps.params.Topics[topic]
|
||||||
|
ps.params.Topics[topic] = p
|
||||||
|
|
||||||
|
if !exist {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// check to see if the counter Caps are being lowered; if that's the case we need to recap them
|
||||||
|
recap := false
|
||||||
|
if p.FirstMessageDeliveriesCap < old.FirstMessageDeliveriesCap {
|
||||||
|
recap = true
|
||||||
|
}
|
||||||
|
if p.MeshMessageDeliveriesCap < old.MeshMessageDeliveriesCap {
|
||||||
|
recap = true
|
||||||
|
}
|
||||||
|
if !recap {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// recap counters for topic
|
||||||
|
for _, pstats := range ps.peerStats {
|
||||||
|
tstats, ok := pstats.topics[topic]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if tstats.firstMessageDeliveries > p.FirstMessageDeliveriesCap {
|
||||||
|
tstats.firstMessageDeliveries = p.FirstMessageDeliveriesCap
|
||||||
|
}
|
||||||
|
|
||||||
|
if tstats.meshMessageDeliveries > p.MeshMessageDeliveriesCap {
|
||||||
|
tstats.meshMessageDeliveries = p.MeshMessageDeliveriesCap
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// router interface
|
// router interface
|
||||||
func (ps *peerScore) Start(gs *GossipSubRouter) {
|
func (ps *peerScore) Start(gs *GossipSubRouter) {
|
||||||
if ps == nil {
|
if ps == nil {
|
||||||
|
|||||||
39
topic.go
39
topic.go
@ -31,6 +31,45 @@ func (t *Topic) String() string {
|
|||||||
return t.topic
|
return t.topic
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetScoreParams sets the topic score parameters if the pubsub router supports peer
|
||||||
|
// scoring
|
||||||
|
func (t *Topic) SetScoreParams(p *TopicScoreParams) error {
|
||||||
|
t.mux.Lock()
|
||||||
|
defer t.mux.Unlock()
|
||||||
|
|
||||||
|
if t.closed {
|
||||||
|
return ErrTopicClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
err := p.validate()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid topic score parameters: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make(chan error, 1)
|
||||||
|
select {
|
||||||
|
case t.p.eval <- func() {
|
||||||
|
gs, ok := t.p.rt.(*GossipSubRouter)
|
||||||
|
if !ok {
|
||||||
|
result <- fmt.Errorf("pubsub router is not gossipsub")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if gs.score == nil {
|
||||||
|
result <- fmt.Errorf("peer scoring is not enabled in router")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
result <- gs.score.SetTopicScoreParams(t.topic, p)
|
||||||
|
}:
|
||||||
|
err = <-result
|
||||||
|
return err
|
||||||
|
|
||||||
|
case <-t.p.ctx.Done():
|
||||||
|
return t.p.ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// EventHandler creates a handle for topic specific events
|
// EventHandler creates a handle for topic specific events
|
||||||
// Multiple event handlers may be created and will operate independently of each other
|
// Multiple event handlers may be created and will operate independently of each other
|
||||||
func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) {
|
func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user