Coverage for source/data_handling/data_handler.py: 84%
51 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-07-30 20:59 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-07-30 20:59 +0000
1# data_handling/data_handler.py
3# global imports
4import io
5import logging
6import pandas as pd
7from typing import Any, Optional
9# local imports
10from source.data_handling import ApiDataCollectorBase
11from source.indicators import IndicatorHandlerBase
12from source.utils import Granularity, SingletonMeta
14class DataHandler(metaclass = SingletonMeta):
15 """
16 Responsible for data handling. Including data collection and preparation.
17 """
19 def __init__(self) -> None:
20 """
21 Class constructor. Initializes components needed for data handling.
22 """
24 self.__api_data_collectors: list[ApiDataCollectorBase] = []
26 def register_api_data_collectors(self, api_data_collectors: list[ApiDataCollectorBase]) -> None:
27 """
28 Registers API data collectors for data collection.
30 Parameters:
31 api_data_collectors (list[ApiDataCollectorBase]): A list of instances of ApiDataCollectorBase or its subclasses.
32 """
34 for api_data_collector in api_data_collectors:
35 if not isinstance(api_data_collector, ApiDataCollectorBase):
36 raise TypeError("Parameter api_data_collector must be an instance of ApiDataCollectorBase or its subclass.")
38 self.__api_data_collectors = api_data_collectors
40 async def prepare_data(self, trading_pair: str, start_date: str, end_date: str,
41 granularity: Granularity, list_of_indicators: Optional[list[IndicatorHandlerBase]] = None) \
42 -> pd.DataFrame:
43 """
44 Collects data from coinbase API and extends it with list of indicators.
46 Parameters:
47 trading_pair (str): String representing unique trading pair symbol.
48 start_date (str): String representing date that collected data should start from.
49 end_date (str): String representing date that collected data should finish at.
50 granularity (Granularity): Enum specifying resolution of collected data - e.g. each
51 15 minutes or 1 hour or 6 hours is treated separately
52 list_of_indicators (list[IndicatorHandlerBase]): List of indicators that should be
53 calculated and added to the data. Defaults to None, which means no indicators
54 will be added.
56 Raises:
57 RuntimeError: If given trading pair symbol is not recognized.
59 Returns:
60 (pd.DataFrame): Collected data extended with given indicators.
61 """
63 if list_of_indicators is None:
64 list_of_indicators = []
66 data, meta_data = None, None
67 for api_data_collector in self.__api_data_collectors:
68 try:
69 data, meta_data = await api_data_collector._collect_data_for_ticker(trading_pair, start_date, end_date, granularity)
70 break
71 except Exception:
72 logging.info(f"Did not manage to collect data for {trading_pair} using "
73 f"{api_data_collector.__class__.__name__}... trying next one.")
75 if data is None or meta_data is None:
76 raise RuntimeError('Trading pair not recognized!')
78 if data.empty:
79 raise RuntimeError(f'No data collected for {trading_pair} between {start_date} and {end_date} with granularity {granularity}.')
81 if len(list_of_indicators) > 0:
82 indicators_data = []
83 for indicator in list_of_indicators:
84 indicators_data.append(indicator.calculate(data))
85 if indicator.can_be_normalized():
86 columns = indicators_data[-1].columns.tolist()
87 meta_data['normalization_groups'].append(columns)
88 data = pd.concat([data] + indicators_data, axis = 1)
90 return data, meta_data
92 def save_extended_data_into_csv_formatted_string_buffer(self, data: pd.DataFrame,
93 meta_data: Optional[dict[str, Any]] = None) -> io.StringIO:
94 """
95 Saves extended data into a CSV formatted string buffer.
97 Parameters:
98 data (pd.DataFrame): Data frame to be saved.
99 meta_data (Optional[dict[str, Any]]): Optional metadata to include in the CSV.
101 Returns:
102 (io.StringIO): StringIO buffer containing the CSV formatted data.
103 """
105 file_content_string_buffer = io.StringIO()
107 if meta_data is not None:
108 file_content_string_buffer.write(f'# {meta_data} \n')
110 data.to_csv(file_content_string_buffer, index = True)
112 return file_content_string_buffer
114 def read_extended_data_from_csv_formatted_string_buffer(self,
115 file_content_string_buffer: io.StringIO) -> tuple[pd.DataFrame, Optional[dict[str, Any]]]:
116 """
117 Reads extended data from a CSV formatted string buffer.
119 Parameters:
120 file_content_string_buffer (io.StringIO): StringIO buffer containing the CSV formatted data.
122 Returns:
123 (tuple[pd.DataFrame, Optional[dict[str, Any]]]): Tuple containing the data frame with extended data
124 and optional metadata.
125 """
127 meta_data = None
129 first_line = file_content_string_buffer.readline().strip()
130 if first_line.startswith('#'):
131 meta_data = eval(first_line[1:])
133 data = pd.read_csv(file_content_string_buffer)
135 return data, meta_data