Merge pull request #1400 from ranaroussi/feature/improve-performance

Optimise recent new features in `history`
pull/1421/head
ValueRaider 2023-02-12 14:58:36 +00:00 committed by GitHub
commit a0046439d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 54 deletions

View File

@ -974,7 +974,7 @@ class TickerBase:
for g in dts_groups:
print(f"- {g[0]} -> {g[-1]}")
# Add some good data to each group, so can calibrate later:
# Add some good data to each group, so can calibrate prices later:
for i in range(len(dts_groups)):
g = dts_groups[i]
g0 = g[0]
@ -1125,7 +1125,7 @@ class TickerBase:
df_new["Volume"] *= ratio_rcp
# Repair!
bad_dts = df_block.index[(df_block[price_cols+["Volume"]]==tag).any(axis=1)]
bad_dts = df_block.index[(df_block[price_cols+["Volume"]]==tag).to_numpy().any(axis=1)]
if debug:
no_fine_data_dts = []
@ -1149,7 +1149,7 @@ class TickerBase:
df_fine = df_fine.loc[idx:]
df_bad_row = df.loc[idx]
bad_fields = df_bad_row.index[df_bad_row==tag].values
bad_fields = df_bad_row.index[df_bad_row==tag].to_numpy()
if "High" in bad_fields:
df_v2.loc[idx, "High"] = df_new_row["High"]
if "Low" in bad_fields:
@ -1189,7 +1189,7 @@ class TickerBase:
if df.index.tz is None:
df2.index = df2.index.tz_localize(tz_exchange)
else:
elif df2.index.tz != tz_exchange:
df2.index = df2.index.tz_convert(tz_exchange)
# Only import scipy if users actually want function. To avoid
@ -1198,7 +1198,7 @@ class TickerBase:
data_cols = ["High", "Open", "Low", "Close", "Adj Close"] # Order important, separate High from Low
data_cols = [c for c in data_cols if c in df2.columns]
f_zeroes = (df2[data_cols]==0).any(axis=1)
f_zeroes = (df2[data_cols]==0).any(axis=1).to_numpy()
if f_zeroes.any():
df2_zeroes = df2[f_zeroes]
df2 = df2[~f_zeroes]
@ -1206,8 +1206,9 @@ class TickerBase:
df2_zeroes = None
if df2.shape[0] <= 1:
return df
median = _ndimage.median_filter(df2[data_cols].values, size=(3, 3), mode="wrap")
ratio = df2[data_cols].values / median
df2_data = df2[data_cols].to_numpy()
median = _ndimage.median_filter(df2_data, size=(3, 3), mode="wrap")
ratio = df2_data / median
ratio_rounded = (ratio / 20).round() * 20 # round ratio to nearest 20
f = ratio_rounded == 100
if not f.any():
@ -1220,14 +1221,15 @@ class TickerBase:
c = data_cols[i]
df2.loc[fi, c] = tag
n_before = (df2[data_cols].to_numpy()==tag).sum()
n_before = df2_data.sum()
df2 = self._reconstruct_intervals_batch(df2, interval, prepost, tag, silent)
df2_tagged = df2[data_cols].to_numpy()==tag
n_after = (df2[data_cols].to_numpy()==tag).sum()
if n_after > 0:
# This second pass will *crudely* "fix" any remaining errors in High/Low
# simply by ensuring they don't contradict e.g. Low = 100x High.
f = df2[data_cols].to_numpy()==tag
f = df2_tagged
for i in range(f.shape[0]):
fi = f[i,:]
if not fi.any():
@ -1259,7 +1261,10 @@ class TickerBase:
if fi[j]:
df2.loc[idx, c] = df2.loc[idx, ["Open", "Close"]].min()
n_after_crude = (df2[data_cols].to_numpy()==tag).sum()
df2_tagged = df2[data_cols].to_numpy()==tag
n_after_crude = df2_tagged.sum()
else:
n_after_crude = n_after
n_fixed = n_before - n_after_crude
n_fixed_crudely = n_after - n_after_crude
@ -1271,7 +1276,7 @@ class TickerBase:
print(report_msg)
# Restore original values where repair failed
f = df2[data_cols].values==tag
f = df2_tagged
for j in range(len(data_cols)):
fj = f[:,j]
if fj.any():
@ -1301,7 +1306,7 @@ class TickerBase:
if df2.index.tz is None:
df2.index = df2.index.tz_localize(tz_exchange)
else:
elif df2.index.tz != tz_exchange:
df2.index = df2.index.tz_convert(tz_exchange)
price_cols = [c for c in ["Open", "High", "Low", "Close", "Adj Close"] if c in df2.columns]
@ -1309,18 +1314,17 @@ class TickerBase:
df2_reserve = None
if intraday:
# Ignore days with >50% intervals containing NaNs
df_nans = pd.DataFrame(f_prices_bad.any(axis=1), columns=["nan"])
df_nans["_date"] = df_nans.index.date
grp = df_nans.groupby("_date")
grp = pd.Series(f_prices_bad.any(axis=1), name="nan").groupby(f_prices_bad.index.date)
nan_pct = grp.sum() / grp.count()
dts = nan_pct.index[nan_pct["nan"]>0.5]
dts = nan_pct.index[nan_pct>0.5]
f_zero_or_nan_ignore = _np.isin(f_prices_bad.index.date, dts)
df2_reserve = df2[f_zero_or_nan_ignore]
df2 = df2[~f_zero_or_nan_ignore]
f_prices_bad = (df2[price_cols] == 0.0) | df2[price_cols].isna()
f_high_low_good = (~df2["High"].isna()) & (~df2["Low"].isna())
f_vol_bad = (df2["Volume"]==0).to_numpy() & f_high_low_good & (df2["High"]!=df2["Low"]).to_numpy()
f_high_low_good = (~df2["High"].isna().to_numpy()) & (~df2["Low"].isna().to_numpy())
f_change = df2["High"].to_numpy() != df2["Low"].to_numpy()
f_vol_bad = (df2["Volume"]==0).to_numpy() & f_high_low_good & f_change
# Check whether worth attempting repair
f_prices_bad = f_prices_bad.to_numpy()
@ -1347,14 +1351,15 @@ class TickerBase:
f_vol_zero_or_nan = (df2["Volume"].to_numpy()==0) | (df2["Volume"].isna().to_numpy())
df2.loc[f_prices_bad.any(axis=1) & f_vol_zero_or_nan, "Volume"] = tag
# If volume=0 or NaN but price moved in interval, then tag volume for repair
f_change = df2["High"].to_numpy() != df2["Low"].to_numpy()
df2.loc[f_change & f_vol_zero_or_nan, "Volume"] = tag
n_before = (df2[data_cols].to_numpy()==tag).sum()
dts_tagged = df2.index[(df2[data_cols].to_numpy()==tag).any(axis=1)]
df2 = self._reconstruct_intervals_batch(df2, interval, prepost, tag, silent)
n_after = (df2[data_cols].to_numpy()==tag).sum()
dts_not_repaired = df2.index[(df2[data_cols].to_numpy()==tag).any(axis=1)]
df2_tagged = df2[data_cols].to_numpy()==tag
n_before = df2_tagged.sum()
dts_tagged = df2.index[df2_tagged.any(axis=1)]
df3 = self._reconstruct_intervals_batch(df2, interval, prepost, tag, silent)
df3_tagged = df3[data_cols].to_numpy()==tag
n_after = df3_tagged.sum()
dts_not_repaired = df3.index[df3_tagged.any(axis=1)]
n_fixed = n_before - n_after
if not silent and n_fixed > 0:
msg = f"{self.ticker}: fixed {n_fixed}/{n_before} value=0 errors in {interval} price data"
@ -1364,18 +1369,17 @@ class TickerBase:
print(msg)
if df2_reserve is not None:
df2 = _pd.concat([df2, df2_reserve])
df2 = df2.sort_index()
df3 = _pd.concat([df3, df2_reserve]).sort_index()
# Restore original values where repair failed (i.e. remove tag values)
f = df2[data_cols].values==tag
f = df3[data_cols].to_numpy()==tag
for j in range(len(data_cols)):
fj = f[:,j]
if fj.any():
c = data_cols[j]
df2.loc[fj, c] = df.loc[fj, c]
df3.loc[fj, c] = df.loc[fj, c]
return df2
return df3
def _get_ticker_tz(self, debug_mode, proxy, timeout):
if self._tz is not None:

View File

@ -719,36 +719,28 @@ def format_history_metadata(md):
tps = md["tradingPeriods"]
if isinstance(tps, list):
# Only regular times
regs_dict = [tps[i][0] for i in range(len(tps))]
pres_dict = None
posts_dict = None
df = _pd.DataFrame.from_records(_np.hstack(tps))
df = df.drop(["timezone", "gmtoffset"], axis=1)
df["start"] = _pd.to_datetime(df["start"], unit='s', utc=True).dt.tz_convert(tz)
df["end"] = _pd.to_datetime(df["end"], unit='s', utc=True).dt.tz_convert(tz)
elif isinstance(tps, dict):
# Includes pre- and post-market
pres_dict = [tps["pre"][i][0] for i in range(len(tps["pre"]))]
posts_dict = [tps["post"][i][0] for i in range(len(tps["post"]))]
regs_dict = [tps["regular"][i][0] for i in range(len(tps["regular"]))]
pre_df = _pd.DataFrame.from_records(_np.hstack(tps["pre"]))
post_df = _pd.DataFrame.from_records(_np.hstack(tps["post"]))
regular_df = _pd.DataFrame.from_records(_np.hstack(tps["post"]))
pre_df = pre_df.rename(columns={"start":"pre_start", "end":"pre_end"}).drop(["timezone", "gmtoffset"], axis=1)
post_df = post_df.rename(columns={"start":"post_start", "end":"post_end"}).drop(["timezone", "gmtoffset"], axis=1)
regular_df = regular_df.drop(["timezone", "gmtoffset"], axis=1)
df = regular_df.join(pre_df).join(post_df)
for c in ["start", "end", "pre_start", "pre_end", "post_start", "post_end"]:
df[c] = _pd.to_datetime(df[c], unit='s', utc=True).dt.tz_convert(tz)
else:
raise Exception()
def _dict_to_table(d):
df = _pd.DataFrame.from_dict(d).drop(["timezone", "gmtoffset"], axis=1)
df["end"] = _pd.to_datetime(df["end"], unit='s', utc=True).dt.tz_convert(tz)
df["start"] = _pd.to_datetime(df["start"], unit='s', utc=True).dt.tz_convert(tz)
df.index = _pd.to_datetime(df["start"].dt.date)
df.index = df.index.tz_localize(tz)
return df
df = _dict_to_table(regs_dict)
df_cols = ["start", "end"]
if pres_dict is not None:
pre_df = _dict_to_table(pres_dict)
df = df.merge(pre_df.rename(columns={"start":"pre_start", "end":"pre_end"}), left_index=True, right_index=True)
df_cols = ["pre_start", "pre_end"]+df_cols
if posts_dict is not None:
post_df = _dict_to_table(posts_dict)
df = df.merge(post_df.rename(columns={"start":"post_start", "end":"post_end"}), left_index=True, right_index=True)
df_cols = df_cols+["post_start", "post_end"]
df = df[df_cols]
df.index = _pd.to_datetime(df["start"].dt.date)
df.index = df.index.tz_localize(tz)
df.index.name = "Date"
md["tradingPeriods"] = df