Synchronization by events

Description of the synchronization by events process

Introduction

Event-driven synchronization is a system that allows updating the configuration of any module in the components in real time.

This is done through events, which are emitted when a certain module modifies its configuration data, and allows the Aura components that are subscribed to the events to update their configuration in real time.

The synchronization by events operates as follows:


Event-driven synchronization


  1. A service updates a certain configuration element.
  2. The new updated data is stored in MongoDB.
  3. An Event is published by the Event Manager. The current system supports two Event Managers, Redis Pub/Sub and DAPR.
    When the event is issued, the services subscribed to the configuration events receive the Event.
  4. The changes are requested through the aura-configuration-api.

Event management can be activated or deactivated in both the event transmission and reception parts. The environment variable that has the aura-configuration-api and the components that support this system is called AURA_CONFIGURATION_EVENTS_ENABLED. The environment variable AURA_CONFIGURATION_EVENTS_MANAGER determines the engine used for event handling, possible values are “REDIS” or “DAPR”.

Using Redis as Event Manager

Publishers and subscribers

In order for a component to update the configuration data that has been modified by another service, two types of elements are required:

  • Publishers, which are the ones that notify of the change
  • Subscribers, which are the ones that will receive an event indicating that a change has occurred and where it has taken place.

All this is managed through the broadcasting of events in Redis through a specific channel using the Pub/Sub module.

Channel

The name of the channel has the following structure: [AURA_REDIS_DATABASE]__:[AURA_REDIS_CONFIGURATION_PREFIX][ConfigCollectionName]*

This pattern will be used by most of the current components. If any of them need to manage identifiers within the channel, such as Agents, they will be subscribed to the channel: [AURA_REDIS_DATABASE]__:[AURA_REDIS_CONFIGURATION_PREFIX][ConfigCollectionName]:[Id]

Being:

  • AURA_REDIS_DATABASE: Environment variable with the number of database in Redis.
  • AURA_REDIS_CONFIGURATION_PREFIX: Environment variable whose default value is aura-config.
  • ConfigCollectionName: One of the following values: ChannelConfiguration, ApplicationConfiguration, SkillConfiguration, TvSectionsConfiguration, PresetConfiguration, AgentConfiguration.

Example:
1__:aura-config:ChannelConfiguration

Event Message

When the event is sent, it has the following format:

{ 
    collection: [ConfigCollectionName]  
    op:         [OPERATION];
    id:         [IDENTIFIER];
    configData: [CONFIGURATION_DATA]; 
    metadata:   [EVENT_METADATA]                (Dapr)
    pubSubName: [DAPR_PUBSUB_NAME];             (Dapr)
    crc:        [INTEGRITY];                    (Not implemented)
}  

The different types of operations are (EventConfigOperation):

  • Create: When a record is created in the configuration. It will carry the identifier of the new record.
  • Update: When a record is modified in the configuration. It will carry the identifier of the updated record.
  • Delete: When a record is deleted in the configuration. It will carry the identifier of the deleted record.

The configuration data contains the updated or new configuration data. This field is not mandatory, and if the configuration data is empty the subscriptor refreshes its configuration calling directly the configuration API.

Examples:

Without configuration data

{
    "op":Create",
    "id":"88834434"
}
With configuration data
{
    "op":Create",
    "id":"88834434", 
    "configData": 
        {
            "name":"Expample1",
             "id":"88834434",
             .... 
         }
}

Publishers

The publishers are inside the aura-configuration-api.

When one of the configuration models writes to the MongoDB database, the event must be generated through Redis. This is done by instantiating an element of the RedisEventPublisher class. The creation of these update records in Redis has two requirements that must be met:

  1. The API must contain an endpoint to obtain all the data from the model named GetAll.
  2. The API must contain an endpoint to obtain an element of the model selected by its identifier named GetById.

To write the records in Redis that will generate the events, we will use the DAO of the model, specifically the method writeConfigurationEvent that is in charge of generating the registry for the event in Redis.

    // aura-configuration-api/src/common/dao.ts
    /**
     * Write configuration event.
     * @param {EventConfigOperation} operation Operation type
     * @param {string} id Id of the configuration
     * @param {T} data Data to write
     * @param {KeyValueType} metadata Metadata to write
     */
    protected async writeConfigurationEvent(operation: EventConfigOperation, id?: string, data?: T, metadata?: KeyValueType) {
        this.configurationEventPublish.op = operation;
        this.configurationEventPublish.id = id;
        this.configurationEventPublish.configData = data;
        this.configurationEventPublish.metadata = Object.assign({}, this.configurationEventPublish.metadata, metadata);

        await this.eventPublisher.publish(this.configurationEventPublish);
    }}

The eventPublisher can be one of Redis or DAPR. For Redis, the class with the methods to publish the event is included in /aura-configuration-api/src/utils/event-publishers/redis-event-publisher.ts

Generate events

When an event is generated, the operation that has been performed is indicated with the identifier of the record affected by the change.

    /**
     * Update channel.
     *
     * @throws {NoDataError} When it hasn't been updated
     * @throws {Error} When channel is invalid
     * @param {ChannelConfiguration} channel Channel configuration
     */
    public async update(channel: ChannelConfiguration) {
        channel.metadata = {
            ...(await this.channelCollection.findOne<ChannelConfiguration>(
                { id: channel.id }, { projection: { metadata: 1 } })
            )?.metadata,
            ...channel.metadata
        };
        const updateResult = await this.channelCollection.replaceOne(
            { id: channel.id }, this.prepareChannelToInsert(channel));
        if (updateResult?.modifiedCount === 0) {
            throw new NoDataError(`Channel with id: ${channel.id} cannot be found`);
        } else {
            await this.writeConfigurationEvent(EventConfigOperation.Update, channel.id);
        }
    }

Subscribers

Subscribers are inside the aura-utilities/aura-configuration library, and are the clients of each of the models in the aura-configuration-api.

These subscribers inherit their class from a base class BaseConfiguration that contains the necessary methods to manage the data. Some methods are common to all and others are specific. They include methods to manage the data locally, that is, if we receive the content of the changes in the event itself, we should be able to apply them on the current module without having to call the aura-configuration-api* to obtain them.

An example for the method of point 3 could be the following:

    /**
      * Update channel configuration.
      *
      * @param {Configuration} configuration Configuration object.
      * @param {EventConfigModel} message Event configuration model.
      * @returns {Promise<void>} Promise.
      */
    public async updateConfiguration(message: EventConfigModel<T>): Promise<void> {
        try {
            this.logger.debug({
                msg: 'UpdateChannelConfiguration executed', message,
                corr: CorrelatorUtil.auraSystem
            });

            switch (message.op) {
                case EventConfigOperation.Create:
                    try {
                        await this.createConfigurationById(message);
                    } catch (error) {
                        // if there is an error updating a single channel, update all the config with the current values
                        await this.load();
                    }
                    break;
                case EventConfigOperation.Update: {
                    try {
                        await this.updateConfigurationById(message);
                    } catch (error) {
                        // if there is an error updating a single channel, update all the config with the current values
                        await this.load();
                    }
                }
                    break;
                case EventConfigOperation.Delete:
                    if (this.removeLocal(message.id)) {
                        this.logger.debug({
                            msg: `Channel removed with id: ${message.id}`, corr: CorrelatorUtil.auraSystem
                        });
                    } else {
                        this.logger.warning({
                            msg: `Channel not found to remove with id: ${message.id}`, corr: CorrelatorUtil.auraSystem
                        });
                        await this.load();
                    }
                    break;

                default:
                    this.logger.warning({
                        msg: `Unknown operation: ${message.op} for configuration update: ${message.id}`,
                        corr: CorrelatorUtil.auraSystem
                    });
                    break;

            }
        } catch (error) {
            this.logger.error({
                msg: 'updateChannelConfiguration failed.', error: error.message, stck: error,
                corr: CorrelatorUtil.auraSystem
            });
        }
    }

      

The methods createConfigurationById and updateConfigurationById contain the logic to apply the changes locally or through the aura-configuration-api*.

    /**
     * Create channel configuration by id.
     * @param {Configuration} configuration Configuration object.
     * @param {EventConfigModel<ChannelConfiguration>} message Event configuration model.
     * @returns {Promise<void>} Promise.
     */
    private async createConfigurationById(message: EventConfigModel<T>): Promise<void> {
        // if the message has the configuration data, create the channel directly.
        if (message.configData && this.addLocal(message.configData)) {
            this.logger.debug({
                msg: `Channel added with new configuration: ${message.id}`, corr: CorrelatorUtil.auraSystem
            });
        } else {
            await this.loadById(message.id);
        }
    }

Using DAPR as Event Manager

Publishers and subscribers

In order for a component to update the configuration data that has been modified by another service, two types of elements are required:

  • Publishers, which are the ones that notify of the change.
  • Subscribers, which are the ones that will receive an event indicating that a change has occurred and where it has taken place.

All this is managed through DAPR Pub/Sub block. The engine selected to manage the PubSub is Redis. Unlike the pure PubSub management in Redis, which has the problem that if a subscriber does not receive the event, it will not be resent, with DAPR this does not happen since it takes care of the Event lifetime internally and manages the retries if needed.

Channel

In the DAPR Pub/Sub block, the concept of a channel is not applicable. In order to subscribe to an event, the name of the Pub/Sub module and the topic to be subscribed are required.

pubsubname: AURA_DAPR_CONFIGURATION_PUBSUB_NAME
topic: ConfigCollectionName

Being:

  • AURA_DAPR_CONFIGURATION_PUBSUB_NAME: Name of PubSub module defined in DAPR for Configuration Events.
  • ConfigCollectionName: One of the following values: ChannelConfiguration, ApplicationConfiguration, SkillConfiguration, TvSectionsConfiguration, PresetConfiguration, AgentConfiguration.

Event Message

DAPR uses a global event management model called CloudEvents. CloudEvents is a specification for describing event data in common formats to provide interoperability across services, platforms and systems.

For compatibility reasons with the Event Managr in Redis, the name of the model used for Redis has been included within the field data of CloudEvents.

Example CloudEvent
{
  "data": {
    "collection": "ChannelConfiguration",
    "id": "c71dc728-5fe2-4735-927d-0c419b35ec59",
    "metadata": {
      "ttlInSeconds": "60"
    },
    "op": "Update",
    "pubSubName": "pubsub-config-ap-four"
  },
  "datacontenttype": "application/json",
  "expiration": "2025-06-30T08:16:05Z",
  "id": "a8340fb8-9071-4da9-91d6-7a14a409d459",
  "pubsubname": "pubsub-config-ap-four",
  "source": "aura-configuration-api",
  "specversion": "1.0",
  "time": "2025-06-30T08:15:05Z",
  "topic": "ChannelConfiguration",
  "traceid": "00-00000000000000000000000000000000-0000000000000000-00",
  "traceparent": "00-00000000000000000000000000000000-0000000000000000-00",
  "tracestate": "",
  "type": "com.dapr.event.sent"
}

Inside the data property, it has the following format:

{ 
    collection: [ConfigCollectionName]  
    op:         [OPERATION];
    id:         [IDENTIFIER];
    configData: [CONFIGURATION_DATA]; 
    metadata:   [EVENT_METADATA]                (Dapr)
    pubSubName: [DAPR_PUBSUB_NAME];             (Dapr)
    crc:        [INTEGRITY];                    (Not implemented)
}  

The different types of operations are (EventConfigOperation):

  • Create: When a record is created in the configuration. It will carry the identifier of the new record.
  • Update: When a record is modified in the configuration. It will carry the identifier of the updated record.
  • Delete: When a record is deleted in the configuration. It will carry the identifier of the deleted record.

The configuration data contains the updated or new configuration data. This field is not mandatory and if the configuration data is empty, the subscriptor refreshes its configuration calling the aura-configuration-api.

The metadata is used to manage the behavior of the event. In this case it uses a { "ttlInSeconds": "60" } to set the expiration time of the event. The pubSubName contains the AURA_DAPR_CONFIGURATION_PUBSUB_NAME.

Examples:

Without configuration data:


{
    "collection": "ChannelConfiguration",
    "id": "c71dc728-5fe2-4735-927d-0c419b35ec59",
    "metadata": {
      "ttlInSeconds": "60"
    },
    "op": "Update",
    "pubSubName": "pubsub-config-ap-four"
  }

With configuration data:

{
    "collection": "ChannelConfiguration",
    "id": "c71dc728-5fe2-4735-927d-0c419b35ec59",
    "metadata": {
      "ttlInSeconds": "60"
    },
    "op": "Update",
    "pubSubName": "pubsub-config-ap-four",
    "configData": 
        {
            "name":"Expample1",
             "id":"88834434",
             .... 
         }
}

Publishers

The publishers are inside the aura-configuration-api.

When one of the configuration models writes to the MongoDB database, the event must be generated through Redis. This is done by instantiating an element of the RedisEventPublisher class. The creation of these update records in Redis has two requirements that must be met:

  1. The API must contain an endpoint to obtain all the data from the model named GetAll.
  2. The API must contain an endpoint to obtain an element of the model selected by its identifier named GetById.

To write the records in Redis that will generate the events, we will use the DAO of the model. The method writeConfigurationEvent generates the registry for the event in Redis.

    // aura-configuration-api/src/common/dao.ts
    /**
     * Write configuration event.
     * @param {EventConfigOperation} operation Operation type
     * @param {string} id Id of the configuration
     * @param {T} data Data to write
     * @param {KeyValueType} metadata Metadata to write
     */
    protected async writeConfigurationEvent(operation: EventConfigOperation, id?: string, data?: T, metadata?: KeyValueType) {
        this.configurationEventPublish.op = operation;
        this.configurationEventPublish.id = id;
        this.configurationEventPublish.configData = data;
        this.configurationEventPublish.metadata = Object.assign({}, this.configurationEventPublish.metadata, metadata);

        await this.eventPublisher.publish(this.configurationEventPublish);
    }

The eventPublisher can be Redis or DAPR. For DAPR, the class with the methods to publish the event is in /aura-configuration-api/src/utils/event-publishers/dapr-event-publisher.ts

Generate events

When an event is generated, the operation that has been performed is indicated with the identifier of the record affected by the change.

    /**
     * Update channel.
     *
     * @throws {NoDataError} When it hasn't been updated
     * @throws {Error} When channel is invalid
     * @param {ChannelConfiguration} channel Channel configuration
     */
    public async update(channel: ChannelConfiguration) {
        channel.metadata = {
            ...(await this.channelCollection.findOne<ChannelConfiguration>(
                { id: channel.id }, { projection: { metadata: 1 } })
            )?.metadata,
            ...channel.metadata
        };
        const updateResult = await this.channelCollection.replaceOne(
            { id: channel.id }, this.prepareChannelToInsert(channel));
        if (updateResult?.modifiedCount === 0) {
            throw new NoDataError(`Channel with id: ${channel.id} cannot be found`);
        } else {
            await this.writeConfigurationEvent(EventConfigOperation.Update, channel.id);
        }
    }

Subscribers

Subscribers in DAPR can be created in several ways. In this case, the “Programatic” type is used, that is, we are going to include an endpoint in our components so that DAPR can subscribe them to the PubSub. The endpoint (/dapr/subscribe) must respond with a subscription model that contains the pubSubName, the topic and the endpoint that is going to be in charge of processing the event. This type of subscription forces us to define 2 methods for DAPR.

Example:

    /**
     * Dapr Subscriptions.
     *
     */
    public daprSubscriptions() {
        this.app.get('/dapr/subscribe', (request: express.Request, response: express.Response) => {
            const subscriptions =
                [
                    {
                        pubsubname: ConfigurationManager.instance.environmentConfiguration.AURA_DAPR_CONFIGURATION_PUBSUB_NAME,
                        topic: ConfigCollectionName.ChannelConfiguration,
                        routes: {
                            default: '/dapr/processConfigurationEvents',
                        }
                    }
                ];

            response.status(200).send(subscriptions);
        });
    }

In the previous example, we are going to subscribe to one topic, that will receive the events about the channel configuration collection.

Once the event is received, we can use the same library that we use in Redis, aura-utilities/aura-configuration, since we share the data model.

Example:

    /**
     * Process configuration events.
     *
     */
    public processConfigurationEvents() {
        this.app.post('/dapr/processConfigurationEvents', async (request: express.Request, response: express.Response) => {
            try {
                const event = HTTP.toEvent({ headers: request.headers, body: request.body.toString() });
                const cloudEvent = (event || {}) as CloudEvent;
                const configModel = (cloudEvent.data || {}) as EventConfigModel<ChannelConfiguration>;
                switch (configModel.collection) {
                    case ConfigCollectionName.ChannelConfiguration:
                        // Update current channels configuration
                        this.logger.debug({
                            msg: 'Processing channel configuration events',
                            corr: CorrelatorUtil.auraSystem
                        });
                        await AuraCurrentChannelsConfiguration.instance
                            .updateConfiguration(configModel as EventConfigModel<ChannelConfiguration>);
                        break;
                    default:
                        this.logger.warning({
                            msg: `Unknown collection in configuration event: ${configModel.collection}`,
                            corr: CorrelatorUtil.auraSystem
                        });
                }
                response.status(200).send({ status: DaprResponseStatus.SUCCESS });
            } catch (error) {
                this.logger.error({
                    msg: 'Error processing configuration events',
                    error: error.message,
                    corr: CorrelatorUtil.auraSystem
                });
                response.status(500).json({ status: DaprResponseStatus.DROP, error: 'Internal Server Error' });
            }
        });
    }

It is important to indicate in the reponse that the event has been successfully processed.