Coverage for source/data_handling/data_handler.py: 68%

71 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-08-24 10:18 +0000

1# data_handling/data_handler.py 

2 

3# global imports 

4import io 

5import logging 

6import os 

7import pandas as pd 

8from typing import Any, Optional 

9 

10# local imports 

11from source.data_handling import ApiDataCollectorBase 

12from source.indicators import IndicatorHandlerBase 

13from source.utils import Granularity, SingletonMeta 

14 

15class DataHandler(metaclass = SingletonMeta): 

16 """ 

17 Responsible for data handling. Including data collection and preparation. 

18 """ 

19 

20 # local constants 

21 __EXPECTED_COLUMN_NAMES = ['time', 'low', 'high', 'open', 'close', 'volume'] 

22 

23 def __init__(self) -> None: 

24 """ 

25 Class constructor. Initializes components needed for data handling. 

26 """ 

27 

28 self.__api_data_collectors: list[ApiDataCollectorBase] = [] 

29 

30 def register_api_data_collectors(self, api_data_collectors: list[ApiDataCollectorBase]) -> None: 

31 """ 

32 Registers API data collectors for data collection. 

33 

34 Parameters: 

35 api_data_collectors (list[ApiDataCollectorBase]): A list of instances of ApiDataCollectorBase or its subclasses. 

36 """ 

37 

38 for api_data_collector in api_data_collectors: 

39 if not isinstance(api_data_collector, ApiDataCollectorBase): 

40 raise TypeError("Parameter api_data_collector must be an instance of ApiDataCollectorBase or its subclass.") 

41 

42 self.__api_data_collectors = api_data_collectors 

43 

44 async def prepare_data(self, input_source: str, start_date: str, end_date: str, granularity: Optional[Granularity] = None, 

45 list_of_indicators: Optional[list[IndicatorHandlerBase]] = None) -> pd.DataFrame: 

46 """ 

47 Collects data from coinbase API and extends it with list of indicators. 

48 

49 Parameters: 

50 input_source (str): String representing unique trading symbol or path 

51 to the file to be preprocessed. 

52 start_date (str): String representing date that collected data should start from. 

53 end_date (str): String representing date that collected data should finish at. 

54 granularity (Optional[Granularity]): Enum specifying resolution of collected data - e.g. each 

55 15 minutes or 1 hour or 6 hours is treated separately. It is optional when input_source is a file path, 

56 but must be provided when input_source is a ticker. 

57 list_of_indicators (Optional[list[IndicatorHandlerBase]]): List of indicators that should be 

58 calculated and added to the data. Defaults to None, which means no indicators 

59 will be added. 

60 

61 Raises: 

62 RuntimeError: If given trading pair symbol is not recognized. 

63 

64 Returns: 

65 (pd.DataFrame): Preprocessed data extended with given indicators. 

66 """ 

67 

68 data, meta_data = None, None 

69 if list_of_indicators is None: 

70 list_of_indicators = [] 

71 

72 # Input source is a file path 

73 if os.path.isfile(input_source): 

74 logging.info(f"Assuming input source '{input_source}' to be a file path.") 

75 if input_source.endswith('.csv'): 

76 data = pd.read_csv(input_source) 

77 data.columns = data.columns.str.lower() 

78 

79 if not all(col in data.columns for col in self.__EXPECTED_COLUMN_NAMES): 

80 logging.error(f"Found columns: {data.columns.tolist()}, " 

81 f"while expected columns are: {self.__EXPECTED_COLUMN_NAMES}") 

82 raise ValueError(f"CSV file must contain columns: {', '.join(self.__EXPECTED_COLUMN_NAMES)}") 

83 

84 data = data[self.__EXPECTED_COLUMN_NAMES] 

85 data['time'] = pd.to_datetime(data['time']) 

86 data.set_index('time', inplace = True) 

87 data.sort_index(inplace = True) 

88 

89 data = data[(data.index >= pd.to_datetime(start_date)) & \ 

90 (data.index < pd.to_datetime(end_date))] 

91 meta_data = { 'normalization_groups': [['low', 'high', 'open', 'close'], ['volume']] } 

92 else: 

93 raise ValueError("Unsupported file format. Please provide a CSV file.") 

94 

95 # Input source is assumed to be a ticker otherwise 

96 else: 

97 logging.info(f"Assuming input source '{input_source}' to be a ticker.") 

98 if granularity is None: 

99 raise ValueError("Granularity must be provided when input source is a ticker.") 

100 

101 for api_data_collector in self.__api_data_collectors: 

102 try: 

103 data, meta_data = await api_data_collector.collect_data(input_source, start_date, end_date, granularity) 

104 break 

105 except Exception: 

106 logging.info(f"Did not manage to collect data for {input_source} using " 

107 f"{api_data_collector.__class__.__name__}... trying next one.") 

108 

109 if data is None or meta_data is None: 

110 raise RuntimeError('Trading pair not recognized!') 

111 

112 if data.empty: 

113 raise RuntimeError(f'No data collected for {input_source} between {start_date} and {end_date} with granularity {granularity}.') 

114 

115 if len(list_of_indicators) > 0: 

116 indicators_data = [] 

117 for indicator in list_of_indicators: 

118 indicators_data.append(indicator.calculate(data)) 

119 if indicator.can_be_normalized(): 

120 columns = indicators_data[-1].columns.tolist() 

121 meta_data['normalization_groups'].append(columns) 

122 data = pd.concat([data] + indicators_data, axis = 1) 

123 

124 return data, meta_data 

125 

126 def save_extended_data_into_csv_formatted_string_buffer(self, data: pd.DataFrame, 

127 meta_data: Optional[dict[str, Any]] = None) -> io.StringIO: 

128 """ 

129 Saves extended data into a CSV formatted string buffer. 

130 

131 Parameters: 

132 data (pd.DataFrame): Data frame to be saved. 

133 meta_data (Optional[dict[str, Any]]): Optional metadata to include in the CSV. 

134 

135 Returns: 

136 (io.StringIO): StringIO buffer containing the CSV formatted data. 

137 """ 

138 

139 file_content_string_buffer = io.StringIO() 

140 

141 if meta_data is not None: 

142 file_content_string_buffer.write(f'# {meta_data} \n') 

143 

144 data.to_csv(file_content_string_buffer, index = True) 

145 

146 return file_content_string_buffer 

147 

148 def read_extended_data_from_csv_formatted_string_buffer(self, 

149 file_content_string_buffer: io.StringIO) -> tuple[pd.DataFrame, Optional[dict[str, Any]]]: 

150 """ 

151 Reads extended data from a CSV formatted string buffer. 

152 

153 Parameters: 

154 file_content_string_buffer (io.StringIO): StringIO buffer containing the CSV formatted data. 

155 

156 Returns: 

157 (tuple[pd.DataFrame, Optional[dict[str, Any]]]): Tuple containing the data frame with extended data 

158 and optional metadata. 

159 """ 

160 

161 meta_data = None 

162 

163 first_line = file_content_string_buffer.readline().strip() 

164 if first_line.startswith('#'): 

165 meta_data = eval(first_line[1:]) 

166 

167 data = pd.read_csv(file_content_string_buffer) 

168 

169 return data, meta_data