Coverage for source/data_handling/coinbase_handler.py: 87%

55 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-07-27 17:11 +0000

1# data_handling/coinbase_handler.py 

2 

3# global imports 

4import aiohttp 

5import asyncio 

6import math 

7import pandas as pd 

8import pytz 

9from datetime import datetime 

10 

11# local imports 

12from source.utils import Granularity 

13 

14class CoinBaseHandler: 

15 """ 

16 Responsible for handling request towards Coinbase API. 

17 """ 

18 

19 # local constants 

20 __MAX_NUMBER_OF_CANDLES_PER_REQUEST = 300 

21 __PRODUCTS_URL = 'https://api.exchange.coinbase.com/products' 

22 

23 def __convert_date_to_timestamp(self, date_str: str, date_format: str = "%Y-%m-%d %H:%M:%S", target_timezone = pytz.UTC) -> int: 

24 """ 

25 Converts date given by string into integer timestamp. 

26 

27 Parameters: 

28 date_str (str): String representing certain date. 

29 date_format (str): String representing format that certain date is written in. 

30 target_timezone (Any): Timezone given by any type of value compatible with datetime library. 

31 

32 Raises: 

33 ValueError: In case of invalid date conversion. 

34 

35 Returns: 

36 (int): Timestamp converted from input date. 

37 """ 

38 

39 try: 

40 result = int(datetime.strptime(date_str, date_format).replace(tzinfo=target_timezone).timestamp()) 

41 except: 

42 raise ValueError(f'Invalid data format! Expected was {date_format}.') 

43 

44 return result 

45 

46 async def __send_request_to_coinbase(self, session: aiohttp.ClientSession, url: str, pid: int) -> list: 

47 """ 

48 Sends request towards Coinbase API and handles exceedance of public rates by repeating 

49 request after certain time. 

50 

51 Parameters: 

52 session (aiohttp.ClientSession): Session used to send request with. 

53 url (str): URL address that certain request is sent towards. 

54 pid (int): Request indentification number. 

55 

56 Raises: 

57 RuntimeError: If public rates are exceeded. Will try to handle that and reattempt 

58 to sent request. 

59 

60 Returns: 

61 (list): List of values returned by Coinbase API for certain request. 

62 """ 

63 

64 try: 

65 async with session.get(url) as response: 

66 data = await response.json() 

67 if 'message' in data and data['message'] == 'Public rate limit exceeded': 

68 raise RuntimeError("Exceeded public rate! Retrying in 5s...") 

69 return data 

70 except: 

71 await asyncio.sleep(5) 

72 return await self.__send_request_to_coinbase(session, url, pid) 

73 

74 async def get_candles_for(self, trading_pair: str, start_date: str, end_date: str, granularity: Granularity) \ 

75 -> tuple[pd.DataFrame, dict[str, str]]: 

76 """ 

77 Collects data from Coinbase API given starting date, ending date, granularity and trainding pair. Dependent on amount of 

78 data segments to fetch, might take some time. Especially, if request exceeds public rates. 

79 

80 Parameters: 

81 trading_pair (str): String representing unique trainding pair symbol. 

82 start_date (str): String representing date that collected data should start from. 

83 end_date (str): String representing date that collected data should finish at. 

84 granularity (Granularity): Enum specifying resolution of collected data - e.g. each 

85 15 minutes or 1 hour or 6 hours is treated separately 

86 

87 Raises: 

88 ValueError: If given granularity is not member if Granularity enum. 

89 

90 Returns: 

91 (tuple[pd.DataFrame, dict[str, str]]): Tuple containing data frame with collected data and meta data. 

92 """ 

93 

94 if granularity not in Granularity: 

95 raise ValueError(f"{granularity} is not an value of Granularity enum!") 

96 

97 start_timestamp = self.__convert_date_to_timestamp(start_date) 

98 end_timestamp = self.__convert_date_to_timestamp(end_date) 

99 granularity_seconds = granularity.value 

100 total_periods = (end_timestamp - start_timestamp) // granularity_seconds 

101 requests_needed = math.ceil(total_periods / self.__MAX_NUMBER_OF_CANDLES_PER_REQUEST) 

102 

103 async with aiohttp.ClientSession() as session: 

104 tasks = [] 

105 for i in range(requests_needed): 

106 start_period = start_timestamp + i * self.__MAX_NUMBER_OF_CANDLES_PER_REQUEST * granularity_seconds 

107 end_period = min(start_period + self.__MAX_NUMBER_OF_CANDLES_PER_REQUEST * granularity_seconds, end_timestamp) 

108 url = f'{self.__PRODUCTS_URL}/{trading_pair}/candles?start={start_period}&end={end_period}&granularity={granularity_seconds}' 

109 tasks.append(self.__send_request_to_coinbase(session, url, i)) 

110 

111 responses = await asyncio.gather(*tasks) 

112 candles = [item for sublist in responses if sublist for item in sublist] 

113 df = pd.DataFrame(candles, columns=['time', 'low', 'high', 'open', 'close', 'volume']) 

114 df['time'] = pd.to_datetime(df['time'], unit = 's') 

115 df.set_index('time', inplace = True) 

116 df.sort_values(by = 'time', inplace = True) 

117 return df, { 'normalization_groups': [['low', 'high', 'open', 'close'], ['volume']] } 

118 

119 async def get_possible_pairs(self) -> pd.DataFrame: 

120 """ 

121 Collects data from Coinbase API reagrding all possible traiding pairs. 

122 

123 Returns: 

124 (pd.DataFrame): Fetched possible traiding pairs inside data frame. 

125 """ 

126 

127 async with aiohttp.ClientSession() as session: 

128 response = await asyncio.gather(self.__send_request_to_coinbase(session, self.__PRODUCTS_URL, 0)) 

129 data = [[product['id'], product['base_currency'], product['quote_currency']] for product in response[0]] 

130 df = pd.DataFrame(sorted(data), columns = ['id', 'base_currency', 'quote_currency']) 

131 df.set_index('id', inplace = True) 

132 return df