Categories:
Aura Databricks Jobs
aura-databricks-jobs is a component based on Databricks. Discover in the current section its technical description and main components.
Introduction
aura-databricks-jobs is a component based on Databricks for the optimization of data processing and the training of ML-based models.
Currently, its primary function is to import Avro-formatted files into Kernel datasets. For this purpose, we will see later that it is necessary to configure a run job in the Databricks environment. Find the method of the job in avro_to_dataset_job.py.
aura-to-dataset-job-cli is an executable script that imports Avro KPIs into the storage location indicated in the Kernel dataset destination config. It is configured in a Databricks cluster that is executed every day (although it is configurable in the job schedule). It is developed with Python and uses the Kernel Spark SDK to read the Avro files and write in Kernel datasets.
Detailed information regarding aura-databricks-jobs is found in the following documents:
. Architecture and main components
. How does aura-databricks-jobs work?
. aura-databricks-jobs configuration
. How to use aura-databricks-jobs?
. Environment variables
. Troubleshooting
Aura Databricks Jobs architecture
In the following diagram, the architecture of aura-to-dataset-job-cli is represented, including its main components, which are described in the following sections.
Avro to Dataset Job components
ConfigManager
ConfigManager is a handler for configuration that is gathered from input config_dict to fulfill the variables needed in the import process. It also validates the configuration. In any error case, the process is not executed.
AuraLogging
AuraLogging is a wrapper of LoggerWrapper class imported from aura-pytraces library. It used to register logs adding the required items such as version, app, stck etc.
The behavior of logs in the file logging.cfg is internally configurable, following the format established by the aura-pytraces library.
This configuration may be overwritten:
levelof handler config by environment variableAURA_LOGGING_LEVEL. By default,INFOvalue.formatterof handler config by environment variableAURA_LOGGING_FORMAT. By default,simplevalue.versionby environment variableAURA_VERSION. By default,not-reachablevalue.
Avro to Dataset Job
It is referred to the process that a cron-job executes in the Databricks.
It contains the logic to configure coroutines to import Avro files by type of dataset with asyncio library.
The result of each coroutine is a report. When all the coroutines are finished, the reports are processed, generating a single one with the information of all the import process and including Spark processing info.
Avro KPI importer
It contains the logic to import Avro-formatted files by type of dataset. If there are not Avro-formatted files of this type of dataset, this coroutine finishes.
The result of each routine is the report of the importation process of the specific type of dataset.
Azure Storage Manager
This module is used to download and upload files from and to Azure Storage.
Spark SDK Manager
This module is used to load data as a Dataframe from Azure Storage and write in dataset of Kernel Datalake.
Aura Databricks Job operation
The execution flowchart of avro-to-dataset-job-cli is shown in the following image:
avro-to-dataset-job-cli
It is responsible for importing the Avro-formatted files in Aura KPIs container (job’s variable: AURA_MICROSOFT_AZURE_STORAGE_KPIS_CONTAINER_NAME) to the correspondent dataset in Kernel.
The information necessary to import the Avro-formatted files with the same Avro schema to their corresponding dataset is obtained from the configuration file stored in the Azure KPIs container, specifically the file path configured in the job’s variable: AURA_KPI_AVRO_ADAPTER_CONFIG_PATH.
In addition, there is a file that will provide us with the average size of the files by type of dataset, specifically the file path configured in the job’s variable: AURA_KPI_AVRO_SOURCE_SIZE_REPORT_PATH. This information will be useful when writing in Kernel datasets with the Spark tool to correctly indicate how the data should be partitioned to improve performance.
From this file, we will obtain all the schemas that are imported. For this purpose, it is required that targetType is set with avro value in each item.
Below, it is defined the information that the job gathers for each Avro schema:
name: dataset_id used to import into Kernel. For example,D_Aura_Channel.schema: type of schema. For example,dimensionalorentity.versionSchema: Version of avroSchema. For example,6.0.0. The major version will be used in the Spark stage to write in Kernel dataset.avroSchema: name of the schema stored in the container within the folder configured in theAURA_KPI_AVRO_SCHEMAS_PATHvariable. The Avro schema necessary when reading the files in spark is obtained from the path configured in the job variable:AURA_KPI_AVRO_SCHEMAS_PATHand extra parameters:$AURA_KPI_AVRO_SCHEMAS_PATH/$schema/$versionSchema/$avroSchema. Example:schemas/dimensional/6.0.0/aura-channel-asvc.json.
Sample of Aura Avro adapter file:
[
{
"name": "D_Aura_Channel",
"schema": "dimensional",
"avroSchema": "aura-channel-asvc.json",
"versionSchema": "6.0.0",
"source": {
"data": "object",
"id": "CHANNEL"
},
"targetType": "avro",
"fields": {
"AURA_CHANNEL_ID": {
"sourceName": "id",
"targetType": "string"
},
"AURA_CHANNEL_NAME": {
"sourceName": "name",
"targetType": "string"
},
"AURA_CHANNEL_SHORT_NAME": {
"sourceName": "prefix",
"targetType": "string"
}
}
},
{
"name": "D_Aura_Recognizer",
"schema": "dimensional",
"avroSchema": "aura-recognizer-asvc.json",
"versionSchema": "6.0.0",
"source": {
"data": "object",
"id": "RECOGNIZER"
},
"targetType": "avro",
"fields": {
"AURA_RECOGNIZER_ID": {
"sourceName": "id",
"targetType": "string"
},
"AURA_RECOGNIZER_NAME": {
"sourceName": "name",
"targetType": "string"
},
"EXTRACTION_TM": {
"sourceName": "EXTRACTION_TM",
"targetType": "string",
"preCalculated": "DATE_ISO_8691"
}
}
}
]
The job will run the import process for each schema type, running in coroutines and using the asyncio library.
The following process is carried out for each type of schema:
-
Check if there are schemas configured not to be loaded. The job variable where this configuration is configured is:
AURA_KPI_AVRO_SCHEMAS_NOT_TO_UPLOAD. The format is a list formatted asschema_1:dataset_id_1,schema_1:dataset_id_2,schema_2;dataset_id_3. Example:dimensional:D_Aura_Channel,entity:E_Aura_GROOT. The number of files that have been skipped for that type are recorded in a report. -
Check if there are files of that type to import in its corresponding folder. The path where the Avro-formatted files are stored is:
AURA_KPI_AVRO_SOURCE_PATH. Within this path, the files are stored by their corresponding$schema/$dataset/$version. Exampledimensional/6.0.0/D_Aura_Channel. If there are no files, the coroutine ends up generating a report without uploaded files. -
If there are files, the reading will be carried out with Spark, indicating the Azure Blob where the files with the same Avro schema are located. Additionally, they will be written to its corresponding dataset of Kernel Datalake. This step is configured with locking using asyncio to prevent
asyncio.Lock()from protecting read and write operations on a DataFrame. -
Once the files are imported, the local copy is moved to a folder inside the container (job’s variables:
AURA_MICROSOFT_AZURE_STORAGE_KPIS_CONTAINER_NAME/AURA_KPI_AVRO_PROCESSED_FOLDER_PATH) and kept there during a fixed time, for recovering purposes.
All the details of the process are recorded in a report that is stored in the job variable:AURA_KPI_AVRO_REPORTS_DESTINATION_PATH/aura-avro-kpis-report-{iso-date}.json.
Depending on the configured report mode,AURA_KPI_AVRO_REPORTS_MODEwill be generated only when errors occur, always or never.
Independently of when it runs, avro-to-dataset-job-cli always performs the same process: it gets all the Avro-formatted files in KPIs container (job variable: AURA_MICROSOFT_AZURE_STORAGE_KPIS_CONTAINER_NAME) from the last upload executed by the aura-kpis-uploader component.
When running independently on the Databricks cluster, Prometheus alerts cannot be configured. Therefore, the process information will be obtained from the report generated along with the following generated files:
-
If the process has ended with errors:
- A file with the name set in the variable
AURA_KPI_AVRO_PROCESS_ERROR_FILENAMEwill be generated containing the execution date. - Additionally, if the report has been generated in Azure Storage, the link to it will be included, valid for the time configured in the variable
AURA_KPI_AVRO_REPORTS_SAS_EXPIRATION. - If the report cannot be recorded, the error will appear in the file.
- A file with the name set in the variable
-
If the process terminates abruptly due to a timeout and the databricks manager kills the process:
- A report will be generated, showing each dataset in its corresponding stage.
- The stages of each dataset can be completed, as when it is run again, it will obtain the last report generated. From this one, it will identify the stage to continue.
- If the process remained in the stage
WRITING_DATASET_OK, the files from the last execution will be moved to the processed folder and deleted from theavrofolder. - If the process remained in the stage
READING_BLOBSorWRITING_DATASET, the files will be loaded together with the rest of the files that have been generated without making distinctions. - If the process remained in the stage
MOVING_BLOBS_TO_PROCESSED, the files will be moved to the processed folder. If this second attempt fails again, the stage will be set to the valueNOT_PROCESSED_PREVIOUS_ERRORSto indicate that it is not recoverable and that a manual review must be carried out in case there is a corrupt Avro file. - If the process remained in the stage
REMOVING_BLOBS, the files will be deleting from theavrofolder. If this second deletion attempt fails again, the stage will be set to the valueNOT_PROCESSED_PREVIOUS_ERRORSto indicate that it is not recoverable and that a manual review must be carried out in case there is a corrupt Avro file. - If the process remained in the stage
WRITING_DATASET_ERROR_NOT_RECOVERABLE, the files of the last execution and the possible ones that have been added since the last run will not be loaded, since there are unrecoverable errors that must be verified manually to be resolved. This involves writing datasets with malformed records or discarded records. So, for the dataset, the stage is recorded asNOT_PROCESSED_PREVIOUS_ERRORSto avoid loading this dataset.