re-organized code; implemented getAggregate() where it was missing

This commit is contained in:
2024-08-11 18:08:54 -04:00
parent 15a5d7c67b
commit d6762fdae5
14 changed files with 193 additions and 86 deletions
@@ -0,0 +1,191 @@
import type { CalendarDatabase, CalendarKey } from "./interfaces.js";
import type { Aggregate } from "../interfaces.js";
import { query } from "../../lib/clickhouse.js";
function makeCalendarDatabase(): CalendarDatabase {
const calendarDatabase: Omit<CalendarDatabase, "getCalendars"> = {
getKeys: async ({ key: { symbol }, date }) => {
const calendarsForSymbolOnDate = await query<
Omit<CalendarKey, "symbol">
>(`
WITH today_option_contracts AS (
SELECT expirationDate, strike, type
FROM option_contract_existences
WHERE symbol = '${symbol}'
AND asOfDate = '${date}'
)
SELECT
front_option_contract.type as type,
front_option_contract.strike as strike,
front_option_contract.expirationDate as frontExpirationDate,
back_option_contract.expirationDate as backExpirationDate
FROM today_option_contracts AS front_option_contract
ASOF INNER JOIN today_option_contracts AS back_option_contract
ON front_option_contract.type = back_option_contract.type
AND front_option_contract.strike = back_option_contract.strike
AND front_option_contract.expirationDate < back_option_contract.expirationDate
`);
return calendarsForSymbolOnDate.map((calendarWithoutSymbol) => ({
...calendarWithoutSymbol,
symbol,
}));
},
getAggregate: async ({
key: { symbol, frontExpirationDate, backExpirationDate, strike, type },
tsStart,
}) => {
const tsStartString = new Date(tsStart).toISOString();
return (
await query<Omit<Aggregate<CalendarKey>, "key">>(`
WITH front_option_contract_candlestick AS (
SELECT
tsStart,
open,
close,
high,
low
FROM option_contract_aggregates
WHERE symbol = '${symbol}'
AND type = '${type}'
AND strike = '${strike}'
AND expirationDate = '${frontExpirationDate}'
AND tsStart = '${tsStartString}'
),
back_option_contract_candlestick AS (
SELECT
tsStart,
open,
close,
high,
low
FROM option_contract_aggregates
WHERE symbol = '${symbol}'
AND type = '${type}'
AND strike = '${strike}'
AND expirationDate = '${backExpirationDate}'
AND tsStart = '${tsStartString}'
)
SELECT
toUnixTimestamp(front_option_contract_candlestick.tsStart) as tsStart,
back_option_contract_candlestick.open - front_option_contract_candlestick.open as open,
back_option_contract_candlestick.close - front_option_contract_candlestick.close as close
FROM front_option_contract_candlestick
INNER JOIN back_option_contract_candlestick
ON front_option_contract_candlestick.tsStart = back_option_contract_candlestick.tsStart
ORDER BY front_option_contract_candlestick.tsStart ASC
`)
).map((aggregate) => ({
...aggregate,
tsStart: aggregate.tsStart * 1000, // unfortunately, `toUnixTimestamp` only returns second-precision
}))[0];
},
getAggregates: async ({
key: { symbol, frontExpirationDate, backExpirationDate, strike, type },
date,
}) => {
return (
await query<Omit<Aggregate<CalendarKey>, "key">>(`
WITH front_option_contract_candlestick AS (
SELECT
tsStart,
open,
close,
high,
low
FROM option_contract_aggregates
WHERE symbol = '${symbol}'
AND type = '${type}'
AND strike = '${strike}'
AND expirationDate = '${frontExpirationDate}'
AND toDate(tsStart) = '${date}'
),
back_option_contract_candlestick AS (
SELECT
tsStart,
open,
close,
high,
low
FROM option_contract_aggregates
WHERE symbol = '${symbol}'
AND type = '${type}'
AND strike = '${strike}'
AND expirationDate = '${backExpirationDate}'
AND toDate(tsStart) = '${date}'
)
SELECT
toUnixTimestamp(front_option_contract_candlestick.tsStart) as tsStart,
back_option_contract_candlestick.open - front_option_contract_candlestick.open as open,
back_option_contract_candlestick.close - front_option_contract_candlestick.close as close
FROM front_option_contract_candlestick
INNER JOIN back_option_contract_candlestick
ON front_option_contract_candlestick.tsStart = back_option_contract_candlestick.tsStart
ORDER BY front_option_contract_candlestick.tsStart ASC
`)
).map((aggregate) => ({
...aggregate,
tsStart: aggregate.tsStart * 1000, // unfortunately, `toUnixTimestamp` only returns second-precision
}));
},
insertAggregates: async (aggregates) => {
// no-op: we insert individual option contracts, not calendars
},
getClosingPrice: async ({
key: { symbol, strike, type, frontExpirationDate, backExpirationDate },
}) => {
return (
await query<{ calendarClosingPrice: number }>(`
WITH front_option_contract_candlestick AS (
SELECT
tsStart,
open,
close,
high,
low
FROM option_contract_aggregates
WHERE symbol = '${symbol}'
AND type = '${type}'
AND strike = '${strike}'
AND expirationDate = '${frontExpirationDate}'
AND toDate(tsStart) = '${frontExpirationDate}'
),
back_option_contract_candlestick AS (
SELECT
tsStart,
open,
close,
high,
low
FROM option_contract_aggregates
WHERE symbol = '${symbol}'
AND type = '${type}'
AND strike = '${strike}'
AND expirationDate = '${backExpirationDate}'
AND toDate(tsStart) = '${frontExpirationDate}'
)
SELECT
min(back_option_contract_candlestick.close - front_option_contract_candlestick.close) as calendarClosingPrice
FROM front_option_contract_candlestick
INNER JOIN back_option_contract_candlestick
ON front_option_contract_candlestick.tsStart = back_option_contract_candlestick.tsStart
`)
)[0]?.calendarClosingPrice;
},
getTargetPriceByProbability: async ({
symbol,
calendarSpan,
strikePercentageFromTheMoney,
historicalProbabilityOfSuccess,
}) => {
return 0.24;
},
};
return {
...calendarDatabase,
getCalendars: calendarDatabase.getKeys,
};
}
export const calendarDatabase: CalendarDatabase = makeCalendarDatabase();
@@ -0,0 +1,24 @@
import type { AggregateDatabase } from "../interfaces.js";
export type CalendarKey = {
symbol: string;
type: "call" | "put";
strike: number;
frontExpirationDate: string;
backExpirationDate: string;
};
export type CalendarDatabase = AggregateDatabase<CalendarKey> & {
getCalendars: AggregateDatabase<CalendarKey>["getKeys"];
getTargetPriceByProbability: ({
symbol,
calendarSpan,
strikePercentageFromTheMoney,
historicalProbabilityOfSuccess,
}: {
symbol: string;
calendarSpan: number;
strikePercentageFromTheMoney: number;
historicalProbabilityOfSuccess: number;
}) => Promise<number>;
};
@@ -0,0 +1,166 @@
import type { CalendarDatabase } from "./interfaces.js";
import { open } from "lmdbx";
const calendarAggregatesDb = open({
path: "./calendar-aggregates.db",
compression: true,
});
const calendarExistenceDb = open({
path: "./calendar-existence.db",
compression: true,
});
/** Largest possible key according to the `ordered-binary` (used by lmdbx) docs. */
const MAXIMUM_KEY = Buffer.from([0xff]);
function makeCalendarDatabase(): CalendarDatabase {
const calendarDatabase: Omit<CalendarDatabase, "getCalendars"> = {
getKeys: async ({ key: { symbol }, date }) => {
return calendarExistenceDb
.getRange({
start: [date, symbol],
end: [date, symbol, MAXIMUM_KEY],
})
.map(({ key }) => ({
symbol,
frontExpirationDate: key[2],
backExpirationDate: key[3],
strike: key[4],
type: key[5],
})).asArray;
},
getAggregates: async ({
key: { symbol, frontExpirationDate, backExpirationDate, strike, type },
date,
}) => {
const startOfDayUnix = new Date(`${date}T00:00:00Z`).valueOf();
const endOfDayUnix = startOfDayUnix + 3600 * 24 * 1000;
return calendarAggregatesDb
.getRange({
start: [
symbol,
frontExpirationDate,
backExpirationDate,
strike,
type,
startOfDayUnix,
],
end: [
symbol,
frontExpirationDate,
backExpirationDate,
strike,
type,
endOfDayUnix,
],
})
.map(({ value }) => ({
tsStart: value.tsStart,
open: value.open,
close: value.close,
high: value.high,
low: value.low,
})).asArray;
},
getAggregate: async ({
key: { symbol, frontExpirationDate, backExpirationDate, strike, type },
tsStart,
}) => {
return await calendarAggregatesDb.get([
symbol,
frontExpirationDate,
backExpirationDate,
strike,
type,
tsStart,
]);
},
insertAggregates: async (aggregates) => {
await calendarExistenceDb.batch(() => {
for (const aggregate of aggregates) {
calendarExistenceDb.put(
[
new Date(aggregate.tsStart).toISOString().substring(0, 10),
aggregate.key.symbol,
aggregate.key.frontExpirationDate,
aggregate.key.backExpirationDate,
aggregate.key.strike,
aggregate.key.type,
],
null
);
}
});
await calendarAggregatesDb.batch(() => {
for (const aggregate of aggregates) {
calendarAggregatesDb.put(
[
aggregate.key.symbol,
aggregate.key.frontExpirationDate,
aggregate.key.backExpirationDate,
aggregate.key.strike,
aggregate.key.type,
aggregate.tsStart,
],
{
open: aggregate.open,
close: aggregate.close,
high: aggregate.high,
low: aggregate.low,
}
);
}
});
},
getClosingPrice: async ({
key: { symbol, strike, type, frontExpirationDate, backExpirationDate },
}) => {
const startOfExpirationDateUnix = new Date(
`${frontExpirationDate}T23:59:59Z`
).valueOf();
const endOfExpirationDateUnix = new Date(
`${frontExpirationDate}T00:00:00Z`
).valueOf();
for (const { value } of calendarAggregatesDb.getRange({
start: [
symbol,
frontExpirationDate,
backExpirationDate,
strike,
type,
startOfExpirationDateUnix,
],
end: [
symbol,
frontExpirationDate,
backExpirationDate,
strike,
type,
endOfExpirationDateUnix,
],
reverse: true,
})) {
if (value.close > 0) {
return value.close;
}
}
return 0;
},
getTargetPriceByProbability: async ({
symbol,
calendarSpan,
strikePercentageFromTheMoney,
historicalProbabilityOfSuccess,
}) => {
return 0.24;
},
};
return {
...calendarDatabase,
getCalendars: calendarDatabase.getKeys,
};
}
export const calendarDatabase: CalendarDatabase = makeCalendarDatabase();
@@ -0,0 +1,207 @@
import { optionContractDatabase } from "../OptionContract/lmdbx.js";
import type { CalendarDatabase } from "./interfaces.js";
/** Largest possible key according to the `ordered-binary` (used by lmdbx) docs. */
const MAXIMUM_KEY = Buffer.from([0xff]);
function makeCalendarDatabase(): CalendarDatabase {
const getAggregatesSync = ({
key: { symbol, frontExpirationDate, backExpirationDate, strike, type },
date,
}) => {
const frontOptionContractAggregates =
optionContractDatabase.getAggregatesSync({
date,
key: { symbol, expirationDate: frontExpirationDate, strike, type },
});
const backOptionContractAggregates =
optionContractDatabase.getAggregatesSync({
date,
key: { symbol, expirationDate: backExpirationDate, strike, type },
});
const calendarAggregates = [];
let i = 0;
let j = 0;
while (
i < frontOptionContractAggregates.length &&
j < backOptionContractAggregates.length
) {
if (
frontOptionContractAggregates[i].tsStart ===
backOptionContractAggregates[j].tsStart
) {
calendarAggregates.push({
tsStart: frontOptionContractAggregates[i].tsStart,
open:
backOptionContractAggregates[j].open -
frontOptionContractAggregates[i].open,
close:
backOptionContractAggregates[j].close -
frontOptionContractAggregates[i].close,
// the high and low are not exactly correct since we don't know if each contract's high and low happened at the same moment as the other:
high:
backOptionContractAggregates[j].high -
frontOptionContractAggregates[i].high,
low:
backOptionContractAggregates[j].low -
frontOptionContractAggregates[i].low,
});
i++;
j++;
} else if (
frontOptionContractAggregates[i].tsStart >
backOptionContractAggregates[j].tsStart
) {
j++;
} else {
i++;
}
}
return calendarAggregates;
};
const calendarDatabase: Omit<CalendarDatabase, "getCalendars"> = {
getKeys: async ({ key: { symbol }, date }) => {
const optionContracts = await optionContractDatabase.getOptionContracts({
date,
key: { symbol },
});
return optionContracts.flatMap(
(frontOptionContract, i, optionContracts) =>
optionContracts
.filter(
(potientialBackOptionContract) =>
frontOptionContract.strike ===
potientialBackOptionContract.strike &&
frontOptionContract.type ===
potientialBackOptionContract.type &&
frontOptionContract.expirationDate <
potientialBackOptionContract.expirationDate
)
.map((backOptionContract) => ({
symbol,
frontExpirationDate: frontOptionContract.expirationDate,
backExpirationDate: backOptionContract.expirationDate,
strike: frontOptionContract.strike,
type: frontOptionContract.type,
}))
);
},
getAggregates: async ({
key: { symbol, frontExpirationDate, backExpirationDate, strike, type },
date,
}) =>
getAggregatesSync({
key: { symbol, frontExpirationDate, backExpirationDate, strike, type },
date,
}),
getAggregatesSync,
insertAggregates: async (aggregates) => {
// right now, no-op
},
getClosingPrice: async ({
key: { symbol, strike, type, frontExpirationDate, backExpirationDate },
}) => {
// get unix timestamp, in milliseconds, of the start of the last hour, which is 03:30PM in the `America/New_York` timezone on the front expiration date:
const startOfLastHourUnix = new Date(
`${frontExpirationDate}T19:30:00Z`
).getTime();
const endOfLastHourUnix = startOfLastHourUnix + 3600 * 1000;
const frontOptionContractAggregates = (
await optionContractDatabase.getAggregates({
date: frontExpirationDate,
key: { symbol, expirationDate: frontExpirationDate, strike, type },
})
).filter(
({ tsStart }) =>
tsStart >= startOfLastHourUnix && tsStart < endOfLastHourUnix
);
const backOptionContractAggregates = (
await optionContractDatabase.getAggregates({
date: frontExpirationDate,
key: { symbol, expirationDate: backExpirationDate, strike, type },
})
).filter(
({ tsStart }) =>
tsStart >= startOfLastHourUnix && tsStart < endOfLastHourUnix
);
let i = 0;
let j = 0;
let minPrice = 0;
while (
i < frontOptionContractAggregates.length &&
j < backOptionContractAggregates.length
) {
if (
frontOptionContractAggregates[i].tsStart ===
backOptionContractAggregates[j].tsStart
) {
const calendarClosePrice =
backOptionContractAggregates[j].close -
frontOptionContractAggregates[i].close;
if (calendarClosePrice < minPrice || minPrice === 0) {
minPrice = calendarClosePrice;
}
i++;
j++;
} else if (
frontOptionContractAggregates[i].tsStart >
backOptionContractAggregates[j].tsStart
) {
j++;
} else {
i++;
}
}
return minPrice;
},
getAggregate: async ({
key: { symbol, frontExpirationDate, backExpirationDate, strike, type },
tsStart,
}) => {
const [frontOptionContractAggregate, backOptionContractAggregate] =
await Promise.all([
optionContractDatabase.getAggregate({
key: { symbol, expirationDate: frontExpirationDate, strike, type },
tsStart,
}),
optionContractDatabase.getAggregate({
key: { symbol, expirationDate: backExpirationDate, strike, type },
tsStart,
}),
]);
// only return the calendar aggregate if its constituent front and back option contract aggregates exist:
if (frontOptionContractAggregate && backOptionContractAggregate) {
return {
tsStart,
open:
backOptionContractAggregate.open -
frontOptionContractAggregate.open,
close:
backOptionContractAggregate.close -
frontOptionContractAggregate.close,
high:
backOptionContractAggregate.high -
frontOptionContractAggregate.high,
low:
backOptionContractAggregate.low - frontOptionContractAggregate.low,
};
}
return undefined;
},
getTargetPriceByProbability: async ({
symbol,
calendarSpan,
strikePercentageFromTheMoney,
historicalProbabilityOfSuccess,
}) => {
return 0.24;
},
};
return {
...calendarDatabase,
getCalendars: calendarDatabase.getKeys,
};
}
export const calendarDatabase: CalendarDatabase = makeCalendarDatabase();
@@ -0,0 +1,115 @@
import type {
OptionContractDatabase,
OptionContractKey,
} from "./interfaces.js";
import type { Aggregate } from "../interfaces.js";
import { clickhouse, query } from "../../lib/clickhouse.js";
function makeOptionContractDatabase(): OptionContractDatabase {
const optionContractDatabase: Omit<
OptionContractDatabase,
"getOptionContracts"
> = {
getKeys: async ({ key: { symbol }, date }) => {
return (
await query<Omit<OptionContractKey, "symbol">>(`
SELECT expirationDate, strike, type
FROM option_contract_existences
WHERE symbol = '${symbol}'
AND asOfDate = '${date}'
`)
).map((optionContractWithoutKey) => ({
...optionContractWithoutKey,
symbol,
}));
},
getAggregates: async ({
key: { symbol, expirationDate, strike, type },
date,
}) => {
return (
await query<Omit<Aggregate<OptionContractKey>, "key">>(`
SELECT
toUnixTimestamp(tsStart) as tsStart,
open,
close,
high,
low
FROM option_contract_aggregates
WHERE symbol = '${symbol}'
AND type = '${type}'
AND strike = '${strike}'
AND expirationDate = '${expirationDate}'
AND toDate(tsStart) = '${date}'
ORDER BY tsStart ASC
`)
).map((aggregate) => ({
...aggregate,
tsStart: aggregate.tsStart * 1000, // unfortunately, `toUnixTimestamp` only returns second-precision
}));
},
getAggregate: async ({
key: { symbol, expirationDate, strike, type },
tsStart,
}) => {
const tsStartString = new Date(tsStart).toISOString();
return (
await query<Omit<Aggregate<OptionContractKey>, "key">>(`
SELECT
open,
close,
high,
low
FROM option_contract_aggregates
WHERE symbol = '${symbol}'
AND type = '${type}'
AND strike = '${strike}'
AND expirationDate = '${expirationDate}'
AND tsStart = '${tsStartString}'
ORDER BY tsStart ASC
`)
).map((aggregate) => ({
...aggregate,
tsStart,
}))[0];
},
insertAggregates: async (aggregates) => {
// stock existence is taken care of by clickhouse materialized view
await clickhouse.insert({
table: "option_contract_aggregates",
values: aggregates.map(
({
key: { symbol, expirationDate, strike, type },
tsStart,
open,
close,
high,
low,
}) => ({
symbol,
expirationDate,
strike,
type,
tsStart,
open,
close,
high,
low,
})
),
});
},
getClosingPrice: async ({ key }) => {
// no-op: not used since stocks don't have a "closing" price, unlike options.
return 0;
},
};
return {
...optionContractDatabase,
getOptionContracts: optionContractDatabase.getKeys,
};
}
export const optionContractDatabase: OptionContractDatabase =
makeOptionContractDatabase();
@@ -0,0 +1,12 @@
import type { AggregateDatabase } from "../interfaces.js";
export type OptionContractKey = {
symbol: string;
expirationDate: string;
strike: number;
type: "call" | "put";
};
export type OptionContractDatabase = AggregateDatabase<OptionContractKey> & {
getOptionContracts: AggregateDatabase<OptionContractKey>["getKeys"];
};
@@ -0,0 +1,139 @@
import type { OptionContractDatabase } from "./interfaces.js";
import { open } from "lmdbx";
const optionContractAggregatesDb = open({
path: "./option-contract-aggregates.db",
// any options go here, we can turn on compression like this:
compression: true,
});
const optionContractExistenceDb = open({
path: "./option-contract-existence.db",
// any options go here, we can turn on compression like this:
compression: true,
});
/** Largest possible key according to the `ordered-binary` (used by lmdbx) docs. */
const MAXIMUM_KEY = Buffer.from([0xff]);
function makeOptionContractDatabase(): OptionContractDatabase {
const getAggregatesSync = ({
key: { symbol, expirationDate, strike, type },
date,
}) => {
const startOfDayUnix = new Date(`${date}T00:00:00Z`).valueOf();
const endOfDayUnix = startOfDayUnix + 3600 * 24 * 1000;
return optionContractAggregatesDb
.getRange({
start: [symbol, expirationDate, strike, type, startOfDayUnix],
end: [symbol, expirationDate, strike, type, endOfDayUnix],
})
.map(({ key, value }) => ({
tsStart: key[4],
open: value.open,
close: value.close,
high: value.high,
low: value.low,
})).asArray;
};
const optionContractDatabase: Omit<
OptionContractDatabase,
"getOptionContracts"
> = {
getKeys: async ({ key: { symbol }, date }) => {
return optionContractExistenceDb
.getRange({
start: [date, symbol],
end: [date, symbol, MAXIMUM_KEY],
})
.map(({ key }) => ({
symbol,
expirationDate: key[2],
strike: key[3],
type: key[4],
})).asArray;
},
getAggregatesSync,
getAggregates: async ({
key: { symbol, expirationDate, strike, type },
date,
}) =>
getAggregatesSync({
key: { symbol, expirationDate, strike, type },
date,
}),
insertAggregates: async (aggregates) => {
await optionContractExistenceDb.batch(() => {
for (const aggregate of aggregates) {
optionContractExistenceDb.put(
[
new Date(aggregate.tsStart).toISOString().substring(0, 10),
aggregate.key.symbol,
aggregate.key.expirationDate,
aggregate.key.strike,
aggregate.key.type,
],
null
);
}
});
await optionContractAggregatesDb.batch(() => {
for (const aggregate of aggregates) {
optionContractAggregatesDb.put(
[
aggregate.key.symbol,
aggregate.key.expirationDate,
aggregate.key.strike,
aggregate.key.type,
aggregate.tsStart,
],
{
open: aggregate.open,
close: aggregate.close,
high: aggregate.high,
low: aggregate.low,
}
);
}
});
},
getClosingPrice: async ({
key: { symbol, strike, type, expirationDate },
}) => {
const startOfLastHourUnix = new Date(
`${expirationDate}T00:00:00Z`
).valueOf();
const endOfLastHourUnix = startOfLastHourUnix + 3600 * 1000;
let minPrice = 0;
for (const { value } of optionContractAggregatesDb.getRange({
start: [symbol, expirationDate, strike, type, startOfLastHourUnix],
end: [symbol, expirationDate, strike, type, endOfLastHourUnix],
})) {
if (value.close < minPrice || minPrice === 0) {
minPrice = value.close;
}
}
return minPrice;
},
getAggregate: async ({
key: { symbol, expirationDate, strike, type },
tsStart,
}) => {
return await optionContractAggregatesDb.get([
symbol,
expirationDate,
strike,
type,
tsStart,
]);
},
};
return {
...optionContractDatabase,
getOptionContracts: optionContractDatabase.getKeys,
};
}
export const optionContractDatabase: OptionContractDatabase =
makeOptionContractDatabase();
@@ -0,0 +1,79 @@
import type { StockDatabase, StockKey } from "./interfaces.js";
import type { Aggregate } from "../interfaces.js";
import { clickhouse, query } from "../../lib/clickhouse.js";
function makeStockDatabase(): StockDatabase {
const stockDatabase: Omit<StockDatabase, "getSymbols"> = {
getKeys: async ({ date, key }) => {
if (key?.symbol) {
return [key as StockKey];
}
return await query(`
SELECT DISTINCT symbol FROM stock_aggregates WHERE toDate(tsStart) = '${date}'
`);
},
getAggregates: async ({ key: { symbol }, date }) => {
return (
await query<Omit<Aggregate<StockKey>, "key">>(`
SELECT
toUnixTimestamp(tsStart) as tsStart,
open,
close,
high,
low
FROM stock_aggregates
WHERE symbol = '${symbol}'
AND toDate(tsStart) = '${date}'
ORDER BY tsStart ASC
`)
).map((aggregate) => ({
...aggregate,
tsStart: aggregate.tsStart * 1000, // unfortunately, `toUnixTimestamp` only returns second-precision
}));
},
getAggregate: async ({ key: { symbol }, tsStart }) => {
return (
await query<Omit<Aggregate<StockKey>, "key">>(`
SELECT
open,
close,
high,
low
FROM stock_aggregates
WHERE symbol = '${symbol}'
AND tsStart = '${tsStart}'
`)
).map((aggregate) => ({
...aggregate,
tsStart,
}))[0];
},
insertAggregates: async (aggregates) => {
// stock existence is taken care of by clickhouse materialized view
await clickhouse.insert({
table: "stock_aggregates",
values: aggregates.map(
({ key: { symbol }, tsStart, open, close, high, low }) => ({
symbol,
tsStart,
open,
close,
high,
low,
})
),
});
},
getClosingPrice: async ({ key }) => {
// no-op: not used since stocks don't have a "closing" price, unlike options.
return 0;
},
};
return {
...stockDatabase,
getSymbols: stockDatabase.getKeys,
};
}
export const stockDatabase: StockDatabase = makeStockDatabase();
@@ -0,0 +1,7 @@
import type { AggregateDatabase } from "../interfaces.js";
export type StockKey = { symbol: string };
export type StockDatabase = AggregateDatabase<StockKey> & {
getSymbols: AggregateDatabase<StockKey>["getKeys"];
};
@@ -0,0 +1,86 @@
import type { StockDatabase, StockKey } from "./interfaces.js";
import { open } from "lmdbx";
const stockAggregatesDb = open({
path: "./stock-aggregates.db",
// any options go here, we can turn on compression like this:
compression: true,
});
const stockExistenceDb = open({
path: "./stock-existence.db",
// any options go here, we can turn on compression like this:
compression: true,
});
/** Largest possible key according to the `ordered-binary` (used by lmdbx) docs. */
const MAXIMUM_KEY = Buffer.from([0xff]);
function makeStockDatabase(): StockDatabase {
const stockDatabase: Omit<StockDatabase, "getSymbols"> = {
getKeys: async ({ date, key }) => {
if (key?.symbol) {
return [key as StockKey];
}
return stockExistenceDb
.getRange({
start: [date],
end: [date, MAXIMUM_KEY],
})
.map(({ key }) => ({ symbol: key[1] })).asArray;
},
getAggregates: async ({ key: { symbol }, date }) => {
const startOfDayUnix = new Date(`${date}T00:00:00Z`).valueOf();
const endOfDayUnix = startOfDayUnix + 3600 * 24 * 1000;
return stockAggregatesDb
.getRange({
start: [symbol, startOfDayUnix],
end: [symbol, endOfDayUnix],
})
.map(({ key, value }) => ({
tsStart: key[1],
open: value.open,
close: value.close,
high: value.high,
low: value.low,
})).asArray;
},
getAggregate: async ({ key: { symbol }, tsStart }) => {
return stockAggregatesDb.get([symbol, tsStart]);
},
insertAggregates: async (aggregates) => {
await stockExistenceDb.batch(() => {
for (const aggregate of aggregates) {
stockExistenceDb.put(
[
new Date(aggregate.tsStart).toISOString().substring(0, 10),
aggregate.key.symbol,
],
null
);
}
});
await stockAggregatesDb.batch(() => {
for (const aggregate of aggregates) {
stockAggregatesDb.put([aggregate.key.symbol, aggregate.tsStart], {
open: aggregate.open,
close: aggregate.close,
high: aggregate.high,
low: aggregate.low,
});
}
});
},
getClosingPrice: async ({ key }) => {
// no-op: not used since stocks don't have a "closing" price, unlike options.
return 0;
},
};
return {
...stockDatabase,
getSymbols: stockDatabase.getKeys,
};
}
export const stockDatabase: StockDatabase = makeStockDatabase();
@@ -0,0 +1,46 @@
export type Candlestick = {
open: number;
close: number;
high: number;
low: number;
};
export type Aggregate<T> = {
key: T;
/** UNIX time in milliseconds */
tsStart: number;
} & Candlestick;
export type AggregateDatabase<T> = {
getKeys: ({
key,
date,
}: {
key?: T | Partial<T>;
date?: string;
}) => Promise<Array<T>>;
getAggregates: ({
key,
date,
}: {
key: T;
date: string;
}) => Promise<Array<Omit<Aggregate<T>, "key">>>;
/** Since an aggregate may not exist at the specified `tsStart`, return `undefined` if it doesn't exist. */
getAggregate: ({
key,
tsStart,
}: {
key: T;
tsStart: number;
}) => Promise<Omit<Aggregate<T>, "key"> | undefined>;
getAggregatesSync?: ({
key,
date,
}: {
key: T;
date: string;
}) => Array<Omit<Aggregate<T>, "key">>;
insertAggregates: (aggregates: Array<Aggregate<T>>) => Promise<void>;
getClosingPrice: ({ key }: { key: T }) => Promise<number>;
};