Coverage for source/data_handling/data_handler.py: 97%
38 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-07-27 17:11 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-07-27 17:11 +0000
1# data_handling/data_handler.py
3# global imports
4import io
5import pandas as pd
6from typing import Any, Optional
8# local imports
9from source.data_handling import CoinBaseHandler
10from source.indicators import IndicatorHandlerBase
11from source.utils import Granularity, SingletonMeta
13class DataHandler(metaclass = SingletonMeta):
14 """
15 Responsible for data handling. Including data collection and preparation.
16 """
18 def __init__(self) -> None:
19 """
20 Class constructor. Initializes needed components for data handling.
21 """
23 self.__coinbase: CoinBaseHandler = CoinBaseHandler()
25 async def prepare_data(self, trading_pair: str, start_date: str, end_date: str,
26 granularity: Granularity, list_of_indicators: Optional[list[IndicatorHandlerBase]] = None) \
27 -> pd.DataFrame:
28 """
29 Collects data from coinbase API and extends it with list of indicators.
31 Parameters:
32 trading_pair (str): String representing unique trading pair symbol.
33 start_date (str): String representing date that collected data should start from.
34 end_date (str): String representing date that collected data should finish at.
35 granularity (Granularity): Enum specifying resolution of collected data - e.g. each
36 15 minutes or 1 hour or 6 hours is treated separately
37 list_of_indicators (list[IndicatorHandlerBase]): List of indicators that should be
38 calculated and added to the data. Defaults to None, which means no indicators
39 will be added.
41 Raises:
42 RuntimeError: If given trading pair symbol is not recognized.
44 Returns:
45 (pd.DataFrame): Collected data extended with given indicators.
46 """
48 if list_of_indicators is None:
49 list_of_indicators = []
51 possible_trading_pairs = await self.__coinbase.get_possible_pairs()
52 if trading_pair not in possible_trading_pairs.index:
53 raise RuntimeError('Trading pair not recognized!')
55 data, meta_data = await self.__coinbase.get_candles_for(trading_pair, start_date, end_date, granularity)
56 if len(list_of_indicators) > 0:
57 indicators_data = []
58 for indicator in list_of_indicators:
59 indicators_data.append(indicator.calculate(data))
60 if indicator.can_be_normalized():
61 columns = indicators_data[-1].columns.tolist()
62 meta_data['normalization_groups'].append(columns)
63 data = pd.concat([data] + indicators_data, axis = 1)
65 return data, meta_data
67 def save_extended_data_into_csv_formatted_string_buffer(self, data: pd.DataFrame,
68 meta_data: Optional[dict[str, Any]] = None) -> io.StringIO:
69 """
70 Saves extended data into a CSV formatted string buffer.
72 Parameters:
73 data (pd.DataFrame): Data frame to be saved.
74 meta_data (Optional[dict[str, Any]]): Optional metadata to include in the CSV.
76 Returns:
77 (io.StringIO): StringIO buffer containing the CSV formatted data.
78 """
80 file_content_string_buffer = io.StringIO()
82 if meta_data is not None:
83 file_content_string_buffer.write(f'# {meta_data} \n')
85 data.to_csv(file_content_string_buffer, index = True)
87 return file_content_string_buffer
89 def read_extended_data_from_csv_formatted_string_buffer(self,
90 file_content_string_buffer: io.StringIO) -> tuple[pd.DataFrame, Optional[dict[str, Any]]]:
91 """
92 Reads extended data from a CSV formatted string buffer.
94 Parameters:
95 file_content_string_buffer (io.StringIO): StringIO buffer containing the CSV formatted data.
97 Returns:
98 (tuple[pd.DataFrame, Optional[dict[str, Any]]]): Tuple containing the data frame with extended data
99 and optional metadata.
100 """
102 meta_data = None
104 first_line = file_content_string_buffer.readline().strip()
105 if first_line.startswith('#'):
106 meta_data = eval(first_line[1:])
108 data = pd.read_csv(file_content_string_buffer)
110 return data, meta_data