fix: rest get messages not returning more than 1 message (#898)

This commit is contained in:
Prem Chaitanya Prathi 2023-11-14 22:54:02 +05:30 committed by GitHub
parent 392558ec8e
commit 8122cf47a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 28 deletions

View File

@ -549,7 +549,7 @@ var (
}) })
RESTRelayCacheCapacity = altsrc.NewIntFlag(&cli.IntFlag{ RESTRelayCacheCapacity = altsrc.NewIntFlag(&cli.IntFlag{
Name: "rest-relay-cache-capacity", Name: "rest-relay-cache-capacity",
Value: 30, Value: 1000,
Usage: "Capacity of the Relay REST API message cache", Usage: "Capacity of the Relay REST API message cache",
Destination: &options.RESTServer.RelayCacheCapacity, Destination: &options.RESTServer.RelayCacheCapacity,
EnvVars: []string{"WAKUNODE2_REST_RELAY_CACHE_CAPACITY"}, EnvVars: []string{"WAKUNODE2_REST_RELAY_CACHE_CAPACITY"},

View File

@ -124,27 +124,34 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) {
return return
} }
var response []*RestWakuMessage var response []*RestWakuMessage
select { done := false
case envelope, open := <-sub.Ch: for {
if !open { if done || len(response) > int(r.cacheCapacity) {
r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic)) break
w.WriteHeader(http.StatusNotFound) }
_, err = w.Write([]byte("consume channel is closed for subscription")) select {
if err != nil { case envelope, open := <-sub.Ch:
r.log.Error("writing response", zap.Error(err)) if !open {
r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic))
w.WriteHeader(http.StatusNotFound)
_, err = w.Write([]byte("consume channel is closed for subscription"))
if err != nil {
r.log.Error("writing response", zap.Error(err))
}
return
} }
return
}
message := &RestWakuMessage{} message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil { if err := message.FromProto(envelope.Message()); err != nil {
r.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) r.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
} else { } else {
response = append(response, message) response = append(response, message)
}
case <-req.Context().Done():
done = true
default:
done = true
} }
default:
break
} }
writeErrOrResponse(w, nil, response) writeErrOrResponse(w, nil, response)
@ -240,16 +247,24 @@ func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Reques
return return
} }
var response []*RestWakuMessage var response []*RestWakuMessage
select { done := false
case envelope := <-sub.Ch: for {
message := &RestWakuMessage{} if done || len(response) > int(r.cacheCapacity) {
if err := message.FromProto(envelope.Message()); err != nil { break
r.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) }
} else { select {
response = append(response, message) case envelope := <-sub.Ch:
message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
r.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
} else {
response = append(response, message)
}
case <-req.Context().Done():
done = true
default:
done = true
} }
default:
break
} }
writeErrOrResponse(w, nil, response) writeErrOrResponse(w, nil, response)