Coverage for source/environment/trading_environment.py: 94%
132 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-30 19:46 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-30 19:46 +0000
1# environment/trading environment.py
3from gym import Env
4from gym.spaces import Discrete, Box
5import pandas as pd
6import numpy as np
7from sklearn.preprocessing import StandardScaler
8import math
9import random
10from types import SimpleNamespace
11from typing import Optional
12import copy
14from .broker import Broker
15from .reward_validator_base import RewardValidatorBase
17class TradingEnvironment(Env):
18 """
19 Implements stock market environment that actor can perform actions (place orders) in.
20 It is used to train Neural Network models with reinforcement learning approach. Can be
21 configure to award points and impose a penalty in a several way.
22 """
24 TRAIN_MODE = 'train'
25 TEST_MODE = 'test'
27 def __init__(self, data_path: str, initial_budget: float, max_amount_of_trades: int, window_size: int,
28 validator: RewardValidatorBase, sell_stop_loss: float, sell_take_profit: float,
29 buy_stop_loss: float, buy_take_profit: float, test_ratio: float = 0.2, penalty_starts: int = 0,
30 penalty_stops: int = 10, static_reward_adjustment: float = 1) -> None:
31 """
32 Class constructor. Allows to define all crucial constans, reward validation methods,
33 environmental penalty policies, etc.
35 Parameters:
36 data_path (str): Path to CSV data that should be used as enivronmental stock market.
37 initial_budget (float): Initial budget constant for trader to start from.
38 max_amount_of_trades (int): Max amount of trades that can be ongoing at the same time.
39 Seting this constant prevents traders from placing orders randomly and defines
40 amount of money that can be assigned to a single order at certain iteration.
41 window_size (int): Constant defining how far in the past trader will be able to look
42 into at certain iteration.
43 validator (RewardValidatorBase): Validator implementing policy used to award points
44 for closed trades.
45 sell_stop_loss (float): Constant used to define losing boundary at which sell order
46 (short) is closed.
47 sell_take_profit (float): Constant used to define winning boundary at which sell order
48 (short) is closed.
49 buy_stop_loss (float): Constant used to define losing boundary at which buy order
50 (long) is closed.
51 buy_take_profit (float): Constant used to define winning boundary at which buy order
52 (long) is closed.
53 penalty_starts (int): Constant defining how many trading periods can trader go without placing
54 an order until penalty is imposed. Penalty at range between start and stop constant
55 is calculated as percentile of positive reward, and subtracted from the actual reward.
56 penalty_stops (int): Constant defining at which trading period penalty will no longer be increased.
57 Reward for trading periods exceeding penalty stop constant will equal minus static reward adjustment.
58 static_reward_adjustment (float): Constant use to penalize trader for bad choices or
59 reward it for good one.
60 """
62 self.__data: dict[pd.DataFrame, pd.DataFrame] = self.__load_data(data_path, test_ratio)
63 self.__mode = TradingEnvironment.TRAIN_MODE
64 self.__broker: Broker = Broker()
65 self.__validator: RewardValidatorBase = validator
67 self.__trading_data: SimpleNamespace = SimpleNamespace()
68 self.__trading_data.current_budget: float = initial_budget
69 self.__trading_data.currently_invested: float = 0
70 self.__trading_data.no_trades_placed_for: int = 0
71 self.__trading_data.currently_placed_trades: int = 0
73 self.__trading_consts = SimpleNamespace()
74 self.__trading_consts.INITIAL_BUDGET: float = initial_budget
75 self.__trading_consts.MAX_AMOUNT_OF_TRADES: int = max_amount_of_trades
76 self.__trading_consts.WINDOW_SIZE: int = window_size
77 self.__trading_consts.SELL_STOP_LOSS: float = sell_stop_loss
78 self.__trading_consts.SELL_TAKE_PROFIT: float = sell_take_profit
79 self.__trading_consts.BUY_STOP_LOSS: float = buy_stop_loss
80 self.__trading_consts.BUY_TAKE_PROFIT: float = buy_take_profit
81 self.__trading_consts.STATIC_REWARD_ADJUSTMENT: float = static_reward_adjustment
82 self.__trading_consts.PENALTY_STARTS: int = penalty_starts
83 self.__trading_consts.PENALTY_STOPS: int = penalty_stops
84 self.__trading_consts.PROFITABILITY_FUNCTION = lambda x: -1.0 * math.exp(-x + 1) + 1
85 self.__trading_consts.PENALTY_FUNCTION = lambda x: \
86 min(1, 1 - math.tanh(-3.0 * (x - penalty_stops) / (penalty_stops - penalty_starts)))
88 self.current_iteration: int = self.__trading_consts.WINDOW_SIZE
89 self.state: list[float] = self.__prepare_state_data()
90 self.action_space: Discrete = Discrete(3)
91 self.observation_space: Box = Box(low = np.ones(len(self.state)) * -3,
92 high = np.ones(len(self.state)) * 3,
93 dtype=np.float64)
95 def __load_data(self, data_path: str, test_size: float) -> dict[pd.DataFrame, pd.DataFrame]:
96 """"""
98 data_frame = pd.read_csv(data_path)
99 dividing_index = int(len(data_frame) * (1 - test_size))
101 return {
102 TradingEnvironment.TRAIN_MODE: data_frame.iloc[:dividing_index],
103 TradingEnvironment.TEST_MODE: data_frame.iloc[dividing_index:]
104 }
106 def __prepare_state_data(self) -> list[float]:
107 """
108 Calculates state data as a list of floats representing current iteration's observation.
109 Observations contains all input data refined to window size and couple of coefficients
110 giving an insight into current budget and orders situation.
112 Returns:
113 (list[float]): List with current observations for environment.
114 """
116 current_market_data = self.__data[self.__mode].iloc[self.current_iteration - self.__trading_consts.WINDOW_SIZE : self.current_iteration]
117 current_market_data_no_index = current_market_data.select_dtypes(include = [np.number])
118 normalized_current_market_data_values = pd.DataFrame(StandardScaler().fit_transform(current_market_data_no_index),
119 columns = current_market_data_no_index.columns).values
120 current_marked_data_list = normalized_current_market_data_values.ravel().tolist()
122 current_normalized_budget = 1.0 * self.__trading_data.current_budget / self.__trading_consts.INITIAL_BUDGET
123 current_profitability_coeff = self.__trading_consts.PROFITABILITY_FUNCTION(current_normalized_budget)
124 current_trades_occupancy_coeff = 1.0 * self.__trading_data.currently_placed_trades / self.__trading_consts.MAX_AMOUNT_OF_TRADES
125 current_no_trades_penalty_coeff = self.__trading_consts.PENALTY_FUNCTION(self.__trading_data.no_trades_placed_for)
126 current_inner_state_list = [current_profitability_coeff, current_trades_occupancy_coeff, current_no_trades_penalty_coeff]
128 return current_marked_data_list + current_inner_state_list
130 def set_mode(self, mode: str) -> None:
131 """"""
133 if mode not in [TradingEnvironment.TRAIN_MODE, TradingEnvironment.TEST_MODE]:
134 raise ValueError(f"Invalid mode: {mode}. Use TradingEnvironment.TRAIN_MODE or TradingEnvironment.TEST_MODE.")
135 self.__mode = mode
137 def get_mode(self) -> str:
138 """"""
140 return copy.copy(self.__mode)
142 def get_trading_data(self) -> SimpleNamespace:
143 """
144 Trading data getter.
146 Returns:
147 (SimpleNamespace): Copy of the namespace with all trading data.
148 """
150 return copy.copy(self.__trading_data)
152 def get_trading_consts(self) -> SimpleNamespace:
153 """
154 Trading constants getter.
156 Returns:
157 (SimpleNamespace): Copy of the namespace with all trading constants.
158 """
160 return copy.copy(self.__trading_consts)
162 def get_broker(self) -> Broker:
163 """
164 Broker getter.
166 Returns:
167 (Broker): Copy of the broker used by environment.
168 """
170 return copy.copy(self.__broker)
172 def get_environment_length(self) -> int:
173 """
174 Environment length getter.
176 Returns:
177 (Int): Length of environment.
178 """
180 return len(self.__data[self.__mode])
182 def get_environment_spatial_data_dimension(self) -> tuple[int, int]:
183 """
184 Environment spatial data dimensionality getter.
186 Returns:
187 (Int): Dimension of spatial data in environment.
188 """
190 return (self.__trading_consts.WINDOW_SIZE, self.__data[self.__mode].shape[1] - 1)
192 def get_data_for_iteration(self, columns: list[str], start: int, stop: int, step: int = 1) -> list[float]:
193 """
194 Data for certain iterations getter.
196 Returns:
197 (list[float]): Copy of part of data with specified columns
198 over specified iterations.
199 """
201 return copy.copy(self.__data[self.__mode].loc[start:stop:step, columns].values.ravel().tolist())
203 def step(self, action: int) -> tuple[list[float], float, bool, dict]:
204 """
205 Performs specified action on environment. It results in generation of the new
206 observations. This function causes trades to be handled, reward to be calculated and
207 environment to be updated.
209 Parameters:
210 action (int): Number specifing action. Possible values are 0 for buy action,
211 1 for wait action and 2 for sell action.
213 Returns:
214 (tuple[list[float], float, bool, dict]): Tuple containing next observation
215 state, reward, finish indication and additional info dictionary.
216 """
218 self.current_iteration += 1
219 self.state = self.__prepare_state_data()
221 close_changes = self.__data[self.__mode].iloc[self.current_iteration - 2 : self.current_iteration]['close'].values
222 stock_change_coeff = 1 + (close_changes[1] - close_changes[0]) / close_changes[0]
223 closed_orders= self.__broker.update_orders(stock_change_coeff)
225 reward = self.__validator.validate_orders(closed_orders)
226 self.__trading_data.currently_placed_trades -= len(closed_orders)
227 self.__trading_data.current_budget += np.sum([trade.current_value for trade in closed_orders])
228 self.__trading_data.currently_invested -= np.sum([trade.initial_value for trade in closed_orders])
230 number_of_possible_trades = self.__trading_consts.MAX_AMOUNT_OF_TRADES - self.__trading_data.currently_placed_trades
231 money_to_trade = 0
232 if number_of_possible_trades > 0:
233 money_to_trade = 1.0 / number_of_possible_trades * self.__trading_data.current_budget
235 if action == 0:
236 is_buy_order = True
237 stop_loss = self.__trading_consts.SELL_STOP_LOSS
238 take_profit = self.__trading_consts.SELL_TAKE_PROFIT
239 elif action == 2:
240 is_buy_order = False
241 stop_loss = self.__trading_consts.BUY_STOP_LOSS
242 take_profit = self.__trading_consts.BUY_TAKE_PROFIT
244 if action != 1:
245 if number_of_possible_trades > 0:
246 self.__trading_data.current_budget -= money_to_trade
247 self.__trading_data.currently_invested += money_to_trade
248 self.__broker.place_order(money_to_trade, is_buy_order, stop_loss, take_profit)
249 self.__trading_data.currently_placed_trades += 1
250 self.__trading_data.no_trades_placed_for = 0
251 reward += self.__trading_consts.STATIC_REWARD_ADJUSTMENT
252 else:
253 self.__trading_data.no_trades_placed_for += 1
254 reward -= self.__trading_consts.STATIC_REWARD_ADJUSTMENT
255 else:
256 self.__trading_data.no_trades_placed_for += 1
257 if number_of_possible_trades == 0:
258 reward += self.__trading_consts.STATIC_REWARD_ADJUSTMENT
260 if number_of_possible_trades > 0:
261 reward *= (1 - self.__trading_consts.PENALTY_FUNCTION(self.__trading_data.no_trades_placed_for)) \
262 if reward > 0 else 1
263 if self.__trading_consts.PENALTY_STOPS < self.__trading_data.no_trades_placed_for:
264 reward -= self.__trading_consts.STATIC_REWARD_ADJUSTMENT
266 if (self.current_iteration >= len(self.__data[self.__mode]) or
267 self.__trading_data.current_budget > 10 * self.__trading_consts.INITIAL_BUDGET or
268 (self.__trading_data.current_budget + self.__trading_data.currently_invested) / self.__trading_consts.INITIAL_BUDGET < 0.8):
269 done = True
270 else:
271 done = False
273 info = {'coeff': stock_change_coeff,
274 'iteration': self.current_iteration,
275 'number_of_closed_orders': len(closed_orders),
276 'money_to_trade': money_to_trade,
277 'action': action,
278 'current_budget': self.__trading_data.current_budget,
279 'currently_invested': self.__trading_data.currently_invested,
280 'no_trades_placed_for': self.__trading_data.no_trades_placed_for,
281 'currently_placed_trades': self.__trading_data.currently_placed_trades}
283 return self.state, reward, done, info
285 def render(self) -> None:
286 """
287 Renders environment visualization. Will be implemented later.
288 """
290 #TODO: Visualization to be implemented
291 pass
293 def reset(self, randkey: Optional[int] = None) -> list[float]:
294 """
295 Resets environment. Used typically if environemnt is finished,
296 i.e. when ther is no more steps to be taken within environemnt
297 or finish conditions are fulfilled.
299 Parameters:
300 randkey (Optional[int]): Value indicating what iteration
301 should be trated as starting point after reset.
303 Returns:
304 (list[float]): Current iteration observation state.
305 """
307 if randkey is None:
308 randkey = random.randint(self.__trading_consts.WINDOW_SIZE, len(self.__data[self.__mode]) - 1)
309 self.__trading_data.current_budget = self.__trading_consts.INITIAL_BUDGET
310 self.__trading_data.currently_invested = 0
311 self.__trading_data.no_trades_placed_for = 0
312 self.__trading_data.currently_placed_trades = 0
313 self.__broker.reset()
314 self.current_iteration = randkey
315 self.state = self.__prepare_state_data()
317 return self.state