ingests to lmdb
parent
b5d6662b66
commit
34ff197320
@ -0,0 +1,2 @@
|
||||
*.lmdb
|
||||
*.lmdb-lock
|
@ -1,16 +1,80 @@
|
||||
import { open } from 'npm:lmdb';
|
||||
import { CsvParseStream } from 'https://deno.land/std@0.184.0/csv/mod.ts';
|
||||
|
||||
const fileStream = (await Deno.open('/home/brian/Downloads/options-data/2013-01-02options.cvs', {read: true})).readable;
|
||||
const csvParseStream = new CsvParseStream({skipFirstRow: true});
|
||||
const db = open({
|
||||
path: 'options.lmdb',
|
||||
compression: false,
|
||||
encoding: 'msgpack',
|
||||
sharedStructuresKey: Symbol.for('structures')
|
||||
});
|
||||
interface Row {
|
||||
contract: string,
|
||||
underlying: string,
|
||||
expiration: string,
|
||||
type: string,
|
||||
strike: string,
|
||||
style: string,
|
||||
bid: string,
|
||||
bid_size: string,
|
||||
ask: string,
|
||||
ask_size: string,
|
||||
volume: string,
|
||||
open_interest: string,
|
||||
quote_date: string,
|
||||
delta: string,
|
||||
gamma: string,
|
||||
theta: string,
|
||||
vega: string,
|
||||
implied_volatility: string,
|
||||
};
|
||||
|
||||
const rowStream = fileStream
|
||||
.pipeThrough(new TextDecoderStream())
|
||||
.pipeThrough(csvParseStream)
|
||||
;
|
||||
const rowReader = rowStream.getReader();
|
||||
async function getCsvRowStream(){
|
||||
const fileStream = (await Deno.open('/home/brian/Downloads/options-data/2013-01-02options.cvs', {read: true})).readable;
|
||||
const csvParseStream = new CsvParseStream({skipFirstRow: true});
|
||||
|
||||
const rowStream = fileStream
|
||||
.pipeThrough(new TextDecoderStream())
|
||||
.pipeThrough(csvParseStream)
|
||||
;
|
||||
return rowStream;
|
||||
}
|
||||
|
||||
async function ingestCsvToLmdb(){
|
||||
const rowStream = await getCsvRowStream();
|
||||
const rowReader = rowStream.getReader();
|
||||
|
||||
while(true){
|
||||
const row = (await rowReader.read()).value as unknown as Row;
|
||||
if(typeof row !== 'undefined'){
|
||||
db.put([row.underlying, row.quote_date, row.expiration, row.strike, row.type], row);
|
||||
}
|
||||
else{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for(let i = 0; i<20; i++){
|
||||
console.log(await rowReader.read());
|
||||
async function countRowsInCsv(){
|
||||
const rowStream = await getCsvRowStream();
|
||||
const rowReader = rowStream.getReader();
|
||||
|
||||
let i = 0;
|
||||
while(true){
|
||||
const row = await rowReader.read();
|
||||
if(typeof row !== 'undefined' && typeof row.value !== 'undefined'){
|
||||
i++;
|
||||
}
|
||||
else{
|
||||
break;
|
||||
}
|
||||
}
|
||||
return i;
|
||||
}
|
||||
|
||||
function countRowsInLmdb(){
|
||||
return db.getKeysCount();
|
||||
}
|
||||
|
||||
//console.log('rows-in-csv:', await countRowsInCsv());
|
||||
//console.log('rows-in-lmdb:', countRowsInLmdb());
|
||||
await ingestCsvToLmdb();
|
Loading…
Reference in New Issue