Aura Databricks Jobs

aura-databricks-jobs is a component based on Databricks. Discover in the current section its technical description and main components.

Introduction

aura-databricks-jobs is a component based on Databricks for the optimization of data processing and the training of ML-based models.

Currently, its primary function is to import Avro-formatted files into Kernel datasets. For this purpose, we will see later that it is necessary to configure a run job in the Databricks environment. Find the method of the job in avro_to_dataset_job.py.

aura-to-dataset-job-cli is an executable script that imports Avro KPIs into the storage location indicated in the Kernel dataset destination config. It is configured in a Databricks cluster that is executed every day (although it is configurable in the job schedule). It is developed with Python and uses the Kernel Spark SDK to read the Avro files and write in Kernel datasets.

Detailed information regarding aura-databricks-jobs is found in the following documents:
. Architecture and main components
. How does aura-databricks-jobs work?
. aura-databricks-jobs configuration
. How to use aura-databricks-jobs?
. Environment variables
. Troubleshooting

Aura Databricks Jobs architecture

In the following diagram, the architecture of aura-to-dataset-job-cli is represented, including its main components, which are described in the following sections.

Components diagram

Avro to Dataset Job components

ConfigManager

ConfigManager is a handler for configuration that is gathered from input config_dict to fulfill the variables needed in the import process. It also validates the configuration. In any error case, the process is not executed.

AuraLogging

AuraLogging is a wrapper of LoggerWrapper class imported from aura-pytraces library. It used to register logs adding the required items such as version, app, stck etc.

The behavior of logs in the file logging.cfg is internally configurable, following the format established by the aura-pytraces library. This configuration may be overwritten:

  • level of handler config by environment variable AURA_LOGGING_LEVEL. By default, INFO value.
  • formatter of handler config by environment variable AURA_LOGGING_FORMAT. By default, simple value.
  • version by environment variable AURA_VERSION. By default, not-reachable value.

Avro to Dataset Job

It is referred to the process that a cron-job executes in the Databricks.

It contains the logic to configure coroutines to import Avro files by type of dataset with asyncio library.

The result of each coroutine is a report. When all the coroutines are finished, the reports are processed, generating a single one with the information of all the import process and including Spark processing info.

Avro KPI importer

It contains the logic to import Avro-formatted files by type of dataset. If there are not Avro-formatted files of this type of dataset, this coroutine finishes.

The result of each routine is the report of the importation process of the specific type of dataset.

Azure Storage Manager

This module is used to download and upload files from and to Azure Storage.

Spark SDK Manager

This module is used to load data as a Dataframe from Azure Storage and write in dataset of Kernel Datalake.

Aura Databricks Job operation

The execution flowchart of avro-to-dataset-job-cli is shown in the following image:

Execution flowchart

avro-to-dataset-job-cli

It is responsible for importing the Avro-formatted files in Aura KPIs container (job’s variable: AURA_MICROSOFT_AZURE_STORAGE_KPIS_CONTAINER_NAME) to the correspondent dataset in Kernel.

The information necessary to import the Avro-formatted files with the same Avro schema to their corresponding dataset is obtained from the configuration file stored in the Azure KPIs container, specifically the file path configured in the job’s variable: AURA_KPI_AVRO_ADAPTER_CONFIG_PATH.

In addition, there is a file that will provide us with the average size of the files by type of dataset, specifically the file path configured in the job’s variable: AURA_KPI_AVRO_SOURCE_SIZE_REPORT_PATH. This information will be useful when writing in Kernel datasets with the Spark tool to correctly indicate how the data should be partitioned to improve performance.

From this file, we will obtain all the schemas that are imported. For this purpose, it is required that targetType is set with avro value in each item.

Below, it is defined the information that the job gathers for each Avro schema:

  • name: dataset_id used to import into Kernel. For example, D_Aura_Channel.
  • schema: type of schema. For example, dimensional or entity.
  • versionSchema: Version of avroSchema. For example, 6.0.0. The major version will be used in the Spark stage to write in Kernel dataset.
  • avroSchema: name of the schema stored in the container within the folder configured in the AURA_KPI_AVRO_SCHEMAS_PATH variable. The Avro schema necessary when reading the files in spark is obtained from the path configured in the job variable: AURA_KPI_AVRO_SCHEMAS_PATH and extra parameters: $AURA_KPI_AVRO_SCHEMAS_PATH/$schema/$versionSchema/$avroSchema. Example: schemas/dimensional/6.0.0/aura-channel-asvc.json.

Sample of Aura Avro adapter file:

[
    {
        "name": "D_Aura_Channel",
        "schema": "dimensional",
        "avroSchema": "aura-channel-asvc.json",
        "versionSchema": "6.0.0",
        "source": {
            "data": "object",
            "id": "CHANNEL"
        },
        "targetType": "avro",
        "fields": {
            "AURA_CHANNEL_ID": {
                "sourceName": "id",
                "targetType": "string"
            },
            "AURA_CHANNEL_NAME": {
                "sourceName": "name",
                "targetType": "string"
            },
            "AURA_CHANNEL_SHORT_NAME": {
                "sourceName": "prefix",
                "targetType": "string"
            }
        }
    },
    {
        "name": "D_Aura_Recognizer",
        "schema": "dimensional",
        "avroSchema": "aura-recognizer-asvc.json",
        "versionSchema": "6.0.0",
        "source": {
            "data": "object",
            "id": "RECOGNIZER"
        },
        "targetType": "avro",
        "fields": {
            "AURA_RECOGNIZER_ID": {
                "sourceName": "id",
                "targetType": "string"
            },
            "AURA_RECOGNIZER_NAME": {
                "sourceName": "name",
                "targetType": "string"
            },
            "EXTRACTION_TM": {
                "sourceName": "EXTRACTION_TM",
                "targetType": "string",
                "preCalculated": "DATE_ISO_8691"
            }
        }
    }
]

The job will run the import process for each schema type, running in coroutines and using the asyncio library.

The following process is carried out for each type of schema:

  1. Check if there are schemas configured not to be loaded. The job variable where this configuration is configured is: AURA_KPI_AVRO_SCHEMAS_NOT_TO_UPLOAD. The format is a list formatted as schema_1:dataset_id_1,schema_1:dataset_id_2,schema_2;dataset_id_3. Example: dimensional:D_Aura_Channel,entity:E_Aura_GROOT. The number of files that have been skipped for that type are recorded in a report.

  2. Check if there are files of that type to import in its corresponding folder. The path where the Avro-formatted files are stored is: AURA_KPI_AVRO_SOURCE_PATH. Within this path, the files are stored by their corresponding $schema/$dataset/$version. Example dimensional/6.0.0/D_Aura_Channel. If there are no files, the coroutine ends up generating a report without uploaded files.

  3. If there are files, the reading will be carried out with Spark, indicating the Azure Blob where the files with the same Avro schema are located. Additionally, they will be written to its corresponding dataset of Kernel Datalake. This step is configured with locking using asyncio to prevent asyncio.Lock() from protecting read and write operations on a DataFrame.

  4. Once the files are imported, the local copy is moved to a folder inside the container (job’s variables: AURA_MICROSOFT_AZURE_STORAGE_KPIS_CONTAINER_NAME/AURA_KPI_AVRO_PROCESSED_FOLDER_PATH) and kept there during a fixed time, for recovering purposes.
    All the details of the process are recorded in a report that is stored in the job variable: AURA_KPI_AVRO_REPORTS_DESTINATION_PATH/aura-avro-kpis-report-{iso-date}.json.
    Depending on the configured report mode, AURA_KPI_AVRO_REPORTS_MODE will be generated only when errors occur, always or never.

Independently of when it runs, avro-to-dataset-job-cli always performs the same process: it gets all the Avro-formatted files in KPIs container (job variable: AURA_MICROSOFT_AZURE_STORAGE_KPIS_CONTAINER_NAME) from the last upload executed by the aura-kpis-uploader component.

When running independently on the Databricks cluster, Prometheus alerts cannot be configured. Therefore, the process information will be obtained from the report generated along with the following generated files:

  • If the process has ended with errors:

    • A file with the name set in the variable AURA_KPI_AVRO_PROCESS_ERROR_FILENAME will be generated containing the execution date.
    • Additionally, if the report has been generated in Azure Storage, the link to it will be included, valid for the time configured in the variable AURA_KPI_AVRO_REPORTS_SAS_EXPIRATION.
    • If the report cannot be recorded, the error will appear in the file.
  • If the process terminates abruptly due to a timeout and the databricks manager kills the process:

    • A report will be generated, showing each dataset in its corresponding stage.
    • The stages of each dataset can be completed, as when it is run again, it will obtain the last report generated. From this one, it will identify the stage to continue.
    • If the process remained in the stage WRITING_DATASET_OK, the files from the last execution will be moved to the processed folder and deleted from the avro folder.
    • If the process remained in the stage READING_BLOBS or WRITING_DATASET, the files will be loaded together with the rest of the files that have been generated without making distinctions.
    • If the process remained in the stage MOVING_BLOBS_TO_PROCESSED, the files will be moved to the processed folder. If this second attempt fails again, the stage will be set to the value NOT_PROCESSED_PREVIOUS_ERRORS to indicate that it is not recoverable and that a manual review must be carried out in case there is a corrupt Avro file.
    • If the process remained in the stage REMOVING_BLOBS, the files will be deleting from the avro folder. If this second deletion attempt fails again, the stage will be set to the value NOT_PROCESSED_PREVIOUS_ERRORS to indicate that it is not recoverable and that a manual review must be carried out in case there is a corrupt Avro file.
    • If the process remained in the stage WRITING_DATASET_ERROR_NOT_RECOVERABLE, the files of the last execution and the possible ones that have been added since the last run will not be loaded, since there are unrecoverable errors that must be verified manually to be resolved. This involves writing datasets with malformed records or discarded records. So, for the dataset, the stage is recorded as NOT_PROCESSED_PREVIOUS_ERRORS to avoid loading this dataset.