import { join, dirname } from 'node:path' import fs from 'node:fs/promises'; import {sql} from './db.js'; import {parse} from 'csv-parse/sync'; try { await sql`CREATE TYPE OPTION_TYPE as ENUM ('put', 'call');`; await sql`CREATE TYPE OPTION_STYLE as ENUM ('A', 'E');`; } catch(err){} /* contract VARCHAR(20) GENERATED ALWAYS AS ( CONCAT( underlying , RIGHT(YEAR(expiration)::VARCHAR,2) , LPAD(MONTH(expiration)::VARCHAR,2,'0') , LPAD(DAY(expiration)::VARCHAR,2,'0') , (CASE WHEN type = 'call' THEN 'C' ELSE 'P' END) , LPAD(((strike*1000)::INTEGER)::VARCHAR,8,'0') ) ), */ await sql`CREATE TABLE IF NOT EXISTS option_quote ( underlying VARCHAR, expiration DATE, type OPTION_TYPE, strike FLOAT, style OPTION_STYLE, bid FLOAT, bid_size INTEGER DEFAULT 0, ask FLOAT, ask_size INTEGER DEFAULT 0, volume INTEGER, open_interest INTEGER, quote_date DATE, delta FLOAT, gamma FLOAT, theta FLOAT, vega FLOAT, implied_volatility FLOAT );`; await sql`CREATE TABLE IF NOT EXISTS stock_quote ( quote_date DATE, symbol VARCHAR, open FLOAT DEFAULT 0.0, high FLOAT DEFAULT 0.0, low FLOAT DEFAULT 0.0, close FLOAT DEFAULT 0.0, volume FLOAT DEFAULT 0.0, adjust_close FLOAT DEFAULT 0.0 );`; export async function ingestStocks(sourceDataDir:string):Promise{ // read each csv, and ingest each row into postgres: const csvFiles = await fs.readdir(sourceDataDir); await Promise.all(csvFiles.filter((csvFile)=>csvFile.substring(10,16)==='stocks').map(async (csvFile)=>{ const quoteDate = csvFile.substring(0,10); const rows = parse(await fs.readFile(join(sourceDataDir, csvFile))); await Promise.all(rows.map(async ([symbol, open, high, low, close, volume, adjust_close])=>{ open = Number(open); high = Number(high); low = Number(low); close = Number(close); volume = Number(volume); adjust_close = Number(adjust_close); try{ await sql`insert into "stock_quote" ( quote_date, symbol, open, high, low, close, volume, adjust_close ) values ( ${quoteDate}, ${symbol}, ${open}, ${high}, ${low}, ${close}, ${volume}, ${adjust_close || 0} );`; console.log(`${quoteDate} ${symbol}`); } catch(err){ console.error(err); } })); })); } export async function ingestOptions(sourceDataDir:string):Promise{ // read each csv, and ingest each row into postgres: const csvFiles = (await fs.readdir(sourceDataDir)).filter((csvFile)=>csvFile.substring(10,17)==='options'); for(const csvFile of csvFiles){ const quoteDate = csvFile.substring(0,10); const rows = parse(await fs.readFile(join(sourceDataDir, csvFile)), {columns:true}); for(let {underlying, expiration, type, strike, style, bid, bid_size, ask, ask_size, volume, open_interest, quote_date, delta, gamma, theta, vega, implied_volatility} of rows){ strike=Number(strike); bid=Number(bid); bid_size=Number(bid_size); ask=Number(ask); ask_size=Number(ask_size); volume=Number(volume); open_interest=Number(open_interest); delta=Number(delta); gamma=Number(gamma); theta=Number(theta); vega=Number(vega); implied_volatility=Number(implied_volatility); try{ await sql`insert into "option_quote" ( underlying, expiration, type, strike, style, bid, bid_size, ask, ask_size, volume, open_interest, quote_date, delta, gamma, theta, vega, implied_volatility ) values ( ${underlying}, ${expiration}, ${type}, ${strike}, ${style}, ${bid}, ${bid_size}, ${ask}, ${ask_size}, ${volume}, ${open_interest}, ${quote_date}, ${delta}, ${gamma}, ${theta}, ${vega}, ${implied_volatility} );`; //console.log(`options ${quoteDate} ${underlying}`); } catch(err){ console.error(`${quoteDate} ${underlying}`, err); } }; }; console.log("creating index"); await sql`CREATE INDEX ON "option_quote" USING btree ("underlying","expiration","type","strike","quote_date");`; await sql`CREATE MATERIALIZED VIEW "underlyings" AS SELECT DISTINCT "underlying" FROM "option_quote";`; }