2018-08-28 03:01:08 +00:00
|
|
|
package pubsub
|
2018-10-10 14:17:27 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
|
2018-08-28 03:01:08 +00:00
|
|
|
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
2018-10-10 14:17:27 +00:00
|
|
|
|
2019-05-26 16:19:03 +00:00
|
|
|
"github.com/libp2p/go-libp2p-core/host"
|
|
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
|
|
"github.com/libp2p/go-libp2p-core/protocol"
|
2018-10-10 14:17:27 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
RandomSubID = protocol.ID("/randomsub/1.0.0")
|
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
RandomSubD = 6
|
|
|
|
)
|
|
|
|
|
2019-01-04 11:09:21 +00:00
|
|
|
// NewRandomSub returns a new PubSub object using RandomSubRouter as the router.
|
2018-10-10 14:17:27 +00:00
|
|
|
func NewRandomSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
|
|
|
|
rt := &RandomSubRouter{
|
|
|
|
peers: make(map[peer.ID]protocol.ID),
|
|
|
|
}
|
|
|
|
return NewPubSub(ctx, h, rt, opts...)
|
|
|
|
}
|
|
|
|
|
|
|
|
// RandomSubRouter is a router that implements a random propagation strategy.
|
|
|
|
// For each message, it selects RandomSubD peers and forwards the message to them.
|
|
|
|
type RandomSubRouter struct {
|
2019-11-04 18:43:48 +00:00
|
|
|
p *PubSub
|
|
|
|
peers map[peer.ID]protocol.ID
|
|
|
|
tracer *pubsubTracer
|
2018-10-10 14:17:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (rs *RandomSubRouter) Protocols() []protocol.ID {
|
2018-10-10 14:39:38 +00:00
|
|
|
return []protocol.ID{RandomSubID, FloodSubID}
|
2018-10-10 14:17:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (rs *RandomSubRouter) Attach(p *PubSub) {
|
|
|
|
rs.p = p
|
2019-11-04 18:47:19 +00:00
|
|
|
rs.tracer = p.tracer
|
2018-10-10 14:17:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
|
2019-11-04 18:43:48 +00:00
|
|
|
rs.tracer.AddPeer(p, proto)
|
2018-10-10 14:17:27 +00:00
|
|
|
rs.peers[p] = proto
|
|
|
|
}
|
|
|
|
|
|
|
|
func (rs *RandomSubRouter) RemovePeer(p peer.ID) {
|
2019-11-04 18:43:48 +00:00
|
|
|
rs.tracer.RemovePeer(p)
|
2018-10-10 14:17:27 +00:00
|
|
|
delete(rs.peers, p)
|
|
|
|
}
|
|
|
|
|
2019-10-31 19:56:09 +00:00
|
|
|
func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool {
|
|
|
|
// check all peers in the topic
|
|
|
|
tmap, ok := rs.p.topics[topic]
|
|
|
|
if !ok {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
fsPeers := 0
|
|
|
|
rsPeers := 0
|
|
|
|
|
|
|
|
// count floodsub and randomsub peers
|
|
|
|
for p := range tmap {
|
|
|
|
switch rs.peers[p] {
|
|
|
|
case FloodSubID:
|
|
|
|
fsPeers++
|
|
|
|
case RandomSubID:
|
|
|
|
rsPeers++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if suggested == 0 {
|
|
|
|
suggested = RandomSubD
|
|
|
|
}
|
|
|
|
|
|
|
|
if fsPeers+rsPeers >= suggested {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
if rsPeers >= RandomSubD {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2018-10-10 14:17:27 +00:00
|
|
|
func (rs *RandomSubRouter) HandleRPC(rpc *RPC) {}
|
|
|
|
|
|
|
|
func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message) {
|
|
|
|
tosend := make(map[peer.ID]struct{})
|
|
|
|
rspeers := make(map[peer.ID]struct{})
|
|
|
|
src := peer.ID(msg.GetFrom())
|
|
|
|
|
|
|
|
for _, topic := range msg.GetTopicIDs() {
|
|
|
|
tmap, ok := rs.p.topics[topic]
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
for p := range tmap {
|
|
|
|
if p == from || p == src {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if rs.peers[p] == FloodSubID {
|
|
|
|
tosend[p] = struct{}{}
|
|
|
|
} else {
|
|
|
|
rspeers[p] = struct{}{}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(rspeers) > RandomSubD {
|
|
|
|
xpeers := peerMapToList(rspeers)
|
|
|
|
shufflePeers(xpeers)
|
|
|
|
xpeers = xpeers[:RandomSubD]
|
|
|
|
for _, p := range xpeers {
|
|
|
|
tosend[p] = struct{}{}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
for p := range rspeers {
|
|
|
|
tosend[p] = struct{}{}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
out := rpcWithMessages(msg)
|
|
|
|
for p := range tosend {
|
|
|
|
mch, ok := rs.p.peers[p]
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case mch <- out:
|
2019-11-04 18:43:48 +00:00
|
|
|
rs.tracer.SendRPC(out, p)
|
2018-10-10 14:17:27 +00:00
|
|
|
default:
|
|
|
|
log.Infof("dropping message to peer %s: queue full", p)
|
2019-11-04 18:43:48 +00:00
|
|
|
rs.tracer.DropRPC(out, p)
|
2018-10-10 14:17:27 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-04 18:43:48 +00:00
|
|
|
func (rs *RandomSubRouter) Join(topic string) {
|
|
|
|
rs.tracer.Join(topic)
|
|
|
|
}
|
2018-10-10 14:17:27 +00:00
|
|
|
|
2019-11-04 18:43:48 +00:00
|
|
|
func (rs *RandomSubRouter) Leave(topic string) {
|
|
|
|
rs.tracer.Join(topic)
|
|
|
|
}
|