retry clickhouse insert in case of socket hangup

main
Avraham Sakal 12 months ago
parent fc2526a4aa
commit f8279d4932

@ -3,6 +3,7 @@ import sqlite3 from "sqlite3";
import { open } from "sqlite"; import { open } from "sqlite";
import { clickhouse, query } from "./clickhouse.js"; import { clickhouse, query } from "./clickhouse.js";
import { OptionContract } from "./polygon.js"; import { OptionContract } from "./polygon.js";
import pRetry from "p-retry";
const sqliteDb = await open({ const sqliteDb = await open({
filename: "/tmp/sync-state.db", filename: "/tmp/sync-state.db",
@ -119,11 +120,16 @@ export async function pullOptionContracts(symbol: string, date: string) {
date date
)) { )) {
console.log(batch.length); console.log(batch.length);
await clickhouse.insert({ pRetry(
table: "option_contract_existences", async () => {
values: batch, await clickhouse.insert({
format: "JSONEachRow", table: "option_contract_existences",
}); values: batch,
format: "JSONEachRow",
});
},
{ forever: true, factor: 2, maxTimeout: 120000 }
);
} }
await setPullOptionContractsState(symbol, date, { await setPullOptionContractsState(symbol, date, {
status: OptionContractSyncStatus.COMPLETED, status: OptionContractSyncStatus.COMPLETED,
@ -157,11 +163,16 @@ export async function pullOptionContractAggregates(
new Date(batch[0].tsStart * 1000), new Date(batch[0].tsStart * 1000),
new Date(batch[batch.length - 1].tsStart * 1000) new Date(batch[batch.length - 1].tsStart * 1000)
); );
await clickhouse.insert({ pRetry(
table: "option_contract_aggregates", async () => {
values: batch, await clickhouse.insert({
format: "JSONEachRow", table: "option_contract_aggregates",
}); values: batch,
format: "JSONEachRow",
});
},
{ forever: true, factor: 2, maxTimeout: 120000 }
);
} }
} }
await setPullOptionContractAggregatesState(ticker, { await setPullOptionContractAggregatesState(ticker, {

Loading…
Cancel
Save