Change Streams

Change Streams is a cloud service feature for consuming table changes in commit order. Applications create a consumer group, poll messages, process the records in those messages, and commit progress. Detailed interface descriptions are in API Reference.

Change Streams is available through the borneo.changestream submodule. The classes are not exported from the top-level borneo namespace.

Enable or Disable Change Streams

Change Streams must be enabled for a table before a consumer can read changes from that table. The convenience method borneo.NoSQLHandle.enable_change_streaming() performs the table operation and waits for it to complete.

result = handle.enable_change_streaming(
    'users',
    compartment='mycompartment',
    enabled=True,
    timeout_ms=60000,
    poll_interval_ms=1000)

To disable Change Streams for the same table:

handle.enable_change_streaming(
    'users',
    compartment='mycompartment',
    enabled=False,
    timeout_ms=60000,
    poll_interval_ms=1000)

Advanced callers can use borneo.TableRequest directly:

from borneo import TableRequest

request = TableRequest().set_table_name(
    'users').set_change_streaming_enabled(True)
request.set_compartment('mycompartment')
result = handle.table_request(request)
result.wait_for_completion(handle, 60000, 1000)

Create a Consumer

Use borneo.changestream.ConsumerBuilder to create a consumer. The builder resolves table names to table OCIDs using the configured handle. A table OCID can also be supplied directly.

from borneo.changestream import ConsumerBuilder, StartLocation

consumer = ConsumerBuilder().set_handle(handle).set_group_id(
    'users-group').set_compartment('groupcompartment').add_table(
    'users',
    compartment='tablecompartment',
    start_location=StartLocation.first_uncommitted()).build()

If no start location is supplied for a table, the default is borneo.changestream.StartLocation.first_uncommitted().

The builder compartment is the compartment used for the consumer group. Table names are resolved using the compartment supplied to borneo.changestream.ConsumerBuilder.add_table() or borneo.changestream.ConsumerBuilder.remove_table(). If no table compartment is supplied, the configured default compartment is used for that table lookup. Different tables in the same consumer group can use different table compartments.

Start Locations

Start locations control where the consumer starts reading when a table is added to a group.

consumer = ConsumerBuilder().set_handle(handle).set_group_id(
    'users-group').add_table(
    'users',
    compartment='mycompartment',
    start_location=StartLocation.at_time(start_time_ms)).build()

Poll and Process Events

Call borneo.changestream.Consumer.poll() to fetch a borneo.changestream.MessageBundle. A bundle contains messages, each message contains events, and each event contains records.

bundle = consumer.poll(limit=100, wait_ms=5000)

for message in bundle.get_messages() or []:
    table_name = message.get_table_name()
    for event in message.get_events() or []:
        for record in event.get_records():
            key = record.get_record_key()
            current = record.get_current_image()
            before = record.get_before_image()
            value = None if current is None else current.get_value()
            previous = None if before is None else before.get_value()
            # Process the table name, key, current value, and previous value.

The value, key, and metadata objects use the normal Python SDK value representation, such as dict, list, scalar values, bytes or bytearray, and Decimal.

Commit Processed Records

Consumer groups can use automatic or manual commit mode. Automatic commit mode is the default. Manual commit mode requires the application to commit after it has successfully processed messages.

consumer = ConsumerBuilder().set_handle(handle).set_group_id(
    'users-group').set_commit_manual().add_table('users').build()

bundle = consumer.poll(limit=100, wait_ms=5000)
if not bundle.is_empty():
    # Process all records first.
    bundle.commit(timeout_ms=30000)

The method borneo.changestream.MessageBundle.commit() commits the cursor associated with that bundle. borneo.changestream.Consumer.commit() commits the consumer’s latest cursor.

Multiple Consumers

Multiple consumers can use the same group ID to share work. Multiple groups can read the same table independently. A consumer group can also include multiple tables.

consumer = ConsumerBuilder().set_handle(handle).set_group_id(
    'orders-group').add_table('orders').add_table('order_items').build()

Tables can be added to or removed from an active group:

consumer.add_table('order_events', compartment='orderscompartment')
consumer.remove_table('order_items', compartment='orderscompartment')

Close, Reset, and Delete

Close a consumer when it is no longer needed:

consumer.close()

Resetting a consumer can cause messages to be delivered again, depending on the group state and start locations:

consumer.reset()

Deleting a group removes the group state. Use force_stop=True only when the group must be deleted while consumers may still be active.

from borneo.changestream import Consumer

Consumer.delete_group(
    handle,
    'users-group',
    compartment='mycompartment',
    force_stop=True)