Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Argo and Airflow DAG Examples

Directed Acyclic Graphs (DAGs) sit at the heart of workflow orchestration. We use them to tell our workflow tools how to process tasks. The graphs determine the order in which steps run, when they can be run in parallel, and when to end processing in the event of an error. While DAGs behave the same on different platforms, creating them is different depending on the workflow orchestrator you are using.

In this post, we'll look at both Airflow DAG examples and Argo DAG examples. Airflow and Argo are two of the more popular workflow orchestration tools, but they have very different user interfaces. In addition to using different methods for defining the graphs, they have different approaches to implementing the decisions inside a DAG. We're going to use Argo's YAML configuration files and Airflow's Python API to implement two example workflows.

This article assumes you are familiar with DAGs.

Text reading: While DAGs behave the same on different platforms, creating them is different depending on the workflow orchestrator you are using.

DAG 101

Let's start with a simple DAG that runs four steps, with the last one dependent on the previous two.

Diagram depicting a simple DAG

Argo DAG Example

This example is similar to the basic DAG example in Argo's documentation. It's an excellent example of a simple DAG workflow.

Here is the Argo YAML for the basic DAG workflow:

The workflow specification starts on line #5, with the spec configuration value.

Line #6 tells Argo where to begin the pipeline. Then, lines #8 - #14 define the workflow's single task. Argo's workflows support templates, reusable pieces of code similar to objects you create in each task definition.

The workflow is named "diamond," after the shape of its graphical representation. Each task has a name, template, and arguments. Three of them have dependencies.

  • Task "1" calls echo with its name.
  • Task "2" calls echo with its name. It has task "1" as its sole dependency.
  • Task "3" calls echo with its name. It has task "1" as its sole dependency.
  • Task "4" calls echo with its name. It has tasks "2" and "3" as its dependencies.

This configuration runs the tasks as depicted in the graph. Argo will execute task "1". When it finishes, it executes "2" and "3" in parallel. When both "2" and "3" are complete, it executes "4."

So, creating a DAG in Argo is straightforward:

  • Define your workflow as a dag
  • Add dependencies to the tasks that need them (omit them if they are not required)
  • Dependencies are lists

Airflow DAG Example

Airflow workflows are written in Python code. Here is an implementation of the same DAG for Airflow:

After importing the required modules, this code defines default arguments for the Pythion DAG object. Defining the arguments at the top of your code is considered a best practice since it makes it easier to find and change them and makes the code more readable.

The second argument, start_date, is required, even though we don't need it for the example. All Airflow DAGs require a schedule, even if you only plan on running your workflow manually. For "partly legacy," you must specify a start date that the Airflow scheduler will use to decide when your workflow can be started.

On line #11, the script creates a DAG using with, so everything inside the block belongs to the workflow. This makes the code compact and easy to read.

Airflow uses operators as reusable tasks, similar to Argo's templates. Airflow's BashOperator is the perfect operator for this example. Lines #16 - #31 create four jobs that call echo with the task name.

Finally, this workflow uses Airflow's chain operator to establish the dependencies between the four tasks. Airflow has several different operators for defining relationships. In this case, chain's syntax resembles the graph.

So creating a DAG in Airflow requires these steps:

  • Create a DAG object
  • Create your steps using operators
  • Place your steps in the DAG object's graph

DAG Branching

A DAG can run tasks in a specific order based on dependencies. It can also direct execution down a particular path or branch based on a result. Here’s what that looks like (below).

Diagram depicting DAG branching

This workflow uses the output of the start step to decide whether or not to run the success or fail step.

Argo Branching DAG Example

Let's start with the Argo workflow again.

Argo uses conditional artifacts and parameters to implement branching DAGs. You can also use these tools outside of DAGs. But by placing them in a DAG workflow spec, you have more control over how Argo runs the tasks.

This time there are two templates. It's reusing the echo template from the first example on line #17. The workflow defines a new one on line #8 that generates a random number between one and five.

The new template is an excellent example of how easy it is to run Python code inside an Argo workflow. By using a script block, the task can define a script inside its source field. Argo passes the code into the Python interpreter and executes it in the container. So, you don't have to create a new Docker container for every task: you can reuse containers like python:Alpine to run simple scripts.

So, the begin task on line #28 uses the random template to generate a number. Then, Argo executes the next two tasks based on the results of their when fields (these are defined on lines #32 and #38).

These fields use the expr syntax to evaluate the random number from begin. It's a similar syntax to what we used to pass parameters to echo, but since the script block in the random template exports the outputs, we don't have to define them in advance.

Expr uses scalar values, so Argo implicitly casts result to an integer and compares it to 3.

Airflow Branching DAG Example

Here is the Python code for the same workflow in Airflow.

With Argo, we didn't have to define a branching step. Its ability to conditionally execute tasks took care of the branching for us. With Airflow, we have to write the branching logic with a special operator.

Let's go over the code step-by-step instead of top-to-bottom.

The start task is a PythonOperator which can call any Python callable. Rather than calling random.randint directly, the code has a function on line #10 to better illustrate the idea of calling code from outside the step. Of course, this is Python, and PythonOperator can call any code in scope.

The decide task is a BranchPythonOperator. It's a subclass of PythonOperator that returns the names of the next task to execute in the graph. So on line #14, a function does exactly that. The returned tasks should be directly downstream from the branching task. Line #51 places the tasks in the graph, so that's true.

The branching function uses xcom_pull to extract the return value from start. Since the default behavior for PythonOperators is to make their context available to the next task, the return value is available without any extra work.

Finally, the code creates the dependency chain so the branching operation can find the next step to execute. For this workflow, we used the >> operator.

So like the Argo example, success or failure is executed based on the random number returned at the start of the workflow.

Text reading: DAGs are valuable frameworks for running workflows like MLOps and CI/CD pipelines.

Argo and Airflow DAGs

We've looked at Airflow DAG examples and Argo DAG examples that illustrate how to build a simple graph and a graph that branches based on a task result. You can use this code to put together a DAG for nearly every situation.

DAGs are valuable frameworks for running workflows like MLOps and CI/CD pipelines. They make it easy to arrange work in logical steps based on dependencies and outcomes. Create some graphs today!

Subscribe for Pipekit updates.

Get the latest articles on all things Pipekit & data orchestration delivered straight to your inbox.

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Follow Pipekit

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

More

Guides

Why teams use Argo Workflows to run cloud-native Spark jobs

6-min read
Guides

Airflow vs. Argo Workflows

5 min read
Guides

Kubeflow vs. Argo Workflows

5 min read