Blueprints

Extract multiple tables from Postgres using SQL queries and process those as Pandas dataframes on schedule

About this blueprint

Parallel Postgres Python SQL

This flow uses the Parallel task to run multiple tasks concurrently - the limit of how many tasks will run at the same time is defined using the concurrent property.

The flow extracts data from a Postgres database. That data is then passed to a Python task using inputFiles. The Python task reads the input files, and performs operations on the data using Pandas.

yaml
id: postgres_to_pandas_dataframes
namespace: company.team

variables:
  db_host: host.docker.internal

tasks:
  - id: get_tables
    type: io.kestra.plugin.core.flow.Parallel
    concurrent: 2
    tasks:
      - id: products
        type: io.kestra.plugin.jdbc.postgresql.CopyOut
        sql: SELECT * FROM products
      
      - id: orders
        type: io.kestra.plugin.jdbc.postgresql.CopyOut
        sql: SELECT * FROM orders
  
  - id: pandas
    type: io.kestra.plugin.scripts.python.Script
    warningOnStdErr: false
    inputFiles:
      products.csv: "{{ outputs.products.uri }}"
      orders.csv: "{{ outputs.orders.uri }}"
    outputFiles:
      - bestsellers_pandas.json
    taskRunner:
      type: io.kestra.plugin.scripts.runner.docker.Docker
    containerImage: ghcr.io/kestra-io/pydata:latest
    script: |
      import pandas as pd
      products = pd.read_csv("products.csv")
      orders = pd.read_csv("orders.csv")
      df = orders.merge(products, on="product_id", how="left")

      top = (
          df.groupby("product_name", as_index=False)["total"]
          .sum()
          .sort_values("total", ascending=False)
          .head(10)
      )
      
      top.to_json("bestsellers_pandas.json", orient="records")

pluginDefaults:
  - type: io.kestra.plugin.jdbc.postgresql.CopyOut
    values:
      url: jdbc:postgresql://{{ vars.db_host }}:5432/
      username: postgres
      password: "{{ secret('DB_PASSWORD') }}"
      format: CSV
      header: true
      delimiter: ","

triggers:
  - id: every_morning
    type: io.kestra.plugin.core.trigger.Schedule
    cron: 0 9 * * *

Parallel

Copy Out

Script

Schedule

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra