Coverage for source/coinbase/coinbase_handler.py: 87%

55 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-30 15:13 +0000

1# coinbase/coibase_handler.py 

2 

3import aiohttp 

4import asyncio 

5from datetime import datetime 

6import pytz 

7import math 

8import pandas as pd 

9from ..utils import Granularity 

10 

11MAX_NUMBER_OF_CANDLES_PER_REQUEST = 300 

12PRODUCTS_URL = 'https://api.exchange.coinbase.com/products' 

13 

14class CoinBaseHandler: 

15 """ 

16 Responsible for handling request towards Coinbase API. 

17 """ 

18 

19 def __convert_date_to_timestamp(self, date_str: str, date_format: str = "%Y-%m-%d %H:%M:%S", target_timezone = pytz.UTC) -> int: 

20 """ 

21 Converts date given by string into integer timestamp. 

22 

23 Parameters: 

24 date_str (str): String representing certain date. 

25 date_format (str): String representing format that certain date is written in. 

26 target_timezone (Any): Timezone given by any type of value compatible with datetime library. 

27 

28 Raises: 

29 ValueError: In case of invalid date conversion. 

30 

31 Returns: 

32 (int): Timestamp converted from input date. 

33 """ 

34 

35 try: 

36 result = int(datetime.strptime(date_str, date_format).replace(tzinfo=target_timezone).timestamp()) 

37 except: 

38 raise ValueError(f'Invalid data format! Expected was {date_format}.') 

39 

40 return result 

41 

42 async def __send_request_to_coinbase(self, session: aiohttp.ClientSession, url: str, pid: int) -> list: 

43 """ 

44 Sends request towards Coinbase API and handles exceedance of public rates by repeating 

45 request after certain time. 

46 

47 Parameters: 

48 session (aiohttp.ClientSession): Session used to send request with. 

49 url (str): URL address that certain request is sent towards. 

50 pid (int): Request indentification number. 

51  

52 Raises: 

53 RuntimeError: If public rates are exceeded. Will try to handle that and reattempt  

54 to sent request. 

55 

56 Returns: 

57 (list): List of values returned by Coinbase API for certain request. 

58 """ 

59 

60 try: 

61 async with session.get(url) as response: 

62 data = await response.json() 

63 if 'message' in data and data['message'] == 'Public rate limit exceeded': 

64 raise RuntimeError("Exceeded public rate! Retrying in 5s...") 

65 return data 

66 except: 

67 await asyncio.sleep(5) 

68 return await self.__send_request_to_coinbase(session, url, pid) 

69 

70 async def get_candles_for(self, trading_pair: str, start_date: str, end_date: str, granularity: Granularity) -> pd.DataFrame: 

71 """ 

72 Collects data from Coinbase API given starting date, ending date, granularity and trainding pair. Dependent on amount of 

73 data segments to fetch, might take some time. Especially, if request exceeds public rates. 

74 

75 Parameters: 

76 trading_pair (str): String representing unique trainding pair symbol. 

77 start_date (str): String representing date that collected data should start from. 

78 end_date (str): String representing date that collected data should finish at. 

79 granularity (Granularity): Enum specifying resolution of collected data - e.g. each  

80 15 minutes or 1 hour or 6 hours is treated separately 

81  

82 Raises: 

83 ValueError: If given granularity is not member if Granularity enum. 

84 

85 Returns: 

86 (pd.DataFrame): Collected data frame. 

87 """ 

88 

89 if granularity not in Granularity: 

90 raise ValueError(f"{granularity} is not an value of Granularity enum!") 

91 

92 start_timestamp = self.__convert_date_to_timestamp(start_date) 

93 end_timestamp = self.__convert_date_to_timestamp(end_date) 

94 granularity_seconds = granularity.value 

95 total_periods = (end_timestamp - start_timestamp) // granularity_seconds 

96 requests_needed = math.ceil(total_periods / MAX_NUMBER_OF_CANDLES_PER_REQUEST) 

97 

98 async with aiohttp.ClientSession() as session: 

99 tasks = [] 

100 for i in range(requests_needed): 

101 start_period = start_timestamp + i * MAX_NUMBER_OF_CANDLES_PER_REQUEST * granularity_seconds 

102 end_period = min(start_period + MAX_NUMBER_OF_CANDLES_PER_REQUEST * granularity_seconds, end_timestamp) 

103 url = f'{PRODUCTS_URL}/{trading_pair}/candles?start={start_period}&end={end_period}&granularity={granularity_seconds}' 

104 tasks.append(self.__send_request_to_coinbase(session, url, i)) 

105 

106 responses = await asyncio.gather(*tasks) 

107 candles = [item for sublist in responses if sublist for item in sublist] 

108 df = pd.DataFrame(candles, columns=['time', 'low', 'high', 'open', 'close', 'volume']) 

109 df['time'] = pd.to_datetime(df['time'], unit='s') 

110 df.set_index('time', inplace=True) 

111 df.sort_values(by='time', inplace=True) 

112 return df 

113 

114 async def get_possible_pairs(self) -> pd.DataFrame: 

115 """ 

116 Collects data from Coinbase API reagrding all possible traiding pairs. 

117 

118 Returns: 

119 (pd.DataFrame): Fetched possible traiding pairs inside data frame. 

120 """ 

121 

122 async with aiohttp.ClientSession() as session: 

123 response = await asyncio.gather(self.__send_request_to_coinbase(session, PRODUCTS_URL, 0)) 

124 data = [[product['id'], product['base_currency'], product['quote_currency']] for product in response[0]] 

125 df = pd.DataFrame(sorted(data), columns=['id', 'base_currency', 'quote_currency']) 

126 df.set_index('id', inplace=True) 

127 return df