Coverage for source/environment/trading_environment.py: 82%
203 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-04 21:16 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-04 21:16 +0000
1# environment/trading environment.py
3# global imports
4import copy
5import logging
6import math
7import numpy as np
8import pandas as pd
9import random
10from enum import Enum
11from gym import Env
12from gym.spaces import Box, Discrete
13from sklearn.preprocessing import StandardScaler
14from sklearn.model_selection import train_test_split
15from tensorflow.keras.utils import to_categorical
16from types import SimpleNamespace
17from typing import Any, Optional
19# local imports
20from source.environment import Broker, LabelAnnotatorBase, LabeledDataBalancer, RewardValidatorBase
22class TradingEnvironment(Env):
23 """
24 Implements stock market environment that actor can perform actions (place orders) in.
25 It is used to train various models using various approaches. Can be
26 configured to award points and impose a penalty in several ways.
27 """
29 class TradingMode(Enum):
30 """
31 Enumeration for the different trading modes.
32 """
34 IMPLICIT_ORDER_CLOSING = 0
35 EXPLICIT_ORDER_CLOSING = 1
37 # global class constants
38 TRAIN_MODE = 'train'
39 TEST_MODE = 'test'
41 def __init__(self, data: pd.DataFrame, initial_budget: float, max_amount_of_trades: int,
42 window_size: int, validator: RewardValidatorBase, label_annotator: LabelAnnotatorBase,
43 sell_stop_loss: float, sell_take_profit: float, buy_stop_loss: float, buy_take_profit: float,
44 test_ratio: float = 0.2, penalty_starts: int = 0, penalty_stops: int = 10,
45 static_reward_adjustment: float = 1, labeled_data_balancer: Optional[LabeledDataBalancer] = None,
46 meta_data: Optional[dict[str, Any]] = None, trading_mode: Optional[TradingMode] = None) -> None:
47 """
48 Class constructor. Allows to define all crucial constans, reward validation methods,
49 environmental penalty policies, etc.
51 Parameters:
52 data (pd.DataFrame): DataFrame containing historical market data.
53 initial_budget (float): Initial budget constant for trader to start from.
54 max_amount_of_trades (int): Max amount of trades that can be ongoing at the same time.
55 Seting this constant prevents traders from placing orders randomly and defines
56 amount of money that can be assigned to a single order at certain iteration.
57 window_size (int): Constant defining how far in the past trader will be able to look
58 into at certain iteration.
59 validator (RewardValidatorBase): Validator implementing policy used to award points
60 for closed trades.
61 label_annotator (LabelAnnotatorBase): Annotator implementing policy used to label
62 data with target values. It is used to provide supervised agents with information
63 about what is the target class value for certain iteration.
64 sell_stop_loss (float): Constant used to define losing boundary at which sell order
65 (short) is closed.
66 sell_take_profit (float): Constant used to define winning boundary at which sell order
67 (short) is closed.
68 buy_stop_loss (float): Constant used to define losing boundary at which buy order
69 (long) is closed.
70 buy_take_profit (float): Constant used to define winning boundary at which buy order
71 (long) is closed.
72 test_ratio (float): Ratio of data that should be used for testing purposes.
73 penalty_starts (int): Constant defining how many trading periods can trader go without placing
74 an order until penalty is imposed. Penalty at range between start and stop constant
75 is calculated as percentile of positive reward, and subtracted from the actual reward.
76 penalty_stops (int): Constant defining at which trading period penalty will no longer be increased.
77 Reward for trading periods exceeding penalty stop constant will equal minus static reward adjustment.
78 static_reward_adjustment (float): Constant use to penalize trader for bad choices or
79 reward it for good one.
80 labeled_data_balancer (Optional[LabeledDataBalancer]): Balancer used to balance
81 labeled data. If None, no balancing will be performed.
82 meta_data (dict[str, Any]): Dictionary containing metadata about the dataset.
83 mode (TradingMode): Mode of the environment, either IMPLICIT_ORDER_CLOSING or EXPLICIT_ORDER_CLOSING.
84 """
86 if test_ratio < 0.0 or test_ratio >= 1.0:
87 raise ValueError(f"Invalid test_ratio: {test_ratio}. It should be in range [0, 1).")
89 if trading_mode is None:
90 trading_mode = TradingEnvironment.TradingMode.IMPLICIT_ORDER_CLOSING
92 self.__data: dict[pd.DataFrame, pd.DataFrame] = self.__split_data(data, test_ratio)
93 self.__meta_data: Optional[dict[str, Any]] = meta_data
94 self.__mode = TradingEnvironment.TRAIN_MODE
95 self.__trading_mode: TradingEnvironment.TradingMode = trading_mode
96 self.__broker: Broker = Broker()
97 self.__validator: RewardValidatorBase = validator
98 self.__label_annotator: LabelAnnotatorBase = label_annotator
99 self.__labeled_data_balancer: Optional[LabeledDataBalancer] = labeled_data_balancer
101 self.__trading_data: SimpleNamespace = SimpleNamespace()
102 self.__trading_data.current_budget: float = initial_budget
103 self.__trading_data.currently_invested: float = 0
104 self.__trading_data.no_trades_placed_for: int = 0
105 self.__trading_data.currently_placed_trades: int = 0
107 self.__trading_consts = SimpleNamespace()
108 self.__trading_consts.INITIAL_BUDGET: float = initial_budget
109 self.__trading_consts.MAX_AMOUNT_OF_TRADES: int = max_amount_of_trades
110 self.__trading_consts.WINDOW_SIZE: int = window_size
111 self.__trading_consts.SELL_STOP_LOSS: float = sell_stop_loss
112 self.__trading_consts.SELL_TAKE_PROFIT: float = sell_take_profit
113 self.__trading_consts.BUY_STOP_LOSS: float = buy_stop_loss
114 self.__trading_consts.BUY_TAKE_PROFIT: float = buy_take_profit
115 self.__trading_consts.STATIC_REWARD_ADJUSTMENT: float = static_reward_adjustment
116 self.__trading_consts.PENALTY_STARTS: int = penalty_starts
117 self.__trading_consts.PENALTY_STOPS: int = penalty_stops
118 self.__trading_consts.PROFITABILITY_FUNCTION = lambda x: -1.0 * math.exp(-x + 1) + 1
119 self.__trading_consts.PENALTY_FUNCTION = lambda x: \
120 min(1, 1 - math.tanh(-3.0 * (x - penalty_stops) / (penalty_stops - penalty_starts)))
121 self.__trading_consts.OUTPUT_CLASSES: int = vars(self.__label_annotator.get_output_classes())
123 self.current_iteration: int = self.__trading_consts.WINDOW_SIZE
124 self.state: list[float] = self.__prepare_state_data()
125 self.action_space: Discrete = Discrete(3)
126 self.observation_space: Box = Box(low = np.ones(len(self.state)) * -3,
127 high = np.ones(len(self.state)) * 3,
128 dtype=np.float64)
130 def __split_data(self, data: pd.DataFrame, test_size: float) -> dict[pd.DataFrame, pd.DataFrame]:
131 """
132 Splits the given DataFrame into training and testing sets based on the specified test size ratio.
134 Parameters:
135 data (pd.DataFrame): DataFrame containing the stock market data.
136 test_size (float): Ratio of the data to be used for testing.
138 Returns:
139 (dict[pd.DataFrame, pd.DataFrame]): Dictionary containing training and testing data frames.
140 """
142 dividing_index = int(len(data) * (1 - test_size))
144 return {
145 TradingEnvironment.TRAIN_MODE: data.iloc[:dividing_index].reset_index(drop=True),
146 TradingEnvironment.TEST_MODE: data.iloc[dividing_index:].reset_index(drop=True)
147 }
149 def __prepare_labeled_data(self, env_length_range: Optional[tuple[int, int]] = None) -> pd.DataFrame:
150 """
151 Prepares labeled data for training the model with classification approach.
152 It extracts the relevant features and labels from the environment's data.
154 Parameters:
155 env_length_range (Optional[tuple[int, int]]): Optional range to limit the length
157 Returns:
158 (pd.DataFrame): A DataFrame containing the features and labels for training.
159 """
161 if env_length_range is None:
162 env_length_range = (self.current_iteration, self.get_environment_length() - 1)
164 new_rows = []
165 for i in range(env_length_range[0], env_length_range[1]):
166 data_row = self.__prepare_state_data(slice(i - self.__trading_consts.WINDOW_SIZE, i), include_trading_data = False)
167 new_rows.append(data_row)
169 new_data = pd.DataFrame(new_rows, columns=[f"feature_{i}" for i in range(len(new_rows[0]))])
170 labels = self.__label_annotator.annotate(self.__data[self.__mode]. \
171 iloc[:env_length_range[1]].copy()).shift(-env_length_range[0]).dropna()
173 return new_data, labels
175 def __prepare_state_data(self, index: Optional[slice] = None, include_trading_data: bool = True) -> list[float]:
176 """
177 Calculates state data as a list of floats representing current iteration's observation.
178 Observations contains all input data refined to window size and couple of coefficients
179 giving an insight into current budget and orders situation.
181 Returns:
182 (list[float]): List with current observations for environment.
183 """
185 if index is None:
186 index = slice(self.current_iteration - self.__trading_consts.WINDOW_SIZE, self.current_iteration)
188 current_market_data = self.__data[self.__mode].iloc[index]
189 current_market_data_no_index = current_market_data.select_dtypes(include = [np.number])
191 if self.__meta_data is not None and \
192 self.__meta_data.get('normalization_groups', None) is not None:
193 grouped_columns_names = self.__meta_data['normalization_groups']
194 preprocessed_data_pieces = []
195 left_over_columns_names = set(current_market_data_no_index.columns)
196 for columns_names_to_normalize in grouped_columns_names:
197 left_over_columns_names -= set(columns_names_to_normalize)
198 data_frame_piece_to_normalize = current_market_data_no_index[columns_names_to_normalize]
199 normalized_data_frame_piece = StandardScaler().fit_transform(data_frame_piece_to_normalize.values.reshape(-1, 1))
200 preprocessed_data_pieces.append(normalized_data_frame_piece.reshape(*data_frame_piece_to_normalize.shape))
201 for column in left_over_columns_names:
202 preprocessed_data_pieces.append(current_market_data_no_index[column].values.reshape(-1, 1))
203 normalized_current_market_data_values = np.hstack(preprocessed_data_pieces)
204 else:
205 normalized_current_market_data_values = StandardScaler().fit_transform(current_market_data_no_index)
206 current_marked_data_list = normalized_current_market_data_values.ravel().tolist()
208 if include_trading_data:
209 current_normalized_budget = 1.0 * self.__trading_data.current_budget / self.__trading_consts.INITIAL_BUDGET
210 current_profitability_coeff = self.__trading_consts.PROFITABILITY_FUNCTION(current_normalized_budget)
211 current_trades_occupancy_coeff = 1.0 * self.__trading_data.currently_placed_trades / self.__trading_consts.MAX_AMOUNT_OF_TRADES
212 current_no_trades_penalty_coeff = self.__trading_consts.PENALTY_FUNCTION(self.__trading_data.no_trades_placed_for)
213 current_inner_state_list = [current_profitability_coeff, current_trades_occupancy_coeff, current_no_trades_penalty_coeff]
214 current_marked_data_list += current_inner_state_list
216 return current_marked_data_list
218 def set_mode(self, mode: str) -> None:
219 """
220 Sets the mode of the environment to either TRAIN_MODE or TEST_MODE.
222 Parameters:
223 mode (str): Mode to set for the environment.
225 Raises:
226 ValueError: If the provided mode is not valid.
227 """
229 if mode not in [TradingEnvironment.TRAIN_MODE, TradingEnvironment.TEST_MODE]:
230 raise ValueError(f"Invalid mode: {mode}. Use TradingEnvironment.TRAIN_MODE or TradingEnvironment.TEST_MODE.")
231 self.__mode = mode
233 def get_mode(self) -> str:
234 """
235 Mode getter.
237 Returns:
238 (str): Current mode of the environment.
239 """
241 return copy.copy(self.__mode)
243 def get_trading_data(self) -> SimpleNamespace:
244 """
245 Trading data getter.
247 Returns:
248 (SimpleNamespace): Copy of the namespace with all trading data.
249 """
251 return copy.copy(self.__trading_data)
253 def get_number_of_trading_points_per_year(self) -> int:
254 """
255 Returns the number of trading points per year.
257 Returns:
258 (int): Number of trading points per year.
259 """
261 temp_data = {"time": pd.to_datetime(self.__data[self.TRAIN_MODE]['time'])}
262 temp_df = pd.DataFrame(temp_data)
263 temp_df['year'] = temp_df['time'].dt.year
265 trading_points_per_year = temp_df.groupby('year').size()
266 if len(trading_points_per_year) > 3:
267 # If there are more than three years, return the mode
268 # of the central years
269 return trading_points_per_year.iloc[1:-1].mode()[0]
270 elif len(trading_points_per_year) > 2:
271 # If there are only three years, return the middle year
272 return trading_points_per_year.values[-2]
273 else:
274 # If there are only two years, return the maximum
275 return max(trading_points_per_year.values)
277 def get_trading_consts(self) -> SimpleNamespace:
278 """
279 Trading constants getter.
281 Returns:
282 (SimpleNamespace): Copy of the namespace with all trading constants.
283 """
285 return copy.copy(self.__trading_consts)
287 def get_broker(self) -> Broker:
288 """
289 Broker getter.
291 Returns:
292 (Broker): Copy of the broker used by environment.
293 """
295 return copy.copy(self.__broker)
297 def get_environment_length(self) -> int:
298 """
299 Environment length getter.
301 Returns:
302 (Int): Length of environment.
303 """
305 return len(self.__data[self.__mode])
307 def get_environment_spatial_data_dimension(self) -> tuple[int, int]:
308 """
309 Environment spatial data dimensionality getter.
311 Returns:
312 (Int): Dimension of spatial data in environment.
313 """
315 return (self.__trading_consts.WINDOW_SIZE, self.__data[self.__mode].shape[1] - 1)
317 def get_labeled_data(self, should_split: bool = True, should_balance: bool = True,
318 verbose: bool = True, env_length_range: Optional[tuple[int, int]] = None) \
319 -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
320 """
321 Prepares labeled data for training or testing the model.
322 It extracts the relevant features and labels from the environment's data.
324 Parameters:
325 should_split (bool): Whether to split the data into training and testing sets.
326 Defaults to True. If set to False, testing data will be empty.
327 should_balance (bool): Whether to balance the labeled data. Defaults to True.
328 Will be ignored if labeled_data_balancer is None.
329 verbose (bool): Whether to log the class cardinality before and after balancing.
330 Defaults to True.
331 env_length_range (tuple[int, int]): Optional range to limit the range of the environment.
333 Returns:
334 (tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]): A tuple containing the
335 input data, output data, test input data, and test output data.
336 """
338 input_data, output_data = self.__prepare_labeled_data(env_length_range)
339 input_data_test, output_data_test = [], []
340 if verbose:
341 logging.info(f"Original class cardinality: {np.array(to_categorical(output_data)).sum(axis = 0)}")
343 if self.__mode == TradingEnvironment.TRAIN_MODE:
344 if should_split:
345 input_data, input_data_test, output_data, output_data_test = \
346 train_test_split(input_data, output_data, test_size = 0.1, random_state = 42,
347 stratify = output_data)
349 if self.__labeled_data_balancer is not None and should_balance:
350 input_data, output_data = self.__labeled_data_balancer.balance(input_data, output_data)
351 if verbose:
352 logging.info(f"Balanced class cardinality: {np.array(to_categorical(output_data)).sum(axis = 0)}")
354 return copy.copy((np.array(input_data), np.array(output_data),
355 np.array(input_data_test), np.array(output_data_test)))
357 def get_data_for_iteration(self, columns: list[str], start: int = 0, stop: Optional[int] = None,
358 step: int = 1) -> list[float]:
359 """
360 Data getter for certain iterations.
362 Parameters:
363 columns (list[str]): List of column names to extract from data.
364 start (int): Start iteration index. Defaults to 0.
365 stop (int): Stop iteration index. Defaults to environment length minus one.
366 step (int): Step between iterations. Defaults to 1.
368 Returns:
369 (list[float]): Copy of part of data with specified columns
370 over specified iterations.
371 """
373 if stop is None:
374 stop = self.get_environment_length() - 1
376 return copy.copy(self.__data[self.__mode].loc[start:stop:step, columns].values.ravel().tolist())
378 def step(self, action: int) -> tuple[list[float], float, bool, dict]:
379 """
380 Performs specified action on environment. It results in generation of the new
381 observations. This function causes trades to be handled, reward to be calculated and
382 environment to be updated.
384 Parameters:
385 action (int): Number specifing action. Possible values are 0 for buy action,
386 1 for wait action and 2 for sell action.
388 Returns:
389 (tuple[list[float], float, bool, dict]): Tuple containing next observation
390 state, reward, finish indication and additional info dictionary.
391 """
393 self.current_iteration += 1
394 self.state = self.__prepare_state_data()
396 close_changes = self.__data[self.__mode].iloc[self.current_iteration - 2 : self.current_iteration]['close'].values
397 stock_change_coeff = 1 + (close_changes[1] - close_changes[0]) / close_changes[0]
398 closed_orders = self.__broker.update_orders(stock_change_coeff)
400 if self.__trading_mode == TradingEnvironment.TradingMode.EXPLICIT_ORDER_CLOSING:
401 current_orders = self.__broker.get_current_orders()
402 if len(current_orders) > 0:
403 was_last_order_placed_as_buy = current_orders[-1].is_buy_order
404 if (action == 0 and not was_last_order_placed_as_buy) or \
405 (action == 2 and was_last_order_placed_as_buy):
406 closed_orders += self.__broker.force_close_orders()
408 reward = self.__validator.validate_orders(closed_orders)
409 self.__trading_data.currently_placed_trades -= len(closed_orders)
410 self.__trading_data.current_budget += np.sum([trade.current_value for trade in closed_orders])
411 self.__trading_data.currently_invested -= np.sum([trade.initial_value for trade in closed_orders])
413 number_of_possible_trades = self.__trading_consts.MAX_AMOUNT_OF_TRADES - self.__trading_data.currently_placed_trades
414 money_to_trade = 0
415 if number_of_possible_trades > 0:
416 money_to_trade = 1.0 / number_of_possible_trades * self.__trading_data.current_budget
418 if action == 0:
419 is_buy_order = True
420 stop_loss = self.__trading_consts.SELL_STOP_LOSS
421 take_profit = self.__trading_consts.SELL_TAKE_PROFIT
422 elif action == 2:
423 is_buy_order = False
424 stop_loss = self.__trading_consts.BUY_STOP_LOSS
425 take_profit = self.__trading_consts.BUY_TAKE_PROFIT
427 if action != 1:
428 if number_of_possible_trades > 0:
429 self.__trading_data.current_budget -= money_to_trade
430 self.__trading_data.currently_invested += money_to_trade
431 self.__broker.place_order(money_to_trade, is_buy_order, stop_loss, take_profit)
432 self.__trading_data.currently_placed_trades += 1
433 self.__trading_data.no_trades_placed_for = 0
434 reward += self.__trading_consts.STATIC_REWARD_ADJUSTMENT
435 else:
436 self.__trading_data.no_trades_placed_for += 1
437 reward -= self.__trading_consts.STATIC_REWARD_ADJUSTMENT
438 else:
439 self.__trading_data.no_trades_placed_for += 1
440 if number_of_possible_trades == 0:
441 reward += self.__trading_consts.STATIC_REWARD_ADJUSTMENT
443 if number_of_possible_trades > 0:
444 reward *= (1 - self.__trading_consts.PENALTY_FUNCTION(self.__trading_data.no_trades_placed_for)) \
445 if reward > 0 else 1
446 if self.__trading_consts.PENALTY_STOPS < self.__trading_data.no_trades_placed_for:
447 reward -= self.__trading_consts.STATIC_REWARD_ADJUSTMENT
449 if (self.current_iteration >= self.get_environment_length() - 1 or
450 self.__trading_data.current_budget > 10 * self.__trading_consts.INITIAL_BUDGET or
451 (self.__trading_data.current_budget + self.__trading_data.currently_invested) / self.__trading_consts.INITIAL_BUDGET < 0.8):
452 done = True
453 else:
454 done = False
456 info = {'coeff': stock_change_coeff,
457 'iteration': self.current_iteration,
458 'number_of_closed_orders': len(closed_orders),
459 'money_to_trade': money_to_trade,
460 'action': action,
461 'current_budget': self.__trading_data.current_budget,
462 'currently_invested': self.__trading_data.currently_invested,
463 'no_trades_placed_for': self.__trading_data.no_trades_placed_for,
464 'currently_placed_trades': self.__trading_data.currently_placed_trades}
466 return self.state, reward, done, info
468 def render(self) -> None:
469 """
470 Renders environment visualization. Will be implemented later.
471 """
473 #TODO: Visualization to be implemented
474 pass
476 def reset(self, randkey: Optional[int] = None) -> list[float]:
477 """
478 Resets environment. Used typically if environemnt is finished,
479 i.e. when ther is no more steps to be taken within environemnt
480 or finish conditions are fulfilled.
482 Parameters:
483 randkey (Optional[int]): Value indicating what iteration
484 should be trated as starting point after reset.
486 Returns:
487 (list[float]): Current iteration observation state.
488 """
490 if randkey is None:
491 randkey = random.randint(self.__trading_consts.WINDOW_SIZE, self.get_environment_length() - 1)
492 self.__trading_data.current_budget = self.__trading_consts.INITIAL_BUDGET
493 self.__trading_data.currently_invested = 0
494 self.__trading_data.no_trades_placed_for = 0
495 self.__trading_data.currently_placed_trades = 0
496 self.__broker.reset()
497 self.current_iteration = randkey
498 self.state = self.__prepare_state_data()
500 return self.state