Coverage for source/data_handling/data_handler.py: 84%

51 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-07-30 20:59 +0000

1# data_handling/data_handler.py 

2 

3# global imports 

4import io 

5import logging 

6import pandas as pd 

7from typing import Any, Optional 

8 

9# local imports 

10from source.data_handling import ApiDataCollectorBase 

11from source.indicators import IndicatorHandlerBase 

12from source.utils import Granularity, SingletonMeta 

13 

14class DataHandler(metaclass = SingletonMeta): 

15 """ 

16 Responsible for data handling. Including data collection and preparation. 

17 """ 

18 

19 def __init__(self) -> None: 

20 """ 

21 Class constructor. Initializes components needed for data handling. 

22 """ 

23 

24 self.__api_data_collectors: list[ApiDataCollectorBase] = [] 

25 

26 def register_api_data_collectors(self, api_data_collectors: list[ApiDataCollectorBase]) -> None: 

27 """ 

28 Registers API data collectors for data collection. 

29 

30 Parameters: 

31 api_data_collectors (list[ApiDataCollectorBase]): A list of instances of ApiDataCollectorBase or its subclasses. 

32 """ 

33 

34 for api_data_collector in api_data_collectors: 

35 if not isinstance(api_data_collector, ApiDataCollectorBase): 

36 raise TypeError("Parameter api_data_collector must be an instance of ApiDataCollectorBase or its subclass.") 

37 

38 self.__api_data_collectors = api_data_collectors 

39 

40 async def prepare_data(self, trading_pair: str, start_date: str, end_date: str, 

41 granularity: Granularity, list_of_indicators: Optional[list[IndicatorHandlerBase]] = None) \ 

42 -> pd.DataFrame: 

43 """ 

44 Collects data from coinbase API and extends it with list of indicators. 

45 

46 Parameters: 

47 trading_pair (str): String representing unique trading pair symbol. 

48 start_date (str): String representing date that collected data should start from. 

49 end_date (str): String representing date that collected data should finish at. 

50 granularity (Granularity): Enum specifying resolution of collected data - e.g. each 

51 15 minutes or 1 hour or 6 hours is treated separately 

52 list_of_indicators (list[IndicatorHandlerBase]): List of indicators that should be 

53 calculated and added to the data. Defaults to None, which means no indicators 

54 will be added. 

55 

56 Raises: 

57 RuntimeError: If given trading pair symbol is not recognized. 

58 

59 Returns: 

60 (pd.DataFrame): Collected data extended with given indicators. 

61 """ 

62 

63 if list_of_indicators is None: 

64 list_of_indicators = [] 

65 

66 data, meta_data = None, None 

67 for api_data_collector in self.__api_data_collectors: 

68 try: 

69 data, meta_data = await api_data_collector._collect_data_for_ticker(trading_pair, start_date, end_date, granularity) 

70 break 

71 except Exception: 

72 logging.info(f"Did not manage to collect data for {trading_pair} using " 

73 f"{api_data_collector.__class__.__name__}... trying next one.") 

74 

75 if data is None or meta_data is None: 

76 raise RuntimeError('Trading pair not recognized!') 

77 

78 if data.empty: 

79 raise RuntimeError(f'No data collected for {trading_pair} between {start_date} and {end_date} with granularity {granularity}.') 

80 

81 if len(list_of_indicators) > 0: 

82 indicators_data = [] 

83 for indicator in list_of_indicators: 

84 indicators_data.append(indicator.calculate(data)) 

85 if indicator.can_be_normalized(): 

86 columns = indicators_data[-1].columns.tolist() 

87 meta_data['normalization_groups'].append(columns) 

88 data = pd.concat([data] + indicators_data, axis = 1) 

89 

90 return data, meta_data 

91 

92 def save_extended_data_into_csv_formatted_string_buffer(self, data: pd.DataFrame, 

93 meta_data: Optional[dict[str, Any]] = None) -> io.StringIO: 

94 """ 

95 Saves extended data into a CSV formatted string buffer. 

96 

97 Parameters: 

98 data (pd.DataFrame): Data frame to be saved. 

99 meta_data (Optional[dict[str, Any]]): Optional metadata to include in the CSV. 

100 

101 Returns: 

102 (io.StringIO): StringIO buffer containing the CSV formatted data. 

103 """ 

104 

105 file_content_string_buffer = io.StringIO() 

106 

107 if meta_data is not None: 

108 file_content_string_buffer.write(f'# {meta_data} \n') 

109 

110 data.to_csv(file_content_string_buffer, index = True) 

111 

112 return file_content_string_buffer 

113 

114 def read_extended_data_from_csv_formatted_string_buffer(self, 

115 file_content_string_buffer: io.StringIO) -> tuple[pd.DataFrame, Optional[dict[str, Any]]]: 

116 """ 

117 Reads extended data from a CSV formatted string buffer. 

118 

119 Parameters: 

120 file_content_string_buffer (io.StringIO): StringIO buffer containing the CSV formatted data. 

121 

122 Returns: 

123 (tuple[pd.DataFrame, Optional[dict[str, Any]]]): Tuple containing the data frame with extended data 

124 and optional metadata. 

125 """ 

126 

127 meta_data = None 

128 

129 first_line = file_content_string_buffer.readline().strip() 

130 if first_line.startswith('#'): 

131 meta_data = eval(first_line[1:]) 

132 

133 data = pd.read_csv(file_content_string_buffer) 

134 

135 return data, meta_data