# ################################## HOW TO USE #################################### # # # # This is a Jupyter notebook formatted as a script # # Format: https://jupytext.readthedocs.io/en/latest/formats.html#the-percent-format # # # # Save this file and remove the '.txt' extension # # In Jupyter Lab, right click on the Python file -> Open With -> Jupytext Notebook # # Make sure to have Jupytext installed: https://github.com/mwouts/jupytext # # # # ################################################################################## # # %% [markdown] # # Generation # ## Comparison # %% from vectorbtpro import * data = vbt.BinanceData.pull( ["BTCUSDT", "ETHUSDT"], start="2021-01-01", end="2022-01-01" ) data.get("Low") # %% bb = vbt.talib("BBANDS").run( data.get("Close"), timeperiod=vbt.Default(14), nbdevup=vbt.Default(2), nbdevdn=vbt.Default(2) ) bb.lowerband # %% mask = data.get("Low") < bb.lowerband mask # %% mask.sum() # %% bb_mult = vbt.talib("BBANDS").run( data.get("Close"), timeperiod=vbt.Default(14), nbdevup=[2, 3], nbdevdn=[2, 3] ) mask = data.get("Low") < bb_mult.lowerband # %% mask = data.get("Low").vbt < bb_mult.lowerband mask # %% mask.sum() # %% mask = bb_mult.lowerband_above(data.get("Low")) mask.sum() # %% [markdown] # ### Thresholds # %% bandwidth = (bb.upperband - bb.lowerband) / bb.middleband mask = bandwidth.vbt > vbt.Param([0.15, 0.3], name="threshold") mask.sum() # %% mask = bandwidth.vbt.combine( [0.15, 0.3], combine_func=np.greater, keys=pd.Index([0.15, 0.3], name="threshold") ) mask.sum() # %% mask = pd.concat( (bandwidth > 0.15, bandwidth > 0.3), keys=pd.Index([0.15, 0.3], name="threshold"), axis=1 ) mask.sum() # %% [markdown] # ### Crossovers # %% low_below_lband = data.get("Low") < bb.lowerband mask = low_below_lband.vbt.signals.first() mask.sum() # %% btc_low = data.get("Low", "BTCUSDT").rename("Low") btc_lowerband = bb.lowerband["BTCUSDT"].rename("Lower Band") btc_mask = mask["BTCUSDT"].rename("Signals") fig = btc_low.vbt.plot() btc_lowerband.vbt.plot(fig=fig) btc_mask.vbt.signals.plot_as_markers( y=btc_low, trace_kwargs=dict( marker=dict( color="#DFFF00" ) ), fig=fig ) fig.show() # %% mask = low_below_lband.vbt.signals.first(after_false=True) mask.sum() # %% sample_low = pd.Series([10, 9, 8, 9, 8]) sample_lband = pd.Series([np.nan, np.nan, 9, 8, 9]) sample_mask = sample_low < sample_lband sample_mask.vbt.signals.first(after_false=True) # %% sample_mask[sample_lband.ffill().isnull()] = True sample_mask.vbt.signals.first(after_false=True) # %% buffer = sample_lband.ffill().isnull().sum(axis=0).max() buffer # %% sample_buf_mask = sample_low.iloc[buffer:] < sample_lband.iloc[buffer:] sample_buf_mask = sample_buf_mask.vbt.signals.first(after_false=True) sample_mask = sample_low.vbt.wrapper.fill(False) sample_mask.loc[sample_buf_mask.index] = sample_buf_mask sample_mask # %% mask = data.get("Low").vbt.crossed_below(bb.lowerband, wait=1) mask.sum() # %% mask = bb.lowerband_crossed_above(data.get("Low"), wait=1) mask.sum() # %% [markdown] # ## Logical operators # %% cond1 = data.get("Low") < bb.lowerband cond2 = bandwidth > 0.3 cond3 = data.get("High") > bb.upperband cond4 = bandwidth < 0.15 mask = (cond1 & cond2) | (cond3 & cond4) mask.sum() # %% cond1 = data.get("Low").vbt < bb.lowerband cond2 = bandwidth.vbt > vbt.Param([0.3, 0.3, 0.4, 0.4], name="cond2_th") cond3 = data.get("High").vbt > bb.upperband cond4 = bandwidth.vbt < vbt.Param([0.1, 0.2, 0.1, 0.2], name="cond4_th") mask = (cond1.vbt & cond2).vbt | (cond3.vbt & cond4) mask.sum() # %% [markdown] # ### Cartesian product # %% cond1 = data.get("Low").vbt < bb.lowerband cond2 = bandwidth.vbt > vbt.Param([0.3, 0.4], name="cond2_th") cond3 = data.get("High").vbt > bb.upperband cond4 = bandwidth.vbt < vbt.Param([0.1, 0.2], name="cond4_th") i1 = np.split(np.arange(len(cond1.columns)), len(cond1.columns) // 2) i2 = np.split(np.arange(len(cond2.columns)), len(cond2.columns) // 2) i3 = np.split(np.arange(len(cond3.columns)), len(cond3.columns) // 2) i4 = np.split(np.arange(len(cond4.columns)), len(cond4.columns) // 2) i1 # %% i2 # %% i3 # %% i4 # %% i1, i2, i3, i4 = zip(*product(i1, i2, i3, i4)) i1 # %% i2 # %% i3 # %% i4 # %% i1 = np.asarray(i1).flatten() i2 = np.asarray(i2).flatten() i3 = np.asarray(i3).flatten() i4 = np.asarray(i4).flatten() i1 # %% i2 # %% i3 # %% i4 # %% cond1 = cond1.iloc[:, i1] cond2 = cond2.iloc[:, i2] cond3 = cond3.iloc[:, i3] cond4 = cond4.iloc[:, i4] mask = (cond1.vbt & cond2).vbt | (cond3.vbt & cond4) mask.sum() # %% cond1 = data.get("Low").vbt < bb.lowerband cond2 = bandwidth.vbt > vbt.Param([0.3, 0.4], name="cond2_th") cond3 = data.get("High").vbt > bb.upperband cond4 = bandwidth.vbt < vbt.Param([0.1, 0.2], name="cond4_th") cond1, cond2, cond3, cond4 = vbt.pd_acc.x(cond1, cond2, cond3, cond4) mask = (cond1.vbt & cond2).vbt | (cond3.vbt & cond4) # %% MaskGenerator = vbt.IF.from_expr(""" upperband, middleband, lowerband = @res_talib_bbands bandwidth = (upperband - lowerband) / middleband cond1 = low < lowerband cond2 = bandwidth > @p_cond2_th cond3 = high > upperband cond4 = bandwidth < @p_cond4_th @out_mask:(cond1 & cond2) | (cond3 & cond4) """) vbt.phelp(MaskGenerator.run, incl_doc=False) # %% mask_generator = MaskGenerator.run( high=data.get("High"), low=data.get("Low"), close=data.get("Close"), cond2_th=[0.3, 0.4], cond4_th=[0.1, 0.2], bbands_timeperiod=vbt.Default(14), param_product=True ) mask_generator.mask.sum() # %% [markdown] # ## Shifting # %% cond1 = data.get("Low") < bb.lowerband cond2 = bandwidth > bandwidth.shift(1) mask = cond1 & cond2 mask.sum() # %% cond2 = bandwidth > bandwidth.rolling("7d").apply(lambda x: x[0]) mask = cond1 & cond2 mask.sum() # %% def exactly_ago(sr): if sr.index[0] == sr.index[-1] - vbt.timedelta("7d"): return sr.iloc[0] return np.nan cond_7d_ago = bandwidth.rolling("8d").apply(exactly_ago, raw=False) cond2 = bandwidth > cond_7d_ago mask = cond1 & cond2 mask.sum() # %% @njit def exactly_ago_meta_nb(from_i, to_i, col, index, freq, arr): if index[from_i] == index[to_i - 1] - freq: return arr[from_i, col] return np.nan cond_7d_ago = vbt.pd_acc.rolling_apply( "8d", exactly_ago_meta_nb, bandwidth.index.values, vbt.timedelta("7d").to_timedelta64(), vbt.to_2d_array(bandwidth), wrapper=bandwidth.vbt.wrapper ) cond2 = bandwidth > cond_7d_ago mask = cond1 & cond2 mask.sum() # %% cond2 = bandwidth > bandwidth.vbt.ago("7d") mask = cond1 & cond2 mask.sum() # %% bandwidth.iloc[-8] # %% bandwidth.vbt.ago("7d").iloc[-1] # %% [markdown] # ## Truth value testing # %% cond2 = data.get("Close").vbt.crossed_below(bb.middleband) cond2 = cond2.rolling(5, min_periods=1).max().astype(bool) mask = cond1 & cond2 mask.sum() # %% cond2 = data.get("Close").vbt.crossed_below(bb.middleband) cond2 = cond2.vbt.rolling_any(5) mask = cond1 & cond2 mask.sum() # %% cond2 = data.get("Close").vbt.crossed_below(bb.middleband) cond2 = cond2.vbt.rolling_apply( "5d", "any", minp=1, wrap_kwargs=dict(fillna=0, dtype=bool) ) mask = cond1 & cond2 mask.sum() # %% anchor_points = data.wrapper.get_index_points( every="M", start=0, exact_start=True ) anchor_points # %% left_bound = np.full(len(data.wrapper.index), np.nan) left_bound[anchor_points] = anchor_points left_bound = vbt.dt.to_ns(vbt.nb.ffill_1d_nb(left_bound)) left_bound = bandwidth.index[left_bound] left_bound # %% right_bound = data.wrapper.index right_bound # %% mask = (bandwidth <= 0.1).vbt.resample_between_bounds( left_bound, right_bound, "any", closed_lbound=True, closed_rbound=True, wrap_kwargs=dict(fillna=0, dtype=bool) ) mask.index = right_bound mask.astype(int).vbt.ts_heatmap().show() # %% [markdown] # ## Periodically # %% min_data = vbt.BinanceData.pull( ["BTCUSDT", "ETHUSDT"], start="2021-01-01 UTC", end="2021-02-01 UTC", timeframe="1h" ) index = min_data.wrapper.index tuesday_index = index[index.weekday == 1] tuesday_index # %% tuesday_1800_index = tuesday_index[tuesday_index.hour == 18] tuesday_1800_index # %% tuesday_1730_index = index[ (index.weekday == 1) & (index.hour == 17) & (index.minute == 30) ] tuesday_1730_index # %% index.get_indexer([vbt.timestamp("2021-01-07", tz=index.tz)]) # %% index.get_indexer([vbt.timestamp("2021-01-07 17:30:00", tz=index.tz)]) # %% index[index.get_indexer( [vbt.timestamp("2021-01-07 17:30:00", tz=index.tz)], method="ffill" )] # %% index[index.get_indexer( [vbt.timestamp("2021-01-07 17:30:00", tz=index.tz)], method="bfill" )] # %% each_tuesday = vbt.date_range(index[0], index[-1], freq="tuesday") each_tuesday_1730 = each_tuesday + pd.Timedelta(hours=17, minutes=30) each_tuesday_1730 # %% positions = index.get_indexer(each_tuesday_1730, method="bfill") min_symbol_wrapper = min_data.get_symbol_wrapper() mask = min_symbol_wrapper.fill(False) mask.iloc[positions] = True mask.sum() # %% mask[mask.any(axis=1)].index.strftime("%A %T") # %% tuesday_after_1700 = (index.weekday == 1) & (index.hour >= 17) wednesday_before_1700 = (index.weekday == 2) & (index.hour < 17) main_cond = tuesday_after_1700 | wednesday_before_1700 mask = min_symbol_wrapper.fill(False) mask[main_cond] = True mask = mask.vbt.signals.first() mask[mask.any(axis=1)].index.strftime("%A %T") # %% mask = min_symbol_wrapper.fill(False) mask.vbt.set( True, every="tuesday", at_time="17:30", inplace=True ) mask[mask.any(axis=1)].index.strftime("%A %T") # %% mask = min_symbol_wrapper.fill(False) mask.vbt.set( True, every="tuesday", at_time="18:00", add_delta=pd.Timedelta(nanoseconds=1), inplace=True ) mask[mask.any(axis=1)].index.strftime("%A %T") # %% mask = min_symbol_wrapper.fill(False) mask.vbt.set_between( True, every="monday", start_time="12:00", end_time="17:00", add_end_delta=pd.Timedelta(days=1), inplace=True ) mask[mask.any(axis=1)].index.strftime("%A %T") # %% mask = min_symbol_wrapper.fill(False) mask.vbt.set( True, on="January 7th 2021 UTC", indexer_method=None, inplace=True ) mask[mask.any(axis=1)].index # %% mask = min_symbol_wrapper.fill(False) mask.vbt.set_between( True, start=["2021-01-01 12:00:00", "2021-01-07 12:00:00"], end=["2021-01-02 12:00:00", "2021-01-08 12:00:00"], inplace=True ) mask[mask.any(axis=1)].index # %% mask = min_symbol_wrapper.fill(False) mask.vbt.set_between( True, every="monday", split_every=False, add_end_delta="2h", inplace=True ) mask[mask.any(axis=1)].index # %% [markdown] # ## Iteratively # %% @njit def generate_mask_1d_nb( high, low, uband, mband, lband, cond2_th, cond4_th ): out = np.full(high.shape, False) for i in range(high.shape[0]): bandwidth = (uband[i] - lband[i]) / mband[i] cond1 = low[i] < lband[i] cond2 = bandwidth > cond2_th cond3 = high[i] > uband[i] cond4 = bandwidth < cond4_th signal = (cond1 and cond2) or (cond3 and cond4) out[i] = signal return out mask = generate_mask_1d_nb( data.get("High")["BTCUSDT"].values, data.get("Low")["BTCUSDT"].values, bb.upperband["BTCUSDT"].values, bb.middleband["BTCUSDT"].values, bb.lowerband["BTCUSDT"].values, 0.30, 0.15 ) symbol_wrapper = data.get_symbol_wrapper() mask = symbol_wrapper["BTCUSDT"].wrap(mask) mask.sum() # %% @njit def generate_mask_nb( high, low, uband, mband, lband, cond2_th, cond4_th ): out = np.empty(high.shape, dtype=np.bool_) for col in range(high.shape[1]): out[:, col] = generate_mask_1d_nb( high[:, col], low[:, col], uband[:, col], mband[:, col], lband[:, col], cond2_th, cond4_th ) return out mask = generate_mask_nb( vbt.to_2d_array(data.get("High")), vbt.to_2d_array(data.get("Low")), vbt.to_2d_array(bb.upperband), vbt.to_2d_array(bb.middleband), vbt.to_2d_array(bb.lowerband), 0.30, 0.15 ) mask = symbol_wrapper.wrap(mask) mask.sum() # %% MaskGenerator = vbt.IF( input_names=["high", "low", "uband", "mband", "lband"], param_names=["cond2_th", "cond4_th"], output_names=["mask"] ).with_apply_func(generate_mask_1d_nb, takes_1d=True) mask_generator = MaskGenerator.run( data.get("High"), data.get("Low"), bb.upperband, bb.middleband, bb.lowerband, [0.3, 0.4], [0.1, 0.2], param_product=True ) mask_generator.mask.sum() # %% @njit def value_ago_1d_nb(arr, ago): out = np.empty(arr.shape, dtype=np.float_) for i in range(out.shape[0]): if i - ago >= 0: out[i] = arr[i - ago] else: out[i] = np.nan return out arr = np.array([1, 2, 3]) value_ago_1d_nb(arr, 1) # %% @njit def any_in_window_1d_nb(arr, window): out = np.empty(arr.shape, dtype=np.bool_) for i in range(out.shape[0]): from_i = max(0, i + 1 - window) to_i = i + 1 out[i] = np.any(arr[from_i:to_i]) return out arr = np.array([False, True, True, False, False]) any_in_window_1d_nb(arr, 2) # %% @njit def any_in_var_window_1d_nb(arr, index, freq): out = np.empty(arr.shape, dtype=np.bool_) from_i = 0 for i in range(out.shape[0]): if index[from_i] <= index[i] - freq: for j in range(from_i + 1, index.shape[0]): if index[j] > index[i] - freq: from_i = j break to_i = i + 1 out[i] = np.any(arr[from_i:to_i]) return out arr = np.array([False, True, True, False, False]) index = vbt.date_range("2020", freq="5min", periods=len(arr)).values freq = vbt.timedelta("10min").to_timedelta64() any_in_var_window_1d_nb(arr, index, freq) # %% any_in_var_window_1d_nb(arr, vbt.dt.to_ns(index), vbt.dt.to_ns(freq)) # %% vbt.dt.to_ns(index) # %% vbt.dt.to_ns(index - np.datetime64(0, "ns")) # %% vbt.dt.to_ns(freq) # %% vbt.dt.to_ns(freq) / 1000 / 1000 / 1000 / 60 # %% [markdown] # ## Generators # %% @njit def place_func_nb(c, index): last_i = -1 for out_i in range(len(c.out)): i = c.from_i + out_i weekday = vbt.dt_nb.weekday_nb(index[i]) hour = vbt.dt_nb.hour_nb(index[i]) if weekday == 1 and hour == 17: c.out[out_i] = True last_i = out_i return last_i mask = vbt.pd_acc.signals.generate( symbol_wrapper.shape, place_func_nb, vbt.dt.to_ns(symbol_wrapper.index), wrapper=symbol_wrapper ) mask.sum() # %% @njit def place_func_nb(c, index): last_i = -1 for out_i in range(len(c.out)): i = c.from_i + out_i weekday = vbt.dt_nb.weekday_nb(index[i]) hour = vbt.dt_nb.hour_nb(index[i]) if weekday == 1 and hour == 17: c.out[out_i] = True last_i = out_i else: past_target_midnight = vbt.dt_nb.past_weekday_nb(index[i], 1) past_target = past_target_midnight + 17 * vbt.dt_nb.h_ns if (i > 0 and index[i - 1] < past_target) and \ index[i] > past_target: c.out[out_i] = True last_i = out_i return last_i mask = vbt.pd_acc.signals.generate( symbol_wrapper.shape, place_func_nb, vbt.dt.to_ns(symbol_wrapper.index), wrapper=symbol_wrapper ) mask.sum() # %% mask.index[mask.any(axis=1)].strftime('%A %m/%d/%Y') # %% @njit def place_func_nb(c, weekday, hour, index): last_i = -1 for out_i in range(len(c.out)): i = c.from_i + out_i weekday_now = vbt.dt_nb.weekday_nb(index[i]) hour_now = vbt.dt_nb.hour_nb(index[i]) if weekday_now == weekday and hour_now == hour: c.out[out_i] = True last_i = out_i return last_i EntryGenerator = vbt.SignalFactory( mode="entries", param_names=["weekday", "hour"] ).with_place_func( entry_place_func_nb=place_func_nb, entry_settings=dict( pass_params=["weekday", "hour"], ), var_args=True ) entry_generator = EntryGenerator.run( symbol_wrapper.shape, 1, [0, 17], vbt.dt.to_ns(symbol_wrapper.index), input_index=symbol_wrapper.index, input_columns=symbol_wrapper.columns ) entry_generator.entries.sum() # %% entry_generator.plot(column=(2, 0, "BTCUSDT")).show() # %% [markdown] # ### Exits # %% @njit def exit_place_func_nb(c): c.out[0] = True return 0 entries = symbol_wrapper.fill(False) entries.vbt.set(True, every="Q", inplace=True) entries.index[entries.any(axis=1)] # %% exits = entries.vbt.signals.generate_exits(exit_place_func_nb) exits.index[exits.any(axis=1)] # %% exits = entries.vbt.signals.generate_exits( exit_place_func_nb, wait=0 ) exits.index[exits.any(axis=1)] # %% @njit def exit_place_func_nb(c, index, wait_td): for out_i in range(len(c.out)): i = c.from_i + out_i if index[i] >= index[c.from_i] + wait_td: return out_i return -1 exits = entries.vbt.signals.generate_exits( exit_place_func_nb, vbt.dt.to_ns(entries.index), vbt.dt.to_ns(vbt.timedelta("7d")), wait=0 ) exits.index[exits.any(axis=1)] # %% entries = symbol_wrapper.fill(False) entries.vbt.set(True, every="5d", inplace=True) exits = entries.vbt.signals.generate_exits( exit_place_func_nb, vbt.dt.to_ns(entries.index), vbt.dt.to_ns(vbt.timedelta("7d")), wait=0 ) exits.index[exits.any(axis=1)] # %% exits = entries.vbt.signals.generate_exits( exit_place_func_nb, vbt.dt.to_ns(entries.index), vbt.dt.to_ns(vbt.timedelta("7d")), wait=0, until_next=False ) exits.index[exits.any(axis=1)] # %% exits = entries.vbt.signals.generate_exits( exit_place_func_nb, vbt.dt.to_ns(entries.index), vbt.dt.to_ns(vbt.timedelta("7d")), wait=0, until_next=False, skip_until_exit=True ) exits.index[exits.any(axis=1)] # %% @njit def exit_place_func_nb(c, wait_td, index): for out_i in range(len(c.out)): i = c.from_i + out_i if index[i] >= index[c.from_i] + wait_td: return out_i return -1 ExitGenerator = vbt.SignalFactory( mode="exits", param_names=["wait_td"] ).with_place_func( exit_place_func_nb=exit_place_func_nb, exit_settings=dict( pass_params=["wait_td"], ), var_args=True, wait=0, until_next=False, skip_until_exit=True, param_settings=dict( wait_td=dict( post_index_func=lambda x: x.map(lambda y: str(vbt.timedelta(y))) ) ), ) exit_generator = ExitGenerator.run( entries, [ vbt.timedelta("3d").to_timedelta64(), vbt.timedelta("7d").to_timedelta64() ], symbol_wrapper.index.values ) exit_generator.exits.sum() # %% new_entries = exit_generator.entries.vbt.signals.first( reset_by=exit_generator.exits, allow_gaps=True, ) new_entries.index[new_entries[("7 days 00:00:00", "BTCUSDT")]] # %% [markdown] # ### Both # %% @njit def entry_place_func_nb(c, low, close, th): if c.from_i == 0: c.out[0] = True return 0 exit_i = c.from_i - c.wait exit_price = close[exit_i, c.col] hit_price = exit_price * (1 - th) for out_i in range(len(c.out)): i = c.from_i + out_i if low[i, c.col] <= hit_price: return out_i return -1 @njit def exit_place_func_nb(c, high, close, th): entry_i = c.from_i - c.wait entry_price = close[entry_i, c.col] hit_price = entry_price * (1 + th) for out_i in range(len(c.out)): i = c.from_i + out_i if high[i, c.col] >= hit_price: return out_i return -1 entries, exits = vbt.pd_acc.signals.generate_both( symbol_wrapper.shape, entry_place_func_nb=entry_place_func_nb, entry_place_args=(vbt.Rep("low"), vbt.Rep("close"), 0.1), exit_place_func_nb=exit_place_func_nb, exit_place_args=(vbt.Rep("high"), vbt.Rep("close"), 0.2), wrapper=symbol_wrapper, broadcast_named_args=dict( high=data.get("High"), low=data.get("Low"), close=data.get("Close") ), broadcast_kwargs=dict(post_func=np.asarray) ) fig = data.plot( symbol="BTCUSDT", ohlc_trace_kwargs=dict(opacity=0.5), plot_volume=False ) entries["BTCUSDT"].vbt.signals.plot_as_entries( y=data.get("Close", "BTCUSDT"), fig=fig) exits["BTCUSDT"].vbt.signals.plot_as_exits( y=data.get("Close", "BTCUSDT"), fig=fig) fig.show() # %% BothGenerator = vbt.SignalFactory( mode="both", input_names=["high", "low", "close"], param_names=["entry_th", "exit_th"] ).with_place_func( entry_place_func_nb=entry_place_func_nb, entry_settings=dict( pass_inputs=["low", "close"], pass_params=["entry_th"], ), exit_place_func_nb=exit_place_func_nb, exit_settings=dict( pass_inputs=["high", "close"], pass_params=["exit_th"], ) ) both_generator = BothGenerator.run( data.get("High"), data.get("Low"), data.get("Close"), [0.1, 0.2], [0.2, 0.3], param_product=True ) fig = data.plot( symbol="BTCUSDT", ohlc_trace_kwargs=dict(opacity=0.5), plot_volume=False ) both_generator.plot( column=(0.1, 0.3, "BTCUSDT"), entry_y=data.get("Close", "BTCUSDT"), exit_y=data.get("Close", "BTCUSDT"), fig=fig ) fig.show() # %% [markdown] # ### Chained exits # %% @njit def exit_place_func_nb(c, low, request_price, fill_price_out): entry_req_price = request_price[c.from_i - c.wait, c.col] for out_i in range(len(c.out)): i = c.from_i + out_i if low[i, c.col] <= entry_req_price: fill_price_out[i, c.col] = entry_req_price return out_i return -1 ChainGenerator = vbt.SignalFactory( mode="chain", input_names=["low", "request_price"], in_output_names=["fill_price_out"] ).with_place_func( exit_place_func_nb=exit_place_func_nb, exit_settings=dict( pass_inputs=["low", "request_price"], pass_in_outputs=["fill_price_out"], ), fill_price_out=np.nan ) fast_ma = vbt.talib("SMA").run( data.get("Close"), vbt.Default(10), short_name="fast_ma" ) slow_ma = vbt.talib("SMA").run( data.get("Close"), vbt.Default(20), short_name="slow_ma" ) entries = fast_ma.real_crossed_above(slow_ma) entries.sum() # %% chain_generator = ChainGenerator.run( entries, data.get("Low"), data.get("Close") * (1 - 0.1) ) request_mask = chain_generator.new_entries request_mask.sum() # %% request_price = chain_generator.request_price request_price[request_mask.any(axis=1)] # %% fill_mask = chain_generator.exits fill_mask.sum() # %% fill_price = chain_generator.fill_price_out fill_price[fill_mask.any(axis=1)] # %% [markdown] # ## Preset generators # ### Random # %% btcusdt_wrapper = symbol_wrapper["BTCUSDT"] mask = vbt.pd_acc.signals.generate_random( btcusdt_wrapper.shape, prob=1 / 10, wrapper=btcusdt_wrapper, seed=42 ) mask_index = mask.index[mask] (mask_index[1:] - mask_index[:-1]).mean() # %% monday_mask = btcusdt_wrapper.fill(False) monday_mask.vbt.set(True, every="monday", inplace=True) mask = monday_mask.vbt.signals.generate_random_exits(wait=0) mask_index = mask.index[mask] mask_index.strftime("%W %A") # %% prob = np.linspace(0, 1, len(symbol_wrapper.index)) rprob = vbt.RPROB.run( symbol_wrapper.shape, vbt.Default(vbt.to_2d_pr_array(prob)), seed=42, input_index=symbol_wrapper.index, input_columns=symbol_wrapper.columns ) rprob.entries.astype(int).vbt.ts_heatmap().show() # %% rprob = vbt.RPROB.run( symbol_wrapper.shape, [0.5, vbt.to_2d_pr_array(prob)], seed=42, input_index=symbol_wrapper.index, input_columns=symbol_wrapper.columns ) rprob.entries.sum() # %% [markdown] # ### Stops # %% new_entries, exits = entries.vbt.signals.generate_stop_exits( data.get("Close"), data.get("High"), stop=0.1, chain=True ) new_entries[new_entries.any(axis=1)] # %% exits[exits.any(axis=1)] # %% out_dict = {} new_entries, exits = entries.vbt.signals.generate_stop_exits( data.get("Close"), data.get("High"), stop=0.1, chain=True, out_dict=out_dict ) out_dict["stop_ts"][exits.any(axis=1)] # %% stcx = vbt.STCX.run( entries, data.get("Open"), ts=data.get("Low"), follow_ts=data.get("High"), stop=-0.1, trailing=[False, True], wait=0 ) fig = data.plot( symbol="BTCUSDT", ohlc_trace_kwargs=dict(opacity=0.5), plot_volume=False ) stcx.plot( column=(-0.1, True, "BTCUSDT"), entry_y="entry_ts", exit_y="stop_ts", fig=fig ) fig.show() # %% ohlcstcx = vbt.OHLCSTCX.run( entries, data.get("Close"), data.get("Open"), data.get("High"), data.get("Low"), data.get("Close"), sl_stop=vbt.Default(0.1), tsl_stop=vbt.Default(0.15), is_entry_open=False ) ohlcstcx.plot(column=("BTCUSDT")).show() # %% ohlcstcx.stop_type_readable[ohlcstcx.exits.any(axis=1)] # %% ohlcstcx = vbt.OHLCSTCX.run( entries, data.get("Close"), sl_stop=vbt.Default(0.1), tsl_stop=vbt.Default(0.15), is_entry_open=False ) ohlcstcx.plot(column=("BTCUSDT")).show() # %% entry_pos_rank = entries.vbt.signals.pos_rank(allow_gaps=True) short_entries = (entry_pos_rank >= 0) & (entry_pos_rank % 2 == 1) ohlcstcx = vbt.OHLCSTCX.run( entries, data.get("Close"), data.get("Open"), data.get("High"), data.get("Low"), data.get("Close"), tsl_th=vbt.Default(0.2), tsl_stop=vbt.Default(0.1), reverse=vbt.Default(short_entries), is_entry_open=False ) ohlcstcx.plot(column=("BTCUSDT")).show() # %% long_entries = ohlcstcx.new_entries.vbt & (~short_entries) long_exits = ohlcstcx.exits.vbt.signals.first_after(long_entries) short_entries = ohlcstcx.new_entries.vbt & short_entries short_exits = ohlcstcx.exits.vbt.signals.first_after(short_entries) fig = data.plot( symbol="BTCUSDT", ohlc_trace_kwargs=dict(opacity=0.5), plot_volume=False ) long_entries["BTCUSDT"].vbt.signals.plot_as_entries( ohlcstcx.entry_price["BTCUSDT"], trace_kwargs=dict(marker=dict(color="limegreen"), name="Long entries"), fig=fig ) long_exits["BTCUSDT"].vbt.signals.plot_as_exits( ohlcstcx.stop_price["BTCUSDT"], trace_kwargs=dict(marker=dict(color="orange"), name="Long exits"), fig=fig ) short_entries["BTCUSDT"].vbt.signals.plot_as_entries( ohlcstcx.entry_price["BTCUSDT"], trace_kwargs=dict(marker=dict(color="magenta"), name="Short entries"), fig=fig ) short_exits["BTCUSDT"].vbt.signals.plot_as_exits( ohlcstcx.stop_price["BTCUSDT"], trace_kwargs=dict(marker=dict(color="red"), name="Short exits"), fig=fig ) fig.show() # %%