mirror: add helpers for full queries and updates (#898)

Summary:
An update step is now roughly as simple as:

    _updateData(postQuery(_queryFromPlan(_findOutdated())))

(with some option/config parameters thrown in).

This makes progress toward #622.

Test Plan:
Unit tests included. They are light, because these functions are light.
They still retain full coverage.

wchargin-branch: mirror-full-pipeline
This commit is contained in:
William Chargin 2018-09-27 19:12:47 -07:00 committed by GitHub
parent 9a4c91887b
commit 65d811fb44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 636 additions and 0 deletions

View File

@ -4,6 +4,7 @@ import type Database, {BindingDictionary, Statement} from "better-sqlite3";
import stringify from "json-stable-stringify";
import dedent from "../util/dedent";
import * as MapUtil from "../util/map";
import * as NullUtil from "../util/null";
import * as Schema from "./schema";
import * as Queries from "./queries";
@ -457,6 +458,164 @@ export class Mirror {
});
}
/**
* Create a GraphQL selection set to fetch data corresponding to the
* given query plan.
*
* The resulting GraphQL should be embedded into a top-level query.
*
* The result of this query is an `UpdateResult`.
*
* This function is pure: it does not interact with the database.
*/
_queryFromPlan(
queryPlan: QueryPlan,
options: {|
+connectionLimit: number,
+connectionPageSize: number,
|}
): Queries.Selection[] {
// Group objects by type, so that we only have to specify each
// type's fieldset once.
const objectsByType: Map<Schema.Typename, Schema.ObjectId[]> = new Map();
for (const object of queryPlan.objects) {
MapUtil.pushValue(objectsByType, object.typename, object.id);
}
// Group connections by object, so that we only have to fetch the
// node once.
const connectionsByObject: Map<
Schema.ObjectId,
{|
+typename: Schema.Typename,
+connections: Array<{|
+fieldname: Schema.Fieldname,
+endCursor: EndCursor | void,
|}>,
|}
> = new Map();
for (const connection of queryPlan.connections.slice(
0,
options.connectionLimit
)) {
let existing = connectionsByObject.get(connection.objectId);
if (existing == null) {
existing = {typename: connection.objectTypename, connections: []};
connectionsByObject.set(connection.objectId, existing);
} else {
if (connection.objectTypename !== existing.typename) {
const s = JSON.stringify;
throw new Error(
`Query plan has inconsistent typenames for ` +
`object ${s(connection.objectId)}: ` +
`${s(existing.typename)} vs. ${s(connection.objectTypename)}`
);
}
}
existing.connections.push({
fieldname: connection.fieldname,
endCursor: connection.endCursor,
});
}
const b = Queries.build;
// Each top-level field corresponds to either an object type
// (fetching own data for objects of that type) or a particular node
// (updating connections on that node). We alias each such field,
// which is necessary to ensure that their names are all unique. The
// names chosen are sufficient to identify which _kind_ of query the
// field corresponds to (type's own data vs node's connections), but
// do not need to identify the particular type or node in question.
// This is because all descendant selections are self-describing:
// they include the ID of any relevant objects.
return [].concat(
Array.from(objectsByType.entries()).map(([typename, ids]) => {
const name = `${_FIELD_PREFIXES.OWN_DATA}${typename}`;
return b.alias(
name,
b.field("nodes", {ids: b.list(ids.map((id) => b.literal(id)))}, [
b.inlineFragment(typename, this._queryOwnData(typename)),
])
);
}),
Array.from(connectionsByObject.entries()).map(
([id, {typename, connections}], i) => {
const name = `${_FIELD_PREFIXES.NODE_CONNECTIONS}${i}`;
return b.alias(
name,
b.field("node", {id: b.literal(id)}, [
b.field("id"),
b.inlineFragment(
typename,
[].concat(
...connections.map(({fieldname, endCursor}) =>
this._queryConnection(
typename,
fieldname,
endCursor,
options.connectionPageSize
)
)
)
),
])
);
}
)
);
}
/**
* Ingest data given by an `UpdateResult`. This is porcelain over
* `_updateConnection` and `_updateOwnData`.
*
* See: `_findOutdated`.
* See: `_queryFromPlan`.
*/
_updateData(updateId: UpdateId, queryResult: UpdateResult): void {
_inTransaction(this._db, () => {
this._nontransactionallyUpdateData(updateId, queryResult);
});
}
/**
* As `_updateData`, but do not enter any transactions. Other methods
* may call this method as a subroutine in a larger transaction.
*/
_nontransactionallyUpdateData(
updateId: UpdateId,
queryResult: UpdateResult
): void {
for (const topLevelKey of Object.keys(queryResult)) {
if (topLevelKey.startsWith(_FIELD_PREFIXES.OWN_DATA)) {
const rawValue: OwnDataUpdateResult | NodeConnectionsUpdateResult =
queryResult[topLevelKey];
const updateRecord: OwnDataUpdateResult = (rawValue: any);
this._nontransactionallyUpdateOwnData(updateId, updateRecord);
} else if (topLevelKey.startsWith(_FIELD_PREFIXES.NODE_CONNECTIONS)) {
const rawValue: OwnDataUpdateResult | NodeConnectionsUpdateResult =
queryResult[topLevelKey];
const updateRecord: NodeConnectionsUpdateResult = (rawValue: any);
for (const fieldname of Object.keys(updateRecord)) {
if (fieldname === "id") {
continue;
}
this._nontransactionallyUpdateConnection(
updateId,
updateRecord.id,
fieldname,
updateRecord[fieldname]
);
}
} else {
throw new Error(
"Bad key in query result: " + JSON.stringify(topLevelKey)
);
}
}
}
/**
* Create a GraphQL selection set required to identify the typename
* and ID for an object of the given declared type, which may be
@ -1362,6 +1521,46 @@ type OwnDataUpdateResult = $ReadOnlyArray<{
| NodeFieldResult,
}>;
/**
* Result describing new elements for connections on a single node.
*
* This type would be exact but for facebook/flow#2977, et al.
*/
type NodeConnectionsUpdateResult = {
+id: Schema.ObjectId,
+[connectionFieldname: Schema.Fieldname]: ConnectionFieldResult,
};
/**
* Result describing both own-data updates and connection updates. Each
* key's prefix determines what type of results the corresponding value
* represents (see constants below). No field prefix is a prefix of
* another, so this characterization is complete.
*
* This type would be exact but for facebook/flow#2977, et al.
*
* See: `_FIELD_PREFIXES`.
*/
type UpdateResult = {
// The prefix of each key determines what type of results the value
// represents. See constants below.
+[string]: OwnDataUpdateResult | NodeConnectionsUpdateResult,
};
export const _FIELD_PREFIXES = Object.freeze({
/**
* A key of an `UpdateResult` has this prefix if and only if the
* corresponding value represents `OwnDataUpdateResult`s.
*/
OWN_DATA: "owndata_",
/**
* A key of an `UpdateResult` has this prefix if and only if the
* corresponding value represents `NodeConnectionsUpdateResult`s.
*/
NODE_CONNECTIONS: "node_",
});
/**
* Execute a function inside a database transaction.
*

View File

@ -8,6 +8,7 @@ import dedent from "../util/dedent";
import * as Schema from "./schema";
import * as Queries from "./queries";
import {
_FIELD_PREFIXES,
_buildSchemaInfo,
_inTransaction,
_makeSingleUpdateFunction,
@ -522,6 +523,428 @@ describe("graphql/mirror", () => {
});
});
describe("_queryFromPlan", () => {
it("errors if connections for an object have distinct typename", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
const plan = {
objects: [],
connections: [
{
objectTypename: "Issue",
objectId: "hmmm",
fieldname: "comments",
endCursor: null,
},
{
objectTypename: "Repository",
objectId: "hmmm",
fieldname: "issues",
endCursor: null,
},
],
};
expect(() => {
mirror._queryFromPlan(plan, {
connectionLimit: 5,
connectionPageSize: 23,
});
}).toThrow(
'Query plan has inconsistent typenames for object "hmmm": ' +
'"Issue" vs. "Repository"'
);
});
it("creates a good query", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
const plan = {
objects: [
{typename: "Issue", id: "i#1"},
{typename: "Repository", id: "repo#2"},
{typename: "Issue", id: "i#3"},
],
connections: [
{
objectTypename: "Issue",
objectId: "i#1",
fieldname: "comments",
endCursor: null,
},
{
objectTypename: "Issue",
objectId: "i#4",
fieldname: "comments",
endCursor: null,
},
{
objectTypename: "Issue",
objectId: "i#1",
fieldname: "timeline",
endCursor: undefined,
},
{
objectTypename: "Issue",
objectId: "i#4",
fieldname: "timeline",
endCursor: "cursor#998",
},
{
objectTypename: "Repository",
objectId: "repo#5",
fieldname: "issues",
endCursor: "cursor#998",
},
// cut off below this point due to the page limit
{
objectTypename: "Repository",
objectId: "repo#5",
fieldname: "pulls",
endCursor: "cursor#997",
},
{
objectTypename: "Repository",
objectId: "repo#6",
fieldname: "issues",
endCursor: "cursor#996",
},
],
};
const actual = mirror._queryFromPlan(plan, {
connectionLimit: 5,
connectionPageSize: 23,
});
const b = Queries.build;
expect(actual).toEqual([
b.alias(
"owndata_Issue",
b.field(
"nodes",
{ids: b.list([b.literal("i#1"), b.literal("i#3")])},
[b.inlineFragment("Issue", mirror._queryOwnData("Issue"))]
)
),
b.alias(
"owndata_Repository",
b.field("nodes", {ids: b.list([b.literal("repo#2")])}, [
b.inlineFragment(
"Repository",
mirror._queryOwnData("Repository")
),
])
),
b.alias(
"node_0",
b.field("node", {id: b.literal("i#1")}, [
b.field("id"),
b.inlineFragment("Issue", [
...mirror._queryConnection("Issue", "comments", null, 23),
...mirror._queryConnection("Issue", "timeline", undefined, 23),
]),
])
),
b.alias(
"node_1",
b.field("node", {id: b.literal("i#4")}, [
b.field("id"),
b.inlineFragment("Issue", [
...mirror._queryConnection("Issue", "comments", null, 23),
...mirror._queryConnection(
"Issue",
"timeline",
"cursor#998",
23
),
]),
])
),
b.alias(
"node_2",
b.field("node", {id: b.literal("repo#5")}, [
b.field("id"),
b.inlineFragment("Repository", [
...mirror._queryConnection(
"Repository",
"issues",
"cursor#998",
23
),
]),
])
),
]);
});
});
describe("_updateData", () => {
it("throws if given a key with invalid prefix", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
const updateId = mirror._createUpdate(new Date(123));
const result = {wat_0: {id: "wot"}};
expect(() => {
mirror._nontransactionallyUpdateData(updateId, result);
}).toThrow('Bad key in query result: "wat_0"');
});
// We test the happy path lightly, because it just delegates to
// other methods, which are themselves tested. This test is
// sufficient to effect full coverage.
it("processes a reasonable input correctly", () => {
const db = new Database(":memory:");
const mirror = new Mirror(db, buildGithubSchema());
const updateId = mirror._createUpdate(new Date(123));
mirror.registerObject({typename: "Repository", id: "repo:foo/bar"});
mirror.registerObject({typename: "Issue", id: "issue:#1"});
mirror.registerObject({typename: "Issue", id: "issue:#2"});
const result = {
owndata_0: [
{
__typename: "Repository",
id: "repo:foo/bar",
url: "url://foo/bar",
},
],
owndata_1: [
{
__typename: "Issue",
id: "issue:#1",
url: "url://foo/bar/issue/1",
title: "something wicked",
repository: {
__typename: "Repository",
id: "repo:foo/bar",
},
author: {
__typename: "User",
id: "user:alice",
},
},
{
__typename: "Issue",
id: "issue:#2",
url: "url://foo/bar/issue/2",
title: "this way comes",
repository: {
__typename: "Repository",
id: "repo:foo/bar",
},
author: {
__typename: "User",
id: "user:alice",
},
},
],
node_0: {
id: "repo:foo/bar",
issues: {
totalCount: 2,
pageInfo: {
hasNextPage: true,
endCursor: "cursor:repo:foo/bar.issues@0",
},
nodes: [{__typename: "Issue", id: "issue:#1"}],
},
},
node_1: {
id: "issue:#1",
comments: {
totalCount: 1,
pageInfo: {
hasNextPage: false,
endCursor: "cursor:issue:#1.comments@0",
},
nodes: [{__typename: "IssueComment", id: "comment:#1.1"}],
},
timeline: {
totalCount: 1,
pageInfo: {
hasNextPage: false,
endCursor: "cursor:issue:#1.timeline@0",
},
nodes: [{__typename: "ClosedEvent", id: "issue:#1!closed#0"}],
},
},
};
mirror._updateData(updateId, result);
// Check that the right objects are in the database.
expect(
db.prepare("SELECT * FROM objects ORDER BY id ASC").all()
).toEqual([
{typename: "IssueComment", id: "comment:#1.1", last_update: null},
{typename: "Issue", id: "issue:#1", last_update: updateId},
{typename: "ClosedEvent", id: "issue:#1!closed#0", last_update: null},
{typename: "Issue", id: "issue:#2", last_update: updateId},
{typename: "Repository", id: "repo:foo/bar", last_update: updateId},
{typename: "User", id: "user:alice", last_update: null},
]);
// Check that some objects have the right primitives.
// (These poke at the internals of the storage format a bit.)
expect(
db
.prepare("SELECT * FROM primitives_Repository ORDER BY id ASC")
.all()
).toEqual([{id: "repo:foo/bar", url: JSON.stringify("url://foo/bar")}]);
expect(
db.prepare("SELECT * FROM primitives_Issue ORDER BY id ASC").all()
).toEqual([
{
id: "issue:#1",
url: JSON.stringify("url://foo/bar/issue/1"),
title: JSON.stringify("something wicked"),
},
{
id: "issue:#2",
url: JSON.stringify("url://foo/bar/issue/2"),
title: JSON.stringify("this way comes"),
},
]);
expect(
db.prepare("SELECT * FROM primitives_User ORDER BY id ASC").all()
).toEqual([{id: "user:alice", login: null, url: null}]);
expect(
db
.prepare("SELECT * FROM primitives_ClosedEvent ORDER BY id ASC")
.all()
).toEqual([{id: "issue:#1!closed#0"}]);
// Check that some links are correct.
expect(
db
.prepare(
"SELECT * FROM links WHERE parent_id LIKE 'issue:%' " +
"ORDER BY parent_id, fieldname"
)
.all()
).toEqual([
{
parent_id: "issue:#1",
fieldname: "author",
child_id: "user:alice",
rowid: expect.anything(),
},
{
parent_id: "issue:#1",
fieldname: "repository",
child_id: "repo:foo/bar",
rowid: expect.anything(),
},
{
parent_id: "issue:#1!closed#0",
fieldname: "actor",
child_id: null,
rowid: expect.anything(),
},
{
parent_id: "issue:#2",
fieldname: "author",
child_id: "user:alice",
rowid: expect.anything(),
},
{
parent_id: "issue:#2",
fieldname: "repository",
child_id: "repo:foo/bar",
rowid: expect.anything(),
},
]);
// Check that the connection metadata are correct.
expect(
db
.prepare("SELECT * FROM connections ORDER BY object_id, fieldname")
.all()
).toEqual([
// Issue #1 had both comments and timeline fetched.
{
object_id: "issue:#1",
fieldname: "comments",
last_update: updateId,
has_next_page: +false,
end_cursor: "cursor:issue:#1.comments@0",
total_count: 1,
rowid: expect.anything(),
},
{
object_id: "issue:#1",
fieldname: "timeline",
last_update: updateId,
has_next_page: +false,
end_cursor: "cursor:issue:#1.timeline@0",
total_count: 1,
rowid: expect.anything(),
},
// Issue #2 had no connections fetched.
{
object_id: "issue:#2",
fieldname: "comments",
last_update: null,
has_next_page: null,
end_cursor: null,
total_count: null,
rowid: expect.anything(),
},
{
object_id: "issue:#2",
fieldname: "timeline",
last_update: null,
has_next_page: null,
end_cursor: null,
total_count: null,
rowid: expect.anything(),
},
// The repository had one issue fetched.
{
object_id: "repo:foo/bar",
fieldname: "issues",
last_update: updateId,
has_next_page: +true,
end_cursor: "cursor:repo:foo/bar.issues@0",
total_count: 2,
rowid: expect.anything(),
},
]);
// Check that the connection entries are correct.
expect(
db
.prepare(
dedent`\
SELECT
connections.object_id AS objectId,
connections.fieldname AS fieldname,
connection_entries.idx AS idx,
connection_entries.child_id AS childId
FROM connections JOIN connection_entries
ON connection_entries.connection_id = connections.rowid
ORDER BY objectId, fieldname, idx
`
)
.all()
).toEqual([
{
objectId: "issue:#1",
fieldname: "comments",
idx: 1,
childId: "comment:#1.1",
},
{
objectId: "issue:#1",
fieldname: "timeline",
idx: 1,
childId: "issue:#1!closed#0",
},
{
objectId: "repo:foo/bar",
fieldname: "issues",
idx: 1,
childId: "issue:#1",
},
]);
});
});
describe("_queryShallow", () => {
it("fails when given a nonexistent type", () => {
const db = new Database(":memory:");
@ -2210,6 +2633,20 @@ describe("graphql/mirror", () => {
});
});
describe("_FIELD_PREFIXES", () => {
it("has the identity prefix relation", () => {
for (const k1 of Object.keys(_FIELD_PREFIXES)) {
for (const k2 of Object.keys(_FIELD_PREFIXES)) {
expect({k1, k2, isPrefix: k1.startsWith(k2)}).toEqual({
k1,
k2,
isPrefix: k1 === k2,
});
}
}
});
});
describe("_inTransaction", () => {
it("runs its callback inside a transaction", () => {
// We use an on-disk database file here because we need to open