cache-types: Update Streaming health cache-type

To use latest protobuf types
This commit is contained in:
Daniel Nephin 2020-09-18 18:33:02 -04:00
parent 132b76acef
commit b576a2d3c7
6 changed files with 306 additions and 119 deletions

View File

@ -0,0 +1,174 @@
package cachetype
import (
fmt "fmt"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/types"
)
func newEndOfSnapshotEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{
Topic: topic,
Index: index,
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
}
}
func newEndOfEmptySnapshotEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{
Topic: topic,
Index: index,
Payload: &pbsubscribe.Event_EndOfEmptySnapshot{EndOfEmptySnapshot: true},
}
}
// newEventServiceHealthRegister returns a realistically populated service
// health registration event for tests. The nodeNum is a
// logical node and is used to create the node name ("node%d") but also change
// the node ID and IP address to make it a little more realistic for cases that
// need that. nodeNum should be less than 64k to make the IP address look
// realistic. Any other changes can be made on the returned event to avoid
// adding too many options to callers.
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
return pbsubscribe.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: svc,
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
ID: nodeID,
Node: node,
Address: addr,
Datacenter: "dc1",
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
// Empty sadness
Proxy: pbservice.ConnectProxyConfig{
MeshGateway: pbservice.MeshGatewayConfig{},
Expose: pbservice.ExposeConfig{},
},
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
Checks: []*pbservice.HealthCheck{
{
Node: node,
CheckID: "serf-health",
Name: "serf-health",
Status: "passing",
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
{
Node: node,
CheckID: types.CheckID("service:" + svc),
Name: "service:" + svc,
ServiceID: svc,
ServiceName: svc,
Type: "ttl",
Status: "passing",
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
},
},
},
}
}
// TestEventServiceHealthDeregister returns a realistically populated service
// health deregistration event for tests. The nodeNum is a
// logical node and is used to create the node name ("node%d") but also change
// the node ID and IP address to make it a little more realistic for cases that
// need that. nodeNum should be less than 64k to make the IP address look
// realistic. Any other changes can be made on the returned event to avoid
// adding too many options to callers.
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
return pbsubscribe.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: svc,
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Deregister,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
Node: node,
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
// Empty sadness
Proxy: pbservice.ConnectProxyConfig{
MeshGateway: pbservice.MeshGatewayConfig{},
Expose: pbservice.ExposeConfig{},
},
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{
// The original insertion index since a delete doesn't update
// this. This magic value came from state store tests where we
// setup at index 10 and then mutate at index 100. It can be
// modified by the caller later and makes it easier than having
// yet another argument in the common case.
CreateIndex: 10,
ModifyIndex: 10,
},
},
},
},
},
}
}
func newEventBatchWithEvents(first pbsubscribe.Event, evs ...pbsubscribe.Event) pbsubscribe.Event {
events := make([]*pbsubscribe.Event, len(evs)+1)
events[0] = &first
for i := range evs {
events[i+1] = &evs[i]
}
return pbsubscribe.Event{
Topic: first.Topic,
Index: first.Index,
Payload: &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{Events: events},
},
}
}

View File

@ -3,11 +3,13 @@ package cachetype
import ( import (
"fmt" "fmt"
"github.com/hashicorp/consul/agent/agentpb"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe"
) )
const ( const (
@ -40,18 +42,20 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
"Internal cache failure: request wrong type: %T", req) "Internal cache failure: request wrong type: %T", req)
} }
r := agentpb.SubscribeRequest{ r := Request{
Topic: agentpb.Topic_ServiceHealth, SubscribeRequest: pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: reqReal.ServiceName, Key: reqReal.ServiceName,
Token: reqReal.Token, Token: reqReal.Token,
Index: reqReal.MinQueryIndex, Index: reqReal.MinQueryIndex,
Filter: reqReal.Filter,
Datacenter: reqReal.Datacenter, Datacenter: reqReal.Datacenter,
},
Filter: reqReal.Filter,
} }
// Connect requests need a different topic // Connect requests need a different topic
if reqReal.Connect { if reqReal.Connect {
r.Topic = agentpb.Topic_ServiceHealthConnect r.Topic = pbsubscribe.Topic_ServiceHealthConnect
} }
view := MaterializedViewFromFetch(c, opts, r) view := MaterializedViewFromFetch(c, opts, r)
@ -105,7 +109,7 @@ func (s *healthViewState) InitFilter(expression string) error {
} }
// Update implements MaterializedViewState // Update implements MaterializedViewState
func (s *healthViewState) Update(events []*agentpb.Event) error { func (s *healthViewState) Update(events []*pbsubscribe.Event) error {
for _, event := range events { for _, event := range events {
serviceHealth := event.GetServiceHealth() serviceHealth := event.GetServiceHealth()
if serviceHealth == nil { if serviceHealth == nil {
@ -116,13 +120,10 @@ func (s *healthViewState) Update(events []*agentpb.Event) error {
id := fmt.Sprintf("%s/%s", node.Node.Node, node.Service.ID) id := fmt.Sprintf("%s/%s", node.Node.Node, node.Service.ID)
switch serviceHealth.Op { switch serviceHealth.Op {
case agentpb.CatalogOp_Register: case pbsubscribe.CatalogOp_Register:
checkServiceNode, err := serviceHealth.CheckServiceNode.ToStructs() checkServiceNode := pbservice.CheckServiceNodeToStructs(serviceHealth.CheckServiceNode)
if err != nil {
return err
}
s.state[id] = *checkServiceNode s.state[id] = *checkServiceNode
case agentpb.CatalogOp_Deregister: case pbsubscribe.CatalogOp_Deregister:
delete(s.state, id) delete(s.state, id)
} }
} }

View File

@ -6,12 +6,14 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/agent/agentpb" "github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/stretchr/testify/require"
) )
func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
@ -23,8 +25,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
// Initially there are no services registered. Server should send an // Initially there are no services registered. Server should send an
// EndOfSnapshot message immediately with index of 1. // EndOfSnapshot message immediately with index of 1.
eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 1) client.QueueEvents(newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 1))
client.QueueEvents(&eosEv)
// This contains the view state so important we share it between calls. // This contains the view state so important we share it between calls.
opts := cache.FetchOptions{ opts := cache.FetchOptions{
@ -42,7 +43,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
}, },
} }
require.True(t, t.Run("empty snapshot returned", func(t *testing.T) { runStep(t, "empty snapshot returned", func(t *testing.T) {
// Fetch should return an empty // Fetch should return an empty
// result of the right type with a non-zero index, and in the background begin // result of the right type with a non-zero index, and in the background begin
// streaming updates. // streaming updates.
@ -54,9 +55,9 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
opts.MinIndex = result.Index opts.MinIndex = result.Index
opts.LastResult = &result opts.LastResult = &result
})) })
require.True(t, t.Run("blocks for timeout", func(t *testing.T) { runStep(t, "blocks for timeout", func(t *testing.T) {
// Subsequent fetch should block for the timeout // Subsequent fetch should block for the timeout
start := time.Now() start := time.Now()
opts.Timeout = 200 * time.Millisecond opts.Timeout = 200 * time.Millisecond
@ -71,9 +72,9 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
opts.MinIndex = result.Index opts.MinIndex = result.Index
opts.LastResult = &result opts.LastResult = &result
})) })
require.True(t, t.Run("blocks until update", func(t *testing.T) { runStep(t, "blocks until update", func(t *testing.T) {
// Make another blocking query with a longer timeout and trigger an update // Make another blocking query with a longer timeout and trigger an update
// event part way through. // event part way through.
start := time.Now() start := time.Now()
@ -81,7 +82,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
// Then a service registers // Then a service registers
healthEv := agentpb.TestEventServiceHealthRegister(t, 4, 1, "web") healthEv := newEventServiceHealthRegister(4, 1, "web")
client.QueueEvents(&healthEv) client.QueueEvents(&healthEv)
}() }()
@ -100,17 +101,16 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
opts.MinIndex = result.Index opts.MinIndex = result.Index
opts.LastResult = &result opts.LastResult = &result
})) })
require.True(t, t.Run("reconnects and resumes after transient stream error", func(t *testing.T) { runStep(t, "reconnects and resumes after transient stream error", func(t *testing.T) {
// Use resetErr just because it's "temporary" this is a stand in for any // Use resetErr just because it's "temporary" this is a stand in for any
// network error that uses that same interface though. // network error that uses that same interface though.
client.QueueErr(resetErr("broken pipe")) client.QueueErr(resetErr("broken pipe"))
// After the error the view should re-subscribe with same index so will get // After the error the view should re-subscribe with same index so will get
// a "resume stream". // a "resume stream".
resumeEv := agentpb.TestEventResumeStream(t, agentpb.Topic_ServiceHealth, opts.MinIndex) client.QueueEvents(newEndOfEmptySnapshotEvent(pbsubscribe.Topic_ServiceHealth, opts.MinIndex))
client.QueueEvents(&resumeEv)
// Next fetch will continue to block until timeout and receive the same // Next fetch will continue to block until timeout and receive the same
// result. // result.
@ -129,7 +129,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
opts.LastResult = &result opts.LastResult = &result
// But an update should still be noticed due to reconnection // But an update should still be noticed due to reconnection
healthEv := agentpb.TestEventServiceHealthRegister(t, 10, 2, "web") healthEv := newEventServiceHealthRegister(10, 2, "web")
client.QueueEvents(&healthEv) client.QueueEvents(&healthEv)
start = time.Now() start = time.Now()
@ -146,9 +146,9 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
opts.MinIndex = result.Index opts.MinIndex = result.Index
opts.LastResult = &result opts.LastResult = &result
})) })
require.True(t, t.Run("returns non-temporary error to watchers", func(t *testing.T) { runStep(t, "returns non-temporary error to watchers", func(t *testing.T) {
// Wait and send the error while fetcher is waiting // Wait and send the error while fetcher is waiting
go func() { go func() {
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
@ -156,8 +156,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
// After the error the view should re-subscribe with same index so will get // After the error the view should re-subscribe with same index so will get
// a "resume stream". // a "resume stream".
resumeEv := agentpb.TestEventResumeStream(t, agentpb.Topic_ServiceHealth, opts.MinIndex) client.QueueEvents(newEndOfEmptySnapshotEvent(pbsubscribe.Topic_ServiceHealth, opts.MinIndex))
client.QueueEvents(&resumeEv)
}() }()
// Next fetch should return the error // Next fetch should return the error
@ -183,7 +182,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
opts.LastResult = &result opts.LastResult = &result
// But an update should still be noticed due to reconnection // But an update should still be noticed due to reconnection
healthEv := agentpb.TestEventServiceHealthRegister(t, opts.MinIndex+5, 3, "web") healthEv := newEventServiceHealthRegister(opts.MinIndex+5, 3, "web")
client.QueueEvents(&healthEv) client.QueueEvents(&healthEv)
opts.Timeout = time.Second opts.Timeout = time.Second
@ -199,7 +198,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
opts.MinIndex = result.Index opts.MinIndex = result.Index
opts.LastResult = &result opts.LastResult = &result
})) })
} }
// requireResultsSame compares two IndexedCheckServiceNodes without requiring // requireResultsSame compares two IndexedCheckServiceNodes without requiring
@ -229,17 +228,15 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
} }
// Create an initial snapshot of 3 instances on different nodes // Create an initial snapshot of 3 instances on different nodes
makeReg := func(index uint64, nodeNum int) *agentpb.Event { makeReg := func(index uint64, nodeNum int) *pbsubscribe.Event {
e := agentpb.TestEventServiceHealthRegister(t, index, nodeNum, "web") e := newEventServiceHealthRegister(index, nodeNum, "web")
return &e return &e
} }
eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 5)
client.QueueEvents( client.QueueEvents(
makeReg(5, 1), makeReg(5, 1),
makeReg(5, 2), makeReg(5, 2),
makeReg(5, 3), makeReg(5, 3),
&eosEv, newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5))
)
// This contains the view state so important we share it between calls. // This contains the view state so important we share it between calls.
opts := cache.FetchOptions{ opts := cache.FetchOptions{
@ -260,7 +257,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
return nodes return nodes
} }
require.True(t, t.Run("full snapshot returned", func(t *testing.T) { runStep(t, "full snapshot returned", func(t *testing.T) {
result, err := typ.Fetch(opts, req) result, err := typ.Fetch(opts, req)
require.NoError(t, err) require.NoError(t, err)
@ -270,9 +267,9 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
opts.MinIndex = result.Index opts.MinIndex = result.Index
opts.LastResult = &result opts.LastResult = &result
})) })
require.True(t, t.Run("blocks until deregistration", func(t *testing.T) { runStep(t, "blocks until deregistration", func(t *testing.T) {
// Make another blocking query with a longer timeout and trigger an update // Make another blocking query with a longer timeout and trigger an update
// event part way through. // event part way through.
start := time.Now() start := time.Now()
@ -280,7 +277,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
// Deregister instance on node1 // Deregister instance on node1
healthEv := agentpb.TestEventServiceHealthDeregister(t, 20, 1, "web") healthEv := newEventServiceHealthDeregister(20, 1, "web")
client.QueueEvents(&healthEv) client.QueueEvents(&healthEv)
}() }()
@ -299,21 +296,19 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
opts.MinIndex = result.Index opts.MinIndex = result.Index
opts.LastResult = &result opts.LastResult = &result
})) })
require.True(t, t.Run("server reload is respected", func(t *testing.T) { runStep(t, "server reload is respected", func(t *testing.T) {
// Simulates the server noticing the request's ACL token privs changing. To // Simulates the server noticing the request's ACL token privs changing. To
// detect this we'll queue up the new snapshot as a different set of nodes // detect this we'll queue up the new snapshot as a different set of nodes
// to the first. // to the first.
resetEv := agentpb.TestEventResetStream(t, agentpb.Topic_ServiceHealth, 45) client.QueueErr(status.Error(codes.Aborted, "reset by server"))
eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 50)
client.QueueEvents( client.QueueEvents(
&resetEv,
makeReg(50, 3), // overlap existing node makeReg(50, 3), // overlap existing node
makeReg(50, 4), makeReg(50, 4),
makeReg(50, 5), makeReg(50, 5),
&eosEv, newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50))
)
// Make another blocking query with THE SAME index. It should immediately // Make another blocking query with THE SAME index. It should immediately
// return the new snapshot. // return the new snapshot.
@ -331,7 +326,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
opts.MinIndex = result.Index opts.MinIndex = result.Index
opts.LastResult = &result opts.LastResult = &result
})) })
} }
func TestStreamingHealthServices_EventBatches(t *testing.T) { func TestStreamingHealthServices_EventBatches(t *testing.T) {
@ -342,17 +337,13 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) {
} }
// Create an initial snapshot of 3 instances but in a single event batch // Create an initial snapshot of 3 instances but in a single event batch
batchEv := agentpb.TestEventBatchWithEvents(t, batchEv := newEventBatchWithEvents(
agentpb.TestEventServiceHealthRegister(t, 5, 1, "web"), newEventServiceHealthRegister(5, 1, "web"),
agentpb.TestEventServiceHealthRegister(t, 5, 2, "web"), newEventServiceHealthRegister(5, 2, "web"),
agentpb.TestEventServiceHealthRegister(t, 5, 3, "web"), newEventServiceHealthRegister(5, 3, "web"))
)
eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 5)
client.QueueEvents( client.QueueEvents(
&batchEv, &batchEv,
&eosEv, newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5))
)
// This contains the view state so important we share it between calls. // This contains the view state so important we share it between calls.
opts := cache.FetchOptions{ opts := cache.FetchOptions{
@ -373,7 +364,7 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) {
return nodes return nodes
} }
require.True(t, t.Run("full snapshot returned", func(t *testing.T) { runStep(t, "full snapshot returned", func(t *testing.T) {
result, err := typ.Fetch(opts, req) result, err := typ.Fetch(opts, req)
require.NoError(t, err) require.NoError(t, err)
@ -383,16 +374,16 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) {
opts.MinIndex = result.Index opts.MinIndex = result.Index
opts.LastResult = &result opts.LastResult = &result
})) })
require.True(t, t.Run("batched updates work too", func(t *testing.T) { runStep(t, "batched updates work too", func(t *testing.T) {
// Simulate multiple registrations happening in one Txn (so all have same // Simulate multiple registrations happening in one Txn (so all have same
// index) // index)
batchEv := agentpb.TestEventBatchWithEvents(t, batchEv := newEventBatchWithEvents(
// Deregister an existing node // Deregister an existing node
agentpb.TestEventServiceHealthDeregister(t, 20, 1, "web"), newEventServiceHealthDeregister(20, 1, "web"),
// Register another // Register another
agentpb.TestEventServiceHealthRegister(t, 20, 4, "web"), newEventServiceHealthRegister(20, 4, "web"),
) )
client.QueueEvents(&batchEv) client.QueueEvents(&batchEv)
opts.Timeout = time.Second opts.Timeout = time.Second
@ -405,7 +396,7 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) {
opts.MinIndex = result.Index opts.MinIndex = result.Index
opts.LastResult = &result opts.LastResult = &result
})) })
} }
func TestStreamingHealthServices_Filtering(t *testing.T) { func TestStreamingHealthServices_Filtering(t *testing.T) {
@ -416,16 +407,13 @@ func TestStreamingHealthServices_Filtering(t *testing.T) {
} }
// Create an initial snapshot of 3 instances but in a single event batch // Create an initial snapshot of 3 instances but in a single event batch
batchEv := agentpb.TestEventBatchWithEvents(t, batchEv := newEventBatchWithEvents(
agentpb.TestEventServiceHealthRegister(t, 5, 1, "web"), newEventServiceHealthRegister(5, 1, "web"),
agentpb.TestEventServiceHealthRegister(t, 5, 2, "web"), newEventServiceHealthRegister(5, 2, "web"),
agentpb.TestEventServiceHealthRegister(t, 5, 3, "web"), newEventServiceHealthRegister(5, 3, "web"))
)
eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 5)
client.QueueEvents( client.QueueEvents(
&batchEv, &batchEv,
&eosEv, newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5))
)
// This contains the view state so important we share it between calls. // This contains the view state so important we share it between calls.
opts := cache.FetchOptions{ opts := cache.FetchOptions{
@ -449,7 +437,7 @@ func TestStreamingHealthServices_Filtering(t *testing.T) {
return nodes return nodes
} }
require.True(t, t.Run("filtered snapshot returned", func(t *testing.T) { runStep(t, "filtered snapshot returned", func(t *testing.T) {
result, err := typ.Fetch(opts, req) result, err := typ.Fetch(opts, req)
require.NoError(t, err) require.NoError(t, err)
@ -459,16 +447,16 @@ func TestStreamingHealthServices_Filtering(t *testing.T) {
opts.MinIndex = result.Index opts.MinIndex = result.Index
opts.LastResult = &result opts.LastResult = &result
})) })
require.True(t, t.Run("filtered updates work too", func(t *testing.T) { runStep(t, "filtered updates work too", func(t *testing.T) {
// Simulate multiple registrations happening in one Txn (so all have same // Simulate multiple registrations happening in one Txn (so all have same
// index) // index)
batchEv := agentpb.TestEventBatchWithEvents(t, batchEv := newEventBatchWithEvents(
// Deregister an existing node // Deregister an existing node
agentpb.TestEventServiceHealthDeregister(t, 20, 1, "web"), newEventServiceHealthDeregister(20, 1, "web"),
// Register another // Register another
agentpb.TestEventServiceHealthRegister(t, 20, 4, "web"), newEventServiceHealthRegister(20, 4, "web"),
) )
client.QueueEvents(&batchEv) client.QueueEvents(&batchEv)
opts.Timeout = time.Second opts.Timeout = time.Second
@ -481,5 +469,11 @@ func TestStreamingHealthServices_Filtering(t *testing.T) {
opts.MinIndex = result.Index opts.MinIndex = result.Index
opts.LastResult = &result opts.LastResult = &result
})) })
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
if !t.Run(name, fn) {
t.FailNow()
}
} }

View File

@ -6,11 +6,14 @@ import (
"sync" "sync"
"time" "time"
"github.com/hashicorp/consul/agent/agentpb"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/proto/pbsubscribe"
) )
const ( const (
@ -22,7 +25,7 @@ const (
// StreamingClient is the interface we need from the gRPC client stub. Separate // StreamingClient is the interface we need from the gRPC client stub. Separate
// interface simplifies testing. // interface simplifies testing.
type StreamingClient interface { type StreamingClient interface {
Subscribe(ctx context.Context, in *agentpb.SubscribeRequest, opts ...grpc.CallOption) (agentpb.Consul_SubscribeClient, error) Subscribe(ctx context.Context, in *pbsubscribe.SubscribeRequest, opts ...grpc.CallOption) (pbsubscribe.StateChangeSubscription_SubscribeClient, error)
} }
// MaterializedViewState is the interface used to manage they type-specific // MaterializedViewState is the interface used to manage they type-specific
@ -42,7 +45,7 @@ type MaterializedViewState interface {
// include _all_ events in the initial snapshot which may be an empty set. // include _all_ events in the initial snapshot which may be an empty set.
// Subsequent calls will contain one or more update events in the order they // Subsequent calls will contain one or more update events in the order they
// are received. // are received.
Update(events []*agentpb.Event) error Update(events []*pbsubscribe.Event) error
// Result returns the type-specific cache result based on the state. When no // Result returns the type-specific cache result based on the state. When no
// events have been delivered yet the result should be an empty value type // events have been delivered yet the result should be an empty value type
@ -83,6 +86,14 @@ func (e resetErr) Error() string {
return string(e) return string(e)
} }
type Request struct {
pbsubscribe.SubscribeRequest
// Filter is a bexpr filter expression that is used to filter events on the
// client side.
// TODO: is this used?
Filter string
}
// MaterializedView is a partial view of the state on servers, maintained via // MaterializedView is a partial view of the state on servers, maintained via
// streaming subscriptions. It is specialized for different cache types by // streaming subscriptions. It is specialized for different cache types by
// providing a MaterializedViewState that encapsulates the logic to update the // providing a MaterializedViewState that encapsulates the logic to update the
@ -97,7 +108,7 @@ type MaterializedView struct {
typ StreamingCacheType typ StreamingCacheType
client StreamingClient client StreamingClient
logger hclog.Logger logger hclog.Logger
req agentpb.SubscribeRequest req Request
ctx context.Context ctx context.Context
cancelFunc func() cancelFunc func()
@ -120,9 +131,11 @@ type MaterializedView struct {
// the cache evicts the result. If the view is not returned in a result state // the cache evicts the result. If the view is not returned in a result state
// though Close must be called some other way to avoid leaking the goroutine and // though Close must be called some other way to avoid leaking the goroutine and
// memory. // memory.
func MaterializedViewFromFetch(t StreamingCacheType, opts cache.FetchOptions, func MaterializedViewFromFetch(
subReq agentpb.SubscribeRequest) *MaterializedView { t StreamingCacheType,
opts cache.FetchOptions,
subReq Request,
) *MaterializedView {
if opts.LastResult == nil || opts.LastResult.State == nil { if opts.LastResult == nil || opts.LastResult.State == nil {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
v := &MaterializedView{ v := &MaterializedView{
@ -225,26 +238,21 @@ func (v *MaterializedView) runSubscription() error {
v.l.Unlock() v.l.Unlock()
s, err := v.client.Subscribe(ctx, &req) s, err := v.client.Subscribe(ctx, &req.SubscribeRequest)
if err != nil { if err != nil {
return err return err
} }
snapshotEvents := make([]*agentpb.Event, 0) snapshotEvents := make([]*pbsubscribe.Event, 0)
for { for {
event, err := s.Recv() event, err := s.Recv()
if err != nil { switch {
return err case isGrpcStatus(err, codes.Aborted):
}
if event.GetResetStream() {
// Server has requested we reset the view and start with a fresh snapshot
// - perhaps because our ACL policy changed. We reset the view state and
// then return an error to allow the `run` method to retry after a backoff
// if necessary.
v.reset() v.reset()
return resetErr("stream reset requested") return resetErr("stream reset requested")
case err != nil:
return err
} }
if event.GetEndOfSnapshot() { if event.GetEndOfSnapshot() {
@ -276,7 +284,7 @@ func (v *MaterializedView) runSubscription() error {
continue continue
} }
if event.GetResumeStream() { if event.GetEndOfEmptySnapshot() {
// We've opened a new subscribe with a non-zero index to resume a // We've opened a new subscribe with a non-zero index to resume a
// connection and the server confirms it's not sending a new snapshot. // connection and the server confirms it's not sending a new snapshot.
if !snapshotDone { if !snapshotDone {
@ -291,7 +299,7 @@ func (v *MaterializedView) runSubscription() error {
} }
// We have an event for the topic // We have an event for the topic
events := []*agentpb.Event{event} events := []*pbsubscribe.Event{event}
// If the event is a batch, unwrap and deliver the raw events // If the event is a batch, unwrap and deliver the raw events
if batch := event.GetEventBatch(); batch != nil { if batch := event.GetEventBatch(); batch != nil {
@ -322,6 +330,11 @@ func (v *MaterializedView) runSubscription() error {
} }
} }
func isGrpcStatus(err error, code codes.Code) bool {
s, ok := status.FromError(err)
return ok && s.Code() == code
}
// reset clears the state ready to start a new stream from scratch. // reset clears the state ready to start a new stream from scratch.
func (v *MaterializedView) reset() { func (v *MaterializedView) reset() {
v.l.Lock() v.l.Lock()

View File

@ -3,9 +3,10 @@ package cachetype
import ( import (
"context" "context"
"github.com/hashicorp/consul/agent/agentpb"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"github.com/hashicorp/consul/proto/pbsubscribe"
) )
// TestStreamingClient is a mock StreamingClient for testing that allows // TestStreamingClient is a mock StreamingClient for testing that allows
@ -17,7 +18,7 @@ type TestStreamingClient struct {
type eventOrErr struct { type eventOrErr struct {
Err error Err error
Event *agentpb.Event Event *pbsubscribe.Event
} }
func NewTestStreamingClient() *TestStreamingClient { func NewTestStreamingClient() *TestStreamingClient {
@ -26,13 +27,16 @@ func NewTestStreamingClient() *TestStreamingClient {
} }
} }
func (t *TestStreamingClient) Subscribe(ctx context.Context, in *agentpb.SubscribeRequest, opts ...grpc.CallOption) (agentpb.Consul_SubscribeClient, error) { func (t *TestStreamingClient) Subscribe(
ctx context.Context,
_ *pbsubscribe.SubscribeRequest,
_ ...grpc.CallOption,
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
t.ctx = ctx t.ctx = ctx
return t, nil return t, nil
} }
func (t *TestStreamingClient) QueueEvents(events ...*agentpb.Event) { func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) {
for _, e := range events { for _, e := range events {
t.events <- eventOrErr{Event: e} t.events <- eventOrErr{Event: e}
} }
@ -42,7 +46,7 @@ func (t *TestStreamingClient) QueueErr(err error) {
t.events <- eventOrErr{Err: err} t.events <- eventOrErr{Err: err}
} }
func (t *TestStreamingClient) Recv() (*agentpb.Event, error) { func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) {
select { select {
case eoe := <-t.events: case eoe := <-t.events:
if eoe.Err != nil { if eoe.Err != nil {

View File

@ -856,10 +856,11 @@ func TestCacheGet_expireClose(t *testing.T) {
require := require.New(t) require := require.New(t)
typ := TestType(t) typ := &MockType{}
defer typ.AssertExpectations(t) defer typ.AssertExpectations(t)
c := New(Options{}) c := New(Options{})
typ.On("RegisterOptions").Return(RegisterOptions{ typ.On("RegisterOptions").Return(RegisterOptions{
SupportsBlocking: true,
LastGetTTL: 100 * time.Millisecond, LastGetTTL: 100 * time.Millisecond,
}) })