mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-09 01:13:09 +00:00
chore:address review comments
This commit is contained in:
parent
555f06cfcb
commit
9764801702
@ -14,7 +14,6 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
type StoreService struct {
|
||||
@ -56,7 +55,7 @@ func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService {
|
||||
return s
|
||||
}
|
||||
|
||||
func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store.HistoryRequestOption, error) {
|
||||
func getStoreParams(r *http.Request) (*store.Query, []store.HistoryRequestOption, error) {
|
||||
query := &store.Query{}
|
||||
var options []store.HistoryRequestOption
|
||||
var err error
|
||||
@ -65,15 +64,9 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
|
||||
if peerAddrStr != "" {
|
||||
m, err = multiaddr.NewMultiaddr(peerAddrStr)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
peerID, err := utils.GetPeerID(m)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
options = append(options, store.WithPeer(peerID))
|
||||
options = append(options, store.WithPeerAddr(m))
|
||||
}
|
||||
query.PubsubTopic = r.URL.Query().Get("pubsubTopic")
|
||||
|
||||
@ -86,7 +79,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
|
||||
if startTimeStr != "" {
|
||||
startTime, err := strconv.ParseInt(startTimeStr, 10, 64)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
query.StartTime = &startTime
|
||||
}
|
||||
@ -95,7 +88,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
|
||||
if endTimeStr != "" {
|
||||
endTime, err := strconv.ParseInt(endTimeStr, 10, 64)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
query.EndTime = &endTime
|
||||
}
|
||||
@ -112,21 +105,21 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
|
||||
if senderTimeStr != "" {
|
||||
cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if storeTimeStr != "" {
|
||||
cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if digestStr != "" {
|
||||
cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@ -143,21 +136,21 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
|
||||
if ascendingStr != "" {
|
||||
ascending, err = strconv.ParseBool(ascendingStr)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if pageSizeStr != "" {
|
||||
pageSize, err = strconv.ParseUint(pageSizeStr, 10, 64)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
options = append(options, store.WithPaging(ascending, pageSize))
|
||||
}
|
||||
|
||||
return m, query, options, nil
|
||||
return query, options, nil
|
||||
}
|
||||
|
||||
func writeStoreError(w http.ResponseWriter, code int, err error) {
|
||||
@ -191,7 +184,7 @@ func toStoreResponse(result *store.Result) StoreResponse {
|
||||
}
|
||||
|
||||
func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
|
||||
peerAddr, query, options, err := getStoreParams(r)
|
||||
query, options, err := getStoreParams(r)
|
||||
if err != nil {
|
||||
writeStoreError(w, http.StatusBadRequest, err)
|
||||
return
|
||||
@ -199,9 +192,7 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
|
||||
defer cancel()
|
||||
if peerAddr != nil {
|
||||
options = append(options, store.WithPeerAddr(peerAddr))
|
||||
}
|
||||
|
||||
result, err := d.node.Store().Query(ctx, *query, options...)
|
||||
if err != nil {
|
||||
writeStoreError(w, http.StatusInternalServerError, err)
|
||||
|
||||
@ -713,7 +713,11 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro
|
||||
// AddPeer is used to add a peer and the protocols it support to the node peerstore
|
||||
// TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics.
|
||||
func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
|
||||
return w.peermanager.AddPeer(address, origin, pubSubTopics, false, protocols...)
|
||||
pData, err := w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return pData.AddrInfo.ID, nil
|
||||
}
|
||||
|
||||
// AddDiscoveredPeer to add a discovered peer to the node peerStore
|
||||
@ -724,7 +728,7 @@ func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wp
|
||||
ID: ID,
|
||||
Addrs: addrs,
|
||||
},
|
||||
PubSubTopics: pubsubTopics,
|
||||
PubsubTopics: pubsubTopics,
|
||||
}
|
||||
w.peermanager.AddDiscoveredPeer(p, connectNow)
|
||||
}
|
||||
|
||||
@ -321,11 +321,11 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID {
|
||||
logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
|
||||
} else {
|
||||
if shards != nil {
|
||||
p.PubSubTopics = make([]string, 0)
|
||||
p.PubsubTopics = make([]string, 0)
|
||||
topics := shards.Topics()
|
||||
for _, topic := range topics {
|
||||
topicStr := topic.String()
|
||||
p.PubSubTopics = append(p.PubSubTopics, topicStr)
|
||||
p.PubsubTopics = append(p.PubsubTopics, topicStr)
|
||||
}
|
||||
} else {
|
||||
pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID))
|
||||
@ -361,12 +361,12 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
|
||||
return
|
||||
}
|
||||
supportedProtos := []protocol.ID{}
|
||||
if len(p.PubSubTopics) == 0 && p.ENR != nil {
|
||||
if len(p.PubsubTopics) == 0 && p.ENR != nil {
|
||||
// Try to fetch shard info and supported protocols from ENR to arrive at pubSub topics.
|
||||
supportedProtos = pm.processPeerENR(&p)
|
||||
}
|
||||
|
||||
_ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubSubTopics, supportedProtos...)
|
||||
_ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubsubTopics, supportedProtos...)
|
||||
|
||||
if p.ENR != nil {
|
||||
err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR)
|
||||
@ -419,12 +419,29 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig
|
||||
return nil
|
||||
}
|
||||
|
||||
func AddrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host, pubsubTopics ...string) *service.PeerData {
|
||||
addrs := host.Peerstore().Addrs(peerID)
|
||||
if len(addrs) == 0 {
|
||||
//Addresses expired, remove peer from peerStore
|
||||
host.Peerstore().RemovePeer(peerID)
|
||||
return nil
|
||||
}
|
||||
return &service.PeerData{
|
||||
Origin: origin,
|
||||
AddrInfo: peer.AddrInfo{
|
||||
ID: peerID,
|
||||
Addrs: addrs,
|
||||
},
|
||||
PubsubTopics: pubsubTopics,
|
||||
}
|
||||
}
|
||||
|
||||
// AddPeer adds peer to the peerStore and also to service slots
|
||||
func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, connectNow bool, protocols ...protocol.ID) (peer.ID, error) {
|
||||
func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) {
|
||||
//Assuming all addresses have peerId
|
||||
info, err := peer.AddrInfoFromP2pAddr(address)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//Add Service peers to serviceSlots.
|
||||
@ -433,20 +450,26 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTo
|
||||
}
|
||||
|
||||
//Add to the peer-store
|
||||
err = pm.addPeer(info.ID, info.Addrs, origin, pubSubTopics, protocols...)
|
||||
err = pm.addPeer(info.ID, info.Addrs, origin, pubsubTopics, protocols...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if connectNow {
|
||||
go pm.peerConnector.PushToChan(service.PeerData{
|
||||
Origin: origin,
|
||||
AddrInfo: *info,
|
||||
PubSubTopics: pubSubTopics,
|
||||
})
|
||||
pData := &service.PeerData{
|
||||
Origin: origin,
|
||||
AddrInfo: peer.AddrInfo{
|
||||
ID: info.ID,
|
||||
Addrs: info.Addrs,
|
||||
},
|
||||
PubsubTopics: pubsubTopics,
|
||||
}
|
||||
|
||||
return info.ID, nil
|
||||
return pData, nil
|
||||
}
|
||||
|
||||
// Connect establishes a connection to a peer.
|
||||
func (pm *PeerManager) Connect(pData *service.PeerData) {
|
||||
go pm.peerConnector.PushToChan(*pData)
|
||||
}
|
||||
|
||||
// RemovePeer deletes peer from the peerStore after disconnecting it.
|
||||
|
||||
@ -71,7 +71,7 @@ func TestServiceSlots(t *testing.T) {
|
||||
|
||||
// add h2 peer to peer manager
|
||||
t.Log(h2.ID())
|
||||
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, false, libp2pProtocol.ID(protocol))
|
||||
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, libp2pProtocol.ID(protocol))
|
||||
require.NoError(t, err)
|
||||
|
||||
///////////////
|
||||
@ -84,7 +84,7 @@ func TestServiceSlots(t *testing.T) {
|
||||
require.Equal(t, peerID, h2.ID())
|
||||
|
||||
// add h3 peer to peer manager
|
||||
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{""}, false, libp2pProtocol.ID(protocol))
|
||||
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{""}, libp2pProtocol.ID(protocol))
|
||||
require.NoError(t, err)
|
||||
|
||||
// check that returned peer is h2 or h3 peer
|
||||
@ -108,7 +108,7 @@ func TestServiceSlots(t *testing.T) {
|
||||
require.Error(t, err, ErrNoPeersAvailable)
|
||||
|
||||
// add h4 peer for protocol1
|
||||
_, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, false, libp2pProtocol.ID(protocol1))
|
||||
_, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1))
|
||||
require.NoError(t, err)
|
||||
|
||||
//Test peer selection for protocol1
|
||||
@ -134,10 +134,10 @@ func TestPeerSelection(t *testing.T) {
|
||||
defer h3.Close()
|
||||
|
||||
protocol := libp2pProtocol.ID("test/protocol")
|
||||
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, false, libp2pProtocol.ID(protocol))
|
||||
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, false, libp2pProtocol.ID(protocol))
|
||||
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
|
||||
@ -176,7 +176,7 @@ func TestDefaultProtocol(t *testing.T) {
|
||||
defer h5.Close()
|
||||
|
||||
//Test peer selection for relay protocol from peer store
|
||||
_, err = pm.AddPeer(getAddr(h5), wps.Static, []string{""}, false, relay.WakuRelayID_v200)
|
||||
_, err = pm.AddPeer(getAddr(h5), wps.Static, []string{""}, relay.WakuRelayID_v200)
|
||||
require.NoError(t, err)
|
||||
|
||||
// since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol.
|
||||
@ -197,7 +197,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer h6.Close()
|
||||
|
||||
_, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, false, protocol2)
|
||||
_, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2)
|
||||
require.NoError(t, err)
|
||||
|
||||
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})
|
||||
|
||||
@ -96,18 +96,31 @@ type HistoryRequestParameters struct {
|
||||
s *WakuStore
|
||||
}
|
||||
|
||||
type HistoryRequestOption func(*HistoryRequestParameters)
|
||||
type HistoryRequestOption func(*HistoryRequestParameters) error
|
||||
|
||||
// WithPeer is an option used to specify the peerID to request the message history
|
||||
// WithPeer is an option used to specify the peerID to request the message history.
|
||||
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
|
||||
func WithPeer(p peer.ID) HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
return func(params *HistoryRequestParameters) error {
|
||||
params.selectedPeer = p
|
||||
if params.peerAddr != nil {
|
||||
return errors.New("peerId and peerAddr options are mutually exclusive")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
//WithPeerAddr is an option used to specify a peerAddress to request the message history.
|
||||
// This new peer will be added to peerStore.
|
||||
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
|
||||
|
||||
func WithPeerAddr(pAddr multiaddr.Multiaddr) HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
return func(params *HistoryRequestParameters) error {
|
||||
params.peerAddr = pAddr
|
||||
if params.selectedPeer != "" {
|
||||
return errors.New("peerAddr and peerId options are mutually exclusive")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -117,9 +130,10 @@ func WithPeerAddr(pAddr multiaddr.Multiaddr) HistoryRequestOption {
|
||||
// from the node peerstore
|
||||
// Note: This option is avaiable only with peerManager
|
||||
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
return func(params *HistoryRequestParameters) error {
|
||||
params.peerSelectionType = peermanager.Automatic
|
||||
params.preferredPeers = fromThesePeers
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -129,44 +143,50 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption
|
||||
// from the node peerstore
|
||||
// Note: This option is avaiable only with peerManager
|
||||
func WithFastestPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
return func(params *HistoryRequestParameters) error {
|
||||
params.peerSelectionType = peermanager.LowestRTT
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithRequestID is an option to set a specific request ID to be used when
|
||||
// creating a store request
|
||||
func WithRequestID(requestID []byte) HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
return func(params *HistoryRequestParameters) error {
|
||||
params.requestID = requestID
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithAutomaticRequestID is an option to automatically generate a request ID
|
||||
// when creating a store request
|
||||
func WithAutomaticRequestID() HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
return func(params *HistoryRequestParameters) error {
|
||||
params.requestID = protocol.GenerateRequestID()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithCursor(c *pb.Index) HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
return func(params *HistoryRequestParameters) error {
|
||||
params.cursor = c
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithPaging is an option used to specify the order and maximum number of records to return
|
||||
func WithPaging(asc bool, pageSize uint64) HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
return func(params *HistoryRequestParameters) error {
|
||||
params.asc = asc
|
||||
params.pageSize = pageSize
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithLocalQuery() HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
return func(params *HistoryRequestParameters) error {
|
||||
params.localQuery = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -262,7 +282,10 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
|
||||
optList := DefaultOptions()
|
||||
optList = append(optList, opts...)
|
||||
for _, opt := range optList {
|
||||
opt(params)
|
||||
err := opt(params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if !params.localQuery {
|
||||
@ -281,11 +304,12 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
|
||||
|
||||
//Add Peer to peerstore.
|
||||
if store.pm != nil && params.peerAddr != nil {
|
||||
peerId, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, true, StoreID_v20beta4)
|
||||
pData, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreID_v20beta4)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
params.selectedPeer = peerId
|
||||
store.pm.Connect(pData)
|
||||
params.selectedPeer = pData.AddrInfo.ID
|
||||
}
|
||||
if store.pm != nil && params.selectedPeer == "" {
|
||||
var err error
|
||||
|
||||
@ -107,7 +107,7 @@ func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string
|
||||
peer := service.PeerData{
|
||||
Origin: peerstore.Rendezvous,
|
||||
AddrInfo: p,
|
||||
PubSubTopics: []string{namespace},
|
||||
PubsubTopics: []string{namespace},
|
||||
}
|
||||
if !r.PushToChan(peer) {
|
||||
r.log.Error("could push to closed channel/context completed")
|
||||
|
||||
@ -14,7 +14,7 @@ type PeerData struct {
|
||||
Origin wps.Origin
|
||||
AddrInfo peer.AddrInfo
|
||||
ENR *enode.Node
|
||||
PubSubTopics []string
|
||||
PubsubTopics []string
|
||||
}
|
||||
|
||||
type CommonDiscoveryService struct {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user