How to Automate Python Workflows in Prefect (Step-by-Step Guide)

jim
9 min readOct 21, 2020

--

Originally published at https://lejimmy.com on October 21, 2020.

Earlier this year I spun up the website Courthouse Leads to look for niche real estate investment opportunities in Houston. Though I knew how to code, run, and monitor scripts manually, I could never could wrap my head around how to automate python workflows.

Traditionally, if you were looking to source your own off market deals, you would buy marketing lists from data vendors like ListSource, PropStream, CoreLogic, etc.

Though these companies cast a wide net, I felt like they were missing some of the nuances in Houston.

For example, in Harris County, different foreclosure types are all recorded through a separate process. The majority of preforeclosure notices are filed with the County Clerk (I created a step-by-step guide on how to do this here.)

And if you wanted to uncover the remaining foreclosure types, you would have to dive into the District Clerk’s website and actually read the court filings.

Meaning, large data vendors were missing these opportunities.

With my novice Python skills in web scraping, natural language processing, and machine learning, I was confident I could build data pipelines that accounted for some of these nuances.

Ultimately, it would give me access to investment opportunities that weren’t widely available.

Since I needed to build a pipeline for each individual opportunity type (non-judicial, judicial, reverse mortgages, home equity loans, etc.), I knew I needed an easy way to manage all of my scripts.

Automate Python Workflows

In the tech industry, Apache Airflow is probably the most popular workflow automation tool. To be honest, there was quite a steep learning curve for me to pick up Airflow.

Thankfully, in the midst of my struggles, I discovered a library that would make the entire process ridiculously simple.

Enter, Prefect.

Prefect is Python native library used to orchestrate, monitor, and automate data workflows. It’s incredibly straight forward to implement. At a glance, I’ll be able to tell which scripts succeeded, which failed, and where I need to direct my attention for the day.

I wasn’t the only one who thought Prefect was amazing. In about a year, they’ve shot up to over 5k stars on GitHub!

Tutorial

In the following tutorial, I’ll demo a script that is tangentially related to real estate investing: generating a list of every broker and realtor in the state of Texas.

The rest of the tutorial will focus more on implementing Prefect and not the script itself.

If you’re interested in the source code, you can view it on my GitHub here.

Installation

The easiest way to install prefect is through the Python package installer. Open a terminal and enter:

pip install prefect

We’ll be using pandas to clean our data and sqlalchemy to store our cleaned data.

pip install pandas pip install sqlalchemy

Next, we’ll write our script using Prefect’s Core Engine.

Defining tasks

We’ll create a pseudo code outline of our functions with the @task decorator so Prefect understands it will be a part of our workflow.

from prefect import task @task def extract(): 
# download and return a list of all texas realtors
...
return all_data
@task def transform()
# filter list to houston realtors
...
return filtered_data
@task def load():
# store data into database
...
pass

In our extract step, we’ll download the broker and realtor data from the Texas Real Estate Commission (TREC) website.

Afterwards, we filter only for those active in the Greater Houston area.

Finally, we’ll store the filtered data in a database.

Defining a flow

Now that we have our tasks defined, we will use a context manager to specify how data should flow between the tasks.

from prefect import Flow
from prefect.schedules import IntervalSchedule

# schedule to run every 12 hours
schedule = IntervalSchedule(
start_date=datetime.utcnow() + timedelta(seconds=1),
interval=timedelta(hours=12))

# define Prefect flow
with Flow("TREC-Realtors", schedule=schedule) as flow:
realtor_data = extract()
houston_realtor_data = transform(realtor_data)
load_to_database = load(houston_realtor_data)

With the flow defined, Prefect understands what order to extract the data, clean it, and finally store it.

Additionally, we define an IntervalSchedule to update once every 12 hours.

Handling failures

Since our extraction step is connecting to the internet, there can be moments where our script fails. There could be a power outage, our internet might go down, the site may be under maintenance, etc.

To account for these potential failures, we’ll add arguments to our previous outline to tell Prefect to retry our tasks in the event of a failure.

from datetime import timedelta

@task
(max_retries=3, retry_delay=timedelta(minutes=1))
def extract():
# download and return a list of all texas realtors
...
return all_data

@task
def transform()
# filter list to houston realtors
...
return filtered_data

@task
(max_retries=3, retry_delay=timedelta(minutes=1))
def load():
# store data into database
...
pass

With these arguments, Prefect will wait 1 minute before retrying the failed tasks up to 3 times.

Test run

To test our workflow, we run the following command:

flow.run()

In a terminal, we’ll run our python script. If we did everything correctly, we should see the following output:

[2020-10-20 14:40:48] INFO - prefect.FlowRunner | Beginning Flow run for 'TREC-Realtors'
[2020-10-20 14:40:48] INFO - prefect.TaskRunner | Task 'extract': Starting task run...
[2020-10-20 14:41:04] INFO - prefect.TaskRunner | Task 'extract': finished task run for task with final state: 'Success'
[2020-10-20 14:41:05] INFO - prefect.TaskRunner | Task 'transform': Starting task run...
[2020-10-20 14:41:05] INFO - prefect.TaskRunner | Task 'transform': finished task run for task with final state: 'Success'
[2020-10-20 14:41:05] INFO - prefect.TaskRunner | Task 'load': Starting task run...
[2020-10-20 14:41:05] INFO - prefect.TaskRunner | Task 'load': finished task run for task with final state: 'Success'
[2020-10-20 14:41:05] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-10-20 14:41:05] INFO - prefect.TREC-Realtors | Waiting for next scheduled run at 2020-10-21T02:40:48.757797+00:00

Once we’ve made sure everything is working locally, we’ll deploy our flow to Prefect Cloud to keep our scripts running 24/7.

Next Steps

Up until now, we’ve defined what the work is and how it should be executed.

You can imagine the Prefect Cloud Server as a manager who sends you reports on how things are going and makes sure the work is getting assigned and completed.

Sign Up for Prefect Cloud

To get started, we’ll sign up for a free account at https://www.prefect.io/get-prefect.

Once we sign up we’ll be greeted with the following empty dashboard.

Creating a Personal Access Token

In the menu, we’ll navigate to the Personal Access Tokens to create a token. This will allow us to connect our local computer with Prefect’s Cloud service.

I’ll give my new token a quick name.

Make sure to copy your token as it won’t be shown again.

Back in our terminal, we’ll let Prefect know we’re using their cloud service and to login using our Personal Access token.

(venv) PS > prefect backend cloud
Backend switched to cloud
(venv) PS > prefect auth login -t vPv4ThLuswV6BT6IUfBMUw
Login successful!

Create a project

To keep flows organized, Prefect categorizes workflows into projects.

From the dashboard, near the top right, select All Projects, then New Project.

Give the project a name to register and organize our flow under later.

Create an agent

Finally, we need to create an agent to actually do the work.

The agent will request work from the Prefect Cloud API. If there are any flows that are scheduled, the agent will execute it.

To continue our analogy, imagine the agent as an employee that is constantly asking the manager, “Is there anything I should be doing right now?”

We’ll head over to the Team settings, then to API Tokens, then click on Create Token. We’ll give the agent a name and select RUNNER as it’s scope.

Similarly, copy this token as it won’t be shown again.

Now, we’ll modify our Python flow from earlier to register our flow underneath the project we made earlier, and to run our agent.

from prefect import task, Flow
from prefect.schedules import IntervalSchedule

# schedule to run every 12 hours
schedule = IntervalSchedule(
start_date=datetime.utcnow() + timedelta(seconds=1),
interval=timedelta(hours=12))

# define Prefect flow
with Flow("TREC-Realtors", schedule=schedule) as flow:
realtor_data = extract()
houston_realtor_data = transform(realtor_data)
load_to_database = load(houston_realtor_data)

# register flow with Prefect Cloud
flow.register(project_name="trec-high-value-data-sets")

# start the agent
flow.run_agent(token="TDTm8O1dJK_ur5w7rkK_lw")

If everything worked properly you should see similar output to the following:

(venv) PS > python scrape.py
Result check: OK
Flow URL: https://cloud.prefect.io/strobila-o0i-es-s-account/flow/9d3c81db-f0d8-4d31-99af-91e352ae4f9a
└── ID: b5d9f8df-74ee-4d30-a4e9-514e0b4dcc07
└── Project: trec-high-value-data-sets
└── Labels: ['DESKTOP-15TEOQB']

____ __ _ _ _
| _ \ _ __ ___ / _| ___ ___| |_ / \ __ _ ___ _ __ | |_
| |_) | '__/ _ \ |_ / _ \/ __| __| / _ \ / _` |/ _ \ '_ \| __|
| __/| | | __/ _| __/ (__| |_ / ___ \ (_| | __/ | | | |_
|_| |_| \___|_| \___|\___|\__| /_/ \_\__, |\___|_| |_|\__|
|___/

[2020-10-20 22:51:18,723] INFO - agent | Starting LocalAgent with labels ['DESKTOP-15TEOQB', 'azure-flow-storage', 'gcs-flow-storage', 's3-flow-storage', 'github-flow-storage', 'webhook-flow-storage', 'gitlab-flow-storage']
[2020-10-20 22:51:18,723] INFO - agent | Agent documentation can be found at https://docs.prefect.io/orchestration/
[2020-10-20 22:51:18,724] INFO - agent | Agent connecting to the Prefect API at https://api.prefect.io
[2020-10-20 22:51:18,821] INFO - agent | Waiting for flow runs...
[2020-10-20 22:51:19,861] INFO - agent | Found 1 flow run(s) to submit for execution.
[2020-10-20 22:51:20,007] INFO - agent | Deploying flow run 9e46cb7d-e586-4334-befd-fb6852cbac60

This terminal window should stay open on your machine to execute any upcoming runs.

Back in our Prefect dashboard, we’re greeted with the following:

At a glance, you can see that our flow succeeded, we have 10 upcoming runs scheduled, and we have one agent that is currently polling for work.

Congrats! You’ve successfully created a Prefect Cloud account, created a flow, and have an agent persistently executing your work!

Troubleshooting

Jeremiah Lowin, the founder of Prefect, elegantly states that “workflows are only interesting when things fail; they’re kind of like insurance”.

To demo this, here is the same flow that has been running for a few days. You can see the screenshot of my dashboard below.

I woke up one morning to a couple of failed flows shown in red. The dashboard allows you to zoom into the failed runs and check what went wrong.

In my particular case, I had a sqlite3.DatabaseError. After some quick Google searches, I was able to make a hotfix.

I rescheduled the flows for a quick run and everything has been dandy since.

Conclusion

There you have it! A simple implementation of Prefect Cloud with an agent running on your local machine. To build on this, you can add as many flows and agents to automate your particular Python workflows.

The team at Prefect are shipping new features quickly. I encourage you to check out the Prefect Docs in the resources below to explore everything they have to offer and if it might be worthwhile implementing for your project.

When you signed up for their account, you should have also received an invitation to their Slack channel, come hang out and see what the community is building.

If you have any questions, feel free to connect with me on Twitter. My DMs are open.

Resources

  1. GitHub Repo: https://github.com/lejimmy/trec-high-value-data-sets/blob/master/scrape.py
  2. Prefect’s Getting Started Guide: docs.prefect.io/

--

--

No responses yet