Module 2: From Bash Scripts to Actual Orchestration
This post documents my experience with Module 2 of the Data Engineering Zoomcamp by DataTalks.Club — a free 9-week course covering the modern data engineering stack end-to-end. The module covers workflow orchestration with Kestra, building ETL pipelines for NYC taxi data locally with PostgreSQL, and deploying the same pipelines to Google Cloud Platform using GCS and BigQuery.
Stack: Kestra (latest), PostgreSQL 15, Docker Compose, Python 3.11, Google Cloud Storage, BigQuery. Skills Gained: Declarative workflow orchestration in YAML, ETL pipeline design with staging/final table patterns, deduplication strategies, scheduled and backfill-capable pipelines, cloud data lake and warehouse integration.
Last Week I Used Cron. This Week I Feel Bad About It.
After Module 1 I felt reasonably competent. I could spin up PostgreSQL in Docker, ingest a million rows without killing my RAM, and provision GCP infrastructure with three Terraform commands. Solid foundation.
Module 2 started by pointing out that everything I’d been doing to run pipelines — cron jobs, bash scripts, “just run the Python file manually” — is held together with tape. And the tape gets worse the more pipelines you add.
The module introduces Kestra as the fix. I was skeptical at first. Another tool to learn. Another layer of abstraction between me and the actual data. But by the end of the week I understood why orchestration is listed as module two, not module seven. It’s not an add-on. It’s the thing everything else sits on top of.
What Orchestration Actually Means
Before this module, I had a mental model of orchestration as “a fancier cron.” Schedule a script, run it at 3am, check the logs in the morning. Job done.
That model breaks the moment you have more than a handful of pipelines. Here’s a scenario that happened to me literally last month: a data pull script failed silently at 2am because an upstream API returned a 429. No retry logic. No alert. I found out the next day when someone asked why the dashboard was showing yesterday’s numbers.
With an orchestrator, that’s a solved problem. Kestra handles retries, logs every execution, alerts on failure, and keeps a full history of what ran, when, and with what inputs. It’s not complexity for complexity’s sake — it’s the same reason you use version control instead of saving files as final_v2_REAL_final.py.
The module uses an analogy I liked: an orchestra. Individual musicians are skilled, but without a conductor telling each section when to come in, you get noise instead of music. An orchestrator is the conductor for your data pipelines.
Kestra: YAML as a First-Class Citizen
Kestra’s design decision is to define workflows in YAML rather than Python. Coming from Airflow’s world (DAGs as Python classes with decorators), this feels almost too simple. A flow looks like this:
id: my_first_pipeline
namespace: zoomcamp
tasks:
- id: extract
type: io.kestra.plugin.core.http.Download
uri: https://api.example.com/data
- id: transform
type: io.kestra.plugin.scripts.python.Script
containerImage: python:3.11-alpine
script: |
import json
...That’s a real, working ETL pipeline. Extract from an HTTP endpoint, transform with a Python script running in an isolated container, all orchestrated through YAML.
The key insight is that tasks run in containers. Kestra doesn’t care what language you use — Python, R, Bash, Node.js, whatever — as long as there’s a Docker image for it. You don’t install anything on the host. You don’t manage virtual environments. The image is the environment.
This matters a lot for reproducibility. The same YAML file runs identically on my laptop and on a cloud VM. That’s the Infrastructure as Code principle applied to data pipelines, not just to servers.
The Anatomy of a Flow
The module spends time on flow structure, and it’s worth understanding properly because every concept builds on it.
{% highlight yaml %} id: taxi_pipeline # unique within namespace namespace: zoomcamp.prod # like a package name
inputs:
- id: taxi type: SELECT values: [yellow, green] defaults: yellow
variables: file: “{{inputs.taxi}}tripdata{{inputs.year}}.csv” table: “{{inputs.taxi}}_tripdata”
tasks:
- id: extract
type: io.kestra.plugin.scripts.shell.Commands
commands:
- wget -qO- https://example.com/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}} outputFiles:
- “*.csv”
- id: load type: io.kestra.plugin.jdbc.postgresql.CopyIn from: “{{outputs.extract.outputFiles[render(vars.file)]}}” table: “{{render(vars.table)}}”
errors:
- id: notify type: io.kestra.plugin.core.log.Log message: “Pipeline failed: {{error.message}}”
triggers:
- id: schedule type: io.kestra.plugin.core.trigger.Schedule cron: “0 9 1 * *”
pluginDefaults:
- type: io.kestra.plugin.jdbc.postgresql values: url: jdbc:postgresql://postgres:5432/ny_taxi username: postgres password: “{{secret(‘PG_PASSWORD’)}}” {% endhighlight %}
A few things worth pointing out:
pluginDefaults is the one I found most useful immediately. Instead of repeating the database connection string in every task, you define it once and it applies to all tasks of that plugin type. When you have a dozen SQL tasks in a flow, this is the difference between clean YAML and a maintenance nightmare.
The render() function is a gotcha that gets everyone at least once. If a variable contains a Pebble expression — like file: "{{inputs.taxi}}_data.csv" — you can’t just write {{vars.file}} in a task. You have to write {{render(vars.file)}}. Without render(), Kestra returns the literal string with the unresolved expression inside it. This produces no error, just wrong filenames that don’t match anything.
Outputs flow between tasks automatically. The file that extract downloads is referenced in load as {{outputs.extract.outputFiles[...]}}. Kestra manages the internal storage; you just reference the output by task ID.
The NYC Taxi ETL: Staging, Merging, Not Losing Your Mind
The actual project in Module 2 is building an ETL pipeline for NYC taxi data — yellow and green cabs, 2019–2020, millions of rows. Same dataset as Module 1, but now orchestrated properly.
The pipeline architecture has three phases:
- Extract: download and decompress the monthly CSV from GitHub Releases
- Load to staging: create a temporary table and copy the data into it
- Merge to final: move data from staging to the permanent table, with deduplication
The staging step is what separates production data engineering from “just load the CSV.” If you run the pipeline twice for the same month (which happens during testing, during backfills, during incident recovery), you want exactly one copy of each record in the final table. The staging table plus a MERGE operation gives you that guarantee.
INSERT INTO
SELECT * FROM
WHERE unique_row_id NOT IN (
SELECT unique_row_id FROM
);
DROP TABLE IF EXISTS ;The unique_row_id column is generated from an MD5 hash of the fields that together uniquely identify a trip. It’s added during the staging load. No matter how many times you run the pipeline for January 2020, you end up with exactly the records that belong there.
I’ll be using this same pattern for EnergyLens. Each hourly electricity price record is uniquely identified by country + datetime + price type. The staging/merge pattern means I can reprocess historical data without worrying about duplicates piling up in BigQuery.
One Flow, Two Taxi Types
The module’s elegant move is building a single parametric flow that handles both yellow and green taxis. You don’t duplicate the pipeline — you pass taxi: yellow or taxi: green as an input.
But yellow and green taxi CSVs have slightly different schemas (different column names for pickup and dropoff time). So you need to create different tables for each. The solution is runIf:
{% highlight yaml %}
-
id: create_yellow_table runIf: “{{inputs.taxi == ‘yellow’}}” type: io.kestra.plugin.jdbc.postgresql.Queries sql: | CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} ( tpep_pickup_datetime TEXT, tpep_dropoff_datetime TEXT, – yellow-specific columns );
-
id: create_green_table runIf: “{{inputs.taxi == ‘green’}}” type: io.kestra.plugin.jdbc.postgresql.Queries sql: | CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} ( lpep_pickup_datetime TEXT, lpep_dropoff_datetime TEXT, – green-specific columns ); {% endhighlight %}
One flow, conditional branches, no duplication. This is the kind of thing that looks obvious written down but requires someone to explicitly point it out when you’re building your first real pipeline.
Scheduling and Backfilling: The Part That Actually Matters
Running a pipeline manually is fine for development. In production, pipelines need to run themselves. And at some point, you need to load data you missed — either because the pipeline didn’t exist yet, or because something broke.
Kestra’s scheduled trigger passes the execution date into the flow automatically:
{% highlight yaml %} triggers:
- id: monthly type: io.kestra.plugin.core.trigger.Schedule cron: “0 9 1 * *” inputs: taxi: yellow year: “{{trigger.date | date(‘yyyy’)}}” month: “{{trigger.date | date(‘M’)}}” {% endhighlight %}
The flow knows it’s running for March 2020 because the trigger date says March 1 2020. No hardcoded values. No manual intervention.
Backfilling is where this design really pays off. Say you want to load two years of historical data — 24 months. With a cron job, that’s 24 manual runs. With Kestra, you click “Backfill” in the UI, set a start and end date, and Kestra runs the flow for each scheduled interval in that range automatically.
One thing the module is clear about: add concurrency: limit: 1 before backfilling a large range. Without it, Kestra launches all 24 runs simultaneously. Your local PostgreSQL will not enjoy this.
concurrency:
limit: 1One line. Add it before you trigger the backfill, remove it (or raise it) after. Cheap insurance.
Moving to the Cloud: GCS + BigQuery
The second half of the module migrates the same pipeline to GCP. Local PostgreSQL becomes Google Cloud Storage as the data lake and BigQuery as the data warehouse. The pipeline logic stays almost identical — the destination just changes.
The architecture adds one step: upload to GCS before loading to BigQuery.
wget + gunzip → Upload to GCS → Create External Table in BQ → Load to Staging → Merge to FinalThe BigQuery external table is an interesting pattern. It’s a table definition that points to a file in GCS — BigQuery can query it directly without copying the data. You use it as the source for the staging load, then discard it. It costs nothing to create and saves you from having to figure out BigQuery’s bulk load API from scratch.
One thing that changes significantly is how you pass configuration. In the local setup, database credentials went into pluginDefaults. In the GCP setup, you have GCP project IDs, bucket names, dataset names, service account credentials — all of which you don’t want hardcoded in YAML files that live in a Git repository.
Kestra’s answer is two-tiered:
- KV Store for non-sensitive config: project ID, bucket name, region. Accessible as
{{kv('GCP_PROJECT_ID')}}. Change it once, all flows pick it up. - Secrets for credentials: the service account JSON. Accessible as
{{secret('GCP_SERVICE_ACCOUNT')}}. Encrypted at rest, never visible in logs.
{% highlight yaml %} pluginDefaults:
- type: io.kestra.plugin.gcp values: serviceAccount: “{{secret(‘GCP_SERVICE_ACCOUNT’)}}” projectId: “{{kv(‘GCP_PROJECT_ID’)}}” location: “{{kv(‘GCP_LOCATION’)}}” bucket: “{{kv(‘GCP_BUCKET_NAME’)}}” {% endhighlight %}
Same pluginDefaults pattern as before — but now it applies to every GCP plugin in the flow. Define it once, every GCS upload, every BigQuery query, every dataset operation uses it automatically.
Things That Will Bite You
A few issues that cost me time and that I didn’t see mentioned prominently in the course material:
Port conflict between Kestra and pgAdmin. Both default to port 8080. If you run them together without changing pgAdmin’s port, one of them silently doesn’t start. Change pgAdmin to 8090 in the Compose file:
pgadmin:
ports:
- "8090:80" # not 8080:80host.docker.internal on Linux. On Mac and Windows, this hostname resolves to the host machine’s IP automatically. On Linux it doesn’t exist by default. If you’re connecting from Kestra to a locally-running PostgreSQL, you’ll get “Connection Refused” and spend twenty minutes ruling out every other possible cause first. Fix: use the PostgreSQL service name from docker-compose (postgres) instead of localhost or host.docker.internal.
The render() function. Already mentioned above but worth repeating: any variable that contains a Pebble expression must be referenced with {{render(vars.variable_name)}}, not {{vars.variable_name}}. The error it produces — a filename that looks exactly right except it contains the literal string {{inputs.taxi}} — is the kind of thing that makes you question your reading comprehension for a while.
Service account JSON format. When adding the GCP service account to Kestra secrets, paste the minified single-line JSON, not the formatted multi-line version. Kestra parses it correctly either way technically, but some versions have issues with line breaks in secret values.