# ################################## HOW TO USE #################################### # # # # This is a Jupyter notebook formatted as a script # # Format: https://jupytext.readthedocs.io/en/latest/formats.html#the-percent-format # # # # Save this file and remove the '.txt' extension # # In Jupyter Lab, right click on the Python file -> Open With -> Jupytext Notebook # # Make sure to have Jupytext installed: https://github.com/mwouts/jupytext # # # # ################################################################################## # # %% [markdown] # # Scheduling # ## Updating # %% from vectorbtpro import * data = vbt.BinanceData.pull( "BTCUSDT", start="10 minutes ago UTC", end="now UTC", timeframe="1m" ) data.close # %% import imageio.v2 as imageio class OHLCFigUpdater(vbt.DataUpdater): _expected_keys = None def __init__(self, data, fig, writer=None, display_last=None, stop_after=None, **kwargs): vbt.DataUpdater.__init__( self, data, writer=writer, display_last=display_last, stop_after=stop_after, **kwargs ) self._fig = fig self._writer = writer self._display_last = display_last self._stop_after = stop_after self._start_dt = vbt.utc_datetime() @property def fig(self): return self._fig @property def writer(self): return self._writer @property def display_last(self): return self._display_last @property def stop_after(self): return self._stop_after @property def start_dt(self): return self._start_dt def update(self, **kwargs): vbt.DataUpdater.update(self, **kwargs) df = self.data.get() if self.display_last is not None: df = df[df.index[-1] - self.display_last:] trace = self.fig.data[0] with self.fig.batch_update(): trace.x = df["Close"].index trace.open = df["Open"].values trace.high = df["High"].values trace.low = df["Low"].values trace.close = df["Close"].values if self.writer is not None: fig_data = imageio.imread(self.fig.to_image(format="png")) self.writer.append_data(fig_data) if self.stop_after is not None: now_dt = vbt.utc_datetime() if now_dt - self.start_dt >= self.stop_after: raise vbt.CancelledError # %% import logging logging.basicConfig(level = logging.INFO) # %% fig = data.plot(ohlc_type="candlestick", plot_volume=False) fig with imageio.get_writer("ohlc_fig_updater.gif", duration=250, loop=0) as writer: ohlc_fig_updater = OHLCFigUpdater( data=data, fig=fig, writer=writer, display_last=pd.Timedelta(minutes=10), stop_after=pd.Timedelta(minutes=5) ) ohlc_fig_updater.update_every(10) # %% ohlc_fig_updater.data.close # %% [markdown] # ## Saving # %% data = vbt.BinanceData.pull( "BTCUSDT", start="10 minutes ago UTC", end="now UTC", timeframe="1m" ) csv_saver = vbt.CSVDataSaver(data) csv_saver.update_every(10, init_save=True) # %% csv_saver.data.close # %% pd.read_csv("BTCUSDT.csv", index_col=0, parse_dates=True)["Close"] # %% vbt.CSVData.pull("BTCUSDT.csv").close # %% vbt.CSVData.pull("BTCUSDT.csv").to_csv() pd.read_csv("BTCUSDT.csv", index_col=0, parse_dates=True)["Close"] # %% csv_saver.update_every(10) # %% csv_saver.save("csv_saver") # %% import logging logging.basicConfig(level = logging.INFO) from vectorbtpro import * csv_saver = vbt.CSVDataSaver.load("csv_saver") csv_saver.update_every(10) # %% csv_saver = vbt.CSVDataSaver( save_kwargs=dict( path_or_buf="data", mkdir_kwargs=dict(mkdir=True) ) ) # %%