import os import json import requests import urllib3 from datetime import datetime, timedelta CLOUDITOR_OAUTH2_HOST = os.environ.get("clouditor_oauth2_host") CLOUDITOR_OAUTH2_PORT = int(os.environ.get("clouditor_oauth2_port")) CLIENT_ID = os.environ.get("clouditor_client_id") CLIENT_SECRET = os.environ.get("clouditor_client_secret") class ClouditorAuthentication(object): def __init__(self, logger): self.logger = logger self.__access_token = None self.__token_expiration_time = None self.__token_url = 'http://{}:{}/v1/auth/token'.format(CLOUDITOR_OAUTH2_HOST, CLOUDITOR_OAUTH2_PORT) self.__data = {'grant_type': 'client_credentials'} self.request_token() def request_token(self): try: access_token_response = requests.post(self.__token_url, data=self.__data, verify=False, allow_redirects=False, auth=(CLIENT_ID, CLIENT_SECRET)) except (TimeoutError, urllib3.exceptions.NewConnectionError, urllib3.exceptions.MaxRetryError, requests.exceptions.ConnectionError) as err: self.logger.error(err) self.logger.error("Clouditor OAuth2 token endpoint not available") self.__access_token = None self.__token_expiration_time = None else: token = json.loads(access_token_response.text) self.__access_token = token['access_token'] self.__token_expiration_time = datetime.utcnow() + timedelta(seconds=(token['expires_in'] - 10)) self.logger.info("New OAuth2 token successfully acquired") self.logger.debug("OAuth2 token expiring at: " + str(self.__token_expiration_time)) def get_token(self): # In practice this condition isn't even needed as every scheduled job creates new ClouditorAuthentication object and acquires new token. if (self.__token_expiration_time != None and datetime.utcnow() > self.__token_expiration_time): self.logger.debug("OAuth2 token expired.") self.request_token() return self.__access_token