option aggregate table; "ingest" scripts; unthrottled polygon

This commit is contained in:
Avraham Sakal
2024-03-03 22:31:22 -05:00
parent de48720d17
commit 0ecbcad0b0
4 changed files with 526 additions and 205 deletions
@@ -0,0 +1,271 @@
import { clickhouse, query } from "../clickhouse.js";
import { getApiKey } from "./polygon.js";
import pAll from "p-all";
import pQueue from "p-queue";
import pSeries from "p-series";
type PolygonResponse = {
next_url?: string;
results: Array<{
ticker: string;
expiration_date: string;
strike_price: number;
contract_type: "call" | "put";
}>;
};
async function getOptionContracts(underlyingSymbol, asOfDate) {
// first mark the sync of this particular symbol and asOfDate as "pending":
await clickhouse.insert({
table: "option_contract_sync_statuses",
values: [{ symbol: underlyingSymbol, asOfDate, status: "pending" }],
format: "JSONEachRow",
});
// then commence the sync with the initial request:
let latestBatchResponse = (await (
await fetch(
`https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${underlyingSymbol}&as_of=${asOfDate}&sort=ticker&limit=1000&apiKey=${await getApiKey()}`
)
).json()) as PolygonResponse;
let latestBatch = latestBatchResponse.results.map((result) => ({
asOfDate,
symbol: underlyingSymbol,
expirationDate: result.expiration_date,
strike: result.strike_price,
type: result.contract_type,
}));
await clickhouse.insert({
table: "option_contracts",
values: latestBatch,
format: "JSONEachRow",
});
//console.log(latestBatch.results.map((r)=>r.ticker));
// as long as there's a `next_url`, call that:
while (latestBatchResponse.hasOwnProperty("next_url")) {
latestBatchResponse = (await (
await fetch(`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`)
).json()) as PolygonResponse;
latestBatch = latestBatchResponse.results.map((result) => ({
asOfDate,
symbol: underlyingSymbol,
expirationDate: result.expiration_date,
strike: result.strike_price,
type: result.contract_type,
}));
//console.log(latestBatch.results.map((r)=>r.ticker));
await clickhouse.insert({
table: "option_contracts",
values: latestBatch,
format: "JSONEachRow",
});
}
await clickhouse.insert({
table: "option_contract_sync_statuses",
values: [{ symbol: underlyingSymbol, asOfDate, status: "done" }],
format: "JSONEachRow",
});
}
async function getNextBatchOfUnstartedSymbolsAndAsOfDates(
previousUnstartedSymbolAndAsOfDate:
| { symbol: string; asOfDate: string }
| undefined,
limit: number
) {
if (typeof previousUnstartedSymbolAndAsOfDate === "undefined") {
return null;
}
// the `OR symbol >` and `OR symbol <` need to be there separately and in that order, to maintain predicatable syncing order
const rows = await query<{ symbol: string; earliestAsOfDate: string }>(`
SELECT
symbol,
first_value(asOfDate) as earliestAsOfDate
FROM (
SELECT
symbol,
asOfDate,
last_value(status) as latestStatus
FROM (
SELECT *
FROM option_contract_sync_statuses
ORDER BY asOfDate ASC, symbol ASC
)
GROUP BY symbol, asOfDate
HAVING latestStatus = 'not-started'
ORDER BY symbol ASC, asOfDate ASC
)
GROUP BY symbol
HAVING (
symbol = '${previousUnstartedSymbolAndAsOfDate.symbol}'
AND asOfDate > '${previousUnstartedSymbolAndAsOfDate.asOfDate}'
)
OR (
symbol > '${previousUnstartedSymbolAndAsOfDate.symbol}'
)
OR (
symbol < '${previousUnstartedSymbolAndAsOfDate.symbol}'
)
ORDER BY symbol ASC
LIMIT ${limit}
`);
return rows.map((row) => ({
symbol: row.symbol,
asOfDate: row.earliestAsOfDate,
}));
}
/**
* For each symbol in `symbols` table, check the latest `asOfDate`
* in `option_contract_sync_statuses` for that symbol. Then fill-in the rest
* of the dates until today's date.
*/
async function fillSyncStatuses() {
const symbols = (
await query<{ symbol: string }>(`
SELECT symbol from symbols
`)
).map(({ symbol }) => symbol);
console.log("symbols", symbols);
await pAll(
symbols.map(
(symbol) => () =>
query<{ latestAsOfDate: string }>(`
SELECT
latestAsOfDate
FROM (
SELECT last_value(asOfDate) as latestAsOfDate
FROM (
SELECT *
FROM option_contract_sync_statuses
WHERE symbol = '${symbol}'
ORDER BY asOfDate ASC
)
)
WHERE latestAsOfDate > '2022-02-18'
`).then((rows) =>
clickhouse
.command({
query: `
INSERT INTO option_contract_sync_statuses
SELECT
'${symbol}' as symbol,
Date(dateAdd(DAY,number,'${
rows[0]?.latestAsOfDate || "2022-02-19"
}')) as asOfDate,
'not-started' as status
FROM system.numbers
WHERE number < dateDiff('days',Date('${
rows[0]?.latestAsOfDate || "2022-02-19"
}'), Date(now()))
AND number > 0
`,
})
.then(() => {
console.log(`Done ${symbol}`);
})
)
),
{ concurrency: 6 }
);
}
/** First, make sure we know which symbol-asOfDate combinations are
* yet un-synced.
*/
await fillSyncStatuses();
/**
* Second, since this is startup time, obviously anything `pending` is not really running.
* So, for each `pending` combo, delete its status, and all contracts synced so far, so as to start afresh.
*/
const pendingSymbolsAndAsOfDates = await query<{
symbol: string;
asOfDate: string;
latestStatus: "not-started" | "pending" | "done";
}>(`
SELECT
symbol,
asOfDate,
last_value(status) as latestStatus
FROM (
SELECT *
FROM option_contract_sync_statuses
ORDER BY asOfDate ASC, symbol ASC
)
GROUP BY symbol, asOfDate
HAVING latestStatus = 'pending'
ORDER BY symbol ASC, asOfDate ASC
`);
console.log(
"Pending operations:",
pendingSymbolsAndAsOfDates.map(
({ symbol, asOfDate }) => `${symbol} ${asOfDate}`
)
);
await pAll(
pendingSymbolsAndAsOfDates.map(
({ symbol, asOfDate }) =>
() =>
pSeries([
// Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart
() =>
clickhouse.command({
query: `DELETE FROM option_contracts WHERE symbol = '${symbol}' AND asOfDate = '${asOfDate}'`,
}),
() =>
clickhouse.command({
query: `DELETE FROM option_contract_sync_statuses WHERE symbol = '${symbol}' AND asOfDate = '${asOfDate}' AND status = 'pending'`,
}),
])
)
);
/** Second, for each symbol-asOfDate combination whose option contracts
* are not known, make them known.
*
* This queries Polygon with a concurrency of 6.
*/
const q = new pQueue({ concurrency: 6 });
/** Initialized with the lowest possible symbol and the earliest possible asOfDate.
* It's passed into `getNextUnstartedSymbolAndAsOfDate()`.
*/
let nextBatchOfUnstartedSymbolsAndAsOfDates = [
{ symbol: "A", asOfDate: "2022-02-01" },
];
while (
(nextBatchOfUnstartedSymbolsAndAsOfDates =
await getNextBatchOfUnstartedSymbolsAndAsOfDates(
nextBatchOfUnstartedSymbolsAndAsOfDates.pop(),
200
)) !== null
) {
await pAll(
nextBatchOfUnstartedSymbolsAndAsOfDates.map(
(unstartedSymbolAndAsOfDate) => () =>
q.add(async () => {
console.log(
`Getting contracts for ${unstartedSymbolAndAsOfDate.symbol} at ${unstartedSymbolAndAsOfDate.asOfDate}`
);
await getOptionContracts(
unstartedSymbolAndAsOfDate.symbol,
unstartedSymbolAndAsOfDate.asOfDate
);
})
)
);
// don't loop again until the queue has less than 50 items; we don't want it to grow in memory without bound:
console.log("Waiting till less than 50 in queue");
await q.onSizeLessThan(50);
}
// wait until pending queue operations are done:
await q.onSizeLessThan(1);
/**
* For each option contract, find its earliest date of existence, and get all quotes from
* then on.
*/
/*** TODOs ***/
/*
+ Gracefully recover from errors in individual operations.
*/
@@ -4,248 +4,266 @@ import pAll from "p-all";
import pQueue from "p-queue";
import pSeries from "p-series";
const optionContractToTicker = ({ symbol, expirationDate, strike, type }) =>
`O:${symbol}${expirationDate.substring(2, 4)}${expirationDate.substring(
5,
7
)}${expirationDate.substring(8, 10)}${
type === "call" ? "C" : "P"
}${Math.floor(strike * 1000)
.toString()
.padStart(8, "0")}`;
type PolygonResponse = {
next_url?: string;
results: Array<{
ticker: string;
expiration_date: string;
strike_price: number;
contract_type: "call" | "put";
c: number;
h: number;
n: number;
l: number;
o: number;
t: number;
v: number;
vw: number;
}>;
};
async function getOptionContracts(underlyingSymbol, asOfDate) {
// first mark the sync of this particular symbol and asOfDate as "pending":
async function getOptionAggregates(
asOfDate,
underlyingSymbol,
expirationDate,
strike,
type
) {
const optionContractTicker = optionContractToTicker({
symbol: underlyingSymbol,
expirationDate,
strike,
type,
});
// first mark the sync of this particular option contract as "pending":
await clickhouse.insert({
table: "option_contract_sync_statuses",
values: [{ symbol: underlyingSymbol, asOfDate, status: "pending" }],
table: "option_aggregate_sync_statuses",
values: [
{
asOfDate,
symbol: underlyingSymbol,
expirationDate,
strike,
type,
status: "pending",
},
],
format: "JSONEachRow",
});
// then commence the sync with the initial request:
let latestBatchResponse = (await (
await fetch(
`https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${underlyingSymbol}&as_of=${asOfDate}&sort=ticker&limit=1000&apiKey=${await getApiKey()}`
`https://api.polygon.io/v2/aggs/ticker/${optionContractTicker}/range/1/minute/${asOfDate}/${asOfDate}?adjusted=false&sort=asc&limit=50000&apiKey=${await getApiKey()}`
)
).json()) as PolygonResponse;
let latestBatch = latestBatchResponse.results.map((result) => ({
asOfDate,
symbol: underlyingSymbol,
expirationDate: result.expiration_date,
strike: result.strike_price,
type: result.contract_type,
expirationDate,
strike,
type,
tsStart: (result.t || 0) / 1000,
open: result.o,
close: result.c,
low: result.l,
high: result.h,
}));
await clickhouse.insert({
table: "option_contracts",
table: "option_aggregates",
values: latestBatch,
format: "JSONEachRow",
});
//console.log(latestBatch.results.map((r)=>r.ticker));
// as long as there's a `next_url`, call that:
while (latestBatchResponse.hasOwnProperty("next_url")) {
latestBatchResponse = (await (
await fetch(`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`)
).json()) as PolygonResponse;
latestBatch = latestBatchResponse.results.map((result) => ({
asOfDate,
symbol: underlyingSymbol,
expirationDate: result.expiration_date,
strike: result.strike_price,
type: result.contract_type,
}));
//console.log(latestBatch.results.map((r)=>r.ticker));
await clickhouse.insert({
table: "option_contracts",
values: latestBatch,
format: "JSONEachRow",
});
}
await clickhouse.insert({
table: "option_contract_sync_statuses",
values: [{ symbol: underlyingSymbol, asOfDate, status: "done" }],
table: "option_contract_quote_sync_statuses",
values: [
{
asOfDate,
symbol: underlyingSymbol,
expirationDate,
strike,
type,
status: "done",
},
],
format: "JSONEachRow",
});
}
async function getNextBatchOfUnstartedSymbolsAndAsOfDates(
previousUnstartedSymbolAndAsOfDate:
| { symbol: string; asOfDate: string }
| undefined,
type OptionContract = {
symbol: string;
expirationDate: string;
strike: number;
type: "call" | "put";
};
type OptionContractDay = OptionContract & { asOfDate: string };
async function getNextBatchOfUnstartedOptionAggregates(
previousUnstartedOptionContract: OptionContractDay | undefined,
limit: number
) {
if (typeof previousUnstartedSymbolAndAsOfDate === "undefined") {
): Promise<Array<OptionContractDay>> {
if (typeof previousUnstartedOptionContract === "undefined") {
return;
}
const rows = await query<{ symbol: string; earliestAsOfDate: string }>(`
SELECT
symbol,
first_value(asOfDate) as earliestAsOfDate
const optionContractsWithoutAggregates = await query<OptionContractDay>(`
SELECT
asOfDate,
symbol,
expirationDate,
strike,
type,
last_value(status) as latestStatus
FROM (
SELECT
symbol,
asOfDate,
last_value(status) as latestStatus
FROM (
SELECT *
FROM option_contract_sync_statuses
ORDER BY asOfDate ASC, symbol ASC
SELECT *
FROM option_aggregate_sync_statuses
ORDER BY ts ASC
)
GROUP BY asOfDate, symbol, expirationDate, strike, type
HAVING latestStatus = 'not-started'
AND (
(
asOfDate = '${previousUnstartedOptionContract.asOfDate}'
AND symbol = '${previousUnstartedOptionContract.symbol}'
AND expirationDate = '${previousUnstartedOptionContract.expirationDate}'
AND strike = ${previousUnstartedOptionContract.strike}
AND type > '${previousUnstartedOptionContract.type}'
)
OR
(
asOfDate = '${previousUnstartedOptionContract.asOfDate}'
AND symbol = '${previousUnstartedOptionContract.symbol}'
AND expirationDate = '${previousUnstartedOptionContract.expirationDate}'
AND strike > ${previousUnstartedOptionContract.strike}
)
OR (
asOfDate = '${previousUnstartedOptionContract.asOfDate}'
AND symbol = '${previousUnstartedOptionContract.symbol}'
AND expirationDate > '${previousUnstartedOptionContract.expirationDate}'
)
OR (
asOfDate = '${previousUnstartedOptionContract.asOfDate}'
AND symbol > '${previousUnstartedOptionContract.symbol}'
)
OR (
asOfDate > '${previousUnstartedOptionContract.asOfDate}'
)
GROUP BY symbol, asOfDate
HAVING latestStatus = 'not-started'
ORDER BY symbol ASC, asOfDate ASC
)
GROUP BY symbol
HAVING (
symbol = '${previousUnstartedSymbolAndAsOfDate.symbol}'
AND asOfDate > '${previousUnstartedSymbolAndAsOfDate.asOfDate}'
)
OR (
symbol > '${previousUnstartedSymbolAndAsOfDate.symbol}'
)
ORDER BY symbol ASC
ORDER BY asOfDate, symbol, expirationDate, strike, type
LIMIT ${limit}
`);
return rows.map((row) => ({
symbol: row.symbol,
asOfDate: row.earliestAsOfDate,
}));
return optionContractsWithoutAggregates;
}
/**
* For each symbol in `symbols` table, check the latest `asOfDate`
* in `option_contract_sync_statuses` for that symbol. Then fill-in the rest
* of the dates until today's date.
* First, since this is startup time, obviously anything `pending` is not
* really running. So, for each `pending` option contract (i.e. unique
* combinations of `(symbol, expirationDate, strike, type)` in the
* `option_contracts` table), delete its status, and all quotes synced so far,
* so as to start afresh.
*/
async function fillSyncStatuses() {
const symbols = (
await query<{ symbol: string }>(`
SELECT symbol from symbols
`)
).map(({ symbol }) => symbol);
console.log("symbols", symbols);
async function revertPendingSyncs() {
const pendingOptionContracts = await query<{
symbol: string;
expirationDate: string;
strike: number;
type: "call" | "put";
latestStatus: "not-started" | "pending" | "done";
}>(`
SELECT
symbol,
expirationDate,
strike,
type,
last_value(status) as latestStatus
FROM (
SELECT *
FROM option_aggregate_sync_statuses
ORDER BY symbol, expirationDate, strike, type, ts ASC
)
GROUP BY symbol, expirationDate, strike, type
HAVING latestStatus = 'pending'
ORDER BY symbol, expirationDate, strike, type
`);
console.log(
"Pending operations:",
pendingOptionContracts.map(
({ symbol, expirationDate, strike, type }) =>
`${symbol} ${expirationDate} ${strike} ${type}`
)
);
await pAll(
symbols.map(
(symbol) => () =>
query<{ latestAsOfDate: string }>(`
SELECT
latestAsOfDate
FROM (
SELECT last_value(asOfDate) as latestAsOfDate
FROM (
SELECT *
FROM option_contract_sync_statuses
WHERE symbol = '${symbol}'
ORDER BY asOfDate ASC
)
)
WHERE latestAsOfDate > '2022-02-18'
`).then((rows) =>
clickhouse
.command({
query: `
INSERT INTO option_contract_sync_statuses
SELECT
'${symbol}' as symbol,
Date(dateAdd(DAY,number,'${
rows[0]?.latestAsOfDate || "2022-02-19"
}')) as asOfDate,
'not-started' as status
FROM system.numbers
WHERE number < dateDiff('days',Date('${
rows[0]?.latestAsOfDate || "2022-02-19"
}'), Date(now()))
AND number > 0
`,
})
.then(() => {
console.log(`Done ${symbol}`);
})
)
),
{ concurrency: 6 }
pendingOptionContracts.map(
({ symbol, expirationDate, strike, type }) =>
() =>
pSeries([
// Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart
() =>
clickhouse.command({
query: `
DELETE FROM option_aggregates
WHERE symbol = '${symbol}'
AND expirationDate = '${expirationDate}'
AND strike = ${strike}
AND type = '${type}'
`,
}),
() =>
clickhouse.command({
query: `
DELETE FROM option_aggregate_sync_statuses
WHERE symbol = '${symbol}'
AND expirationDate = '${expirationDate}'
AND strike = ${strike}
AND type = '${type}'
AND status = 'pending'`,
}),
])
)
);
}
//await revertPendingSyncs();
/** First, make sure we know which symbol-asOfDate combinations are
* yet un-synced.
*/
await fillSyncStatuses();
/**
* Second, since this is startup time, obviously anything `pending` is not really running.
* So, for each `pending` combo, delete its status, and all contracts synced so far, so as to start afresh.
*/
const pendingSymbolsAndAsOfDates = await query<{
symbol: string;
asOfDate: string;
latestStatus: "not-started" | "pending" | "done";
}>(`
SELECT
symbol,
asOfDate,
last_value(status) as latestStatus
FROM (
SELECT *
FROM option_contract_sync_statuses
ORDER BY asOfDate ASC, symbol ASC
)
GROUP BY symbol, asOfDate
HAVING latestStatus = 'pending'
ORDER BY symbol ASC, asOfDate ASC
`);
console.log(
"Pending operations:",
pendingSymbolsAndAsOfDates.map(
({ symbol, asOfDate }) => `${symbol} ${asOfDate}`
)
);
await pAll(
pendingSymbolsAndAsOfDates.map(
({ symbol, asOfDate }) =>
() =>
pSeries([
// Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart
() =>
clickhouse.command({
query: `DELETE FROM option_contracts WHERE symbol = '${symbol}' AND asOfDate = '${asOfDate}'`,
}),
() =>
clickhouse.command({
query: `DELETE FROM option_contract_sync_statuses WHERE symbol = '${symbol}' AND asOfDate = '${asOfDate}' AND status = 'pending'`,
}),
])
)
);
/** Second, for each symbol-asOfDate combination whose option contracts
* are not known, make them known.
/** Second, for each option contract, get all of its quotes.
*
* This queries Polygon with a concurrency of 6.
*/
const q = new pQueue({ concurrency: 6 });
/** Initialized with the lowest possible symbol and the earliest possible asOfDate.
/** Initialized with the lowest possible option contract.
* It's passed into `getNextUnstartedSymbolAndAsOfDate()`.
*/
let nextBatchOfUnstartedSymbolsAndAsOfDates = [
{ symbol: "A", asOfDate: "2022-02-01" },
let nextBatchOfUnstartedOptionContracts: Array<OptionContractDay> = [
{
asOfDate: "2022-02-01",
symbol: "A",
expirationDate: "2022-02-01",
strike: 0,
type: "call",
},
];
while (
(nextBatchOfUnstartedSymbolsAndAsOfDates =
await getNextBatchOfUnstartedSymbolsAndAsOfDates(
nextBatchOfUnstartedSymbolsAndAsOfDates.pop(),
(nextBatchOfUnstartedOptionContracts =
await getNextBatchOfUnstartedOptionAggregates(
nextBatchOfUnstartedOptionContracts.pop(),
200
)) !== null
) {
await pAll(
nextBatchOfUnstartedSymbolsAndAsOfDates.map(
(unstartedSymbolAndAsOfDate) => () =>
nextBatchOfUnstartedOptionContracts.map(
(unstartedOptionContract) => () =>
q.add(async () => {
console.log(
`Getting contracts for ${unstartedSymbolAndAsOfDate.symbol} at ${unstartedSymbolAndAsOfDate.asOfDate}`
);
await getOptionContracts(
unstartedSymbolAndAsOfDate.symbol,
unstartedSymbolAndAsOfDate.asOfDate
`Getting aggregates for ${unstartedOptionContract.asOfDate} ${unstartedOptionContract.symbol} at ${unstartedOptionContract.expirationDate} ${unstartedOptionContract.strike} ${unstartedOptionContract.type}`
);
// await getOptionAggregates(
// unstartedOptionContract.asOfDate,
// unstartedOptionContract.symbol,
// unstartedOptionContract.expirationDate,
// unstartedOptionContract.strike,
// unstartedOptionContract.type
// );
})
)
);
@@ -256,11 +274,6 @@ while (
// wait until pending queue operations are done:
await q.onSizeLessThan(1);
/**
* For each option contract, find its earliest date of existence, and get all quotes from
* then on.
*/
/*** TODOs ***/
/*
+ Gracefully recover from errors in individual operations.
+4 -3
View File
@@ -1,4 +1,5 @@
import pThrottle from 'p-throttle';
//import pThrottle from 'p-throttle';
const apiKey = 'H95NTsatM1iTWLUwDLxM2J5zhUVYdCEz';
export const getApiKey = pThrottle({limit: 5, interval: 60000})(()=>apiKey);
const apiKey = "H95NTsatM1iTWLUwDLxM2J5zhUVYdCEz";
//export const getApiKey = pThrottle({limit: 5, interval: 60000})(()=>apiKey);
export const getApiKey = () => apiKey;