Coverage for source/data_handling/coinbase_api_data_collector.py: 92%
52 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-01 20:51 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-01 20:51 +0000
1# data_handling/coinbase_api_data_collector.py
3# global imports
4import aiohttp
5import asyncio
6import math
7import pandas as pd
8import pytz
10# local imports
11from source.data_handling import ApiDataCollectorBase
12from source.utils import Granularity
14class CoinbaseApiDataCollector(ApiDataCollectorBase):
15 """
16 Implements a data collector for Coinbase API.
17 Responsible for collecting historical data for a given trading pair.
18 """
20 # local constants
21 __MAX_NUMBER_OF_CANDLES_PER_REQUEST = 300
22 __PRODUCTS_URL = 'https://api.exchange.coinbase.com/products'
24 async def __send_request_to_coinbase(self, session: aiohttp.ClientSession, url: str, pid: int) -> list:
25 """
26 Sends request towards Coinbase API and handles exceedance of public rates by repeating
27 request after certain time.
29 Parameters:
30 session (aiohttp.ClientSession): Session used to send request with.
31 url (str): URL address that certain request is sent towards.
32 pid (int): Request indentification number.
34 Raises:
35 RuntimeError: If public rates are exceeded. Will try to handle that and reattempt
36 to sent request.
38 Returns:
39 (list): List of values returned by Coinbase API for certain request.
40 """
42 try:
43 async with session.get(url) as response:
44 data = await response.json()
45 if 'message' in data and data['message'] == 'Public rate limit exceeded':
46 raise RuntimeError("Exceeded public rate! Retrying in 5s...")
47 return data
48 except:
49 await asyncio.sleep(5)
50 return await self.__send_request_to_coinbase(session, url, pid)
52 async def __get_possible_pairs(self) -> pd.DataFrame:
53 """
54 Collects data from Coinbase API regarding all possible trading pairs.
56 Returns:
57 (pd.DataFrame): Fetched possible trading pairs inside data frame.
58 """
60 async with aiohttp.ClientSession() as session:
61 response = await asyncio.gather(self.__send_request_to_coinbase(session, self.__PRODUCTS_URL, 0))
62 data = [[product['id'], product['base_currency'], product['quote_currency']] for product in response[0]]
63 df = pd.DataFrame(sorted(data), columns = ['id', 'base_currency', 'quote_currency'])
64 df.set_index('id', inplace = True)
65 return df
67 async def _validate_ticker(self, ticker: str) -> bool:
68 """
69 Validates if the ticker is supported by the API.
71 Parameters:
72 ticker (str): Stock ticker to validate.
74 Returns:
75 bool: True if ticker is valid, False otherwise.
76 """
78 df = await self.__get_possible_pairs()
79 if ticker not in df.index:
80 return False
82 return True
84 async def _collect_data_for_ticker(self, ticker: str, start_date: str, end_date: str, granularity: Granularity) \
85 -> tuple[pd.DataFrame, dict[str, str]]:
86 """
87 Collects data for a specific ticker from the API.
89 Parameters:
90 ticker (str): Stock ticker to collect data for.
91 start_date (str): Start date for the data collection.
92 end_date (str): End date for the data collection.
93 granularity (Granularity): Data resolution.
95 Returns:
96 tuple[pd.DataFrame, dict[str, str]]: A tuple containing the collected data and metadata.
97 """
99 start_timestamp = int(self._convert_date_to_datetime(start_date).replace(tzinfo = pytz.UTC).timestamp())
100 end_timestamp = int(self._convert_date_to_datetime(end_date).replace(tzinfo = pytz.UTC).timestamp())
101 granularity_seconds = granularity.value
102 total_periods = (end_timestamp - start_timestamp) // granularity_seconds
103 requests_needed = math.ceil(total_periods / self.__MAX_NUMBER_OF_CANDLES_PER_REQUEST)
105 async with aiohttp.ClientSession() as session:
106 tasks = []
107 for i in range(requests_needed):
108 start_period = start_timestamp + i * self.__MAX_NUMBER_OF_CANDLES_PER_REQUEST * granularity_seconds
109 end_period = min(start_period + self.__MAX_NUMBER_OF_CANDLES_PER_REQUEST * granularity_seconds, end_timestamp)
110 url = f'{self.__PRODUCTS_URL}/{ticker}/candles?start={start_period}&end={end_period}&granularity={granularity_seconds}'
111 tasks.append(self.__send_request_to_coinbase(session, url, i))
113 responses = await asyncio.gather(*tasks)
114 candles = [item for sublist in responses if sublist for item in sublist]
115 df = pd.DataFrame(candles, columns=['time', 'low', 'high', 'open', 'close', 'volume'])
116 df['time'] = pd.to_datetime(df['time'], unit = 's')
117 df.set_index('time', inplace = True)
118 df.sort_values(by = 'time', inplace = True)
119 return df, { 'normalization_groups': [['low', 'high', 'open', 'close'], ['volume']] }