Big Data

Enhance observability throughout Amazon MWAA duties

Enhance observability throughout Amazon MWAA duties
Written by admin


Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that makes it easy to arrange and function end-to-end information pipelines within the cloud at scale. A knowledge pipeline is a set of duties and processes used to automate the motion and transformation of knowledge between totally different techniques.­ The Apache Airflow open-source neighborhood gives over 1,000 pre-built operators (plugins that simplify connections to companies) for Apache Airflow to construct information pipelines. The Amazon supplier package deal for Apache Airflow comes with integrations for over 31 AWS companies, comparable to Amazon Easy Storage Service (Amazon S3), Amazon Redshift, Amazon EMR, AWS Glue, Amazon SageMaker, and extra.

The most typical use case for Airflow is ETL (extract, remodel, and cargo). Almost all Airflow customers implement ETL pipelines starting from easy to advanced. Operationalizing machine studying (ML) is one other rising use case, the place information needs to be remodeled and normalized earlier than it may be loaded into an ML mannequin. In each use instances, the information pipeline is getting ready the information for consumption by ingesting information from totally different sources and remodeling it by way of a sequence of steps.

Observability throughout the totally different processes inside the information pipeline is a key element to watch the success or failure of the pipeline. Though scheduling the runs of duties inside the information pipeline is managed by Airflow, the run of the duty itself (remodeling, normalizing, and aggregating information) is completed by totally different companies primarily based on the use case. Having an end-to-end view of the information movement is a problem because of a number of contact factors within the information pipeline.

On this put up, we offer an outline of logging enhancements when working with Amazon MWAA, which is without doubt one of the pillars of observability. We then talk about an answer to additional improve end-to-end observability by modifying the duty definitions that make up the information pipeline. For this put up, we concentrate on job definitions for 2 companies: AWS Glue and Amazon EMR­, nonetheless the identical technique will be utilized throughout totally different companies.

Problem

Many purchasers’ information pipelines begin easy, orchestrating a couple of duties, and over time develop to be extra advanced, consisting of a lot of duties and dependencies between them. Because the complexity will increase, it turns into more and more arduous to function and debug in case of failure, which creates a necessity for a single pane of glass to supply end-to-end information pipeline orchestration and well being administration. For information pipeline orchestration, the Apache Airflow UI is a user-friendly software that gives detailed views into your information pipeline. Relating to pipeline well being administration, every service that your duties are interacting with could possibly be storing or publishing logs to totally different places, comparable to an S3 bucket or Amazon CloudWatch logs. Because the variety of integration contact factors will increase, stitching the distributed logs generated by totally different companies in varied places will be difficult.

One answer offered by Amazon MWAA to consolidate the Airflow and job logs inside the directed acyclic graph (DAG) is to ahead the logs to CloudWatch log teams. A separate log group is created for every enabled Airflow logging possibility (For instance, DAGProcessing, Scheduler, Job, WebServer, and Employee). These logs will be queried throughout log teams utilizing CloudWatch Logs Insights.

A typical method in distributed tracing is to make use of a correlation ID to sew and question distributed logs. A correlation ID is a singular identifier that’s handed by way of a request movement for monitoring a sequence of actions all through the lifetime of the workflow. When every service within the workflow must log info, it may possibly embrace this correlation ID, thereby making certain you may monitor a full request from begin to end.

The Airflow engine passes a couple of variables by default which are accessible to all templates. run_id is one such variable, which is a singular identifier for a DAG run. The run_id can be utilized because the correlation ID to question towards totally different log teams inside CloudWatch to seize all of the logs for a specific DAG run.

Nonetheless, bear in mind that companies that your duties are interacting with will use a separate log group and gained’t log the run_id as a part of their output. It will forestall you from getting an end-to-end view throughout the DAG run.

For instance, in case your information pipeline consists of an AWS Glue job working a Spark job as a part of the pipeline, then the Airflow job logs shall be out there in a single CloudWatch log group and the AWS Glue job logs shall be in a distinct CloudWatch log group. Nonetheless, the Spark job that’s run as a part of the AWS Glue job doesn’t have entry to the correlation ID and may’t be tied again to a specific DAG run. So even should you use the correlation ID to question the totally different CloudWatch log teams, you gained’t get any details about the run of the Spark job.

Answer overview

As you now know, run_id is a variable that may be a distinctive identifier for a DAG run. The run_id is current as a part of the Airflow job logs. To make use of the run_id successfully and enhance the observability throughout the DAG run, we use run_id because the correlation ID and cross it to totally different duties with the DAG. The correlation ID is then be consumed by the scripts used inside the duties.

The next diagram illustrates the answer structure.

Architecture Diagram

The information pipeline that we concentrate on consists of the next parts:

  • An S3 bucket that accommodates the supply information
  • An AWS Glue crawler that creates the desk metadata within the Knowledge Catalog from the supply information
  • An AWS Glue job that transforms the uncooked information right into a processed information format whereas performing file format conversions
  • An EMR job that generates reporting datasets

For particulars on the structure and full steps on easy methods to run the DAG refer, to Amazon MWAA for Analytics Workshop.

Within the subsequent sections, we discover the next matters:

  • The DAG file, in an effort to perceive easy methods to outline after which cross the correlation ID within the AWS Glue and EMR duties
  • The code wanted within the Python scripts to output info primarily based on the correlation ID

Discuss with the GitHub repo for the detailed DAG definition and Spark scripts. To run the scripts, confer with the Amazon MWAA analytics workshop.

DAG definitions

On this part, we take a look at snippets of the additions wanted to the DAG file. We additionally talk about easy methods to cross the correlation ID to the AWS Glue and EMR jobs. Discuss with the GitHub repo for the entire DAG code.

The DAG file begins by defining the variables:

# Variables

correlation_id = “{{ run_id }}” 
dag_name = “data_pipeline” 
S3_BUCKET_NAME = “airflow_data_pipeline_bucket”

Subsequent, let’s take a look at easy methods to cross the correlation ID to the AWS Glue job utilizing the AWS Glue operator. Operators are the constructing blocks of Airflow DAGs. They comprise the logic of how information is processed within the information pipeline. Every job in a DAG is outlined by instantiating an operator.

Airflow gives operators for various duties. For this put up, we use the AWS Glue operator.

The AWS Glue job definition accommodates the next:

  • The Python Spark job script (raw_to_tranform.py) to run the job
  • The DAG title, job ID, and correlation ID, that are handed as arguments
  • The AWS Glue service position assigned, which has permissions to run the crawler and the roles

See the next code:

# Glue Job definition

glue_task = AwsGlueJobOperator(
    task_id=’glue_task’,
    job_name=’raw_to_transform’,
    iam_role_name=’AWSGlueServiceRoleDefault’,
    script_args={‘--dag_name’: dag_name,
                 ‘--task_id’: ‘glue_task’,
                 ‘--correlation_id’: correlation_id},
)

Subsequent, we cross the correlation ID to the EMR job utilizing the EMR operator. This consists of the next steps:

  1. Outline the configuration of an EMR cluster.
  2. Create the EMR cluster.
  3. Outline the steps to be run by the EMR job.
  4. Run the EMR job:
    1. We use the Python Spark job script aggregations.py.
    2. We cross the DAG title, job ID, and correlation ID as arguments to the steps for the EMR job.

Let’s begin with defining the configuration for the EMR cluster. The correlation_id is handed within the title of the cluster to simply establish the cluster comparable to a DAG run. The logs generated by EMR jobs are revealed to a S3 bucket; the correlation_id is a part of the LogUri as effectively. See the next code:

# Outline the EMR cluster configuration

emr_task_id=’create_emr_cluster’
JOB_FLOW_OVERRIDES = {
    "Identify": dag_name + "." + emr_task_id + "-" + correlation_id,
    "ReleaseLabel": "emr-5.29.0",
    "LogUri": "s3://{}/logs/emr/{}/{}/{}".format(S3_BUCKET_NAME, dag_name, emr_task_id, correlation_id),
    "Situations": {
      "InstanceGroups": [{
         "Name": "Master nodes",
         "Market": "ON_DEMAND",
         "InstanceRole": "MASTER",
         "InstanceType": "m5.xlarge",
         "InstanceCount": 1
       },{
         "Name": "Slave nodes",
         "Market": "ON_DEMAND",
         "InstanceRole": "CORE",
         "InstanceType": "m5.xlarge",
         "InstanceCount": 2
       }],
       "TerminationProtected": False,
       "KeepJobFlowAliveWhenNoSteps": True
}}

Now let’s outline the duty to create the EMR cluster primarily based on the configuration:

# Create the EMR cluster

cluster_creator = EmrCreateJobFlowOperator(
    task_id= emr_task_id,
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id=’aws_default’,
    emr_conn_id=’emr_default’,
    dag=dag
)

Subsequent, let’s outline the steps wanted to run as a part of the EMR job. The enter and output information processed by the EMR job is saved in an S3 bucket handed as arguments. Dag_name, task_id, and correlation_id are additionally handed in as arguments. The task_id used will be the title of your selection; right here we use add_steps:

# EMR steps to be executed by EMR cluster

SPARK_TEST_STEPS = [{
    'Name': 'Run Spark',
    'ActionOnFailure': 'CANCEL_AND_WAIT',
    'HadoopJarStep': {
        'Jar': 'command-runner.jar',
        'Args': ['spark-submit',
        '/home/hadoop/aggregations.py',
            's3://{}/data/transformed/green'.format(S3_BUCKET_NAME),
            's3://{}/data/aggregated/green'.format(S3_BUCKET_NAME),
             dag_name,
             'add_steps',
             correlation_id]
}]

Subsequent, let’s add a job to run the steps on the EMR cluster. The job_flow_id is the ID of the JobFlow, which is handed down from the EMR create job described earlier utilizing Airflow XComs. See the next code:

#Run the EMR job

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",      
    aws_conn_id='aws_default',
    steps=SPARK_TEST_STEPS,
)

This completes the steps wanted to cross the correlation ID inside the DAG job definition.

Within the subsequent part, we use this ID inside the script run to log particulars.

Job script definitions

On this part, we evaluation the modifications required to log info primarily based on the correlation_id. Let’s begin with the AWS Glue job script (for the entire code, confer with the next file in GitHub):

# Script modifications to file ‘raw_to_transform’

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','dag_name','task_id','correlation_id'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()
correlation_id = args['dag_name'] + "." + args['task_id'] + " " + args['correlation_id']
logger.information("Correlation ID from GLUE job: " + correlation_id)

Subsequent, we concentrate on the EMR job script (for the entire code, confer with the file in GitHub):

# Script modifications to file ‘nyc_aggregations’

from __future__ import print_function
import sys
from pyspark.sql import SparkSession
from pyspark.sql.capabilities import sum

if __name__ == "__main__":
    if len(sys.argv) != 6:
        print("""
        Utilization: nyc_aggregations.py <s3_input_path> <s3_output_path> <dag_name> <task_id> <correlation_id>
        """, file=sys.stderr)
        sys.exit(-1)
    input_path = sys.argv[1]
    output_path = sys.argv[2]
    dag_task_name = sys.argv[3] + "." + sys.argv[4]
    correlation_id = dag_task_name + " " + sys.argv[5]
    spark = SparkSession
        .builder
        .appName(correlation_id)
        .getOrCreate()
    sc = spark.sparkContext
    log4jLogger = sc._jvm.org.apache.log4j
    logger = log4jLogger.LogManager.getLogger(dag_task_name)
    logger.information("Spark session began: " + correlation_id)

This completes the steps for passing the correlation ID to the script run.

After we full the DAG definitions and script additions, we are able to run the DAG. Logs for a specific DAG run will be queried utilizing the correlation ID. The correlation ID for a DAG run will be discovered through the Airflow UI. An instance of a correlation ID is manual__2022-07-12T00:22:36.111190+00:00. With this distinctive string, we are able to run queries on the related CloudWatch log teams utilizing CloudWatch Logs Insights. The results of the question consists of the logging offered by the AWS Glue and EMR scripts, together with different logs related to the correlation ID.

Instance question for DAG degree logs : manual__2022-07-12T00:22:36.111190+00:00

We will additionally get hold of task-level logs by utilizing the format <dag_name.task_id correlation_id>:

Instance question : data_pipeline.glue_task manual__2022-07-12T00:22:36.111190+00:00

Clear up

When you created the setup to run and take a look at the scripts utilizing the Amazon MWAA analytics workshop, carry out the cleanup steps to keep away from incurring fees.

Conclusion

On this put up, we confirmed easy methods to ship Amazon MWAA logs to CloudWatch log teams. We then mentioned easy methods to tie in logs from totally different duties inside a DAG utilizing the distinctive correlation ID. The correlation ID will be outputted with as a lot or as little info wanted by your job to supply extra particulars throughout your whole DAG run. You’ll be able to then use CloudWatch Logs Insights to question the logs.

With this answer, you need to use Amazon MWAA as a single pane of glass for information pipeline orchestration and CloudWatch logs for information pipeline well being administration. The distinctive identifier improves the end-to-end observability for a DAG run and helps cut back the time wanted for troubleshooting.

To be taught extra and get hands-on expertise, begin with the Amazon MWAA analytics workshop after which use the scripts within the GitHub repo to achieve extra observability of your DAG run.


Concerning the Creator

Payal Singh is a Associate Options Architect at Amazon Net Companies, targeted on the Serverless platform. She is liable for serving to companion and clients modernize and migrate their functions to AWS.

About the author

admin

Leave a Comment