#
# Copyright (c) 2018, 2023 Oracle and/or its affiliates. All rights reserved.
#
# Licensed under the Universal Permissive License v 1.0 as shown at
# https://oss.oracle.com/licenses/upl/
#
from base64 import b64encode
from json import loads
from threading import Lock, Timer
from time import time
from traceback import format_exc
from requests import ConnectionError, Session, codes
try:
# noinspection PyCompatibility
from urlparse import urlparse
except ImportError:
# noinspection PyUnresolvedReferences,PyCompatibility
from urllib.parse import urlparse
import borneo.http
import borneo.config
import borneo.operations
from borneo.auth import AuthorizationProvider
from borneo.common import (
CheckValue, HttpConstants, LogUtils, SSLAdapter, synchronized)
from borneo.exception import (
IllegalArgumentException, InvalidAuthorizationException, NoSQLException)
[docs]class StoreAccessTokenProvider(AuthorizationProvider):
"""
On-premise only.
StoreAccessTokenProvider is an :py:class:`borneo.AuthorizationProvider` that
performs the following functions:
Initial (bootstrap) login to store, using credentials provided.\n
Storage of bootstrap login token for re-use.\n
Optionally renews the login token before it expires.\n
Logs out of the store when closed.
If accessing an insecure instance of Oracle NoSQL Database the default
constructor is used, with no arguments.
If accessing a secure instance of Oracle NoSQL Database a user name and
password must be provided. That user must already exist in the NoSQL
Database and have sufficient permission to perform table operations. That
user's identity is used to authorize all database operations.
To access to a store without security enabled, no parameter need to be set
to the constructor.
To access to a secure store, the constructor requires a valid user name and
password to access the target store. The user must exist and have sufficient
permission to perform table operations required by the application. The user
identity is used to authorize all operations performed by the application.
:param user_name: the user name to use for the store. This user must exist
in the NoSQL Database and is the identity that is used for authorizing
all database operations.
:type user_name: str
:param password: the password for the user.
:type password: str
:raises IllegalArgumentException: raises the exception if one or more of the
parameters is malformed or a required parameter is missing.
"""
# Used when we send user:password pair.
_BASIC_PREFIX = 'Basic '
# The general prefix for the login token.
_BEARER_PREFIX = 'Bearer '
# Login service end point name.
_LOGIN_SERVICE = '/login'
# Login token renew service end point name.
_RENEW_SERVICE = '/renew'
# Logout service end point name.
_LOGOUT_SERVICE = '/logout'
# Default timeout when sending http request to server
_HTTP_TIMEOUT_MS = 30000
[docs] def __init__(self, user_name=None, password=None):
"""
Creates a StoreAccessTokenProvider
:param user_name: the user name to use for the store. This user must
exist in the NoSQL Database and is the identity that is used for
authorizing all database operations.
:type user_name: str
:param password: the password for the user.
:type password: str
:raises IllegalArgumentException: raises the exception if one or more
of the parameters is malformed or a required parameter is missing.
"""
self._endpoint = None
self._url = None
self._auth_string = None
self._auto_renew = True
self._is_closed = False
# The base path for security related services.
self._base_path = HttpConstants.KV_SECURITY_PATH
# The login token expiration time.
self._expiration_time = 0
self._logger = None
self._logutils = LogUtils(self._logger)
self._sess = Session()
self._request_utils = borneo.http.RequestUtils(
self._sess, self._logutils)
self._lock = Lock()
self._timer = None
self.lock = Lock()
if user_name is None and password is None:
# Used to access to a store without security enabled.
self._is_secure = False
else:
if user_name is None or password is None:
raise IllegalArgumentException('Invalid input arguments.')
CheckValue.check_str(user_name, 'user_name')
CheckValue.check_str(password, 'password')
self._is_secure = True
self._user_name = user_name
self._password = password
@synchronized
def bootstrap_login(self):
# Bootstrap login using the provided credentials.
if not self._is_secure or self._is_closed:
return
# Convert the username:password pair in base 64 format.
pair = self._user_name + ':' + self._password
try:
encoded_pair = b64encode(pair)
except TypeError:
encoded_pair = b64encode(pair.encode()).decode()
try:
# Send request to server for login token.
response = self._send_request(
StoreAccessTokenProvider._BASIC_PREFIX + encoded_pair,
StoreAccessTokenProvider._LOGIN_SERVICE)
content = response.get_content()
# Login fail
if response.get_status_code() != codes.ok:
raise InvalidAuthorizationException(
'Fail to login to service: ' + content)
if self._is_closed:
return
# Generate the authentication string using login token.
self._auth_string = (StoreAccessTokenProvider._BEARER_PREFIX +
self._parse_json_result(content))
# Schedule login token renew thread.
self._schedule_refresh()
except (ConnectionError, InvalidAuthorizationException) as e:
self._logutils.log_debug(format_exc())
raise e
except Exception as e:
self._logutils.log_debug(format_exc())
raise NoSQLException('Bootstrap login fail.', e)
[docs] @synchronized
def close(self):
"""
Close the provider, releasing resources such as a stored login token.
"""
# Don't do anything for non-secure case.
if not self._is_secure or self._is_closed:
return
# Send request for logout.
try:
response = self._send_request(
self._auth_string, StoreAccessTokenProvider._LOGOUT_SERVICE)
if response.get_status_code() != codes.ok:
self._logutils.log_error(
'Failed to logout user ' + self._user_name + ': ' +
response.get_content())
except Exception as e:
self._logutils.log_error(
'Failed to logout user ' + self._user_name + ': ' + str(e))
# Clean up.
self._is_closed = True
self._auth_string = None
self._expiration_time = 0
self._password = None
if self._timer is not None:
self._timer.cancel()
self._timer = None
if self._sess is not None:
self._sess.close()
def get_authorization_string(self, request=None):
if (request is not None and
not isinstance(request, borneo.operations.Request)):
raise IllegalArgumentException(
'get_authorization_string requires an instance of Request or ' +
'None as parameter.')
if not self._is_secure or self._is_closed:
return None
# If there is no cached auth string, re-authentication to retrieve the
# login token and generate the auth string.
if self._auth_string is None:
self.bootstrap_login()
return self._auth_string
[docs] def is_secure(self):
"""
Returns whether the provider is accessing a secured store.
:returns: True if accessing a secure store, otherwise False.
:rtype: bool
"""
return self._is_secure
[docs] def set_auto_renew(self, auto_renew):
"""
Sets the auto-renew state. If True, automatic renewal of the login token
is enabled.
:param auto_renew: set to True to enable auto-renew.
:type auto_renew: bool
:returns: self.
:raises IllegalArgumentException: raises the exception if auto_renew is
not True or False.
"""
CheckValue.check_boolean(auto_renew, 'auto_renew')
self._auto_renew = auto_renew
return self
[docs] def is_auto_renew(self):
"""
Returns whether the login token is to be automatically renewed.
:returns: True if auto-renew is set, otherwise False.
:rtype: bool
"""
return self._auto_renew
def set_endpoint(self, endpoint):
"""
Sets the endpoint of the on-prem proxy.
:param endpoint: the endpoint.
:type endpoint: str
:returns: self.
:raises IllegalArgumentException: raises the exception if endpoint is
not a string.
"""
CheckValue.check_str(endpoint, 'endpoint')
self._endpoint = endpoint
self._url = borneo.config.NoSQLHandleConfig.create_url(endpoint, '')
if self._is_secure and self._url.scheme.lower() != 'https':
raise IllegalArgumentException(
'StoreAccessTokenProvider requires use of https.')
return self
def get_endpoint(self):
"""
Returns the endpoint of the on-prem proxy.
:returns: the endpoint.
:rtype: str
"""
return self._endpoint
[docs] def set_logger(self, logger):
CheckValue.check_logger(logger, 'logger')
self._logger = logger
self._logutils = LogUtils(logger)
return self
[docs] def get_logger(self):
return self._logger
def set_ssl_context(self, ssl_ctx):
# Internal use only
adapter = SSLAdapter(ssl_ctx)
self._sess.mount(self._url.scheme + '://', adapter)
def set_url_for_test(self):
self._url = urlparse(self._url.geturl().replace('https', 'http'))
return self
def validate_auth_string(self, auth_string):
if self._is_secure and auth_string is None:
raise IllegalArgumentException(
'Secured StoreAccessProvider requires a non-none string.')
def _parse_json_result(self, json_result):
# Retrieve login token from JSON string.
result = loads(json_result)
# Extract expiration time from JSON result.
self._expiration_time = result['expireAt']
# Extract login token from JSON result.
return result['token']
def _refresh_task(self):
"""
This task sends a request to the server for login session extension.
Depending on the server policy, a new login token with new expiration
time may or may not be granted.
"""
if not self._is_secure or not self._auto_renew or self._is_closed:
return
try:
old_auth = self._auth_string
response = self._send_request(
old_auth, StoreAccessTokenProvider._RENEW_SERVICE)
token = self._parse_json_result(response.get_content())
if response.get_status_code() != codes.ok:
raise InvalidAuthorizationException(token)
if self._is_closed:
return
with self._lock:
if self._auth_string == old_auth:
self._auth_string = (
StoreAccessTokenProvider._BEARER_PREFIX + token)
self._schedule_refresh()
except Exception as e:
self._logutils.log_error('Failed to renew login token: ' + str(e))
if self._timer is not None:
self._timer.cancel()
self._timer = None
def _schedule_refresh(self):
# Schedule a login token renew when half of the token life time is
# reached.
if not self._is_secure or not self._auto_renew:
return
# Clean up any existing timer
if self._timer is not None:
self._timer.cancel()
self._timer = None
acquire_time = int(round(time() * 1000))
if self._expiration_time <= 0:
return
# If it is 10 seconds before expiration, don't do further renew to avoid
# to many renew request in the last few seconds.
if self._expiration_time > acquire_time + 10000:
renew_time = (
acquire_time + (self._expiration_time - acquire_time) // 2)
self._timer = Timer(
float(renew_time - acquire_time) / 1000, self._refresh_task)
self._timer.start()
def _send_request(self, auth_header, service_name):
# Send HTTPS request to login/renew/logout service location with proper
# authentication information.
headers = {'Host': self._url.hostname, 'Authorization': auth_header}
return self._request_utils.do_get_request(
self._url.geturl() + self._base_path + service_name, headers,
StoreAccessTokenProvider._HTTP_TIMEOUT_MS)