#
# Copyright (c) 2018, 2026 Oracle and/or its affiliates. All rights reserved.
#
# Licensed under the Universal Permissive License v 1.0 as shown at
# https://oss.oracle.com/licenses/upl/
#
from time import sleep, time
from ..common import CheckValue
from ..exception import (
IllegalArgumentException, IllegalStateException, NoSQLException,
OperationNotSupportedException)
from ..operations import (
ChangeStreamConsumerRequest, ChangeStreamPollRequest, GetTableRequest)
from .models import MessageBundle, StartLocation
_CHANGE_STREAM_NOT_SUPPORTED = 'Change Streams not supported by server'
_TABLE_OCID_PREFIX = 'ocid1.nosqltable.'
def _is_table_ocid(table_name):
return (CheckValue.is_str(table_name) and
table_name.lower().startswith(_TABLE_OCID_PREFIX))
def _raise_if_change_stream_not_supported(exc):
msg = str(exc)
lower_msg = msg.lower() if msg is not None else ''
if ('unknown opcode' in lower_msg or
'unknown operation' in lower_msg):
raise OperationNotSupportedException(_CHANGE_STREAM_NOT_SUPPORTED)
if (isinstance(exc, OperationNotSupportedException) and
'change stream' in lower_msg and
'serial version' not in lower_msg):
raise OperationNotSupportedException(_CHANGE_STREAM_NOT_SUPPORTED)
[docs]
class ConsumerBuilder(object):
"""
Builder used to create a Change Streams :py:class:`Consumer`.
"""
class TableConfig(object):
"""
Internal table configuration used by Change Streams requests.
"""
def __init__(self, table_name, compartment=None,
start_location=None, is_remove=False):
CheckValue.check_str(table_name, 'table_name')
CheckValue.check_str(compartment, 'compartment', True)
if (start_location is not None and
not isinstance(start_location, StartLocation)):
raise IllegalArgumentException(
'start_location must be an instance of StartLocation.')
CheckValue.check_boolean(is_remove, 'is_remove')
self._table_name = None
self._table_ocid = None
if _is_table_ocid(table_name):
self._table_ocid = table_name
else:
self._table_name = table_name
self._compartment = compartment
if start_location is None:
self._start_location = StartLocation.first_uncommitted()
else:
self._start_location = start_location
self._is_remove = is_remove
def get_table_name(self):
return self._table_name
def get_table_ocid(self):
return self._table_ocid
def get_compartment(self):
return self._compartment
def get_start_location(self):
return self._start_location
def is_remove(self):
return self._is_remove
def _set_table_ocid(self, table_ocid):
CheckValue.check_str(table_ocid, 'table_ocid')
self._table_ocid = table_ocid
return self
def __str__(self):
return ('TableConfig [table_name=' + str(self._table_name) +
', table_ocid=' + str(self._table_ocid) +
', compartment=' + str(self._compartment) +
', start_location=' + str(self._start_location) +
', is_remove=' + str(self._is_remove) + ']')
def __init__(self):
self._compartment = None
self._force_reset = False
self._group_id = None
self._handle = None
self._manual_commit = False
self._max_poll_interval_ms = None
self._tables = None
[docs]
def set_handle(self, handle):
"""
Sets the NoSQL handle used for all Change Streams operations.
"""
if handle is None:
raise IllegalArgumentException('handle must be not-none.')
self._handle = handle
return self
[docs]
def get_handle(self):
return self._handle
[docs]
def add_table(self, table_name, compartment=None, start_location=None):
"""
Adds a table to the Change Streams consumer configuration.
The table name may be a table OCID. The compartment is used to resolve
the table name to a table OCID. If compartment is not set, the
configured default compartment is used for that table lookup.
"""
if self._table_index(table_name, compartment) >= 0:
return self
table_config = self.TableConfig(
table_name, compartment, start_location)
if self._tables is None:
self._tables = list()
self._tables.append(table_config)
return self
[docs]
def remove_table(self, table_name, compartment=None):
"""
Adds a table-removal entry to the Change Streams consumer config.
The table name may be a table OCID. The compartment is used to resolve
the table name to a table OCID. If compartment is not set, the
configured default compartment is used for that table lookup.
"""
table_config = self.TableConfig(
table_name, compartment, None, True)
if self._tables is None:
self._tables = list()
self._tables.append(table_config)
return self
[docs]
def set_group_id(self, group_id):
"""
Sets the Change Streams consumer group ID.
"""
CheckValue.check_str(group_id, 'group_id')
self._group_id = group_id
return self
[docs]
def get_group_id(self):
return self._group_id
[docs]
def set_compartment(self, compartment):
"""
Sets the compartment used for the Change Streams consumer group.
Tables in the group may be in different compartments, as specified by
add_table() and remove_table().
"""
CheckValue.check_str(compartment, 'compartment', True)
self._compartment = compartment
return self
[docs]
def get_compartment(self):
return self._compartment
[docs]
def set_commit_automatic(self):
"""
Sets automatic commit mode for the consumer.
"""
self._manual_commit = False
return self
[docs]
def set_commit_manual(self):
"""
Sets manual commit mode for the consumer.
"""
self._manual_commit = True
return self
[docs]
def is_manual_commit(self):
return self._manual_commit
[docs]
def set_max_poll_interval(self, max_poll_interval_ms):
"""
Sets the maximum interval between consumer poll calls, in milliseconds.
"""
CheckValue.check_int_gt_zero(
max_poll_interval_ms, 'max_poll_interval_ms')
self._max_poll_interval_ms = max_poll_interval_ms
return self
[docs]
def get_max_poll_interval(self):
return self._max_poll_interval_ms
[docs]
def set_force_reset_start_location(self):
"""
Forces existing consumer-group start locations to be reset.
"""
self._force_reset = True
return self
[docs]
def get_force_reset(self):
return self._force_reset
[docs]
def get_tables(self):
return self._tables
[docs]
def get_num_tables(self):
if self._tables is None:
return 0
return len(self._tables)
[docs]
def validate(self):
"""
Validates this builder and resolves table names to table OCIDs.
"""
if self._handle is None:
raise IllegalArgumentException(
'Consumer builder missing NoSQLHandle.')
if self._tables is None or len(self._tables) == 0:
raise IllegalArgumentException(
'Consumer builder missing tables information.')
for table_config in self._tables:
self.validate_table_config(table_config, self._handle)
return self
[docs]
@staticmethod
def validate_table_config(table_config, handle):
"""
Validates one table config and resolves its table OCID when needed.
"""
if not isinstance(table_config, ConsumerBuilder.TableConfig):
raise IllegalArgumentException(
'table_config must be an instance of TableConfig.')
if table_config.get_table_ocid() is not None:
return
table_name = table_config.get_table_name()
if table_name is None or len(table_name) == 0:
raise IllegalArgumentException(
'missing table name in consumer configuration.')
if handle is None or not callable(getattr(handle, 'get_table', None)):
raise IllegalArgumentException(
'NoSQLHandle is required to resolve table OCIDs.')
req = GetTableRequest().set_table_name(table_name)
if table_config.get_compartment() is not None:
req.set_compartment(table_config.get_compartment())
try:
res = handle.get_table(req)
table_ocid = res.get_table_id()
if table_ocid is None:
raise IllegalArgumentException(
'table OCID is not available.')
table_config._set_table_ocid(table_ocid)
except Exception as exc:
if isinstance(exc, IllegalArgumentException):
detail = str(exc)
else:
detail = exc.__class__.__name__ + ': ' + str(exc)
raise IllegalArgumentException(
"Can't get table '" + table_name + "' information: " +
detail)
[docs]
def build(self):
"""
Creates a Change Streams consumer using this builder configuration.
"""
return Consumer(self)
def _table_index(self, table_name, compartment):
if self._tables is None:
return -1
CheckValue.check_str(table_name, 'table_name')
CheckValue.check_str(compartment, 'compartment', True)
candidate = table_name.lower()
for index, table_config in enumerate(self._tables):
if self._matches_table(table_config, candidate):
if self._same_compartment(
table_config.get_compartment(), compartment):
return index
return -1
@staticmethod
def _matches_table(table_config, candidate):
table_name = table_config.get_table_name()
table_ocid = table_config.get_table_ocid()
if table_name is not None and table_name.lower() == candidate:
return True
return table_ocid is not None and table_ocid.lower() == candidate
@staticmethod
def _same_compartment(left, right):
if left is None:
return right is None
return right is not None and left.lower() == right.lower()
[docs]
class Consumer(object):
"""
Main object used to consume Change Streams messages.
Use :py:class:`ConsumerBuilder` to create instances of this class. The
:py:meth:`poll` method is not thread-safe.
"""
_POLL_INTERVAL_MS = 100
def __init__(self, builder):
if not isinstance(builder, ConsumerBuilder):
raise IllegalArgumentException(
'builder must be an instance of ConsumerBuilder.')
builder.validate()
self._builder = builder
self._closed = False
self._cursor = None
self._handle = builder.get_handle()
self._metadata = None
req = ChangeStreamConsumerRequest(
ChangeStreamConsumerRequest.RequestMode.CREATE).set_builder(
builder)
res = self._execute_request(req)
cursor = res.get_cursor()
if cursor is None:
raise NoSQLException(
'Server returned invalid consumer cursor.')
self._cursor = cursor
self._metadata = res.get_metadata()
[docs]
def poll(self, limit, wait_ms):
"""
Gets Change Streams messages for this consumer.
"""
self._check_open()
CheckValue.check_int_ge_zero(limit, 'limit')
CheckValue.check_int_ge_zero(wait_ms, 'wait_ms')
start_time_ms = int(round(time() * 1000))
while True:
bundle = self._poll_once(limit)
if not bundle.is_empty():
return bundle
now_ms = int(round(time() * 1000))
elapsed_ms = now_ms - start_time_ms
if elapsed_ms + self._POLL_INTERVAL_MS > wait_ms:
return bundle
sleep(float(self._POLL_INTERVAL_MS) / 1000.0)
[docs]
def commit(self, timeout_ms=None):
"""
Marks the messages from this consumer's latest poll as committed.
"""
self._check_open()
self._commit_internal(self._cursor, timeout_ms)
[docs]
def commit_bundle(self, bundle, timeout_ms=None):
"""
Marks the messages in the specified bundle as committed.
"""
self._check_open()
if not isinstance(bundle, MessageBundle):
raise IllegalArgumentException(
'bundle must be an instance of MessageBundle.')
cursor = bundle._get_cursor()
if cursor is None:
raise IllegalArgumentException(
'MessageBundle does not contain a cursor.')
self._commit_internal(cursor, timeout_ms)
[docs]
def close(self):
"""
Closes this consumer and releases server-side resources for it.
"""
if self._closed:
return
req = ChangeStreamConsumerRequest(
ChangeStreamConsumerRequest.RequestMode.CLOSE).set_cursor(
self._cursor)
res = self._execute_request(req)
if res.get_cursor() is not None:
raise NoSQLException('Consumer not closed on server side.')
self._cursor = None
self._closed = True
[docs]
def reset(self):
"""
Resets this consumer without committing poll results.
"""
self._check_open()
req = ChangeStreamConsumerRequest(
ChangeStreamConsumerRequest.RequestMode.RESET).set_cursor(
self._cursor)
res = self._execute_request(req)
cursor = res.get_cursor()
if cursor is None:
raise NoSQLException('Consumer not reset on server side.')
self._cursor = cursor
[docs]
def add_table(self, table_name, compartment=None, start_location=None):
"""
Adds a table to the current consumer group.
The table name may be a table OCID. The compartment is used to resolve
the table name to a table OCID. If compartment is not set, the
configured default compartment is used for that table lookup.
"""
self._check_open()
builder = ConsumerBuilder().set_handle(self._handle).add_table(
table_name, compartment, start_location)
builder.validate()
self._update_tables(builder)
[docs]
def remove_table(self, table_name, compartment=None):
"""
Removes a table from the current consumer group.
The table name may be a table OCID. The compartment is used to resolve
the table name to a table OCID. If compartment is not set, the
configured default compartment is used for that table lookup.
"""
self._check_open()
builder = ConsumerBuilder().set_handle(self._handle).remove_table(
table_name, compartment)
builder.validate()
self._update_tables(builder)
[docs]
@staticmethod
def delete_group(handle, group_id, compartment=None, force_stop=False):
"""
Deletes a Change Streams consumer group.
"""
Consumer._check_handle(handle)
CheckValue.check_str(group_id, 'group_id')
CheckValue.check_str(compartment, 'compartment', True)
CheckValue.check_boolean(force_stop, 'force_stop')
builder = ConsumerBuilder().set_group_id(group_id).set_compartment(
compartment)
if force_stop:
builder.set_force_reset_start_location()
req = ChangeStreamConsumerRequest(
ChangeStreamConsumerRequest.RequestMode.DELETE).set_builder(
builder)
Consumer._execute_request_with_handle(handle, req)
def _poll_once(self, limit):
req = ChangeStreamPollRequest(self._cursor, limit)
res = self._execute_request(req)
bundle = res.get_bundle()
cursor = res.get_cursor()
if cursor is None:
if bundle is not None:
raise NoSQLException('Poll returned invalid cursor.')
bundle = MessageBundle(None)
else:
self._cursor = cursor
bundle._set_cursor(self._cursor)
bundle._set_consumer(self)
bundle._set_events_remaining(res.get_events_remaining())
return bundle
def _commit_internal(self, cursor, timeout_ms):
req = ChangeStreamConsumerRequest(
ChangeStreamConsumerRequest.RequestMode.COMMIT).set_cursor(
cursor)
self._set_request_timeout(req, timeout_ms)
res = self._execute_request(req)
cursor = res.get_cursor()
if cursor is None:
raise NoSQLException('Consumer not committed on server side.')
self._cursor = cursor
def _update_tables(self, builder):
req = ChangeStreamConsumerRequest(
ChangeStreamConsumerRequest.RequestMode.UPDATE).set_builder(
builder).set_cursor(self._cursor)
res = self._execute_request(req)
cursor = res.get_cursor()
if cursor is None:
raise NoSQLException(
'Server returned invalid consumer cursor.')
self._cursor = cursor
if res.get_metadata() is not None:
self._metadata = res.get_metadata()
def _execute_request(self, request):
return self._execute_request_with_handle(self._handle, request)
@staticmethod
def _execute_request_with_handle(handle, request):
Consumer._check_handle(handle)
try:
if callable(getattr(handle, '_execute', None)):
return handle._execute(request)
return handle.get_client().execute(request)
except Exception as exc:
_raise_if_change_stream_not_supported(exc)
raise
@staticmethod
def _set_request_timeout(request, timeout_ms):
if timeout_ms is not None:
request.set_timeout(timeout_ms)
@staticmethod
def _check_handle(handle):
if handle is None:
raise IllegalArgumentException('handle must be not-none.')
if callable(getattr(handle, '_execute', None)):
return
get_client = getattr(handle, 'get_client', None)
if callable(get_client):
client = get_client()
if client is not None and callable(getattr(client, 'execute',
None)):
return
raise IllegalArgumentException(
'handle must be an instance of NoSQLHandle.')
def _check_open(self):
if self._closed:
raise IllegalStateException('Consumer has been closed.')