Blueprints
Extract multiple tables from Postgres using SQL queries and process those as Pandas dataframes on schedule
About this blueprint
Parallel Postgres Python SQL
This flow uses the Parallel
task to run multiple tasks concurrently - the limit of how many tasks will run at the same time is defined using the concurrent
property.
The flow extracts data from a Postgres database. That data is then passed to a Python task using inputFiles
. The Python task reads the input files, and performs operations on the data using Pandas.
yaml
id: postgres_to_pandas_dataframes
namespace: company.team
variables:
db_host: host.docker.internal
tasks:
- id: get_tables
type: io.kestra.plugin.core.flow.Parallel
concurrent: 2
tasks:
- id: products
type: io.kestra.plugin.jdbc.postgresql.CopyOut
sql: SELECT * FROM products
- id: orders
type: io.kestra.plugin.jdbc.postgresql.CopyOut
sql: SELECT * FROM orders
- id: pandas
type: io.kestra.plugin.scripts.python.Script
warningOnStdErr: false
inputFiles:
products.csv: "{{ outputs.products.uri }}"
orders.csv: "{{ outputs.orders.uri }}"
outputFiles:
- bestsellers_pandas.json
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/pydata:latest
script: |
import pandas as pd
products = pd.read_csv("products.csv")
orders = pd.read_csv("orders.csv")
df = orders.merge(products, on="product_id", how="left")
top = (
df.groupby("product_name", as_index=False)["total"]
.sum()
.sort_values("total", ascending=False)
.head(10)
)
top.to_json("bestsellers_pandas.json", orient="records")
pluginDefaults:
- type: io.kestra.plugin.jdbc.postgresql.CopyOut
values:
url: jdbc:postgresql://{{ vars.db_host }}:5432/
username: postgres
password: "{{ secret('DB_PASSWORD') }}"
format: CSV
header: true
delimiter: ","
triggers:
- id: every_morning
type: io.kestra.plugin.core.trigger.Schedule
cron: 0 9 * * *