mirror: query and process connection updates (#878)

Summary:
This commit adds internal functions to (a) emit a GraphQL query to fetch
data for a particular connection, and (b) ingest the results of said
query back into the database.

This commit makes progress toward #622.

Test Plan:
Unit tests included, with full coverage. While these tests check that
the GraphQL queries are as expected, they cannot check that they are
actually valid in production. To check this, follow the instructions in
the added snapshot test.

wchargin-branch: mirror-connection-updates
This commit is contained in:
William Chargin 2018-09-20 15:46:03 -07:00 committed by GitHub
parent 5348fe68bf
commit 6ae5c56624
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 680 additions and 0 deletions

View File

@ -0,0 +1,45 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`graphql/mirror Mirror _queryConnection snapshot test for actual GitHub queries 1`] = `
"query TestQuery {
initialQuery: node(id: \\"MDEwOlJlcG9zaXRvcnkxMjMyNTUwMDY=\\") {
... on Repository {
issues(first: 2) {
totalCount
pageInfo {
endCursor
hasNextPage
}
nodes {
__typename
id
}
}
}
}
updateQuery: node(id: \\"MDEwOlJlcG9zaXRvcnkxMjMyNTUwMDY=\\") {
... on Repository {
issues(first: 2 after: \\"Y3Vyc29yOnYyOpHOEe_nRA==\\") {
totalCount
pageInfo {
endCursor
hasNextPage
}
nodes {
__typename
id
}
}
}
}
expectedIds: node(id: \\"MDEwOlJlcG9zaXRvcnkxMjMyNTUwMDY=\\") {
... on Repository {
issues(first: 4) {
nodes {
id
}
}
}
}
}"
`;

View File

@ -5,10 +5,18 @@ import stringify from "json-stable-stringify";
import dedent from "../util/dedent";
import * as Schema from "./schema";
import * as Queries from "./queries";
/**
* A local mirror of a subset of a GraphQL database.
*/
/*
* NOTE(perf): The implementation of this class is not particularly
* optimized. In particular, when we interact with SQLite, we compile
* our prepared statements many times over the lifespan of an
* instance. It may be beneficial to precompile them at instance
* construction time.
*/
export class Mirror {
+_db: Database;
+_schema: Schema.Schema;
@ -405,6 +413,228 @@ export class Mirror {
return {objects, connections};
});
}
/**
* Create a GraphQL selection set required to identify the typename
* and ID for an object. This is the minimal information required to
* register an object in our database, so we query this information
* whenever we find a reference to an object that we want to traverse
* later.
*
* The resulting GraphQL should be embedded in any node context. For
* instance, it might replace the `?` in any of the following queries:
*
* repository(owner: "foo", name: "bar") { ? }
*
* repository(owner: "foo", name: "bar") {
* issues(first: 1) {
* nodes { ? }
* }
* }
*
* nodes(ids: ["baz", "quux"]) { ? }
*
* The result of this query has type `NodeFieldResult`.
*/
_queryShallow(): Queries.Selection[] {
const b = Queries.build;
return [b.field("__typename"), b.field("id")];
}
/**
* Get the current value of the end cursor on a connection, or
* `undefined` if the object has never been fetched. If no object by
* the given ID is known, or the object does not have a connection of
* the given name, then an error is thrown.
*
* Note that `null` is a valid end cursor and is distinct from
* `undefined`.
*/
_getEndCursor(
objectId: Schema.ObjectId,
fieldname: Schema.Fieldname
): EndCursor | void {
const result: {|
+initialized: 0 | 1,
+endCursor: string | null,
|} | void = this._db
.prepare(
dedent`\
SELECT
last_update IS NOT NULL AS initialized,
end_cursor AS endCursor
FROM connections
WHERE object_id = :objectId AND fieldname = :fieldname
`
)
// No need to worry about corruption in the form of multiple
// matches: there is a UNIQUE(object_id, fieldname) constraint.
.get({objectId, fieldname});
if (result === undefined) {
const s = JSON.stringify;
throw new Error(`No such connection: ${s(objectId)}.${s(fieldname)}`);
}
return result.initialized ? result.endCursor : undefined;
}
/**
* Create a GraphQL selection set to fetch elements from a collection.
* If the connection has been queried before and you wish to fetch new
* elements, use an appropriate end cursor. Use `undefined` otherwise.
* Note that `null` is a valid end cursor and is distinct from
* `undefined`. Note that these semantics are compatible with the
* return value of `_getEndCursor`.
*
* If an end cursor for a particular node's connection was specified,
* then the resulting GraphQL should be embedded in the context of
* that node. For instance, if repository "foo/bar" has ID "baz" and
* an end cursor of "c000" on its "issues" connection, then the
* GraphQL emitted by `_queryConnection("issues", "c000")` might
* replace the `?` in the following query:
*
* node(id: "baz") { ? }
*
* If no end cursor was specified, then the resulting GraphQL may be
* embedded in the context of _any_ node with a connection of the
* appropriate fieldname. For instance, `_queryConnection("issues")`
* emits GraphQL that may replace the `?` in either of the following
* queries:
*
* node(id: "baz") { ? } # where "baz" is a repository ID
* repository(owner: "foo", name: "bar") { ? }
*
* Note, however, that this query will fetch nodes from the _start_ of
* the connection. It would be wrong to append these results onto an
* connection for which we have already fetched data.
*
* The result of this query has type `ConnectionFieldResult`.
*
* This function is pure: it does not interact with the database.
*
* See: `_getEndCursor`.
* See: `_updateConnection`.
*/
_queryConnection(
fieldname: Schema.Fieldname,
endCursor: EndCursor | void,
connectionPageSize: number
): Queries.Selection[] {
const b = Queries.build;
const connectionArguments: Queries.Arguments = {
first: b.literal(connectionPageSize),
};
if (endCursor !== undefined) {
connectionArguments.after = b.literal(endCursor);
}
return [
b.field(fieldname, connectionArguments, [
b.field("totalCount"),
b.field("pageInfo", {}, [b.field("endCursor"), b.field("hasNextPage")]),
b.field("nodes", {}, this._queryShallow()),
]),
];
}
/**
* Ingest new entries in a connection on an existing object.
*
* The connection's last update will be set to the given value, which
* must be an existing update lest an error be thrown.
*
* If the object does not exist or does not have a connection by the
* given name, an error will be thrown.
*
* See: `_queryConnection`.
* See: `_createUpdate`.
*/
_updateConnection(
updateId: UpdateId,
objectId: Schema.ObjectId,
fieldname: Schema.Fieldname,
queryResult: ConnectionFieldResult
): void {
_inTransaction(this._db, () => {
this._nontransactionallyUpdateConnection(
updateId,
objectId,
fieldname,
queryResult
);
});
}
/**
* As `_updateConnection`, but do not enter any transactions. Other
* methods may call this method as a subroutine in a larger
* transaction.
*/
_nontransactionallyUpdateConnection(
updateId: UpdateId,
objectId: Schema.ObjectId,
fieldname: Schema.Fieldname,
queryResult: ConnectionFieldResult
): void {
const db = this._db;
const connectionId: number = this._db
.prepare(
dedent`\
SELECT rowid FROM connections
WHERE object_id = :objectId AND fieldname = :fieldname
`
)
.pluck()
.get({objectId, fieldname});
// There is a UNIQUE(object_id, fieldname) constraint, so we don't
// have to worry about pollution due to duplicates. But it's
// possible that no such connection exists, indicating that the
// object has not been registered. This is an error.
if (connectionId === undefined) {
const s = JSON.stringify;
throw new Error(`No such connection: ${s(objectId)}.${s(fieldname)}`);
}
db.prepare(
dedent`\
UPDATE connections
SET
last_update = :updateId,
total_count = :totalCount,
has_next_page = :hasNextPage,
end_cursor = :endCursor
WHERE rowid = :connectionId
`
).run({
updateId,
totalCount: queryResult.totalCount,
hasNextPage: +queryResult.pageInfo.hasNextPage,
endCursor: queryResult.pageInfo.endCursor,
connectionId,
});
let nextIndex: number = db
.prepare(
dedent`\
SELECT IFNULL(MAX(idx), 0) + 1 FROM connection_entries
WHERE connection_id = :connectionId
`
)
.pluck()
.get({connectionId});
const addEntry = db.prepare(
dedent`\
INSERT INTO connection_entries (connection_id, idx, child_id)
VALUES (:connectionId, :idx, :childId)
`
);
for (const node of queryResult.nodes) {
let childId = null;
if (node != null) {
const childObject = {typename: node.__typename, id: node.id};
this._nontransactionallyRegisterObject(childObject);
childId = childObject.id;
}
const idx = nextIndex++;
addEntry.run({connectionId, idx, childId});
}
}
}
/**
@ -524,6 +754,16 @@ type QueryPlan = {|
*/
type EndCursor = string | null;
type NodeFieldResult = {|
+__typename: Schema.Typename,
+id: Schema.ObjectId,
|} | null;
type ConnectionFieldResult = {|
+totalCount: number,
+pageInfo: {|+hasNextPage: boolean, +endCursor: string | null|},
+nodes: $ReadOnlyArray<NodeFieldResult>,
|};
/**
* Execute a function inside a database transaction.
*

View File

@ -6,6 +6,7 @@ import tmp from "tmp";
import dedent from "../util/dedent";
import * as Schema from "./schema";
import * as Queries from "./queries";
import {_buildSchemaInfo, _inTransaction, Mirror} from "./mirror";
describe("graphql/mirror", () => {
@ -480,6 +481,400 @@ describe("graphql/mirror", () => {
expect(actual).toEqual(expected);
});
});
describe("_getEndCursor", () => {
it("fails when the object does not exist", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
expect(() => {
mirror._getEndCursor("foo/bar#1", "comments");
}).toThrow('No such connection: "foo/bar#1"."comments"');
});
it("fails when the object has no such connection", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
mirror.registerObject({typename: "Repository", id: "foo/bar#1"});
expect(() => {
mirror._getEndCursor("foo/bar#1", "comments");
}).toThrow('No such connection: "foo/bar#1"."comments"');
});
it("returns `undefined` for a never-fetched connection", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
mirror.registerObject({typename: "Issue", id: "foo/bar#1"});
expect(mirror._getEndCursor("foo/bar#1", "comments")).toBe(undefined);
});
it("returns a `null` cursor", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
mirror.registerObject({typename: "Issue", id: "foo/bar#1"});
const updateId = mirror._createUpdate(new Date(123));
db.prepare(
dedent`\
UPDATE connections
SET
last_update = :updateId,
total_count = 0,
has_next_page = 0,
end_cursor = NULL
WHERE object_id = :objectId AND fieldname = :fieldname
`
).run({updateId, objectId: "foo/bar#1", fieldname: "comments"});
expect(mirror._getEndCursor("foo/bar#1", "comments")).toBe(null);
});
it("returns a non-`null` cursor", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
mirror.registerObject({typename: "Issue", id: "foo/bar#1"});
const updateId = mirror._createUpdate(new Date(123));
db.prepare(
dedent`\
UPDATE connections
SET
last_update = :updateId,
total_count = 1,
end_cursor = :endCursor,
has_next_page = 0
WHERE object_id = :objectId AND fieldname = :fieldname
`
).run({
updateId,
endCursor: "c29tZS1jdXJzb3I=",
objectId: "foo/bar#1",
fieldname: "comments",
});
expect(mirror._getEndCursor("foo/bar#1", "comments")).toBe(
"c29tZS1jdXJzb3I="
);
});
});
describe("_queryConnection", () => {
it("creates a query when no cursor is specified", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
const pageLimit = 23;
const endCursor = undefined;
const actual = mirror._queryConnection("comments", endCursor, 23);
const b = Queries.build;
expect(actual).toEqual([
b.field("comments", {first: b.literal(pageLimit)}, [
b.field("totalCount"),
b.field("pageInfo", {}, [
b.field("endCursor"),
b.field("hasNextPage"),
]),
b.field("nodes", {}, [b.field("__typename"), b.field("id")]),
]),
]);
});
it("creates a query with a `null` end cursor", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
const pageLimit = 23;
const endCursor = null;
const actual = mirror._queryConnection("comments", endCursor, 23);
const b = Queries.build;
expect(actual).toEqual([
b.field(
"comments",
{first: b.literal(pageLimit), after: b.literal(null)},
[
b.field("totalCount"),
b.field("pageInfo", {}, [
b.field("endCursor"),
b.field("hasNextPage"),
]),
b.field("nodes", {}, [b.field("__typename"), b.field("id")]),
]
),
]);
});
it("creates a query with a non-`null` end cursor", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
const pageLimit = 23;
const endCursor = "c29tZS1jdXJzb3I=";
const actual = mirror._queryConnection("comments", endCursor, 23);
const b = Queries.build;
expect(actual).toEqual([
b.field(
"comments",
{first: b.literal(pageLimit), after: b.literal(endCursor)},
[
b.field("totalCount"),
b.field("pageInfo", {}, [
b.field("endCursor"),
b.field("hasNextPage"),
]),
b.field("nodes", {}, [b.field("__typename"), b.field("id")]),
]
),
]);
});
it("snapshot test for actual GitHub queries", () => {
// This test emits as a snapshot a valid query against GitHub's
// GraphQL API. You can copy-and-paste the snapshot into
// <https://developer.github.com/v4/explorer/> to run it. The
// resulting IDs in `initialQuery` and `updateQuery` should
// concatenate to match those in `expectedIds`. In particular,
// the following JQ program should output `true` when passed the
// query result from GitHub:
//
// jq '.data |
// ([.initialQuery, .updateQuery] | map(.issues.nodes[].id))
// == [.expectedIds.issues.nodes[].id]
// '
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
const exampleGithubRepoId = "MDEwOlJlcG9zaXRvcnkxMjMyNTUwMDY=";
const pageLimit = 2;
const b = Queries.build;
const initialQuery = mirror._queryConnection(
"issues",
undefined,
pageLimit
);
const expectedEndCursor = "Y3Vyc29yOnYyOpHOEe_nRA==";
const updateQuery = mirror._queryConnection(
"issues",
expectedEndCursor,
pageLimit
);
const query = b.query(
"TestQuery",
[],
[
b.alias(
"initialQuery",
b.field("node", {id: b.literal(exampleGithubRepoId)}, [
b.inlineFragment("Repository", initialQuery),
])
),
b.alias(
"updateQuery",
b.field("node", {id: b.literal(exampleGithubRepoId)}, [
b.inlineFragment("Repository", updateQuery),
])
),
b.alias(
"expectedIds",
b.field("node", {id: b.literal(exampleGithubRepoId)}, [
b.inlineFragment("Repository", [
b.field("issues", {first: b.literal(pageLimit * 2)}, [
b.field("nodes", {}, [b.field("id")]),
]),
]),
])
),
]
);
const format = (body: Queries.Body): string =>
Queries.stringify.body(body, Queries.multilineLayout(" "));
expect(format([query])).toMatchSnapshot();
});
});
describe("_updateConnection", () => {
const createResponse = (options: {
totalCount: number,
endCursor: string | null,
hasNextPage: boolean,
comments: $ReadOnlyArray<number | null>,
}) => ({
totalCount: options.totalCount,
pageInfo: {
hasNextPage: options.hasNextPage,
endCursor: options.endCursor,
},
nodes: options.comments.map(
(n) =>
n === null ? null : {__typename: "IssueComment", id: `comment:${n}`}
),
});
const createEmptyResponse = () =>
createResponse({
totalCount: 0,
endCursor: null,
hasNextPage: false,
comments: [],
});
it("fails when the object does not exist", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
const updateId = mirror._createUpdate(new Date(123));
expect(() => {
mirror._updateConnection(
updateId,
"foo/bar#1",
"comments",
createEmptyResponse()
);
}).toThrow('No such connection: "foo/bar#1"."comments"');
});
it("fails when the object has no such connection", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
const updateId = mirror._createUpdate(new Date(123));
mirror.registerObject({typename: "Repository", id: "foo/bar#1"});
expect(() => {
mirror._updateConnection(
updateId,
"foo/bar#1",
"comments",
createEmptyResponse()
);
}).toThrow('No such connection: "foo/bar#1"."comments"');
});
it("fails when there is no update with the given ID", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
const updateId = 777;
mirror.registerObject({typename: "Issue", id: "foo/bar#1"});
expect(() => {
mirror._updateConnection(
updateId,
"foo/bar#1",
"comments",
createEmptyResponse()
);
}).toThrow("FOREIGN KEY constraint failed");
});
it("properly updates under various circumstances", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
mirror.registerObject({typename: "Issue", id: "foo/bar#1"});
const connectionId: number = db
.prepare(
dedent`\
SELECT rowid FROM connections
WHERE object_id = :objectId AND fieldname = :fieldname
`
)
.pluck()
.get({objectId: "foo/bar#1", fieldname: "comments"});
const getEntries = (): $ReadOnlyArray<{|
+idx: number,
+child_id: Schema.ObjectId,
|}> =>
db
.prepare(
dedent`\
SELECT idx, child_id FROM connection_entries
WHERE connection_id = ?
ORDER BY idx ASC
`
)
.all(connectionId);
const getConnectionInfo = (): {|
+last_update: number | null,
+total_count: number | null,
+end_cursor: string | null,
+has_next_page: 0 | 1 | null,
|} =>
db
.prepare(
dedent`\
SELECT last_update, total_count, end_cursor, has_next_page
FROM connections
WHERE rowid = ?
`
)
.get(connectionId);
expect(getConnectionInfo()).toEqual({
last_update: null,
total_count: null,
end_cursor: null,
has_next_page: null,
});
expect(getEntries()).toEqual([]);
const firstUpdate = mirror._createUpdate(new Date(123));
mirror._updateConnection(
firstUpdate,
"foo/bar#1",
"comments",
createResponse({
totalCount: 4,
endCursor: "uno",
hasNextPage: true,
comments: [101, 102],
})
);
expect(getEntries()).toEqual([
{idx: 1, child_id: "comment:101"},
{idx: 2, child_id: "comment:102"},
]);
expect(getConnectionInfo()).toEqual({
last_update: firstUpdate,
total_count: 4,
end_cursor: "uno",
has_next_page: +true,
});
const secondUpdate = mirror._createUpdate(new Date(234));
mirror._updateConnection(
secondUpdate,
"foo/bar#1",
"comments",
createResponse({
totalCount: 5,
endCursor: "dos",
hasNextPage: false,
comments: [55, null, 54],
})
);
expect(getEntries()).toEqual([
{idx: 1, child_id: "comment:101"},
{idx: 2, child_id: "comment:102"},
{idx: 3, child_id: "comment:55"},
{idx: 4, child_id: null},
{idx: 5, child_id: "comment:54"},
]);
expect(getConnectionInfo()).toEqual({
last_update: secondUpdate,
total_count: 5,
end_cursor: "dos",
has_next_page: +false,
});
const thirdUpdate = mirror._createUpdate(new Date(345));
db.prepare(
dedent`\
DELETE FROM connection_entries
WHERE connection_id = :connectionId AND idx = :idx
`
).run({connectionId, idx: 3});
mirror._updateConnection(
thirdUpdate,
"foo/bar#1",
"comments",
createResponse({
totalCount: 6,
endCursor: "tres",
hasNextPage: false,
comments: [888, 889],
})
);
expect(getEntries()).toEqual([
{idx: 1, child_id: "comment:101"},
{idx: 2, child_id: "comment:102"},
{idx: 4, child_id: null},
{idx: 5, child_id: "comment:54"},
{idx: 6, child_id: "comment:888"},
{idx: 7, child_id: "comment:889"},
]);
expect(getConnectionInfo()).toEqual({
last_update: thirdUpdate,
total_count: 6,
end_cursor: "tres",
has_next_page: +false,
});
});
});
});
describe("_buildSchemaInfo", () => {