# ################################## HOW TO USE #################################### # # # # This is a Jupyter notebook formatted as a script # # Format: https://jupytext.readthedocs.io/en/latest/formats.html#the-percent-format # # # # Save this file and remove the '.txt' extension # # In Jupyter Lab, right click on the Python file -> Open With -> Jupytext Notebook # # Make sure to have Jupytext installed: https://github.com/mwouts/jupytext # # # # ################################################################################## # # %% [markdown] # # Pairs trading # ## Selection # %% from vectorbtpro import * SYMBOLS = vbt.BinanceData.list_symbols("*USDT") POOL_FILE = "temp/data_pool.h5" START = "2018" END = "2023" # vbt.remove_dir("temp", with_contents=True, missing_ok=True) vbt.make_dir("temp") if not vbt.file_exists(POOL_FILE): with vbt.ProgressBar(total=len(SYMBOLS)) as pbar: collected = 0 for symbol in SYMBOLS: try: data = vbt.BinanceData.pull( symbol, start=START, end=END, show_progress=False, silence_warnings=True ) data.to_hdf(POOL_FILE) collected += 1 except Exception: pass pbar.set_prefix(f"{symbol} ({collected})") pbar.update() # %% SELECT_START = "2020" SELECT_END = "2021" data = vbt.HDFData.pull( POOL_FILE, start=SELECT_START, end=SELECT_END, silence_warnings=True ) print(len(data.symbols)) # %% data = data.select([ k for k, v in data.data.items() if not v.isnull().any().any() ]) print(len(data.symbols)) # %% @vbt.parameterized( merge_func="concat", engine="pathos", distribute="chunks", n_chunks="auto" ) def coint_pvalue(close, s1, s2): import statsmodels.tsa.stattools as ts import numpy as np return ts.coint(np.log(close[s1]), np.log(close[s2]))[1] COINT_FILE = "temp/coint_pvalues.pickle" # vbt.remove_file(COINT_FILE, missing_ok=True) if not vbt.file_exists(COINT_FILE): coint_pvalues = coint_pvalue( data.close, vbt.Param(data.symbols, condition="s1 != s2"), vbt.Param(data.symbols) ) vbt.save(coint_pvalues, COINT_FILE) else: coint_pvalues = vbt.load(COINT_FILE) # %% coint_pvalues = coint_pvalues.sort_values() print(coint_pvalues) # %% S1, S2 = "ALGOUSDT", "QTUMUSDT" data.plot(column="Close", symbol=[S1, S2], base=1).show() # %% S1_log = np.log(data.get("Close", S1)) S2_log = np.log(data.get("Close", S2)) log_diff = (S2_log - S1_log).rename("Log diff") fig = log_diff.vbt.plot() fig.add_hline(y=log_diff.mean(), line_color="yellow", line_dash="dot") fig.show() # %% [markdown] # ## Testing # %% DATA_FILE = "temp/data.pickle" # vbt.remove_file(DATA_FILE, missing_ok=True) if not vbt.file_exists(DATA_FILE): data = vbt.BinanceData.pull( [S1, S2], start=SELECT_END, end=END, timeframe="hourly" ) vbt.save(data, DATA_FILE) else: data = vbt.load(DATA_FILE) print(len(data.index)) # %% [markdown] # ### Level: Researcher # %% import scipy.stats as st WINDOW = 24 * 30 UPPER = st.norm.ppf(1 - 0.05 / 2) LOWER = -st.norm.ppf(1 - 0.05 / 2) S1_close = data.get("Close", S1) S2_close = data.get("Close", S2) ols = vbt.OLS.run(S1_close, S2_close, window=vbt.Default(WINDOW)) spread = ols.error.rename("Spread") zscore = ols.zscore.rename("Z-score") print(pd.concat((spread, zscore), axis=1)) # %% upper_crossed = zscore.vbt.crossed_above(UPPER) lower_crossed = zscore.vbt.crossed_below(LOWER) fig = zscore.vbt.plot() fig.add_hline(y=UPPER, line_color="orangered", line_dash="dot") fig.add_hline(y=0, line_color="yellow", line_dash="dot") fig.add_hline(y=LOWER, line_color="limegreen", line_dash="dot") upper_crossed.vbt.signals.plot_as_exits(zscore, fig=fig) lower_crossed.vbt.signals.plot_as_entries(zscore, fig=fig) fig.show() # %% long_entries = data.symbol_wrapper.fill(False) short_entries = data.symbol_wrapper.fill(False) short_entries.loc[upper_crossed, S1] = True long_entries.loc[upper_crossed, S2] = True long_entries.loc[lower_crossed, S1] = True short_entries.loc[lower_crossed, S2] = True print(long_entries.sum()) # %% print(short_entries.sum()) # %% pf = vbt.Portfolio.from_signals( data, entries=long_entries, short_entries=short_entries, size=10, size_type="valuepercent100", group_by=True, cash_sharing=True, call_seq="auto" ) # %% fig = pf.plot_allocations() rebalancing_dates = data.index[np.unique(pf.orders.idx.values)] for date in rebalancing_dates: fig.add_vline(x=date, line_color="teal", line_dash="dot") fig.show() # %% pf.stats() # %% allocations = data.symbol_wrapper.fill() allocations.loc[upper_crossed, S1] = -0.1 allocations.loc[upper_crossed, S2] = 0.1 allocations.loc[lower_crossed, S1] = 0.1 allocations.loc[lower_crossed, S2] = -0.1 pfo = vbt.PortfolioOptimizer.from_filled_allocations(allocations) print(pfo.allocations) # %% pfo.plot().show() # %% pf = pfo.simulate(data, pf_method="from_signals") pf.total_return # %% PTS_expr = """ PTS: x = @in_close.iloc[:, 0] y = @in_close.iloc[:, 1] ols = vbt.OLS.run(x, y, window=@p_window, hide_params=True) upper = st.norm.ppf(1 - @p_upper_alpha / 2) lower = -st.norm.ppf(1 - @p_lower_alpha / 2) upper_crossed = ols.zscore.vbt.crossed_above(upper) lower_crossed = ols.zscore.vbt.crossed_below(lower) long_entries = wrapper.fill(False) short_entries = wrapper.fill(False) short_entries.loc[upper_crossed, x.name] = True long_entries.loc[upper_crossed, y.name] = True long_entries.loc[lower_crossed, x.name] = True short_entries.loc[lower_crossed, y.name] = True long_entries, short_entries """ PTS = vbt.IF.from_expr(PTS_expr, keep_pd=True, st=st) vbt.phelp(PTS.run) # %% WINDOW_SPACE = np.arange(5, 50).tolist() ALPHA_SPACE = (np.arange(1, 100) / 1000).tolist() long_entries, short_entries = data.run( PTS, window=WINDOW_SPACE, upper_alpha=ALPHA_SPACE, lower_alpha=ALPHA_SPACE, param_product=True, random_subset=1000, seed=42, unpack=True ) print(long_entries.columns) # %% pf = vbt.Portfolio.from_signals( data, entries=long_entries, short_entries=short_entries, size=10, size_type="valuepercent100", group_by=vbt.ExceptLevel("symbol"), cash_sharing=True, call_seq="auto" ) opt_results = pd.concat(( pf.total_return, pf.trades.expectancy, ), axis=1) print(opt_results.sort_values(by="total_return", ascending=False)) # %% best_index = opt_results.idxmax()["expectancy"] best_long_entries = long_entries[best_index] best_short_entries = short_entries[best_index] STOP_SPACE = [np.nan] + np.arange(1, 100).tolist() pf = vbt.Portfolio.from_signals( data, entries=best_long_entries, short_entries=best_short_entries, size=10, size_type="valuepercent100", group_by=vbt.ExceptLevel("symbol"), cash_sharing=True, call_seq="auto", sl_stop=vbt.Param(STOP_SPACE), tsl_stop=vbt.Param(STOP_SPACE), tp_stop=vbt.Param(STOP_SPACE), delta_format="percent100", stop_exit_price="close", broadcast_kwargs=dict(random_subset=1000, seed=42) ) opt_results = pd.concat(( pf.total_return, pf.trades.expectancy, ), axis=1) print(opt_results.sort_values(by="total_return", ascending=False)) # %% def plot_metric_by_stop(stop_name, metric_name, stat_name, smooth): from scipy.signal import savgol_filter values = pf.deep_getattr(metric_name) values = values.vbt.select_levels(stop_name) values = getattr(values.groupby(values.index), stat_name)() smooth_values = savgol_filter(values, smooth, 1) smooth_values = values.vbt.wrapper.wrap(smooth_values) fig = values.rename(metric_name).vbt.plot() smooth_values.rename(f"{metric_name} (smoothed)").vbt.plot( trace_kwargs=dict(line=dict(dash="dot", color="yellow")), fig=fig, ) return fig # %% plot_metric_by_stop( "sl_stop", "trades.expectancy", "median", 10 ).show() # %% plot_metric_by_stop( "tsl_stop", "trades.expectancy", "median", 10 ).show() # %% plot_metric_by_stop( "tp_stop", "trades.expectancy", "median", 10 ).show() # %% [markdown] # ### Level: Engineer :satellite_orbital: # %% @njit(nogil=True) def pt_signals_nb(close, window=WINDOW, upper=UPPER, lower=LOWER): x = np.expand_dims(close[:, 0], 1) y = np.expand_dims(close[:, 1], 1) _, _, zscore = vbt.ind_nb.ols_nb(x, y, window) zscore_1d = zscore[:, 0] upper_ts = np.full_like(zscore_1d, upper, dtype=np.float_) lower_ts = np.full_like(zscore_1d, lower, dtype=np.float_) upper_crossed = vbt.nb.crossed_above_1d_nb(zscore_1d, upper_ts) lower_crossed = vbt.nb.crossed_above_1d_nb(lower_ts, zscore_1d) long_entries = np.full_like(close, False, dtype=np.bool_) short_entries = np.full_like(close, False, dtype=np.bool_) short_entries[upper_crossed, 0] = True long_entries[upper_crossed, 1] = True long_entries[lower_crossed, 0] = True short_entries[lower_crossed, 1] = True return long_entries, short_entries # %% long_entries, short_entries = pt_signals_nb(data.close.values) long_entries = data.symbol_wrapper.wrap(long_entries) short_entries = data.symbol_wrapper.wrap(short_entries) print(long_entries.sum()) # %% print(short_entries.sum()) # %% @njit(nogil=True) def pt_portfolio_nb( open, high, low, close, long_entries, short_entries, sl_stop=np.nan, tsl_stop=np.nan, tp_stop=np.nan, ): target_shape = close.shape group_lens = np.array([2]) sim_out = vbt.pf_nb.from_signals_nb( target_shape=target_shape, group_lens=group_lens, auto_call_seq=True, open=open, high=high, low=low, close=close, long_entries=long_entries, short_entries=short_entries, size=10, size_type=vbt.pf_enums.SizeType.ValuePercent100, sl_stop=sl_stop, tsl_stop=tsl_stop, tp_stop=tp_stop, delta_format=vbt.pf_enums.DeltaFormat.Percent100, stop_exit_price=vbt.pf_enums.StopExitPrice.Close ) return sim_out # %% sim_out = pt_portfolio_nb( data.open.values, data.high.values, data.low.values, data.close.values, long_entries.values, short_entries.values ) # %% pf = vbt.Portfolio( data.symbol_wrapper.regroup(group_by=True), sim_out, open=data.open, high=data.high, low=data.low, close=data.close, cash_sharing=True, init_cash=100 ) print(pf.total_return) # %% @njit(nogil=True) def pt_metrics_nb(close, sim_out): target_shape = close.shape group_lens = np.array([2]) filled_close = vbt.nb.fbfill_nb(close) col_map = vbt.rec_nb.col_map_nb( col_arr=sim_out.order_records["col"], n_cols=target_shape[1] ) total_profit = vbt.pf_nb.total_profit_nb( target_shape=target_shape, close=filled_close, order_records=sim_out.order_records, col_map=col_map ) total_profit_grouped = vbt.pf_nb.total_profit_grouped_nb( total_profit=total_profit, group_lens=group_lens, )[0] total_return = total_profit_grouped / 100 trade_records = vbt.pf_nb.get_exit_trades_nb( order_records=sim_out.order_records, close=filled_close, col_map=col_map ) trade_records = trade_records[ trade_records["status"] == vbt.pf_enums.TradeStatus.Closed ] expectancy = vbt.pf_nb.expectancy_reduce_nb( pnl_arr=trade_records["pnl"] ) return total_return, expectancy # %% pt_metrics_nb(data.close.values, sim_out) # %% @njit(nogil=True) def pt_pipeline_nb( open, high, low, close, window=WINDOW, upper=UPPER, lower=LOWER, sl_stop=np.nan, tsl_stop=np.nan, tp_stop=np.nan, ): long_entries, short_entries = pt_signals_nb( close, window=window, upper=upper, lower=lower ) sim_out = pt_portfolio_nb( open, high, low, close, long_entries, short_entries, sl_stop=sl_stop, tsl_stop=tsl_stop, tp_stop=tp_stop ) return pt_metrics_nb(close, sim_out) pt_pipeline_nb( data.open.values, data.high.values, data.low.values, data.close.values ) # %% %%timeit pt_pipeline_nb( data.open.values, data.high.values, data.low.values, data.close.values ) # %% param_pt_pipeline = vbt.parameterized( pt_pipeline_nb, merge_func="concat", seed=42, engine="threadpool", chunk_len="auto" ) UPPER_SPACE = [st.norm.ppf(1 - x / 2) for x in ALPHA_SPACE] LOWER_SPACE = [-st.norm.ppf(1 - x / 2) for x in ALPHA_SPACE] POPT_FILE = "temp/param_opt.pickle" # vbt.remove_file(POPT_FILE, missing_ok=True) if not vbt.file_exists(POPT_FILE): param_opt = param_pt_pipeline( data.open.values, data.high.values, data.low.values, data.close.values, window=vbt.Param(WINDOW_SPACE), upper=vbt.Param(UPPER_SPACE), lower=vbt.Param(LOWER_SPACE) ) vbt.save(param_opt, POPT_FILE) else: param_opt = vbt.load(POPT_FILE) total_return, expectancy = param_opt # %% print(total_return) # %% grouped_metric = total_return.groupby(level=["upper", "lower"]).mean() grouped_metric.vbt.heatmap( trace_kwargs=dict(colorscale="RdBu", zmid=0), yaxis=dict(autorange="reversed") ).show() # %% @njit(nogil=True) def pt_pipeline_mult_nb( n_params: int, open: tp.Array2d, high: tp.Array2d, low: tp.Array2d, close: tp.Array2d, window: tp.FlexArray1dLike = WINDOW, upper: tp.FlexArray1dLike = UPPER, lower: tp.FlexArray1dLike = LOWER, sl_stop: tp.FlexArray1dLike = np.nan, tsl_stop: tp.FlexArray1dLike = np.nan, tp_stop: tp.FlexArray1dLike = np.nan, ): window_ = vbt.to_1d_array_nb(np.asarray(window)) upper_ = vbt.to_1d_array_nb(np.asarray(upper)) lower_ = vbt.to_1d_array_nb(np.asarray(lower)) sl_stop_ = vbt.to_1d_array_nb(np.asarray(sl_stop)) tsl_stop_ = vbt.to_1d_array_nb(np.asarray(tsl_stop)) tp_stop_ = vbt.to_1d_array_nb(np.asarray(tp_stop)) total_return = np.empty(n_params, dtype=np.float_) expectancy = np.empty(n_params, dtype=np.float_) for i in range(n_params): total_return[i], expectancy[i] = pt_pipeline_nb( open, high, low, close, window=vbt.flex_select_1d_nb(window_, i), upper=vbt.flex_select_1d_nb(upper_, i), lower=vbt.flex_select_1d_nb(lower_, i), sl_stop=vbt.flex_select_1d_nb(sl_stop_, i), tsl_stop=vbt.flex_select_1d_nb(tsl_stop_, i), tp_stop=vbt.flex_select_1d_nb(tp_stop_, i), ) return total_return, expectancy # %% pt_pipeline_mult_nb( 3, data.open.values, data.high.values, data.low.values, data.close.values, window=np.array([10, 20, 30]) ) # %% chunked_pt_pipeline = vbt.chunked( pt_pipeline_mult_nb, size=vbt.ArgSizer(arg_query="n_params"), arg_take_spec=dict( n_params=vbt.CountAdapter(), open=None, high=None, low=None, close=None, window=vbt.FlexArraySlicer(), upper=vbt.FlexArraySlicer(), lower=vbt.FlexArraySlicer(), sl_stop=vbt.FlexArraySlicer(), tsl_stop=vbt.FlexArraySlicer(), tp_stop=vbt.FlexArraySlicer() ), chunk_len=1000, merge_func="concat", execute_kwargs=dict( chunk_len="auto", engine="threadpool" ) ) # %% param_product, param_index = vbt.combine_params( dict( window=vbt.Param(WINDOW_SPACE), upper=vbt.Param(UPPER_SPACE), lower=vbt.Param(LOWER_SPACE) ) ) COPT_FILE = "temp/chunked_opt.pickle" # vbt.remove_file(COPT_FILE, missing_ok=True) if not vbt.file_exists(COPT_FILE): chunked_opt = chunked_pt_pipeline( len(param_index), data.open.values, data.high.values, data.low.values, data.close.values, window=param_product["window"], upper=param_product["upper"], lower=param_product["lower"] ) vbt.save(chunked_opt, COPT_FILE) else: chunked_opt = vbt.load(COPT_FILE) # %% total_return, expectancy = chunked_opt total_return = pd.Series(total_return, index=param_index) expectancy = pd.Series(expectancy, index=param_index) # %% GRID_LEN = len(WINDOW_SPACE) * \ len(UPPER_SPACE) * \ len(LOWER_SPACE) * \ len(STOP_SPACE) ** 3 print(GRID_LEN) # %% GRID = dict( window=WINDOW_SPACE, upper=UPPER_SPACE, lower=LOWER_SPACE, sl_stop=STOP_SPACE, tsl_stop=STOP_SPACE, tp_stop=STOP_SPACE, ) vbt.pprint(vbt.pick_from_param_grid(GRID, 123_456_789)) # %% FOUND_FILE = "temp/found.pickle" BEST_N = 100 BEST_TH = 1.0 CHUNK_LEN = 10_000 # vbt.remove_file(FOUND_FILE, missing_ok=True) if vbt.file_exists(FOUND_FILE): found = vbt.load(FOUND_FILE) else: found = None with ( vbt.ProgressBar( desc="Found", initial=0 if found is None else len(found), total=BEST_N ) as pbar1, vbt.ProgressBar( desc="Processed" ) as pbar2 ): while found is None or len(found) < BEST_N: param_df = pd.DataFrame([ vbt.pick_from_param_grid(GRID) for _ in range(CHUNK_LEN) ]) param_index = pd.MultiIndex.from_frame(param_df) _, expectancy = chunked_pt_pipeline( CHUNK_LEN, data.open.values, data.high.values, data.low.values, data.close.values, window=param_df["window"], upper=param_df["upper"], lower=param_df["lower"], sl_stop=param_df["sl_stop"], tsl_stop=param_df["tsl_stop"], tp_stop=param_df["tp_stop"], _chunk_len=None, _execute_kwargs=dict( chunk_len=None ) ) expectancy = pd.Series(expectancy, index=param_index) best_mask = expectancy >= BEST_TH if best_mask.any(): best = expectancy[best_mask] if found is None: found = best else: found = pd.concat((found, best)) found = found[~found.index.duplicated(keep="first")] vbt.save(found, FOUND_FILE) pbar1.update_to(len(found)) pbar1.refresh() pbar2.update(len(expectancy)) # %% def get_param_median(param): return found.index.get_level_values(param).to_series().median() pt_pipeline_nb( data.open.values, data.high.values, data.low.values, data.close.values, window=int(get_param_median("window")), upper=get_param_median("upper"), lower=get_param_median("lower"), sl_stop=get_param_median("sl_stop"), tsl_stop=get_param_median("tsl_stop"), tp_stop=get_param_median("tp_stop") ) # %% import optuna optuna.logging.disable_default_handler() optuna.logging.set_verbosity(optuna.logging.WARNING) def objective(trial): window = trial.suggest_categorical("window", WINDOW_SPACE) upper = trial.suggest_categorical("upper", UPPER_SPACE) lower = trial.suggest_categorical("lower", LOWER_SPACE) sl_stop = trial.suggest_categorical("sl_stop", STOP_SPACE) tsl_stop = trial.suggest_categorical("tsl_stop", STOP_SPACE) tp_stop = trial.suggest_categorical("tp_stop", STOP_SPACE) total_return, expectancy = pt_pipeline_nb( data.open.values, data.high.values, data.low.values, data.close.values, window=window, upper=upper, lower=lower, sl_stop=sl_stop, tsl_stop=tsl_stop, tp_stop=tp_stop ) if np.isnan(total_return): raise optuna.TrialPruned() if np.isnan(expectancy): raise optuna.TrialPruned() return total_return, expectancy study = optuna.create_study(directions=["maximize", "maximize"]) study.optimize(objective, n_trials=1000) trials_df = study.trials_dataframe(attrs=["params", "values"]) trials_df.set_index([ "params_window", "params_upper", "params_lower", "params_sl_stop", "params_tsl_stop", "params_tp_stop" ], inplace=True) trials_df.index.rename([ "window", "upper", "lower", "sl_stop", "tsl_stop", "tp_stop" ], inplace=True) trials_df.columns = ["total_return", "expectancy"] trials_df = trials_df[~trials_df.index.duplicated(keep="first")] print(trials_df.sort_values(by="total_return", ascending=False)) # %% [markdown] # ### Level: Architect :flying_saucer: # %% InOutputs = namedtuple("InOutputs", ["spread", "zscore"]) @njit(nogil=True, boundscheck=True) def can_execute_nb(c, wait_days): if c.order_counts[c.col] == 0: return True last_order = c.order_records[c.order_counts[c.col] - 1, c.col] ns_delta = c.index[c.i] - c.index[last_order.idx] if ns_delta >= wait_days * vbt.dt_nb.d_ns: return True return False @njit(nogil=True, boundscheck=True) def create_signals_nb(c, upper, lower, wait_days): _upper = vbt.pf_nb.select_nb(c, upper) _lower = vbt.pf_nb.select_nb(c, lower) _wait_days = vbt.pf_nb.select_nb(c, wait_days) if c.i > 0: prev_zscore = c.in_outputs.zscore[c.i - 1, c.group] zscore = c.in_outputs.zscore[c.i, c.group] if prev_zscore < _upper and zscore > _upper: if can_execute_nb(c, _wait_days): if c.col % 2 == 0: return False, False, True, False return True, False, False, False if prev_zscore > _lower and zscore < _lower: if can_execute_nb(c, _wait_days): if c.col % 2 == 0: return True, False, False, False return False, False, True, False return False, False, False, False @njit(nogil=True, boundscheck=True) def signal_func_nb(c, window, upper, lower, wait_days): _window = vbt.pf_nb.select_nb(c, window) if c.col % 2 == 0: x = vbt.pf_nb.select_nb(c, c.close, col=c.col) y = vbt.pf_nb.select_nb(c, c.close, col=c.col + 1) c.in_outputs.spread[c.i, c.group] = np.log(y) - np.log(x) window_start = c.i - _window + 1 window_end = c.i + 1 if window_start >= 0: s = c.in_outputs.spread[window_start : window_end, c.group] s_mean = np.nanmean(s) s_std = np.nanstd(s) c.in_outputs.zscore[c.i, c.group] = (s[-1] - s_mean) / s_std return create_signals_nb(c, upper, lower, wait_days) # %% WAIT_DAYS = 30 def iter_pt_portfolio( window=WINDOW, upper=UPPER, lower=LOWER, wait_days=WAIT_DAYS, signal_func_nb=signal_func_nb, more_signal_args=(), **kwargs ): return vbt.Portfolio.from_signals( data, broadcast_named_args=dict( window=window, upper=upper, lower=lower, wait_days=wait_days ), in_outputs=vbt.RepEval(""" InOutputs( np.full((target_shape[0], target_shape[1] // 2), np.nan), np.full((target_shape[0], target_shape[1] // 2), np.nan) ) """, context=dict(InOutputs=InOutputs)), signal_func_nb=signal_func_nb, signal_args=( vbt.Rep("window"), vbt.Rep("upper"), vbt.Rep("lower"), vbt.Rep("wait_days"), *more_signal_args ), size=10, size_type="valuepercent100", group_by=vbt.ExceptLevel("symbol"), cash_sharing=True, call_seq="auto", delta_format="percent100", stop_exit_price="close", **kwargs ) pf = iter_pt_portfolio() # %% fig = vbt.make_subplots( rows=2, cols=1, vertical_spacing=0, shared_xaxes=True ) zscore = pf.get_in_output("zscore").rename("Z-score") zscore.vbt.plot( add_trace_kwargs=dict(row=1, col=1), fig=fig ) fig.add_hline(row=1, y=UPPER, line_color="orangered", line_dash="dot") fig.add_hline(row=1, y=0, line_color="yellow", line_dash="dot") fig.add_hline(row=1, y=LOWER, line_color="limegreen", line_dash="dot") orders = pf.orders.regroup(group_by=False).iloc[:, 0] exit_mask = orders.side_sell.get_pd_mask(idx_arr="signal_idx") entry_mask = orders.side_buy.get_pd_mask(idx_arr="signal_idx") upper_crossed = zscore.vbt.crossed_above(UPPER) lower_crossed = zscore.vbt.crossed_below(LOWER) (upper_crossed & ~exit_mask).vbt.signals.plot_as_exits( pf.get_in_output("zscore"), trace_kwargs=dict( name="Exits (ignored)", marker=dict(color="lightgray"), opacity=0.5 ), add_trace_kwargs=dict(row=1, col=1), fig=fig ) (lower_crossed & ~entry_mask).vbt.signals.plot_as_entries( pf.get_in_output("zscore"), trace_kwargs=dict( name="Entries (ignored)", marker=dict(color="lightgray"), opacity=0.5 ), add_trace_kwargs=dict(row=1, col=1), fig=fig ) exit_mask.vbt.signals.plot_as_exits( pf.get_in_output("zscore"), add_trace_kwargs=dict(row=1, col=1), fig=fig ) entry_mask.vbt.signals.plot_as_entries( pf.get_in_output("zscore"), add_trace_kwargs=dict(row=1, col=1), fig=fig ) pf.plot_allocations( add_trace_kwargs=dict(row=2, col=1), fig=fig ) rebalancing_dates = data.index[np.unique(orders.idx.values)] for date in rebalancing_dates: fig.add_vline(row=2, x=date, line_color="teal", line_dash="dot") fig.update_layout(height=600) fig.show() # %% WAIT_SPACE = np.arange(30, 370, 5).tolist() pf = iter_pt_portfolio(wait_days=vbt.Param(WAIT_SPACE)) pf.orders.count().vbt.scatterplot( xaxis_title="Wait days", yaxis_title="Order count" ).show() # %% with (vbt.Timer() as timer, vbt.MemTracer() as tracer): iter_pt_portfolio(wait_days=vbt.Param(WAIT_SPACE)) print(timer.elapsed()) # %% print(tracer.peak_usage()) # %% zscore_state_dt = np.dtype( [ ("cumsum", np.float_), ("cumsum_sq", np.float_), ("nancnt", np.int_) ], align=True, ) @njit(nogil=True, boundscheck=True) def stream_signal_func_nb( c, window, upper, lower, wait_days, zscore_state ): _window = vbt.pf_nb.select_nb(c, window) if c.col % 2 == 0: x = vbt.pf_nb.select_nb(c, c.close, col=c.col) y = vbt.pf_nb.select_nb(c, c.close, col=c.col + 1) c.in_outputs.spread[c.i, c.group] = np.log(y) - np.log(x) value = c.in_outputs.spread[c.i, c.group] pre_i = c.i - _window if pre_i >= 0: pre_window_value = c.in_outputs.spread[pre_i, c.group] else: pre_window_value = np.nan zscore_in_state = vbt.enums.RollZScoreAIS( i=c.i, value=value, pre_window_value=pre_window_value, cumsum=zscore_state["cumsum"][c.group], cumsum_sq=zscore_state["cumsum_sq"][c.group], nancnt=zscore_state["nancnt"][c.group], window=_window, minp=_window, ddof=0 ) zscore_out_state = vbt.nb.rolling_zscore_acc_nb(zscore_in_state) c.in_outputs.zscore[c.i, c.group] = zscore_out_state.value zscore_state["cumsum"][c.group] = zscore_out_state.cumsum zscore_state["cumsum_sq"][c.group] = zscore_out_state.cumsum_sq zscore_state["nancnt"][c.group] = zscore_out_state.nancnt return create_signals_nb(c, upper, lower, wait_days) # %% stream_pt_portfolio = partial( iter_pt_portfolio, signal_func_nb=stream_signal_func_nb, more_signal_args=( vbt.RepEval( """ n_groups = target_shape[1] // 2 zscore_state = np.empty(n_groups, dtype=zscore_state_dt) zscore_state["cumsum"] = 0.0 zscore_state["cumsum_sq"] = 0.0 zscore_state["nancnt"] = 0 zscore_state """, context=dict(zscore_state_dt=zscore_state_dt) ), ) ) # %% stream_pf = stream_pt_portfolio() print(stream_pf.total_return) # %% pf = iter_pt_portfolio() print(pf.total_return) # %% with (vbt.Timer() as timer, vbt.MemTracer() as tracer): stream_pt_portfolio(wait_days=vbt.Param(WAIT_SPACE)) print(timer.elapsed()) # %% print(tracer.peak_usage()) # %% chunked_stream_pt_portfolio = partial( stream_pt_portfolio, chunked=dict( engine="threadpool", arg_take_spec=dict( signal_args=vbt.ArgsTaker( vbt.flex_array_gl_slicer, vbt.flex_array_gl_slicer, vbt.flex_array_gl_slicer, vbt.flex_array_gl_slicer, vbt.ArraySlicer(axis=0) ), in_outputs=vbt.SequenceTaker([ vbt.ArraySlicer(axis=1), vbt.ArraySlicer(axis=1) ]) ) ) ) with (vbt.Timer() as timer, vbt.MemTracer() as tracer): chunked_stream_pt_portfolio(wait_days=vbt.Param(WAIT_SPACE)) print(timer.elapsed()) # %% print(tracer.peak_usage()) # %%