Coverage for source/utils/aws_handler.py: 78%

50 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-09-29 20:04 +0000

1# utils/aws_handler.py 

2 

3# global imports 

4import botocore.exceptions 

5import boto3 

6import functools 

7import io 

8import os 

9from typing import Any 

10 

11# local imports 

12from source.utils import SingletonMeta 

13 

14class AWSHandler(metaclass = SingletonMeta): 

15 """ 

16 Responsible for handling communication with Amazon AWS services. 

17 """ 

18 

19 # local constants 

20 __DEFAULT_REGION = "eu-central-1" 

21 

22 def __renew_s3_client_session(self) -> Any: 

23 """ 

24 Assumes a role in AWS and renews S3 client session. Functionality is 

25 put into a separate method to allow for easier session renewal. The 

26 credentials are expired after 1 hour by default. 

27 """ 

28 

29 credentials = self.__credential_session.client('sts'). \ 

30 assume_role(RoleArn = self.__role_arn, 

31 RoleSessionName = 'S3_bucket_user_session')['Credentials'] 

32 

33 return boto3.client('s3', aws_access_key_id=credentials['AccessKeyId'], 

34 aws_secret_access_key=credentials['SecretAccessKey'], 

35 aws_session_token=credentials['SessionToken'], 

36 region_name=self.__region_name) 

37 

38 def __init__(self, region_name: str = __DEFAULT_REGION) -> None: 

39 """ 

40 Class constructor. Before calling it AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY 

41 and ACCOUNT_ID should be available as environmental variables. 

42 

43 Parameters: 

44 region_name (str): Region name to connect to. 

45 

46 Raises: 

47 RuntimeError: If AWS credentials or account ID are not defined. 

48 """ 

49 

50 AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') 

51 AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') 

52 ACCOUNT_ID = os.getenv('ACCOUNT_ID') 

53 ROLE_NAME = os.getenv('ROLE_NAME') 

54 if not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY or not ACCOUNT_ID \ 

55 or not ROLE_NAME: 

56 raise RuntimeError("AWS credentials or account ID not found in environment variables!") 

57 

58 self.__credential_session = boto3.Session(aws_access_key_id = AWS_ACCESS_KEY_ID, 

59 aws_secret_access_key = AWS_SECRET_ACCESS_KEY) 

60 self.__role_arn = f'arn:aws:iam::{ACCOUNT_ID}:role/{ROLE_NAME}' 

61 self.__region_name = region_name 

62 self.__s3_session = self.__renew_s3_client_session() 

63 

64 def with_session_renewal(method): 

65 """ 

66 Decorator to handle session renewal on ExpiredToken error. 

67 """ 

68 @functools.wraps(method) 

69 def wrapper(self, *args, **kwargs): 

70 try: 

71 return method(self, *args, **kwargs) 

72 except (botocore.exceptions.BotoCoreError, 

73 boto3.exceptions.Boto3Error) as e: 

74 self.__s3_session = self.__renew_s3_client_session() 

75 try: 

76 return method(self, *args, **kwargs) 

77 except Exception as e2: 

78 raise RuntimeError(f"Operation failed after session renewal! Original error: {e2}") 

79 except Exception as e: 

80 raise RuntimeError(f"Did not manage to perform operation for unknown reason! Original error: {e}") 

81 return wrapper 

82 

83 @with_session_renewal 

84 def upload_file_to_s3(self, bucket_name: str, file_path: str, desired_name: str = "") -> None: 

85 """ 

86 Attempts to upload local file specified by path to S3 Amazon bucket. 

87 

88 Parameters: 

89 bucket_name (str): String denoting bucket name. 

90 file_path (str): String representing file to the path that should be uploaded. 

91 desired_name (str): Desired name to be given to the file after being uploaded. 

92 If left unspecified, name does not change. 

93 

94 Raises: 

95 RuntimeError: If approached problem during file uploading. 

96 """ 

97 

98 if desired_name == "": 

99 desired_name = file_path.split('/')[-1] 

100 

101 self.__s3_session.upload_file(file_path, bucket_name, desired_name) 

102 

103 @with_session_renewal 

104 def upload_buffer_to_s3(self, bucket_name: str, buffer: io.StringIO, desired_name: str) -> None: 

105 """ 

106 Attempts to upload buffer as file body directly to S3 Amazon bucket. 

107 

108 Parameters: 

109 bucket_name (str): String denoting bucket name. 

110 buffer (io.StringIO): Buffer containing data that should be directly 

111 written to bucket. 

112 desired_name (str): Desired name to be given to the file after being uploaded. 

113 

114 Raises: 

115 RuntimeError: If approached problem during file uploading. 

116 """ 

117 

118 self.__s3_session.put_object(Bucket = bucket_name, Key = desired_name, Body = buffer.getvalue()) 

119 

120 @with_session_renewal 

121 def download_file_from_s3(self, bucket_name: str, file_name: str, desired_path: str = "") -> None: 

122 """ 

123 Downloads a file from an S3 bucket to a local path. 

124 

125 Parameters: 

126 bucket_name (str): The name of the S3 bucket. 

127 file_name (str): The key/path of the file in the S3 bucket. 

128 desired_path (str, optional): The local path where the file will be saved. 

129 If not provided, the file will be downloaded to the current working directory 

130 with the original filename. 

131 

132 Raises: 

133 RuntimeError: If the download operation fails. 

134 """ 

135 

136 if desired_path == "": 

137 desired_path = os.getcwd() + '/' + file_name.split('/')[-1] 

138 

139 self.__s3_session.download_file(bucket_name, file_name, desired_path)