Newer
Older
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import os
import json
import requests
import urllib3
from datetime import datetime, timedelta
LOCAL_CLOUDITOR_DEPLOY = os.environ.get("local_clouditor_deploy").lower() in ('true', '1', 't')
CLOUDITOR_OAUTH2_HOST = os.environ.get("clouditor_oauth2_host")
CLOUDITOR_OAUTH2_PORT = int(os.environ.get("clouditor_oauth2_port"))
CLIENT_ID = os.environ.get("clouditor_client_id")
CLIENT_SECRET = os.environ.get("clouditor_client_secret")
class ClouditorAuthentication(object):
def __init__(self, logger):
self.logger = logger
self.__access_token = None
self.__token_expiration_time = None
if LOCAL_CLOUDITOR_DEPLOY:
#If Clouditor is deployed localy, in a VM
self.__token_url = 'http://{}:{}/v1/auth/token'.format(CLOUDITOR_OAUTH2_HOST, CLOUDITOR_OAUTH2_PORT)
self.__data = {'grant_type': 'client_credentials'}
else:
# For K8s/other remote deployed Clouditor
split_host_url = CLOUDITOR_OAUTH2_HOST.split("/", 1)
if split_host_url[1] is not None:
self.__token_url = 'https://{}:{}/{}'.format(split_host_url[0], CLOUDITOR_OAUTH2_PORT, split_host_url[1])
else:
self.__token_url = 'https://{}:{}'.format(split_host_url[0], CLOUDITOR_OAUTH2_PORT)
CLOUDITOR_OAUTH2_SCOPE = os.environ.get("clouditor_oauth2_scope")
self.__data = {'grant_type': 'client_credentials', 'scope': CLOUDITOR_OAUTH2_SCOPE}
self.request_token()
def request_token(self):
try:
access_token_response = requests.post(self.__token_url, data=self.__data, verify=False, allow_redirects=False, auth=(CLIENT_ID, CLIENT_SECRET))
token = json.loads(access_token_response.text)
self.__access_token = token['access_token']
self.__token_expiration_time = datetime.utcnow() + timedelta(seconds=(token['expires_in'] - 10))
self.logger.info("New OAuth2 token successfully acquired")
self.logger.debug("OAuth2 token expiring at: " + str(self.__token_expiration_time))
except (TimeoutError, urllib3.exceptions.NewConnectionError, OSError,
urllib3.exceptions.MaxRetryError, requests.exceptions.ConnectionError):
self.logger.exception("Acquiring Clouditor OAuth2 token failed")
self.__access_token = None
self.__token_expiration_time = None
except ValueError:
self.logger.exception("Invalid Clouditor OAuth2 token format")
self.__access_token = None
self.__token_expiration_time = None
except Exception:
self.logger.exception("Unknown exception occured while acquiring Clouditor OAuth2 token")
self.__access_token = None
self.__token_expiration_time = None
def get_token(self):
# In practice this condition isn't even needed as every scheduled job creates new ClouditorAuthentication object and acquires new token.
if (self.__token_expiration_time != None and datetime.utcnow() > self.__token_expiration_time):
self.logger.debug("OAuth2 token expired")
self.request_token()
return self.__access_token