diff --git a/contributing/README.md b/contributing/README.md index 440b5826be..b394d9755f 100644 --- a/contributing/README.md +++ b/contributing/README.md @@ -1,21 +1,12 @@ -# Contributing to Consul - -See [our contributing guide](../.github/CONTRIBUTING.md) to get started. - -This directory contains other helpful resources like checklists for making -certain common types of changes. - -## Checklists - -See the `checklist-*` files in this dir to see of there is a checklist that -applies to a change you wish to make. The most common one is a change to -Consul's configuration file which has a lot more steps and subtlety than might -first appear so please use the checklist! - -We recommend copying the raw Markdown lists into a local file or gist while you -work and checking tasks as you complete them (by filling in `[ ]` with `[x]`). -You can then paste the list in the PR description when you post it indicating -the steps you've already done. If you want to post an initial draft PR before -the list is complete, please still include the list as is so reviewers can see -progress. You can then check the list off on the PR description directly in -GitHub's UI after pushing the relevant fixes up. +# Contributing to Consul + +See [our contributing guide](../.github/CONTRIBUTING.md) to get started. + +This directory contains documentation intended for anyone interested in +understanding, and contributing changes to, the Consul codebase. + +## Contents + +1. [Overview](./INTERNALS.md) +2. [Configuration](./checklist-adding-config-fields.md) +3. [Streaming](./streaming) diff --git a/contributing/streaming/README.md b/contributing/streaming/README.md new file mode 100644 index 0000000000..400b0b680b --- /dev/null +++ b/contributing/streaming/README.md @@ -0,0 +1,99 @@ + +# Event Streaming + +Event streaming is a new asynchronous RPC mechanism that is being added to Consul. Instead +of synchronous blocking RPC calls (long polling) to fetch data when it changes, streaming +sends events as they occur, and the client maintains a materialized view of the events. + +At the time of writing only the service health endpoint uses streaming, but more endpoints +will be added in the future. + +## Overview + +The diagram below shows the components that are used in streaming, and how they fit into +the rest of Consul. + +![Streaming Overview](./overview.svg) + +[source](./overview.mmd) + +Read requests are received either from the HTTP API or from a DNS request. They use +[rpcclient/health.Health] +to query the cache. The [StreamingHealthServices cache-type] uses a [materialized view] +to manage subscriptions and store the aggregated events. On the server, the +[SubscribeEndpoint] subscribes and receives events from [EventPublisher]. + +Writes will likely enter the system through the client as well, but to make the diagram +less complicated the write flow starts when it is received by the RPC endpoint. The +endpoint calls raft.Apply, which if successful will save the new data in the state.Store. +When the [state.Store commits] it produces an event which is managed by the [EventPublisher] +and sent to any active subscriptions. + +[rpcclient/health.Health]: https://github.com/hashicorp/consul/blob/master/agent/rpcclient/health/health.go +[StreamingHealthServices cache-type]: https://github.com/hashicorp/consul/blob/master/agent/cache-types/streaming_health_services.go +[materialized view]: https://github.com/hashicorp/consul/blob/master/agent/submatview/materializer.go +[SubscribeEndpoint]: https://github.com/hashicorp/consul/blob/master/agent/rpc/subscribe/subscribe.go +[EventPublisher]: https://github.com/hashicorp/consul/blob/master/agent/consul/stream/event_publisher.go +[state.Store commits]: https://github.com/hashicorp/consul/blob/master/agent/consul/state/memdb.go + + +## Event Publisher + +The [EventPublisher] is at the core of streaming. It receives published events, and +subscription requests, and forwards events to the appropriate subscriptions. The diagram +below illustrates how events are stored by the [EventPublisher]. + +![Event Publisher layout](./event-publisher-layout.svg) + +[source](./event-publisher-layout.mmd) + +When a new subscription is created it will create a snapshot of the events required to +reflect the current state. This snapshot is cached by the [EventPublisher] so that other +subscriptions can re-use the snapshot without having to recreate it. + +The snapshot always points at the first item in the linked list of events. A subscription +will initially point at the first item, but the pointer advances each time +`Subscribe.Next` is called. The topic buffers in the EventPublisher always point at the +latest item in the linked list, so that new events can be appended to the buffer. + +When a snapshot cache TTL expires, the snapshot is removed. If there are no other +subscriptions holding a reference to those items, the items will be garbage collected by +the Go runtime. This setup allows EventPublisher to keep some events around for a short +period of time, without any hard coded limit on the number of events to cache. + + +## Subscription events + +A subscription provides a stream of events on a single topic. Most of the events contain +data for a change in state, but there are a few special "framing" events that are used to +communicate something to the client. The diagram below helps illustrate the logic in +`EventPublisher.Subscribe` and the [materialized view]. + + +![Framing events](./framing-events.svg) + +[source](./framing-events.mmd) + + +Events in the `Snapshot` contain the same data as those in the `EventStream`, the only +difference is that events in the `Snapshot` indicate the current state not a change in +state. + +`NewSnapshotToFollow` is a framing event that indicates to the client that their existing +view is out of date. They must reset their view and prepare to receive a new snapshot. + +`EndOfSnapshot` indicates to the client that the snapshot is complete. Any future events +will be changes in state. + + +## Event filtering + +As events pass through the system from the `state.Store` to the client they are grouped +and filtered along the way. The diagram below helps illustrate where each of the grouping +and filtering happens. + + +![event filtering](./event-filtering.svg) + +[source](./event-filtering.mmd) + diff --git a/contributing/streaming/event-filtering.mmd b/contributing/streaming/event-filtering.mmd new file mode 100644 index 0000000000..620698c733 --- /dev/null +++ b/contributing/streaming/event-filtering.mmd @@ -0,0 +1,11 @@ +graph TD + + state.Store -->|events in different topics| EventPublisher.Publish + EventPublisher.Publish -->|group by topic| EventPublisher.topicBuffer + + EventPublisher.topicBuffer --> Subscription + Subscription -->|filter by key and namespace| SubscribeEndpoint + SubscribeEndpoint -->|"filter by auth (acl token)"| ProtobufEvents[/ grpc /] + ProtobufEvents -->|filter with bexpr| MaterializedView + MaterializedView --> HTTPEndpoint + diff --git a/contributing/streaming/event-filtering.svg b/contributing/streaming/event-filtering.svg new file mode 100644 index 0000000000..ee6e5f1981 --- /dev/null +++ b/contributing/streaming/event-filtering.svg @@ -0,0 +1 @@ +
events in different topics
group by topic
filter by key and namespace
filter by auth (acl token)
filter with bexpr
state.Store
EventPublisher.Publish
EventPublisher.topicBuffer
Subscription
SubscribeEndpoint
grpc
MaterializedView
HTTPEndpoint
\ No newline at end of file diff --git a/contributing/streaming/event-publisher-layout.mmd b/contributing/streaming/event-publisher-layout.mmd new file mode 100644 index 0000000000..b2986aae74 --- /dev/null +++ b/contributing/streaming/event-publisher-layout.mmd @@ -0,0 +1,36 @@ +graph TB + + subgraph ep[ ] + EventPublisher + subscriptions + snapshots + topicBuffers + end + + EventPublisher --> snapshots & subscriptions & topicBuffers + + Subscription + Snapshot + Item0 + Item1 + Item2 + Item3 + Item4 + + topicBuffers ----->|head| Item4 + subscriptions --> Subscription + + snapshots --> Snapshot + + Subscription -->|next| Item0 + Item0 --> Item1 + Item1 --> Item2 + Item2 --> Item3 + Item3 --> Item4 + Snapshot -->|first| Item0 + + Subscription -..->|next| Item1 + Subscription -..->|next| Item2 + Subscription -..->|next| Item3 + Subscription -..->|next| Item4 + diff --git a/contributing/streaming/event-publisher-layout.svg b/contributing/streaming/event-publisher-layout.svg new file mode 100644 index 0000000000..2f16c26d00 --- /dev/null +++ b/contributing/streaming/event-publisher-layout.svg @@ -0,0 +1 @@ +
head
next
first
next
next
next
next
EventPublisher
subscriptions
snapshots
topicBuffers
Subscription
Snapshot
Item0
Item1
Item2
Item3
Item4
\ No newline at end of file diff --git a/contributing/streaming/framing-events.mmd b/contributing/streaming/framing-events.mmd new file mode 100644 index 0000000000..fd091cb045 --- /dev/null +++ b/contributing/streaming/framing-events.mmd @@ -0,0 +1,17 @@ +graph TD + + SubscribeIndex0[Subscribe, index = 0, no snapshot] + SubscribeIndexNot0[Subscribe, index > 0, with snapshot] + + SubscribeIndex0 --->|if events in topic| Snapshot + Snapshot --> EndOfSnapshot + SubscribeIndex0 ------->|no events in topic| EndOfSnapshot + EndOfSnapshot --> EventStream + + SubscribeIndexNot0 -->|if index != TopicBuffer.Head| NewSnapshotToFollow + NewSnapshotToFollow ---> Snapshot + + SubscribeIndexNot0 -->|if index == TopicBuffer.Head| EventStream + + class EndOfSnapshot,NewSnapshotToFollow framing + classDef framing fill:#FFD700,stroke:#333 diff --git a/contributing/streaming/framing-events.svg b/contributing/streaming/framing-events.svg new file mode 100644 index 0000000000..885b871946 --- /dev/null +++ b/contributing/streaming/framing-events.svg @@ -0,0 +1 @@ +
if events in topic
no events in topic
if index != TopicBuffer.Head
if index == TopicBuffer.Head
Subscribe, index = 0, no snapshot
Subscribe, index > 0, with snapshot
Snapshot
EndOfSnapshot
EventStream
NewSnapshotToFollow
\ No newline at end of file diff --git a/contributing/streaming/overview.mmd b/contributing/streaming/overview.mmd new file mode 100644 index 0000000000..a5d61548ae --- /dev/null +++ b/contributing/streaming/overview.mmd @@ -0,0 +1,36 @@ +graph TD + + subgraph ClientAgent[Client Agent] + HTTPEndpoint + DNSEndpoint + rpcClient.Health + AgentCache + MaterializedView + end + + subgraph ServerAgent[Server Agent] + RPCEndpoint + raft.Apply + FSM.applyRegistration + state.Store.Register + SubscribeEndpoint + EventPublisher + end + + Read --> HTTPEndpoint & DNSEndpoint + + HTTPEndpoint & DNSEndpoint --> rpcClient.Health + rpcClient.Health --> AgentCache + AgentCache --> MaterializedView + MaterializedView --> SubscribeEndpoint + SubscribeEndpoint -->|Subscribe to topic| EventPublisher + + Write --> RPCEndpoint + RPCEndpoint --> raft.Apply + raft.Apply --> FSM.applyRegistration + FSM.applyRegistration --> state.Store.Register + state.Store.Register -->|Publish event| EventPublisher + + class Read,Write start + classDef start fill:transparent,stroke:transparent + diff --git a/contributing/streaming/overview.svg b/contributing/streaming/overview.svg new file mode 100644 index 0000000000..f1a0b9874b --- /dev/null +++ b/contributing/streaming/overview.svg @@ -0,0 +1 @@ +
Server Agent
Client Agent
Subscribe to topic
Publish event
RPCEndpoint
raft.Apply
FSM.applyRegistration
state.Store.Register
SubscribeEndpoint
EventPublisher
HTTPEndpoint
DNSEndpoint
rpcClient.Health
AgentCache
MaterializedView
Read
Write
\ No newline at end of file