Coverage for source/environment/trading_environment.py: 0%

166 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-07-23 22:15 +0000

1# environment/trading environment.py 

2 

3# global imports 

4import copy 

5import logging 

6import math 

7import numpy as np 

8import pandas as pd 

9import random 

10from gym import Env 

11from gym.spaces import Box, Discrete 

12from sklearn.preprocessing import StandardScaler 

13from sklearn.model_selection import train_test_split 

14from tensorflow.keras.utils import to_categorical 

15from types import SimpleNamespace 

16from typing import Optional 

17 

18# local imports 

19from source.environment import Broker, LabelAnnotatorBase, LabeledDataBalancer, RewardValidatorBase 

20 

21class TradingEnvironment(Env): 

22 """ 

23 Implements stock market environment that actor can perform actions (place orders) in. 

24 It is used to train various models using various approaches. Can be 

25 configured to award points and impose a penalty in several ways. 

26 """ 

27 

28 # global class constants 

29 TRAIN_MODE = 'train' 

30 TEST_MODE = 'test' 

31 

32 def __init__(self, data_path: str, initial_budget: float, max_amount_of_trades: int, window_size: int, 

33 validator: RewardValidatorBase, label_annotator: LabelAnnotatorBase, sell_stop_loss: float, 

34 sell_take_profit: float, buy_stop_loss: float, buy_take_profit: float, test_ratio: float = 0.2, 

35 penalty_starts: int = 0, penalty_stops: int = 10, static_reward_adjustment: float = 1, 

36 labeled_data_balancer: Optional[LabeledDataBalancer] = None) -> None: 

37 """ 

38 Class constructor. Allows to define all crucial constans, reward validation methods, 

39 environmental penalty policies, etc. 

40 

41 Parameters: 

42 data_path (str): Path to CSV data that should be used as enivronmental stock market. 

43 initial_budget (float): Initial budget constant for trader to start from. 

44 max_amount_of_trades (int): Max amount of trades that can be ongoing at the same time. 

45 Seting this constant prevents traders from placing orders randomly and defines 

46 amount of money that can be assigned to a single order at certain iteration. 

47 window_size (int): Constant defining how far in the past trader will be able to look 

48 into at certain iteration. 

49 validator (RewardValidatorBase): Validator implementing policy used to award points 

50 for closed trades. 

51 label_annotator (LabelAnnotatorBase): Annotator implementing policy used to label 

52 data with target values. It is used to provide supervised agents with information 

53 about what is the target class value for certain iteration. 

54 sell_stop_loss (float): Constant used to define losing boundary at which sell order 

55 (short) is closed. 

56 sell_take_profit (float): Constant used to define winning boundary at which sell order 

57 (short) is closed. 

58 buy_stop_loss (float): Constant used to define losing boundary at which buy order 

59 (long) is closed. 

60 buy_take_profit (float): Constant used to define winning boundary at which buy order 

61 (long) is closed. 

62 test_ratio (float): Ratio of data that should be used for testing purposes. 

63 penalty_starts (int): Constant defining how many trading periods can trader go without placing 

64 an order until penalty is imposed. Penalty at range between start and stop constant 

65 is calculated as percentile of positive reward, and subtracted from the actual reward. 

66 penalty_stops (int): Constant defining at which trading period penalty will no longer be increased. 

67 Reward for trading periods exceeding penalty stop constant will equal minus static reward adjustment. 

68 static_reward_adjustment (float): Constant use to penalize trader for bad choices or 

69 reward it for good one. 

70 labeled_data_balancer (Optional[LabeledDataBalancer]): Balancer used to balance 

71 labeled data. If None, no balancing will be performed. 

72 """ 

73 

74 if test_ratio < 0.0 or test_ratio >= 1.0: 

75 raise ValueError(f"Invalid test_ratio: {test_ratio}. It should be in range [0, 1).") 

76 

77 self.__data: dict[pd.DataFrame, pd.DataFrame] = self.__load_data(data_path, test_ratio) 

78 self.__mode = TradingEnvironment.TRAIN_MODE 

79 self.__broker: Broker = Broker() 

80 self.__validator: RewardValidatorBase = validator 

81 self.__label_annotator: LabelAnnotatorBase = label_annotator 

82 self.__labeled_data_balancer: Optional[LabeledDataBalancer] = labeled_data_balancer 

83 

84 self.__trading_data: SimpleNamespace = SimpleNamespace() 

85 self.__trading_data.current_budget: float = initial_budget 

86 self.__trading_data.currently_invested: float = 0 

87 self.__trading_data.no_trades_placed_for: int = 0 

88 self.__trading_data.currently_placed_trades: int = 0 

89 

90 self.__trading_consts = SimpleNamespace() 

91 self.__trading_consts.INITIAL_BUDGET: float = initial_budget 

92 self.__trading_consts.MAX_AMOUNT_OF_TRADES: int = max_amount_of_trades 

93 self.__trading_consts.WINDOW_SIZE: int = window_size 

94 self.__trading_consts.SELL_STOP_LOSS: float = sell_stop_loss 

95 self.__trading_consts.SELL_TAKE_PROFIT: float = sell_take_profit 

96 self.__trading_consts.BUY_STOP_LOSS: float = buy_stop_loss 

97 self.__trading_consts.BUY_TAKE_PROFIT: float = buy_take_profit 

98 self.__trading_consts.STATIC_REWARD_ADJUSTMENT: float = static_reward_adjustment 

99 self.__trading_consts.PENALTY_STARTS: int = penalty_starts 

100 self.__trading_consts.PENALTY_STOPS: int = penalty_stops 

101 self.__trading_consts.PROFITABILITY_FUNCTION = lambda x: -1.0 * math.exp(-x + 1) + 1 

102 self.__trading_consts.PENALTY_FUNCTION = lambda x: \ 

103 min(1, 1 - math.tanh(-3.0 * (x - penalty_stops) / (penalty_stops - penalty_starts))) 

104 self.__trading_consts.OUTPUT_CLASSES: int = vars(self.__label_annotator.get_output_classes()) 

105 

106 self.current_iteration: int = self.__trading_consts.WINDOW_SIZE 

107 self.state: list[float] = self.__prepare_state_data() 

108 self.action_space: Discrete = Discrete(3) 

109 self.observation_space: Box = Box(low = np.ones(len(self.state)) * -3, 

110 high = np.ones(len(self.state)) * 3, 

111 dtype=np.float64) 

112 

113 def __load_data(self, data_path: str, test_size: float) -> dict[pd.DataFrame, pd.DataFrame]: 

114 """ 

115 Loads data from CSV file and splits it into training and testing sets based on the 

116 specified test size ratio. 

117 

118 Parameters: 

119 data_path (str): Path to the CSV file containing the stock market data. 

120 test_size (float): Ratio of the data to be used for testing. 

121 

122 Returns: 

123 (dict[pd.DataFrame, pd.DataFrame]): Dictionary containing training and testing data frames. 

124 """ 

125 

126 data_frame = pd.read_csv(data_path) 

127 dividing_index = int(len(data_frame) * (1 - test_size)) 

128 

129 return { 

130 TradingEnvironment.TRAIN_MODE: data_frame.iloc[:dividing_index].reset_index(drop=True), 

131 TradingEnvironment.TEST_MODE: data_frame.iloc[dividing_index:].reset_index(drop=True) 

132 } 

133 

134 def __prepare_labeled_data(self) -> pd.DataFrame: 

135 """ 

136 Prepares labeled data for training the model with classification approach. 

137 It extracts the relevant features and labels from the environment's data. 

138 

139 Returns: 

140 (pd.DataFrame): A DataFrame containing the features and labels for training. 

141 """ 

142 

143 new_rows = [] 

144 for i in range(self.current_iteration, self.get_environment_length() - 1): 

145 data_row = self.__prepare_state_data(slice(i - self.__trading_consts.WINDOW_SIZE, i), include_trading_data = False) 

146 new_rows.append(data_row) 

147 

148 new_data = pd.DataFrame(new_rows, columns=[f"feature_{i}" for i in range(len(new_rows[0]))]) 

149 labels = self.__label_annotator.annotate(self.__data[self.__mode]).shift(-self.current_iteration) 

150 

151 return new_data, labels.dropna() 

152 

153 def __prepare_state_data(self, index: Optional[slice] = None, include_trading_data: bool = True) -> list[float]: 

154 """ 

155 Calculates state data as a list of floats representing current iteration's observation. 

156 Observations contains all input data refined to window size and couple of coefficients 

157 giving an insight into current budget and orders situation. 

158 

159 Returns: 

160 (list[float]): List with current observations for environment. 

161 """ 

162 

163 if index is None: 

164 index = slice(self.current_iteration - self.__trading_consts.WINDOW_SIZE, self.current_iteration) 

165 

166 current_market_data = self.__data[self.__mode].iloc[index] 

167 current_market_data_no_index = current_market_data.select_dtypes(include = [np.number]) 

168 normalized_current_market_data_values = pd.DataFrame(StandardScaler().fit_transform(current_market_data_no_index), 

169 columns = current_market_data_no_index.columns).values 

170 current_marked_data_list = normalized_current_market_data_values.ravel().tolist() 

171 

172 if include_trading_data: 

173 current_normalized_budget = 1.0 * self.__trading_data.current_budget / self.__trading_consts.INITIAL_BUDGET 

174 current_profitability_coeff = self.__trading_consts.PROFITABILITY_FUNCTION(current_normalized_budget) 

175 current_trades_occupancy_coeff = 1.0 * self.__trading_data.currently_placed_trades / self.__trading_consts.MAX_AMOUNT_OF_TRADES 

176 current_no_trades_penalty_coeff = self.__trading_consts.PENALTY_FUNCTION(self.__trading_data.no_trades_placed_for) 

177 current_inner_state_list = [current_profitability_coeff, current_trades_occupancy_coeff, current_no_trades_penalty_coeff] 

178 current_marked_data_list += current_inner_state_list 

179 

180 return current_marked_data_list 

181 

182 def set_mode(self, mode: str) -> None: 

183 """ 

184 Sets the mode of the environment to either TRAIN_MODE or TEST_MODE. 

185 

186 Parameters: 

187 mode (str): Mode to set for the environment. 

188 

189 Raises: 

190 ValueError: If the provided mode is not valid. 

191 """ 

192 

193 if mode not in [TradingEnvironment.TRAIN_MODE, TradingEnvironment.TEST_MODE]: 

194 raise ValueError(f"Invalid mode: {mode}. Use TradingEnvironment.TRAIN_MODE or TradingEnvironment.TEST_MODE.") 

195 self.__mode = mode 

196 

197 def get_mode(self) -> str: 

198 """ 

199 Mode getter. 

200 

201 Returns: 

202 (str): Current mode of the environment. 

203 """ 

204 

205 return copy.copy(self.__mode) 

206 

207 def get_trading_data(self) -> SimpleNamespace: 

208 """ 

209 Trading data getter. 

210 

211 Returns: 

212 (SimpleNamespace): Copy of the namespace with all trading data. 

213 """ 

214 

215 return copy.copy(self.__trading_data) 

216 

217 def get_trading_consts(self) -> SimpleNamespace: 

218 """ 

219 Trading constants getter. 

220 

221 Returns: 

222 (SimpleNamespace): Copy of the namespace with all trading constants. 

223 """ 

224 

225 return copy.copy(self.__trading_consts) 

226 

227 def get_broker(self) -> Broker: 

228 """ 

229 Broker getter. 

230 

231 Returns: 

232 (Broker): Copy of the broker used by environment. 

233 """ 

234 

235 return copy.copy(self.__broker) 

236 

237 def get_environment_length(self) -> int: 

238 """ 

239 Environment length getter. 

240 

241 Returns: 

242 (Int): Length of environment. 

243 """ 

244 

245 return len(self.__data[self.__mode]) 

246 

247 def get_environment_spatial_data_dimension(self) -> tuple[int, int]: 

248 """ 

249 Environment spatial data dimensionality getter. 

250 

251 Returns: 

252 (Int): Dimension of spatial data in environment. 

253 """ 

254 

255 return (self.__trading_consts.WINDOW_SIZE, self.__data[self.__mode].shape[1] - 1) 

256 

257 def get_labeled_data(self, should_split: bool = True, should_balance: bool = True, 

258 verbose: bool = True) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: 

259 """ 

260 Prepares labeled data for training or testing the model. 

261 It extracts the relevant features and labels from the environment's data. 

262 

263 Parameters: 

264 should_split (bool): Whether to split the data into training and testing sets. 

265 Defaults to True. If set to False, testing data will be empty. 

266 should_balance (bool): Whether to balance the labeled data. Defaults to True. 

267 Will be ignored if labeled_data_balancer is None. 

268 verbose (bool): Whether to log the class cardinality before and after balancing. 

269 Defaults to True. 

270 

271 Returns: 

272 (tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]): A tuple containing the 

273 input data, output data, test input data, and test output data. 

274 """ 

275 

276 input_data, output_data = self.__prepare_labeled_data() 

277 input_data_test, output_data_test = [], [] 

278 if verbose: 

279 logging.info(f"Original class cardinality: {np.array(to_categorical(output_data)).sum(axis = 0)}") 

280 

281 if self.__mode == TradingEnvironment.TRAIN_MODE: 

282 if should_split: 

283 input_data, input_data_test, output_data, output_data_test = \ 

284 train_test_split(input_data, output_data, test_size = 0.1, random_state = 42, 

285 stratify = output_data) 

286 

287 if self.__labeled_data_balancer is not None and should_balance: 

288 input_data, output_data = self.__labeled_data_balancer.balance(input_data, output_data) 

289 if verbose: 

290 logging.info(f"Balanced class cardinality: {np.array(to_categorical(output_data)).sum(axis = 0)}") 

291 

292 return copy.copy((np.array(input_data), np.array(output_data), 

293 np.array(input_data_test), np.array(output_data_test))) 

294 

295 def get_data_for_iteration(self, columns: list[str], start: int = 0, stop: Optional[int] = None, 

296 step: int = 1) -> list[float]: 

297 """ 

298 Data getter for certain iterations. 

299 

300 Parameters: 

301 columns (list[str]): List of column names to extract from data. 

302 start (int): Start iteration index. Defaults to 0. 

303 stop (int): Stop iteration index. Defaults to environment length minus one. 

304 step (int): Step between iterations. Defaults to 1. 

305 

306 Returns: 

307 (list[float]): Copy of part of data with specified columns 

308 over specified iterations. 

309 """ 

310 

311 if stop is None: 

312 stop = self.get_environment_length() - 1 

313 

314 return copy.copy(self.__data[self.__mode].loc[start:stop:step, columns].values.ravel().tolist()) 

315 

316 def step(self, action: int) -> tuple[list[float], float, bool, dict]: 

317 """ 

318 Performs specified action on environment. It results in generation of the new 

319 observations. This function causes trades to be handled, reward to be calculated and 

320 environment to be updated. 

321 

322 Parameters: 

323 action (int): Number specifing action. Possible values are 0 for buy action, 

324 1 for wait action and 2 for sell action. 

325 

326 Returns: 

327 (tuple[list[float], float, bool, dict]): Tuple containing next observation 

328 state, reward, finish indication and additional info dictionary. 

329 """ 

330 

331 self.current_iteration += 1 

332 self.state = self.__prepare_state_data() 

333 

334 close_changes = self.__data[self.__mode].iloc[self.current_iteration - 2 : self.current_iteration]['close'].values 

335 stock_change_coeff = 1 + (close_changes[1] - close_changes[0]) / close_changes[0] 

336 closed_orders= self.__broker.update_orders(stock_change_coeff) 

337 

338 reward = self.__validator.validate_orders(closed_orders) 

339 self.__trading_data.currently_placed_trades -= len(closed_orders) 

340 self.__trading_data.current_budget += np.sum([trade.current_value for trade in closed_orders]) 

341 self.__trading_data.currently_invested -= np.sum([trade.initial_value for trade in closed_orders]) 

342 

343 number_of_possible_trades = self.__trading_consts.MAX_AMOUNT_OF_TRADES - self.__trading_data.currently_placed_trades 

344 money_to_trade = 0 

345 if number_of_possible_trades > 0: 

346 money_to_trade = 1.0 / number_of_possible_trades * self.__trading_data.current_budget 

347 

348 if action == 0: 

349 is_buy_order = True 

350 stop_loss = self.__trading_consts.SELL_STOP_LOSS 

351 take_profit = self.__trading_consts.SELL_TAKE_PROFIT 

352 elif action == 2: 

353 is_buy_order = False 

354 stop_loss = self.__trading_consts.BUY_STOP_LOSS 

355 take_profit = self.__trading_consts.BUY_TAKE_PROFIT 

356 

357 if action != 1: 

358 if number_of_possible_trades > 0: 

359 self.__trading_data.current_budget -= money_to_trade 

360 self.__trading_data.currently_invested += money_to_trade 

361 self.__broker.place_order(money_to_trade, is_buy_order, stop_loss, take_profit) 

362 self.__trading_data.currently_placed_trades += 1 

363 self.__trading_data.no_trades_placed_for = 0 

364 reward += self.__trading_consts.STATIC_REWARD_ADJUSTMENT 

365 else: 

366 self.__trading_data.no_trades_placed_for += 1 

367 reward -= self.__trading_consts.STATIC_REWARD_ADJUSTMENT 

368 else: 

369 self.__trading_data.no_trades_placed_for += 1 

370 if number_of_possible_trades == 0: 

371 reward += self.__trading_consts.STATIC_REWARD_ADJUSTMENT 

372 

373 if number_of_possible_trades > 0: 

374 reward *= (1 - self.__trading_consts.PENALTY_FUNCTION(self.__trading_data.no_trades_placed_for)) \ 

375 if reward > 0 else 1 

376 if self.__trading_consts.PENALTY_STOPS < self.__trading_data.no_trades_placed_for: 

377 reward -= self.__trading_consts.STATIC_REWARD_ADJUSTMENT 

378 

379 if (self.current_iteration >= self.get_environment_length() - 1 or 

380 self.__trading_data.current_budget > 10 * self.__trading_consts.INITIAL_BUDGET or 

381 (self.__trading_data.current_budget + self.__trading_data.currently_invested) / self.__trading_consts.INITIAL_BUDGET < 0.8): 

382 done = True 

383 else: 

384 done = False 

385 

386 info = {'coeff': stock_change_coeff, 

387 'iteration': self.current_iteration, 

388 'number_of_closed_orders': len(closed_orders), 

389 'money_to_trade': money_to_trade, 

390 'action': action, 

391 'current_budget': self.__trading_data.current_budget, 

392 'currently_invested': self.__trading_data.currently_invested, 

393 'no_trades_placed_for': self.__trading_data.no_trades_placed_for, 

394 'currently_placed_trades': self.__trading_data.currently_placed_trades} 

395 

396 return self.state, reward, done, info 

397 

398 def render(self) -> None: 

399 """ 

400 Renders environment visualization. Will be implemented later. 

401 """ 

402 

403 #TODO: Visualization to be implemented 

404 pass 

405 

406 def reset(self, randkey: Optional[int] = None) -> list[float]: 

407 """ 

408 Resets environment. Used typically if environemnt is finished, 

409 i.e. when ther is no more steps to be taken within environemnt 

410 or finish conditions are fulfilled. 

411 

412 Parameters: 

413 randkey (Optional[int]): Value indicating what iteration 

414 should be trated as starting point after reset. 

415 

416 Returns: 

417 (list[float]): Current iteration observation state. 

418 """ 

419 

420 if randkey is None: 

421 randkey = random.randint(self.__trading_consts.WINDOW_SIZE, self.get_environment_length() - 1) 

422 self.__trading_data.current_budget = self.__trading_consts.INITIAL_BUDGET 

423 self.__trading_data.currently_invested = 0 

424 self.__trading_data.no_trades_placed_for = 0 

425 self.__trading_data.currently_placed_trades = 0 

426 self.__broker.reset() 

427 self.current_iteration = randkey 

428 self.state = self.__prepare_state_data() 

429 

430 return self.state