From 34ff197320d73d5a973cfee8583e796379fb3200 Mon Sep 17 00:00:00 2001 From: Brian Sakal Date: Wed, 26 Apr 2023 23:04:49 -0400 Subject: [PATCH] ingests to lmdb --- .gitignore | 2 ++ index.ts | 84 +++++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 76 insertions(+), 10 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..da37e43 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.lmdb +*.lmdb-lock \ No newline at end of file diff --git a/index.ts b/index.ts index 96e1b4e..4b74650 100644 --- a/index.ts +++ b/index.ts @@ -1,16 +1,80 @@ import { open } from 'npm:lmdb'; import { CsvParseStream } from 'https://deno.land/std@0.184.0/csv/mod.ts'; -const fileStream = (await Deno.open('/home/brian/Downloads/options-data/2013-01-02options.cvs', {read: true})).readable; -const csvParseStream = new CsvParseStream({skipFirstRow: true}); +const db = open({ + path: 'options.lmdb', + compression: false, + encoding: 'msgpack', + sharedStructuresKey: Symbol.for('structures') +}); +interface Row { + contract: string, + underlying: string, + expiration: string, + type: string, + strike: string, + style: string, + bid: string, + bid_size: string, + ask: string, + ask_size: string, + volume: string, + open_interest: string, + quote_date: string, + delta: string, + gamma: string, + theta: string, + vega: string, + implied_volatility: string, +}; -const rowStream = fileStream - .pipeThrough(new TextDecoderStream()) - .pipeThrough(csvParseStream) -; -const rowReader = rowStream.getReader(); +async function getCsvRowStream(){ + const fileStream = (await Deno.open('/home/brian/Downloads/options-data/2013-01-02options.cvs', {read: true})).readable; + const csvParseStream = new CsvParseStream({skipFirstRow: true}); + const rowStream = fileStream + .pipeThrough(new TextDecoderStream()) + .pipeThrough(csvParseStream) + ; + return rowStream; +} -for(let i = 0; i<20; i++){ - console.log(await rowReader.read()); -} \ No newline at end of file +async function ingestCsvToLmdb(){ + const rowStream = await getCsvRowStream(); + const rowReader = rowStream.getReader(); + + while(true){ + const row = (await rowReader.read()).value as unknown as Row; + if(typeof row !== 'undefined'){ + db.put([row.underlying, row.quote_date, row.expiration, row.strike, row.type], row); + } + else{ + break; + } + } +} + +async function countRowsInCsv(){ + const rowStream = await getCsvRowStream(); + const rowReader = rowStream.getReader(); + + let i = 0; + while(true){ + const row = await rowReader.read(); + if(typeof row !== 'undefined' && typeof row.value !== 'undefined'){ + i++; + } + else{ + break; + } + } + return i; +} + +function countRowsInLmdb(){ + return db.getKeysCount(); +} + +//console.log('rows-in-csv:', await countRowsInCsv()); +//console.log('rows-in-lmdb:', countRowsInLmdb()); +await ingestCsvToLmdb(); \ No newline at end of file