Coverage for source/data_handling/coinbase_api_data_collector.py: 92%

52 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-07-30 20:59 +0000

1# data_handling/coinbase_api_data_collector.py 

2 

3# global imports 

4import aiohttp 

5import asyncio 

6import math 

7import pandas as pd 

8import pytz 

9 

10# local imports 

11from source.data_handling import ApiDataCollectorBase 

12from source.utils import Granularity 

13 

14class CoinbaseApiDataCollector(ApiDataCollectorBase): 

15 """ 

16 Implements a data collector for Coinbase API. 

17 Responsible for collecting historical data for a given trading pair. 

18 """ 

19 

20 # local constants 

21 __MAX_NUMBER_OF_CANDLES_PER_REQUEST = 300 

22 __PRODUCTS_URL = 'https://api.exchange.coinbase.com/products' 

23 

24 async def __send_request_to_coinbase(self, session: aiohttp.ClientSession, url: str, pid: int) -> list: 

25 """ 

26 Sends request towards Coinbase API and handles exceedance of public rates by repeating 

27 request after certain time. 

28 

29 Parameters: 

30 session (aiohttp.ClientSession): Session used to send request with. 

31 url (str): URL address that certain request is sent towards. 

32 pid (int): Request indentification number. 

33 

34 Raises: 

35 RuntimeError: If public rates are exceeded. Will try to handle that and reattempt 

36 to sent request. 

37 

38 Returns: 

39 (list): List of values returned by Coinbase API for certain request. 

40 """ 

41 

42 try: 

43 async with session.get(url) as response: 

44 data = await response.json() 

45 if 'message' in data and data['message'] == 'Public rate limit exceeded': 

46 raise RuntimeError("Exceeded public rate! Retrying in 5s...") 

47 return data 

48 except: 

49 await asyncio.sleep(5) 

50 return await self.__send_request_to_coinbase(session, url, pid) 

51 

52 async def __get_possible_pairs(self) -> pd.DataFrame: 

53 """ 

54 Collects data from Coinbase API regarding all possible trading pairs. 

55 

56 Returns: 

57 (pd.DataFrame): Fetched possible trading pairs inside data frame. 

58 """ 

59 

60 async with aiohttp.ClientSession() as session: 

61 response = await asyncio.gather(self.__send_request_to_coinbase(session, self.__PRODUCTS_URL, 0)) 

62 data = [[product['id'], product['base_currency'], product['quote_currency']] for product in response[0]] 

63 df = pd.DataFrame(sorted(data), columns = ['id', 'base_currency', 'quote_currency']) 

64 df.set_index('id', inplace = True) 

65 return df 

66 

67 async def _validate_ticker(self, ticker: str) -> bool: 

68 """ 

69 Validates if the ticker is supported by the API. 

70 

71 Parameters: 

72 ticker (str): Stock ticker to validate. 

73 

74 Returns: 

75 bool: True if ticker is valid, False otherwise. 

76 """ 

77 

78 df = await self.__get_possible_pairs() 

79 if ticker not in df.index: 

80 return False 

81 

82 return True 

83 

84 async def _collect_data_for_ticker(self, ticker: str, start_date: str, end_date: str, granularity: Granularity) \ 

85 -> tuple[pd.DataFrame, dict[str, str]]: 

86 """ 

87 Collects data for a specific ticker from the API. 

88 

89 Parameters: 

90 ticker (str): Stock ticker to collect data for. 

91 start_date (str): Start date for the data collection. 

92 end_date (str): End date for the data collection. 

93 granularity (Granularity): Data resolution. 

94 

95 Returns: 

96 tuple[pd.DataFrame, dict[str, str]]: A tuple containing the collected data and metadata. 

97 """ 

98 

99 start_timestamp = int(self._convert_date_to_datetime(start_date).replace(tzinfo = pytz.UTC).timestamp()) 

100 end_timestamp = int(self._convert_date_to_datetime(end_date).replace(tzinfo = pytz.UTC).timestamp()) 

101 granularity_seconds = granularity.value 

102 total_periods = (end_timestamp - start_timestamp) // granularity_seconds 

103 requests_needed = math.ceil(total_periods / self.__MAX_NUMBER_OF_CANDLES_PER_REQUEST) 

104 

105 async with aiohttp.ClientSession() as session: 

106 tasks = [] 

107 for i in range(requests_needed): 

108 start_period = start_timestamp + i * self.__MAX_NUMBER_OF_CANDLES_PER_REQUEST * granularity_seconds 

109 end_period = min(start_period + self.__MAX_NUMBER_OF_CANDLES_PER_REQUEST * granularity_seconds, end_timestamp) 

110 url = f'{self.__PRODUCTS_URL}/{ticker}/candles?start={start_period}&end={end_period}&granularity={granularity_seconds}' 

111 tasks.append(self.__send_request_to_coinbase(session, url, i)) 

112 

113 responses = await asyncio.gather(*tasks) 

114 candles = [item for sublist in responses if sublist for item in sublist] 

115 df = pd.DataFrame(candles, columns=['time', 'low', 'high', 'open', 'close', 'volume']) 

116 df['time'] = pd.to_datetime(df['time'], unit = 's') 

117 df.set_index('time', inplace = True) 

118 df.sort_values(by = 'time', inplace = True) 

119 return df, { 'normalization_groups': [['low', 'high', 'open', 'close'], ['volume']] }