From f8279d4932f74786a4d1e56b2afa84073e6c842b Mon Sep 17 00:00:00 2001 From: Avraham Sakal Date: Sun, 30 Jun 2024 21:45:51 -0400 Subject: [PATCH] retry clickhouse insert in case of socket hangup --- server/src/lib/sync.ts | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/server/src/lib/sync.ts b/server/src/lib/sync.ts index 61b7a56..e6f9aa2 100644 --- a/server/src/lib/sync.ts +++ b/server/src/lib/sync.ts @@ -3,6 +3,7 @@ import sqlite3 from "sqlite3"; import { open } from "sqlite"; import { clickhouse, query } from "./clickhouse.js"; import { OptionContract } from "./polygon.js"; +import pRetry from "p-retry"; const sqliteDb = await open({ filename: "/tmp/sync-state.db", @@ -119,11 +120,16 @@ export async function pullOptionContracts(symbol: string, date: string) { date )) { console.log(batch.length); - await clickhouse.insert({ - table: "option_contract_existences", - values: batch, - format: "JSONEachRow", - }); + pRetry( + async () => { + await clickhouse.insert({ + table: "option_contract_existences", + values: batch, + format: "JSONEachRow", + }); + }, + { forever: true, factor: 2, maxTimeout: 120000 } + ); } await setPullOptionContractsState(symbol, date, { status: OptionContractSyncStatus.COMPLETED, @@ -157,11 +163,16 @@ export async function pullOptionContractAggregates( new Date(batch[0].tsStart * 1000), new Date(batch[batch.length - 1].tsStart * 1000) ); - await clickhouse.insert({ - table: "option_contract_aggregates", - values: batch, - format: "JSONEachRow", - }); + pRetry( + async () => { + await clickhouse.insert({ + table: "option_contract_aggregates", + values: batch, + format: "JSONEachRow", + }); + }, + { forever: true, factor: 2, maxTimeout: 120000 } + ); } } await setPullOptionContractAggregatesState(ticker, {