Aura bridge response messages synchronization

Description of the process for the synchronization of the user’s messages executed by Aura Bridge using Redis as the database for storing Aura bridge cache

Introduction

aura-bridge manages the synchronization of the user’s messages.

Requirements

Messages in aura-bridge are divided into two parts:

  • The entry of the message in the bridge (Request Message)
  • The message replies (Response Message)

Message order

The messages must be sorted in order:

  • If there is a previous message that has not yet received a reply, it must wait.
  • If there is a previous response to the current one, wait for it.

Error returning messages to the client

  • If a response returns a non-recoverable error, all messages belonging to the same group, i.e., to the same request, must be cancelled.
  • If a response returns a recoverable error, the message must be included in messages to be resent, together with those of the same group.

Flow of messages in the queue

Incoming message (request)

This is the input message, containing a unique identifier requestId and the conversation identifier.

If the message does not have a direct response, but is made through an external service, the async callback must carry the identifier requestId, so that the bridge does not block the queue. Note that the requirement that a message cannot go out if it has a previous request would cause the queue to block because it has no response. By assigning this identifier to the callback, the bridge reuses the previous request and thus the callback response would unblock the queue.

Before sending an async Callback to aura-groot, you must assign the messageId with the id of the activity:

messageId = context.activity.id

Output message (response)

Output messages can be one or several for each input message. They all contain a unique identifier responseId, the identifier to the input message to which they belong (requestId) and the conversation identifier.

Awaited messages

The outgoing messages can be in two states:

  • PENDING, which means that messages are already included in the queue and it is currently trying to send them.
  • AWAITED, which are messages that have been tried to be sent, but for whatever reason it has not been possible.

These messages must be reattempted to be sent.

All these processes are managed by the message syncer.

The related aura-bridge environment variables are shown in the following table:

Property Type Description Default Value
AURA_QUEUE_MANAGER_AWAITED_MESSAGE_TTL number In milliseconds, maximum lifetime of a message in aura-bridge. After this time, the system will delete the message. 900.000 (15 m)
AURA_QUEUE_MANAGER_AWAITED_MESSAGES_BY_TURN number Number of messages to be imported by the Awaited Message Agent in each turn. 15
AURA_QUEUE_MANAGER_AWAITED_MESSAGES_INTERVAL number In milliseconds, time interval of a shift for the Awaited Message Agent. 20.000
AURA_QUEUE_MANAGER_AWAITED_RETRIES_FACTOR number Factor to manage the backoff of the Awaited Message Agent. The time to consider an AWAITED message is:
currentRetry * MAX_PENDING_TIME * AWAITED_RETRIES_FACTOR
1
AURA_QUEUE_MANAGER_AWAITED_TIME_WHEN_RETRIES_END number In milliseconds, when a message has consumed all the retries from the queue, it goes to the AWAITED state. This variable assigns the waiting time assigned to retry the message. 5000
AURA_QUEUE_MANAGER_CHECK_QUEUE_INTERVAL number In milliseconds, time interval used by the queue manager to check if the queue system has pending messages. If not, it deactivates the service and it will be reactivated when a new message comes in. 1000
AURA_QUEUE_MANAGER_CONCURRENT number Number of messages processed by the memory queue simultaneously. 25
AURA_QUEUE_MANAGER_CONCURRENT_FACTOR number It sets together with AURA_QUEUE_QUEUE_MANAGER_CONCURRENT the maximum number of messages the message execution queue can have. The number is assigned with the operation:
AURA_QUEUE_QUEUE_MANAGER_CONCURRENT * AURA_QUEUE_QUEUE_MANAGER_CONCURRENT_FACTOR
2
AURA_QUEUE_MANAGER_INTERVAL number In milliseconds, time interval for processing messages in the queue. 200
AURA_QUEUE_MANAGER_MAX_PENDING_TTL number In milliseconds, time that a message is in PENDING status before it is considered as AWAITED. 30.000
AURA_QUEUE_MANAGER_MAX_PENDING_TTL_MARGIN number In milliseconds, time margin used by the memory queue to manage the time that a PENDING message has before being discarded and returned to the AWAITEDs system. 5.000
AURA_QUEUE_MANAGER_MAX_RETRIES number Number of validation attempts. Do not confuse with message sending retries. These retries are for the validations that a message makes to check if it can be sent or if it has to continue waiting. 100
AURA_QUEUE_MANAGER_MESSAGE_TTL number In milliseconds, lifetime of an incoming message. 10.000
AURA_QUEUE_MANAGER_SENT_MESSAGE_TTL number In milliseconds, lifetime of a message with status SENT. If the ACK does not arrive from the client, then the message is considered as sent and is deleted from the system, making way for the next messages that the user has pending. 5.000
AURA_QUEUE_MANAGER_DATABASE string Set the default database engine to store the queue information. Values: REDIS, MONGODB MONGODB
AURA_QUEUE_MANAGER_METRICS_PARTIAL_INTERVAL number In milliseconds, it is used to obtain partial values of the queue metrics. These metrics are internal and used for performance testing to see the behavior of the message queue. To enable these metrics, it is necessary to set the environment variable AURA_LOGGING_EXTRA_TIME_METRICS to true. 30.000

Message Model

{
    _id?;                       // Mongo Id. Not used in Redis
    conversationId: string;     // Conversation Identifier
    origin: string;             // Name of Client
    requestId: string;          // Request Identifier
    taskItem?: TaskItem;        // Task Item Model
    requestTimestamp?: number;  // Date of request
    responseId?: string;        // Response Identifier
    responseTimestamp?: number; // Date of response
    asyncResponseId?: string;   // Identifier of Async response
    status?: MessageSyncStatus; // Status of Message
    lastAccess?: Date;          // Date of last Access
    sentAttempt?: number;       // Number of sent attempts
    awaitedRetries: number;     // Number of retries by Awaited Agent module
    awaitedToDate: Date;        // Next Date to retry to send the Message
    expiresAt?: Date;           // Message expiration Date
    requestIdCounter?: number;  // Number of response messages
    expiresAtTS?: number;       // Expiration Date in Unix Timestamp format (seconds)
    awaitedToDateTS?: number;   // Same that awaited to Date but in  Unix Timestamp format (seconds)
}

Redis Message Models

Several data structures designed to prioritize performance are required to manage messages in the queue with Redis.

These data structures will be responsible for storing the key data of a conversation: ConversationId, Request Message, Response Message, ACK management, Awaited Messages.

MinMessageInfo String

Structure containing the minimum information of an message, that is: requestId, responseId, conversationId.

It is used as a member of a list and must be passed as a string in a certain order to serve as an internal key and not generate duplicates.

Example:

{ rqId: [requestId], rsId: [responseId], cnId: [ConversationId] }

Redis KEYS

 Request Message:            bridge:queue:req:[ requestId ]  EXPIRES => AURA_QUEUE_MANAGER_MESSAGE_TTL**  
 Response Message:           bridge:queue:res:[ responseId ]  EXPIRES => AURA_QUEUE_MANAGER_AWAITED_MESSAGE_TTL**  
 Conversation Request List:  bridge:queue:[ conversationId ]:req   EXPIRES => AURA_QUEUE_MANAGER_MESSAGE_TTL**  
 Conversation Response List: bridge:queue:[ conversationId ]   EXPIRES => AURA_QUEUE_MANAGER_MESSAGE_TTL**  
 Pending Message List:       bridge:queue:pendingMessages    EXPIRES => AURA_QUEUE_MANAGER_MESSAGE_TTL**  
 ACK Identifier:             bridge:queue:asyncRequest:[ asyncRequestId ]   EXPIRES => AURA_QUEUE_MANAGER_SENT_MESSAGE_TTL**  

Request Message

Request messages are stored in a HASH structure:

KEY: bridge:queue:req:[ requestId ] EXPIRES: AURA_QUEUE_MANAGER_MESSAGE_TTL
FIELDS:

Field Description
conversationId Conversation identifier
origin Sender client of the message
requestId Request identifier
requestTimestamp Timestamp with the date of request
status Status of message. Valuer: WAITING, PENDING
responseIdCounter Number of replies that have arrived with for this request message
expiresAtTS Timestamp with the expiration date of the message

Response Message

Response messages are stored in a HASH structure:

KEY: bridge:queue:res:[ responseId ] EXPIRES: AURA_QUEUE_MANAGER_AWAITED_MESSAGE_TTL
FIELDS:

Field Description
conversationId Conversation identifier
origin Sender client of the message
requestId Request identifier
responseId Response identifier
requestTimestamp Timestamp with the date of request
responseTimestamp Timestamp with the date of response
status Status of message. Values: WAITING, PENDING, SENT, AWAITED, CANCEL
expiresAtTS Timestamp with the expiration date of the message
awaitedToDateTS Timestamp with the next date to retry sending the message
awaitedRetries Number of awaited retries
taskItem TaskItem Model

Conversation Structures

Request List

Queue-structured list of incoming messages in a conversation. It is necessary to maintain the order of the messages. If a reply is going to be sent, make sure that there are no pending incoming messages.

KEY: bridge:queue:[ conversationId ]:req EXPIRES: AURA_QUEUE_MANAGER_MESSAGE_TTL
VALUES: Strings with the key of the requests.

bridge:queue:cnId-1:req

    bridge:queue:req:rqId-A (older)
    bridge:queue:req:rqId-B 
Response List

Sorted list with the [ requestTimestamp ].([ responseTimestamp ] % 10000) as score. One for each conversation. It will be the one that indicates the order in which the messages will be sent. It contains a MinMessageInfo It must be taken into account that it must be used together with the previous one because a previous input message may not have received a response yet, so it will not be in this list yet.

Sorted list with the [ requestTimestamp ].([ responseTimestamp ] % 10000) as score. One for each conversation.

It will be the one that indicates the order in which the messages will be sent and contains a MinMessageInfo.

Take into account that it must be used together with the previous one because a previous input message may not have received a response yet, so it will not be in this list yet.

KEY: bridge:queue:[ conversationId ] EXPIRES: AURA_QUEUE_MANAGER_MESSAGE_TTL
VALUES: MinMessageInfo [requestTimestamp].([responseTimestamp] % 10000)

bridge:queue:cnId-1

    10000000.00001  { rqId: rqId-A, rsId: rsId-A1, cnId: cnId-1 } 
    10000000.00009  { rqId: rqId-A, rsId: rsId-A2, cnId: cnId-1 } 
    10000001.00007  { rqId: rqId-B, rsId: rsId-B1, cnId: cnId-1 } 

Manage of awaited messages

When a message has consumed all its retries, it goes to an AWAITED state, so that it can be reattempted again. The information regarding when messages can be reattempted is stored in a sorted list, the date from which it can be reattempted and the message data is stored as MinMessageInfo.

KEY: bridge:queue:pendingMessages EXPIRES: AURA_QUEUE_MANAGER_MESSAGE_TTL
VALUES: MinMessageInfo [Timestamp]

bridge:queue:pendingMessages

    10000000  { rqId: rqId-A, rsId: rsId-A1, cnId: cnId-1 } 
    10000002  { rqId: rqId-A, rsId: rsId-A2, cnId: cnId-1 } 
    10000003  { rqId: rqId-B, rsId: rsId-B1, cnId: cnId-1 } 

ACK Management

When a message is returned to the client, an identifier (asyncResponseId) is received, which is used to later identify to which message an ACK belongs. This identifier is stored in a key-value structure (asyncRequestId, MinMessageInfo).

KEY: bridge:queue:asyncRequest:[ asyncRequestId ] EXPIRES: AURA_QUEUE_MANAGER_SENT_MESSAGE_TTL
VALUE: MinMessageInfo

  bridge:queue:asyncRequest:rA1  -> { rqId: rqId-A, rsId: rsId-A1, cnId: cnId-1 } 
  bridge:queue:asyncRequest:rA2  -> { rqId: rqId-A, rsId: rsId-A2, cnId: cnId-1 } 
  bridge:queue:asyncRequest:rB1  -> { rqId: rqId-B, rsId: rsId-B1, cnId: cnId-1 }

Message flow and operations

Request Message

  • The message information is stored in the request message structure.
  • AURA_QUEUE_MANAGER_MESSAGE_TTL is set to expire.
  • It is added to the request list of the conversation.
  • The expiration of the list is set to AURA_QUEUE_QUEUE_MANAGER_MESSAGE_TTL.

Example:

conversationId: conv-01
requestId:      rq-message-1


[Request Message]
Key
 bridge:queue:req:rq-message-1
Properties
 conversationId: "conv-01"
 origin: "whatsapp"  
 requestId: "rq-message-1"
 requestTimestamp: 0000000001
 status: "WAITING"
 responseIdCounter: 0
 expiresAtTS: 000060001

[Conversation conv-01 List]
Key
 bridge:queue:conv-01:req
Value
 0: bridge:queue:req:rq-message-1

If another message from the same user comes in, its request message record is created and added to the end of the list.

In this way, the conversation list has the items sorted in order of arrival, with the oldest being the first. This is useful to know if a message has any unanswered incoming messages.

Example:

conversationId: conv-01
requestId:      rq-message-2

[Request Messages]
.....
Key
 bridge:queue:req:rq-message-1
Properties
 .....
Key
 bridge:queue:req:rq-message-2
Properties
 .....
....

[Conversation conv-01 List]
Key
 bridge:queue:conv-01:req
Value
 0: bridge:queue:req:rq-message-1
 1: bridge:queue:req:rq-message-2

If a message from another user arrives, we will have:

conversationId: conv-02
requestId:      rq-message-3

[Request Messages]
.....
Key
 bridge:queue:req:rq-message-1
Properties
 .....
Key
 bridge:queue:req:rq-message-2
Properties
 .....
Key
 bridge:queue:req:rq-message-3
Properties
 .....
....

[Conversations Lists]

Key
 bridge:queue:conv-01:req
Value
 0: bridge:queue:req:rq-message-1
 1: bridge:queue:req:rq-message-2


Key
 bridge:queue:conv-02:req
Value
 0: bridge:queue:req:rq-message-3

Response Message

  • Response messages have the identifier associated with the request message. With this identifier, the properties of the input message are obtained and a new HASH record is created for the response data.

  • The expiration time of AURA_QUEUE_MANAGER_AWAITED_MESSAGE_TTL is added.

  • Added 1 to the message counter of the request property requestIdCounter.

  • Add to the list of response Messages of the conversation.

  • The expiration of the conversation response list is set to AURA_QUEUE_QUEUE_MANAGER_AWAITED_MESSAGE_TTL.

  • It is added to the list of Pending Messages (awaited).

  • The expiration of the list of Pending Messages is set to AURA_QUEUE_QUEUE_MANAGER_AWAITED_MESSAGE_TTL.

    Example:

    conversationId: conv-01
    responseId:      res-message-1-1
    
    
    [Response Message]
    Key
     bridge:queue:res:rq-message-1-1
    Properties
     conversationId: "conv-01"
     origin: "whatsapp"  
     requestId: "rq-message-1"
     responseId: "res-message-1-1"
     requestTimestamp: 0000000001
     responseTimestamp: 0000000012
     status: "PENDING"
     expiresAtTS: 000080001
     awaitedToDateTS: 000080001
     awaitedRetries: 0
     taskItem: [TaskItem data]
    
    [Conversation responses conv-01 Sorted List]
    Key
     bridge:queue:conv-01
    Value
     0:  10000000.00001  "{ rqId: 'rq-message-1', rsId: 'res-message-1-1', cnId: 'conv-01' }"
    
    [Pending messages Sorted List]
    Key
     bridge:queue:pendingMessages
    Value
     0:  000080001  "{ rqId: 'rq-message-1', rsId: 'res-message-1-1', cnId: 'conv-01' }"
    

    If another response to the same request arrives, we will have:

     conversationId: conv-01
     responseId:      res-message-1-2
    
    
    [Response Message]
    Key
      bridge:queue:res:rq-message-1-2
    Properties
     ....
    
    [Conversation responses conv-01 Sorted List]
    Key
      bridge:queue:conv-01
    Value
      0:  10000000.00001  "{ rqId: 'rq-message-1', rsId: 'res-message-1-1', cnId: 'conv-01' }"
      0:  10000000.00002  "{ rqId: 'rq-message-1', rsId: 'res-message-1-2', cnId: 'conv-01' }"
    
    [Pending messages Sorted List]
    Key
      bridge:queue:pendingMessages
    Value
      0:  000080001  "{ rqId: 'rq-message-1', rsId: 'res-message-1-1', cnId: 'conv-01' }"
      1:  000080002  "{ rqId: 'rq-message-1', rsId: 'res-message-1-2', cnId: 'conv-01' }"
    

If a message arrives from another request, we will have:

 conversationId: conv-02
 responseId:      res-message-2-1

[Response Message]
Key
  bridge:queue:res:rq-message-2-1
Properties
 ....

[Conversation responses conv-02 Sorted List]
Key
  bridge:queue:conv-02
Value
  0:  10000005.00003  "{ rqId: 'rq-message-2', rsId: 'res-message-2-1', cnId: 'conv-02' }"

[Pending messages Sorted List]
Key
  bridge:queue:pendingMessages
Value
  0:  000080001  "{ rqId: 'rq-message-1', rsId: 'res-message-1-1', cnId: 'conv-01' }"
  1:  000080002  "{ rqId: 'rq-message-1', rsId: 'res-message-1-2', cnId: 'conv-01' }"
  2:  000080003  "{ rqId: 'rq-message-2', rsId: 'res-message-2-1', cnId: 'conv-02' }"

Validate if the response can be sent to the client

It is the operation that each message must perform before being sent to ensure the order of the messages.

Steps:

  • The response message has an associated a request message. The first thing to be validated is whether the request message is the first in the list of request messages of the conversation list or not.
    This is important because a previous request message may have operations as attachments, which means that response messages have not been received yet.

  • If the request message matches with the request message of the current Response message, then what is checked is whether this response message is the first in the list of Response messages in the conversation or not.
    In this case three things can happen:

    1. If the current response message is the first in the list, the message can go out.
    2. If the first message in the list is not the current response message and it exists, then the current request message will wait.
    3. If the first message in the list is not the current response message and it does not exist, it is removed from the list and checked again.

Awaited Management

This is the management of messages that could not be sent because you still have previous messages to send and your sending attempts have been completed.

A message can also go to AWAITED status when a recoverable error occurs.

Steps:

  • As they are in an ordered list (bridge:queue:pendingMessages), simply collect the messages from that list.
  • Once the messages to be processed have been collected, they must be blocked so that other services do not process the same messages.
  • These collected messages are passed to the priority queue to be processed.

Logical States

These are logical states since any message that passes to these states is removed from the queuing system.

Message sync states

Message synchronization modules

Message synchronization is managed at two levels:

  • The first level is the micro-service memory. A queue within the micro-service’s own memory is used to manage message synchronization locally. For the management of this level, a module called Memory Queue Manager is used, which is composed of two components:

    • Memory Queue Core that implements an in-memory queue manager.
    • Awaited Message Manager, which is responsible for collecting pending messages from Redis.
  • The second level is through Redis. It is used to synchronize messages between the different micro-services. For queue management at this level, the Message Sync Module is used, which is responsible, among other things, for managing the CRUD operations of the messages in the queue in Redis.

Message sync modules

Memory Queue Manager

Memory Queue Core

Message Task Model
{
  executionTask: QueueTask;                     // Task to execute when validation is OK
  validationTask?: QueueTask;                   // Task to validate if the Main task can be executed
  errorTask?: QueueTask;                        // Task to execute when Main Task fails
  taskId: string;                               // Task identify
  taskGroup: string;                            // Task Group
  tsInput: number;                              // Time when the input message was received
  tsOutput: number;                             // Time when the response message was received
  internalId?: number;                          // Use by the Queue internally
  timeInQueue?: number;                         // Maximum time that one Task can stay in Queue
  messageOptions: SendMessageOptions<any>;      // Message Options
  corr: string;                                 // Task correlator
}
Architecture

Message queue architecture

Life Cycle

Enqueue Phase

Messages can be queued in the memory queue for two reasons:

  • Because a response arrives to aura-bridge and it wants to be synchronized or
  • Because the agent that takes care of the pending messages has obtained messages from Redis, bridge:queue:pendingMessages list.

Enqueue phase

The aura-bridge environment variables that are associated with this agent are:

awaited vars

Select Priority Tasks

At this point, those that are not expired are selected from the main queue, and secondly, if they belong to a group, only the first one to be sent to the priority queue will be sent to the priority queue.

priority tasks

The aura-bridge environment variables that are associated with this process are:

priority tasks vars

Execution Phase

In this phase, each task is validated to see if it can be executed. This validation is done, firstly, by validating if it has not expired or has not exceeded the maximum number of validations, and secondly if there is no pending task for the user to whom the message is addressed.

priority queue

The flow within the execution phase is as follows:

execution flow

The aura-bridge environment variables that are associated with this process are:

Awaited Messages Agent

This module is in charge of collecting messages with AWAITED status.

The collection of these messages is done periodically:

  • The variable AURA_QUEUE_QUEUE_MANAGER_AWAITED_MESSAGES_INTERVAL is the one that establishes the interval time.
  • The number of messages to collect at each interval is determined by the environment variable AURA_QUEUE_QUEUE_MANAGER_AWAITED_MESSAGES_BY_TURN.

When the module is started for the first time, a second within a minute is set, i.e., a value between 0 and 59 in which the first request for messages with AWAITED status will be launched, and from then on, the time interval is followed. This is to try to avoid that two instances of aura-bridge make the request at the same time.

Memory Sync Manager

This module is in charge of managing the synchronization between the different aura-bridge instances and their messages. This is performed through the MessageSyncManager class.

Methods

createConversationRegister

Method used for message input in aura-bridge. It sets the message to WAITING status.
This method has all steps to manage Request Message Flow.

  • Input:

    • origin: Name of the client sending the message. Example: “whatsapp”.
    • conversationId: Conversation identifier.
    • requestId: Identifier of the incoming message.
    • corr: Correlator of the request.
    • requestTimestamp: Date on which the request is made.
  • Output:

getInitDefaultValues

This is an auxiliary method used to initialize the values to their default value.

  • Input:

    • origin: Name of the client sending the message.
    • conversationId: Conversation identifier.
    • requestId: Identifier of the incoming message.
    • requestTimestamp: Date on which the request is made.
  • Output

markPendingConversationRegister

This method is called when a response message has arrived and has all steps to manage Response Message Flow.

  • Input:
    • origin: Name of the client sending the message.
    • conversationId: Conversation identifier.
    • requestId: Identifier of the incoming message.
    • taskItem : Message Task Model.
    • corr: Current correlator.
    • responseId: Unique identifier for the response message.
markSentConversationRegister

This method is used when a message is sent to the destination client. It sets the status of the message to SENT.

  • Input:
    • conversationId: Conversation identifier.
    • requestId: Identifier of the incoming message.
    • corr: Current correlator.
    • responseId: Unique identifier for the response message.
    • asyncResponseId: Identifier received when sending the message. This is used when sending messages against asynchronous clients. This identifier is necessary to manage the ACKs.
canSendConversationMessage

This is the method used by the default validator of a message to know whether or not the current message can be sent to the destination client.

The model used to the return value is:

CanSendResponse {
    canSendStatus: CanSendStatus;
    firstResponseId: string;
}
  • Input:

    • conversationId: Conversation identifier.
    • requestId: Identifier of the incoming message.
    • corr: Current correlator.
  • Output:

    • CanSendResponse: it returns a boolean value indicating whether or not the message can be sent.
getFirstConversationMessage

This method returns true if the current message is the first one, indicating that it can be sent immediately.

  • Input:

    • conversationId: Conversation identifier.
  • Output:

deleteConversationMessage

This method deletes all references in Redis of a message. To select the message to be deleted, it uses the model SyncMessageOptions.

SyncMessageOptions {
    responseId?: string;
    asyncResponseId?: string;
    corr: string;
}
  • Input:
    • options: SyncMessageOptions
deleteConversationRegister

This method deletes all messages from the same RequestId.

  • Input:
    • allOriginExcept: Indicates that messages should not be deleted.
    • status: The status that the message must have to be deleted.
    • requestInfo: Object generated in all requests that contains the relevant information of a request.
getConversationMessage

This method returns a message based on its responseId or asyncResponseId.

  • Input:

    • options: SyncMessageOptions.
    • projection: A filter in JSON format to return only specific fields. Example: { name:1}.
  • Output:

setAwaitedByRequestId

This method sets the message to AWAITED status based on its requestId.

setAwaitedByResponseId

This method sets the message to AWAITED status based on its responseId.

  • Input:
    • options: SyncMessageOptions.
getAwaitedMessages

It returns a specified number of messages with AWAITED status.

  • Input:

    • nMessages: Number of messages to return.
    • corr: Current correlator.
  • Output:

setErrorById

Used when needs remove current message and their siblings.

  • Input:
  • options: SyncMessageOptions.
updateAwaitedExpires

Set a new date for awaited.

  • Input:
    • responseId: Response id of message to update.
    • corr: Current correlator.
removeRequestMessage

Remove all references of a message in redis.

  • Input:
    • syncOptions: SyncMessageOptions.
    • removeOwner: True if the current message must be deleted
    • corr: Current correlator.
getAllSiblings

Get all siblings of a message

  • Input:

    • responseIdKey: Response message Key in Redis
    • corr: Current correlator.
  • Output:

    • ResponsesIdByRequestId: model with info and siblings.
removeAllSiblings

Remove all siblings from a RequestIdByRequestId model.

  • Input:
    • ResponsesIdByRequestId: Model with info and siblings.
    • corr: Current correlator.
generateUniqueMinInfoKey

Get all siblings of a message.

  • Input:

  • Output:

    • string: String with the data in JSON model: { rqId: minInfo.rqId, rsId: minInfo.rsId, cnId: minInfo.cnId }.
removeAllReferencesAtTime

Used to remove all references of a message in Redis setting a timer.

  • Input:
    • syncOptions: SyncMessageOptions.
    • delay: Delay in seconds to wait until remove the references.
    • corr: Current correlator.