# ################################## HOW TO USE #################################### # # # # This is a Jupyter notebook formatted as a script # # Format: https://jupytext.readthedocs.io/en/latest/formats.html#the-percent-format # # # # Save this file and remove the '.txt' extension # # In Jupyter Lab, right click on the Python file -> Open With -> Jupytext Notebook # # Make sure to have Jupytext installed: https://github.com/mwouts/jupytext # # # # ################################################################################## # # %% [markdown] # # Portfolio optimization # ## Data # %% from vectorbtpro import * data = vbt.BinanceData.pull( ["BTCUSDT", "ETHUSDT", "BNBUSDT", "XRPUSDT", "ADAUSDT"], start="2020-01-01 UTC", end="2021-01-01 UTC", timeframe="1h" ) # %% data.to_hdf() data = vbt.HDFData.pull("BinanceData.h5") # %% [markdown] # ## Allocation # ### Manually # #### Index points # %% ms_points = data.wrapper.get_index_points(every="M") ms_points # %% data.wrapper.index.get_indexer( pd.Series(index=data.wrapper.index).resample(vbt.offset("M")).asfreq().index, method="bfill" ) # %% data.wrapper.index[ms_points] # %% example_points = data.wrapper.get_index_points(every=24 * 30) data.wrapper.index[example_points] # %% date_offset = pd.offsets.WeekOfMonth(week=3, weekday=4) example_points = data.wrapper.get_index_points( every=date_offset, add_delta=pd.Timedelta(hours=17) ) data.wrapper.index[example_points] # %% example_points = data.wrapper.get_index_points( start="April 1st 2020", every="M" ) data.wrapper.index[example_points] # %% example_points = data.wrapper.get_index_points( on=["April 1st 2020 19:45", "17 September 2020 00:01"] ) data.wrapper.index[example_points] # %% [markdown] # #### Filling # %% symbol_wrapper = data.get_symbol_wrapper(freq="1h") filled_allocations = symbol_wrapper.fill() filled_allocations # %% np.random.seed(42) def random_allocate_func(): weights = np.random.uniform(size=symbol_wrapper.shape[1]) return weights / weights.sum() for idx in ms_points: filled_allocations.iloc[idx] = random_allocate_func() allocations = filled_allocations[~filled_allocations.isnull().any(axis=1)] allocations # %% [markdown] # #### Simulation # %% pf = vbt.Portfolio.from_orders( close=data.get("Close"), size=filled_allocations, size_type="targetpercent", group_by=True, cash_sharing=True, call_seq="auto" ) # %% sim_alloc = pf.get_asset_value(group_by=False).vbt / pf.value sim_alloc # %% sim_alloc.vbt.plot( trace_kwargs=dict(stackgroup="one"), use_gl=False ).show() # %% pf.plot_allocations().show() # %% np.isclose(allocations, sim_alloc.iloc[ms_points]) # %% [markdown] # ### Allocation method # %% np.random.seed(42) pfo = vbt.PortfolioOptimizer.from_allocate_func( symbol_wrapper, random_allocate_func, every="M" ) # %% pfo.allocations # %% pfo.filled_allocations # %% pfo.alloc_records.records_readable # %% pfo.plot().show() # %% pfo.stats() # %% pf = vbt.Portfolio.from_optimizer(data, pfo, freq="1h") pf.sharpe_ratio # %% pf = pfo.simulate(data, freq="1h") pf.sharpe_ratio # %% [markdown] # #### Once # %% def const_allocate_func(target_alloc): return target_alloc pfo = vbt.PortfolioOptimizer.from_allocate_func( symbol_wrapper, const_allocate_func, [0.5, 0.2, 0.1, 0.1, 0.1], on=0 ) pfo.plot().show() # %% pfo = vbt.PortfolioOptimizer.from_initial( symbol_wrapper, [0.5, 0.2, 0.1, 0.1, 0.1] ) pfo.plot().show() # %% [markdown] # #### Custom array # %% custom_index = vbt.date_range("2020-01-01", "2021-01-01", freq="Q") custom_allocations = pd.DataFrame( [ [0.5, 0.2, 0.1, 0.1, 0.1], [0.1, 0.5, 0.2, 0.1, 0.1], [0.1, 0.1, 0.5, 0.2, 0.1], [0.1, 0.1, 0.1, 0.5, 0.2] ], index=custom_index, columns=symbol_wrapper.columns ) # %% pfo = vbt.PortfolioOptimizer.from_allocations( symbol_wrapper, allocations ) pfo.allocations # %% pfo = vbt.PortfolioOptimizer.from_allocations( symbol_wrapper, custom_allocations.values, start="2020-01-01", end="2021-01-01", every="Q" ) pfo.allocations # %% pfo = vbt.PortfolioOptimizer.from_filled_allocations( pfo.fill_allocations() ) pfo.allocations # %% [markdown] # #### Templates # %% def rotation_allocate_func(wrapper, i): weights = np.full(len(wrapper.columns), 0) weights[i % len(wrapper.columns)] = 1 return weights pfo = vbt.PortfolioOptimizer.from_allocate_func( symbol_wrapper, rotation_allocate_func, vbt.Rep("wrapper"), vbt.Rep("i"), every="M" ) pfo.plot().show() # %% def rotation_allocate_func(symbols, chosen_symbol): return {s: 1 if s == chosen_symbol else 0 for s in symbols} pfo = vbt.PortfolioOptimizer.from_allocate_func( symbol_wrapper, rotation_allocate_func, vbt.RepEval("wrapper.columns"), vbt.RepEval("wrapper.columns[i % len(wrapper.columns)]"), every="M" ) pfo.allocations # %% [markdown] # #### Groups # %% pfo = vbt.PortfolioOptimizer.from_allocate_func( symbol_wrapper, const_allocate_func, [0.5, 0.2, 0.1, 0.1, 0.1], every=vbt.Param(["1M", "2M", "3M"]) ) pf = pfo.simulate(data, freq="1h") pf.total_return # %% pfo = vbt.PortfolioOptimizer.from_allocate_func( symbol_wrapper, const_allocate_func, vbt.Param([ [0.5, 0.2, 0.1, 0.1, 0.1], [0.2, 0.1, 0.1, 0.1, 0.5] ], keys=pd.Index(["w1", "w2"], name="weights")), every=vbt.Param(["1M", "2M", "3M"]) ) # %% pfo.wrapper.grouper.get_index() # %% pfo.wrapper.columns # %% pfo[("3M", "w2")].stats() # %% pfo = vbt.PortfolioOptimizer.from_allocate_func( symbol_wrapper, const_allocate_func, group_configs=[ dict(args=([0.5, 0.2, 0.1, 0.1, 0.1],), every="1M"), dict(args=([0.2, 0.1, 0.1, 0.1, 0.5],), every="2M"), dict(args=([0.1, 0.1, 0.1, 0.5, 0.2],), every="3M"), dict(args=([0.1, 0.1, 0.5, 0.2, 0.1],), every="1M"), dict(args=([0.1, 0.5, 0.2, 0.1, 0.1],), every="2M"), dict(args=([0.5, 0.2, 0.1, 0.1, 0.1],), every="3M"), ] ) # %% pfo = vbt.PortfolioOptimizer.from_allocate_func( symbol_wrapper, const_allocate_func, group_configs=[ dict( allocate_func=const_allocate_func, args=([0.5, 0.2, 0.1, 0.1, 0.1],), _name="const" ), dict( allocate_func=random_allocate_func, every="M", _name="random" ), ] ) pfo.wrapper.grouper.get_index() # %% pfo = vbt.PortfolioOptimizer.from_allocate_func( symbol_wrapper, const_allocate_func, group_configs={ "const": dict( allocate_func=const_allocate_func, args=([0.5, 0.2, 0.1, 0.1, 0.1],) ), "random": dict( allocate_func=random_allocate_func, ), }, every=vbt.Param(["1M", "2M", "3M"]) ) pfo.wrapper.grouper.get_index() # %% [markdown] # #### Numba # %% @njit def rotation_allocate_func_nb(i, idx, n_cols): weights = np.full(n_cols, 0) weights[i % n_cols] = 1 return weights pfo = vbt.PortfolioOptimizer.from_allocate_func( symbol_wrapper, rotation_allocate_func_nb, vbt.RepEval("len(wrapper.columns)"), every="D", jitted_loop=True ) pfo.allocations.head() # %% [markdown] # #### Distribution # %% pfo = vbt.PortfolioOptimizer.from_allocate_func( symbol_wrapper, rotation_allocate_func_nb, vbt.Rep("i"), vbt.Rep("index_point"), vbt.RepEval("len(wrapper.columns)"), every="D", execute_kwargs=dict(engine="dask") ) pfo.allocations.head() # %% pfo = vbt.PortfolioOptimizer.from_allocate_func( symbol_wrapper, rotation_allocate_func_nb, vbt.RepEval("len(wrapper.columns)"), every="D", jitted_loop=True, chunked=dict( arg_take_spec=dict(args=vbt.ArgsTaker(None)), engine="dask" ) ) # %% pfo = vbt.PortfolioOptimizer.from_allocate_func( symbol_wrapper, rotation_allocate_func_nb, vbt.RepEval("len(wrapper.columns)"), every="D", jitted_loop=True, jitted=dict(parallel=True) ) pfo.allocations.head() # %% [markdown] # #### Previous allocation # %% def randomize_prev_allocate_func(i, allocations, mean, std): if i == 0: return allocations[0] prev_allocation = allocations[-1] log_returns = np.random.uniform(mean, std, size=len(prev_allocation)) returns = np.exp(log_returns) - 1 new_allocation = prev_allocation * (1 + returns) new_allocation = new_allocation / new_allocation.sum() allocations.append(new_allocation) return new_allocation np.random.seed(42) n_symbols = len(symbol_wrapper.columns) init_allocation = np.full(n_symbols, 1 / n_symbols) pfo = vbt.PortfolioOptimizer.from_allocate_func( symbol_wrapper, randomize_prev_allocate_func, i=vbt.Rep("i"), allocations=[init_allocation], mean=0, std=0.5, every="W", start=0, exact_start=True ) pfo.plot().show() # %% [markdown] # #### Current allocation # %% def current_allocate_func(price, index_point, alloc_info): prev_alloc_info = alloc_info[-1] prev_index_point = prev_alloc_info["index_point"] prev_allocation = prev_alloc_info["allocation"] if prev_index_point is None: current_allocation = prev_allocation else: prev_price_period = price.iloc[prev_index_point:index_point] prev_pfo = vbt.PFO.from_initial(prev_price_period.vbt.wrapper, prev_allocation) prev_pf = prev_pfo.simulate(prev_price_period) current_allocation = prev_pf.allocations.iloc[-1] alloc_info.append(dict( index_point=index_point, allocation=current_allocation, )) return current_allocation n_symbols = len(symbol_wrapper.columns) init_allocation = np.full(n_symbols, 1 / n_symbols) pfo = vbt.PortfolioOptimizer.from_allocate_func( symbol_wrapper, current_allocate_func, price=data.get("Close"), index_point=vbt.Rep("index_point"), alloc_info=[dict(index_point=None, allocation=init_allocation)], every="W", start=0, exact_start=True ) pfo.plot().show() # %% init_pfo = vbt.PFO.from_initial(symbol_wrapper, init_allocation) continuous_pf = pfo.simulate(data.get("Close")) index_points = symbol_wrapper.get_index_points(every="W", start=0, exact_start=True) discrete_pfo = vbt.PFO.from_allocations(symbol_wrapper, continuous_pf.allocations.iloc[index_points]) discrete_pfo.plot().show() # %% [markdown] # ## Optimization # ### Index ranges # %% example_ranges = data.wrapper.get_index_ranges(every="M") example_ranges[0] # %% example_ranges[1] # %% data.wrapper.index[example_ranges[0][0]:example_ranges[1][0]] # %% example_ranges = data.wrapper.get_index_ranges( every="M", lookback_period="3M" ) def get_index_bounds(range_starts, range_ends): for i in range(len(range_starts)): start_idx = range_starts[i] end_idx = range_ends[i] range_index = data.wrapper.index[start_idx:end_idx] yield range_index[0], range_index[-1] list(get_index_bounds(*example_ranges)) # %% example_ranges = data.wrapper.get_index_ranges( start=["2020-01-01", "2020-04-01", "2020-08-01"], end=["2020-04-01", "2020-08-01", "2020-12-01"] ) list(get_index_bounds(*example_ranges)) # %% example_ranges = data.wrapper.get_index_ranges( start="2020-01-01", end=["2020-04-01", "2020-08-01", "2020-12-01"] ) list(get_index_bounds(*example_ranges)) # %% example_ranges = data.wrapper.get_index_ranges( every="Q", exact_start=True, fixed_start=True ) list(get_index_bounds(*example_ranges)) # %% [markdown] # ### Optimization method # %% def inv_rank_optimize_func(price, index_slice): price_period = price.iloc[index_slice] first_price = price_period.iloc[0] last_price = price_period.iloc[-1] ret = (last_price - first_price) / first_price ranks = ret.rank(ascending=False) return ranks / ranks.sum() pfo = vbt.PortfolioOptimizer.from_optimize_func( symbol_wrapper, inv_rank_optimize_func, data.get("Close"), vbt.Rep("index_slice"), every="M" ) pfo.allocations # %% def inv_rank_optimize_func(price): first_price = price.iloc[0] last_price = price.iloc[-1] ret = (last_price - first_price) / first_price ranks = ret.rank(ascending=False) return ranks / ranks.sum() pfo = vbt.PortfolioOptimizer.from_optimize_func( symbol_wrapper, inv_rank_optimize_func, vbt.Takeable(data.get("Close")), every="M" ) # %% pfo.alloc_records.records_readable # %% start_idx = pfo.alloc_records.values[0]["start_idx"] end_idx = pfo.alloc_records.values[0]["end_idx"] close_period = data.get("Close").iloc[start_idx:end_idx] close_period.vbt.rebase(1).vbt.plot().show() # %% pfo.stats() # %% pfo.plots().show() # %% [markdown] # #### Waiting # %% pfo = vbt.PortfolioOptimizer.from_optimize_func( symbol_wrapper, inv_rank_optimize_func, vbt.Takeable(data.get("Close")) ) pfo.allocations # %% pfo = vbt.PortfolioOptimizer.from_optimize_func( symbol_wrapper, inv_rank_optimize_func, vbt.Takeable(data.get("Close")), alloc_wait=0 ) pfo.allocations # %% [markdown] # #### Numba # %% @njit def inv_rank_optimize_func_nb(i, start_idx, end_idx, price): price_period = price[start_idx:end_idx] first_price = price_period[0] last_price = price_period[-1] ret = (last_price - first_price) / first_price ranks = vbt.nb.rank_1d_nb(-ret) return ranks / ranks.sum() pfo = vbt.PortfolioOptimizer.from_optimize_func( symbol_wrapper, inv_rank_optimize_func_nb, data.get("Close").values, every="M", jitted_loop=True ) pfo.allocations # %%