Coverage for source/data_handling/data_handler.py: 68%
71 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-09-29 18:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-09-29 18:17 +0000
1# data_handling/data_handler.py
3# global imports
4import io
5import logging
6import os
7import pandas as pd
8from typing import Any, Optional
10# local imports
11from source.data_handling import ApiDataCollectorBase
12from source.indicators import IndicatorHandlerBase
13from source.utils import Granularity, SingletonMeta
15class DataHandler(metaclass = SingletonMeta):
16 """
17 Responsible for data handling. Including data collection and preparation.
18 """
20 # local constants
21 __EXPECTED_COLUMN_NAMES = ['time', 'low', 'high', 'open', 'close', 'volume']
23 def __init__(self) -> None:
24 """
25 Class constructor. Initializes components needed for data handling.
26 """
28 self.__api_data_collectors: list[ApiDataCollectorBase] = []
30 def register_api_data_collectors(self, api_data_collectors: list[ApiDataCollectorBase]) -> None:
31 """
32 Registers API data collectors for data collection.
34 Parameters:
35 api_data_collectors (list[ApiDataCollectorBase]): A list of instances of ApiDataCollectorBase or its subclasses.
36 """
38 for api_data_collector in api_data_collectors:
39 if not isinstance(api_data_collector, ApiDataCollectorBase):
40 raise TypeError("Parameter api_data_collector must be an instance of ApiDataCollectorBase or its subclass.")
42 self.__api_data_collectors = api_data_collectors
44 async def prepare_data(self, input_source: str, start_date: str, end_date: str, granularity: Optional[Granularity] = None,
45 list_of_indicators: Optional[list[IndicatorHandlerBase]] = None) -> pd.DataFrame:
46 """
47 Collects data from coinbase API and extends it with list of indicators.
49 Parameters:
50 input_source (str): String representing unique trading symbol or path
51 to the file to be preprocessed.
52 start_date (str): String representing date that collected data should start from.
53 end_date (str): String representing date that collected data should finish at.
54 granularity (Optional[Granularity]): Enum specifying resolution of collected data - e.g. each
55 15 minutes or 1 hour or 6 hours is treated separately. It is optional when input_source is a file path,
56 but must be provided when input_source is a ticker.
57 list_of_indicators (Optional[list[IndicatorHandlerBase]]): List of indicators that should be
58 calculated and added to the data. Defaults to None, which means no indicators
59 will be added.
61 Raises:
62 RuntimeError: If given trading pair symbol is not recognized.
64 Returns:
65 (pd.DataFrame): Preprocessed data extended with given indicators.
66 """
68 data, meta_data = None, None
69 if list_of_indicators is None:
70 list_of_indicators = []
72 # Input source is a file path
73 if os.path.isfile(input_source):
74 logging.info(f"Assuming input source '{input_source}' to be a file path.")
75 if input_source.endswith('.csv'):
76 data = pd.read_csv(input_source)
77 data.columns = data.columns.str.lower()
79 if not all(col in data.columns for col in self.__EXPECTED_COLUMN_NAMES):
80 logging.error(f"Found columns: {data.columns.tolist()}, "
81 f"while expected columns are: {self.__EXPECTED_COLUMN_NAMES}")
82 raise ValueError(f"CSV file must contain columns: {', '.join(self.__EXPECTED_COLUMN_NAMES)}")
84 data = data[self.__EXPECTED_COLUMN_NAMES]
85 data['time'] = pd.to_datetime(data['time'])
86 data.set_index('time', inplace = True)
87 data.sort_index(inplace = True)
89 data = data[(data.index >= pd.to_datetime(start_date)) & \
90 (data.index < pd.to_datetime(end_date))]
91 meta_data = { 'normalization_groups': [['low', 'high', 'open', 'close'], ['volume']] }
92 else:
93 raise ValueError("Unsupported file format. Please provide a CSV file.")
95 # Input source is assumed to be a ticker otherwise
96 else:
97 logging.info(f"Assuming input source '{input_source}' to be a ticker.")
98 if granularity is None:
99 raise ValueError("Granularity must be provided when input source is a ticker.")
101 for api_data_collector in self.__api_data_collectors:
102 try:
103 data, meta_data = await api_data_collector.collect_data(input_source, start_date, end_date, granularity)
104 break
105 except Exception:
106 logging.info(f"Did not manage to collect data for {input_source} using "
107 f"{api_data_collector.__class__.__name__}... trying next one.")
109 if data is None or meta_data is None:
110 raise RuntimeError('Trading pair not recognized!')
112 if data.empty:
113 raise RuntimeError(f'No data collected for {input_source} between {start_date} and {end_date} with granularity {granularity}.')
115 if len(list_of_indicators) > 0:
116 indicators_data = []
117 for indicator in list_of_indicators:
118 indicators_data.append(indicator.calculate(data))
119 if indicator.can_be_normalized():
120 columns = indicators_data[-1].columns.tolist()
121 meta_data['normalization_groups'].append(columns)
122 data = pd.concat([data] + indicators_data, axis = 1)
124 return data, meta_data
126 def save_extended_data_into_csv_formatted_string_buffer(self, data: pd.DataFrame,
127 meta_data: Optional[dict[str, Any]] = None) -> io.StringIO:
128 """
129 Saves extended data into a CSV formatted string buffer.
131 Parameters:
132 data (pd.DataFrame): Data frame to be saved.
133 meta_data (Optional[dict[str, Any]]): Optional metadata to include in the CSV.
135 Returns:
136 (io.StringIO): StringIO buffer containing the CSV formatted data.
137 """
139 file_content_string_buffer = io.StringIO()
141 if meta_data is not None:
142 file_content_string_buffer.write(f'# {meta_data} \n')
144 data.to_csv(file_content_string_buffer, index = True)
146 return file_content_string_buffer
148 def read_extended_data_from_csv_formatted_string_buffer(self,
149 file_content_string_buffer: io.StringIO) -> tuple[pd.DataFrame, Optional[dict[str, Any]]]:
150 """
151 Reads extended data from a CSV formatted string buffer.
153 Parameters:
154 file_content_string_buffer (io.StringIO): StringIO buffer containing the CSV formatted data.
156 Returns:
157 (tuple[pd.DataFrame, Optional[dict[str, Any]]]): Tuple containing the data frame with extended data
158 and optional metadata.
159 """
161 meta_data = None
163 first_line = file_content_string_buffer.readline().strip()
164 if first_line.startswith('#'):
165 meta_data = eval(first_line[1:])
167 data = pd.read_csv(file_content_string_buffer)
169 return data, meta_data