Source code for borneo.iam.iam

#
# Copyright (c) 2018, 2023 Oracle and/or its affiliates. All rights reserved.
#
# Licensed under the Universal Permissive License v 1.0 as shown at
#  https://oss.oracle.com/licenses/upl/
#

OCI_PYTHON_SDK_NO_SERVICE_IMPORTS = True

from os import path
from threading import Lock, Timer
from time import sleep, time

from requests import Request

try:
    # noinspection PyUnresolvedReferences
    from oci.signer import Signer
    # noinspection PyUnresolvedReferences
    from oci.auth.signers import SecurityTokenSigner
    # noinspection PyUnresolvedReferences
    from oci.auth.signers import EphemeralResourcePrincipalSigner
    # noinspection PyUnresolvedReferences
    from oci.auth.signers import InstancePrincipalsSecurityTokenSigner
    # noinspection PyUnresolvedReferences
    from oci.auth.signers import get_resource_principals_signer
    # noinspection PyUnresolvedReferences
    from oci.config import from_file

    oci = 'yes'
except ImportError:
    oci = None

from borneo.auth import AuthorizationProvider
from borneo.common import (
    CheckValue, HttpConstants, LogUtils, Memoize, synchronized)
from borneo.config import Region, Regions
from borneo.exception import IllegalArgumentException


[docs]class SignatureProvider(AuthorizationProvider): """ Cloud service only. An instance of :py:class:`borneo.AuthorizationProvider` that generates and caches signature for each request as authorization string. A number of pieces of information are required for configuration. See `Required Keys and OCIDs <https://docs.cloud.oracle.com/iaas/Content/API/Concepts/ apisigningkey.htm>`_ for information and instructions on how to create the required keys and OCIDs for configuration. The required information includes: * A signing key, used to sign requests. * A pass phrase for the key, if it is encrypted. * The fingerprint of the key pair used for signing. * The OCID of the tenancy. * The OCID of a user in the tenancy. All of this information is required to authenticate and authorize access to the service. See :ref:`creds-label` for information on how to acquire this information. There are three different ways to authorize an application: 1. Using a specific user's identity. 2. Using an Instance Principal, which can be done when running on a compute instance in the Oracle Cloud Infrastructure (OCI). See :py:meth:`create_with_instance_principal` and `Calling Services from Instances <https://docs.cloud.oracle.com/iaas/Content/Identity/Tasks/ callingservicesfrominstances.htm>`_. 3. Using a Resource Principal, which can be done when running within a Function within the Oracle Cloud Infrastructure (OCI). See :py:meth:`create_with_resource_principal` and `Accessing Other Oracle Cloud Infrastructure Resources from Running Functions <https://docs. cloud.oracle.com/en-us/iaas/Content/Functions/Tasks/ functionsaccessingociresources.htm>`_. The latter 2 limit the ability to use a compartment name vs OCID when naming compartments and tables in :py:class:`Request` classes and when naming tables in queries. A specific user identity is best for naming flexibility, allowing both compartment names and OCIDs. When using a specific user's identity there are 3 options for providing the required information: 1. Using a instance of oci.signer.Signer or oci.auth.signers.SecurityTokenSigner 2. Directly providing the credentials via parameters 3. Using a configuration file Only one method of providing credentials can be used, and if they are mixed the priority from high to low is: * Signer or SecurityTokenSigner(provider is used) * Credentials as arguments (tenant_id, etc used) * Configuration file (config_file is used) :param provider: an instance of oci.signer.Signer or oci.auth.signers.SecurityTokenSigner. :type provider: Signer or SecurityTokenSigner :param config_file: path of configuration file. :type config_file: str :param profile_name: user profile name. Only valid with config_file. :type profile_name: str :param tenant_id: id of the tenancy :type tenant_id: str :param user_id: id of a specific user :type user_id: str :param private_key: path to private key or private key content :type private_key: str :param fingerprint: fingerprint for the private key :type fingerprint: str :param pass_phrase: pass_phrase for the private key if created :type pass_phrase: str :param region: identifies the region will be accessed by the NoSQLHandle :type region: Region :param duration_seconds: the signature cache duration in seconds. :type duration_seconds: int :param refresh_ahead: the refresh time before signature cache expiry in seconds. :type refresh_ahead: int :raises IllegalArgumentException: raises the exception if the parameters are not valid. """ CACHE_KEY = 'signature' """Cache key name.""" # Use 240 so that it expires well before the 300s token lifetime MAX_ENTRY_LIFE_TIME = 240 """Maximum lifetime of signature 240 seconds.""" DEFAULT_REFRESH_AHEAD = 10 """Default refresh time before signature expiry, 10 seconds.""" def __init__(self, provider=None, config_file=None, profile_name=None, tenant_id=None, user_id=None, fingerprint=None, private_key=None, pass_phrase=None, region=None, duration_seconds=MAX_ENTRY_LIFE_TIME, refresh_ahead=DEFAULT_REFRESH_AHEAD): """ The SignatureProvider that generates and caches request signature. """ # # This class depends on the oci package # SignatureProvider._check_oci() CheckValue.check_int_gt_zero(duration_seconds, 'duration_seconds') CheckValue.check_int_gt_zero(refresh_ahead, 'refresh_ahead') if duration_seconds > SignatureProvider.MAX_ENTRY_LIFE_TIME: raise IllegalArgumentException( 'Access token cannot be cached longer than ' + str(SignatureProvider.MAX_ENTRY_LIFE_TIME) + ' seconds.') self._region = None if provider is not None: if not isinstance( provider, (Signer, SecurityTokenSigner)): raise IllegalArgumentException( 'provider should be an instance of oci.signer.Signer or ' + 'oci.auth.signers.SecurityTokenSigner.') self._provider = provider if region is not None: region_id = region else: try: region_id = provider.region except AttributeError: region_id = None if region_id is not None: self._region = Regions.from_region_id(region_id) elif (tenant_id is None or user_id is None or fingerprint is None or private_key is None): CheckValue.check_str(config_file, 'config_file', True) CheckValue.check_str(profile_name, 'profile_name', True) if config_file is None and profile_name is None: # Use default user profile and private key from default path of # configuration file ~/.oci/config. config = from_file() elif config_file is None and profile_name is not None: # Use user profile with given profile name and private key from # default path of configuration file ~/.oci/config. config = from_file(profile_name=profile_name) elif config_file is not None and profile_name is None: # Use user profile with default profile name and private key # from specified configuration file. config = from_file(file_location=config_file) else: # config_file is not None and profile_name is not None # Use user profile with given profile name and private key from # specified configuration file. config = from_file( file_location=config_file, profile_name=profile_name) self._provider = Signer( config['tenancy'], config['user'], config['fingerprint'], config['key_file'], config.get('pass_phrase'), config.get('key_content')) region_id = config.get('region') if region_id is not None: self._provider.region = region_id self._region = Regions.from_region_id(region_id) else: CheckValue.check_str(tenant_id, 'tenant_id') CheckValue.check_str(user_id, 'user_id') CheckValue.check_str(fingerprint, 'fingerprint') CheckValue.check_str(private_key, 'private_key') CheckValue.check_str(pass_phrase, 'pass_phrase', True) if path.isfile(private_key): key_file = private_key key_content = None else: key_file = None key_content = private_key self._provider = Signer( tenant_id, user_id, fingerprint, key_file, pass_phrase, key_content) if region is not None: if not isinstance(region, Region): raise IllegalArgumentException( 'region must be an instance of an instance of Region.') self._provider.region = region.get_region_id() self._region = region self._signature_cache = Memoize(duration_seconds) self._refresh_ahead = refresh_ahead self._refresh_interval_s = (duration_seconds - refresh_ahead if duration_seconds > refresh_ahead else 0) # Refresh timer. self._timer = None self._service_url = None self._logger = None self._logutils = LogUtils() self.lock = Lock()
[docs] def close(self): """ Closes the signature provider. """ if self._timer is not None: self._timer.cancel() self._timer = None
[docs] def get_authorization_string(self, request=None): if self._service_url is None: raise IllegalArgumentException( 'Unable to find service url, use set_service_url to load ' + 'from NoSQLHandleConfig') sig_details = self._get_signature_details() if sig_details is not None: return sig_details['authorization']
[docs] def get_logger(self): return self._logger
def get_region(self): # Internal use only. return self._region
[docs] def get_resource_principal_claim(self, key): """ Resource principal session tokens carry JWT claims. Permit the retrieval of the value from the token by given key. See :py:class:`borneo.ResourcePrincipalClaimKeys`. :param key: the name of a claim in the session token. :type key: str :returns: the claim value. :rtype: str """ if not isinstance(self._provider, EphemeralResourcePrincipalSigner): raise IllegalArgumentException( 'Only ephemeral resource principal support.') return self._provider.get_claim(key)
[docs] def set_logger(self, logger): CheckValue.check_logger(logger, 'logger') self._logger = logger self._logutils = LogUtils(logger) return self
def set_required_headers(self, request, auth_string, headers): sig_details = self._get_signature_details() if sig_details is None: return headers[HttpConstants.AUTHORIZATION] = sig_details['authorization'] headers[HttpConstants.DATE] = sig_details['date'] if sig_details.get(HttpConstants.OPC_OBO_TOKEN) is not None: headers[HttpConstants.OPC_OBO_TOKEN] = sig_details['opc-obo-token'] compartment = request.get_compartment() if compartment is None: # If request doesn't has compartment, set the tenant id as the # default compartment, which is the root compartment in IAM if using # user principal. If using an instance principal this value is # None. compartment = self._get_tenant_ocid() if compartment is not None: headers[HttpConstants.REQUEST_COMPARTMENT_ID] = compartment else: raise IllegalArgumentException( 'Compartment is None. When authenticating using an Instance ' + 'Principal the compartment for the operation must be specified.' ) def set_service_url(self, config): service_url = config.get_service_url() if service_url is None: raise IllegalArgumentException('Must set service URL first.') self._service_url = (service_url.scheme + '://' + service_url.hostname + '/' + HttpConstants.NOSQL_DATA_PATH) return self
[docs] @staticmethod def create_with_instance_principal(iam_auth_uri=None, region=None, logger=None): """ Creates a SignatureProvider using an instance principal. This method may be used when calling the Oracle NoSQL Database Cloud Service from an Oracle Cloud compute instance. It authenticates with the instance principal and uses a security token issued by IAM to do the actual request signing. When using an instance principal the compartment id (OCID) must be specified on each request or defaulted by using :py:meth:`borneo.NoSQLHandleConfig.set_default_compartment`. If the compartment is not specified for an operation an exception will be thrown. See `Calling Services from Instances <https://docs.cloud.oracle.com/ iaas/Content/Identity/Tasks/callingservicesfrominstances.htm>`_ :param iam_auth_uri: the URI is usually detected automatically, specify the URI if you need to overwrite the default, or encounter the *Invalid IAM URI* error, it is optional. :type iam_auth_uri: str :param region: identifies the region will be accessed by the NoSQLHandle, it is optional. :type region: Region :param logger: the logger used by the SignatureProvider, it is optional. :type logger: Logger :returns: a SignatureProvider. :rtype: SignatureProvider """ SignatureProvider._check_oci() if iam_auth_uri is None: provider = InstancePrincipalsSecurityTokenSigner() else: provider = InstancePrincipalsSecurityTokenSigner( federation_endpoint=iam_auth_uri) if region is not None: provider.region = region.get_region_id() signature_provider = SignatureProvider(provider) return (signature_provider if logger is None else signature_provider.set_logger(logger))
[docs] @staticmethod def create_with_resource_principal(logger=None): """ Creates a SignatureProvider using a resource principal. This method may be used when calling the Oracle NoSQL Database Cloud Service from other Oracle Cloud service resource such as Functions. It uses a resource provider session token (RPST) that enables the resource such as function to authenticate itself. When using an resource principal the compartment id (OCID) must be specified on each request or defaulted by using :py:meth:`borneo.NoSQLHandleConfig.set_default_compartment`. If the compartment id is not specified for an operation an exception will be thrown. See `Accessing Other Oracle Cloud Infrastructure Resources from Running Functions <https://docs.cloud.oracle.com/en-us/iaas/Content/Functions/ Tasks/functionsaccessingociresources.htm>`_. :param logger: the logger used by the SignatureProvider, it is optional. :type logger: Logger :returns: a SignatureProvider. :rtype: SignatureProvider """ SignatureProvider._check_oci() signature_provider = SignatureProvider( get_resource_principals_signer()) return (signature_provider if logger is None else signature_provider.set_logger(logger))
@synchronized def get_signature_details_internal(self): # Visible for testing. request = Request(method='post', url=self._service_url) request = self._provider.without_content_headers(request.prepare()) sig_details = request.headers self._signature_cache.set(SignatureProvider.CACHE_KEY, sig_details) self._schedule_refresh() return sig_details @staticmethod def _check_oci(): if oci is None: raise ImportError('Package "oci" is required; please install.') def _get_signature_details(self): sig_details = self._signature_cache.get(SignatureProvider.CACHE_KEY) if sig_details is not None: return sig_details return self.get_signature_details_internal() def _get_tenant_ocid(self): """ Get tenant OCID if using user principal. :returns: tenant OCID of user. :rtype: str """ if isinstance(self._provider, Signer): return self._provider.api_key.split('/')[0] def _refresh_task(self): timeout = self._refresh_ahead start_ms = int(round(time() * 1000)) error_logged = False while True: try: # refresh security token before create new signature if (isinstance( self._provider, InstancePrincipalsSecurityTokenSigner) or isinstance( self._provider, EphemeralResourcePrincipalSigner)): self._provider.refresh_security_token() self.get_signature_details_internal() return except Exception as e: # Ignore the refresh failure, then sleep and try again until # the timeout. Log the failure the first time only. If the # refresh failure continues until the task times out the # driver will attempt to generate a signature in the next # request. If that operation fails, it will be reported to # the user as an exception if not error_logged: self._logutils.log_error( 'Unable to refresh cached request signature, ' + str(e)) error_logged = True # check for timeout in the loop if int(round(time() * 1000)) - start_ms >= timeout: self._logutils.log_error( 'Request signature refresh timed out after ' + str(timeout)) break sleep(0.1) # if we get here the refresh failed and timed out. Cancel the timer. # It will get re-created when the next in-line call to get signature # details is called self._timer.cancel() self._timer = None def _schedule_refresh(self): # If refresh interval is 0, don't schedule a refresh. if self._refresh_interval_s == 0: return if self._timer is not None: self._timer.cancel() self._timer = None self._timer = Timer(self._refresh_interval_s, self._refresh_task) self._timer.start()