Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- ELT IoT Pipeline: Extract/Load raw data, Transform in SQL
- This pipeline demonstrates the ELT pattern where raw data is loaded first,
- then transformation happens in the data warehouse using SQL.
- """
- from airflow import DAG
- from airflow.operators.python import PythonOperator
- from airflow.providers.postgres.operators.postgres import PostgresOperator
- from airflow.providers.postgres.hooks.postgres import PostgresHook
- from datetime import datetime, timedelta
- import pandas as pd
- import os
- # Configuration
- DATA_DIR = "/opt/airflow/data"
- POSTGRES_CONN_ID = "postgres_default"
- def load_sensors_to_staging(**context):
- """
- Load raw sensors data directly to staging table
- """
- print("π Loading sensors data to staging...")
- # Read CSV file
- sensors_path = os.path.join(DATA_DIR, "sensors.csv")
- sensors_df = pd.read_csv(sensors_path)
- print(f"π Loading {len(sensors_df)} sensor records")
- # Get PostgreSQL connection and load data
- postgres_hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
- engine = postgres_hook.get_sqlalchemy_engine()
- # Load raw data to staging table
- sensors_df.to_sql(
- 'raw_sensors',
- engine,
- if_exists='replace',
- index=False,
- method='multi'
- )
- print("β Sensors data loaded to raw_sensors staging table!")
- def load_readings_to_staging(**context):
- """
- Load raw readings data directly to staging table
- """
- print("π Loading readings data to staging...")
- # Read CSV file
- readings_path = os.path.join(DATA_DIR, "readings.csv")
- readings_df = pd.read_csv(readings_path)
- print(f"π Loading {len(readings_df)} reading records")
- # Get PostgreSQL connection and load data
- postgres_hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
- engine = postgres_hook.get_sqlalchemy_engine()
- # Load raw data to staging table
- readings_df.to_sql(
- 'raw_readings',
- engine,
- if_exists='replace',
- index=False,
- method='multi'
- )
- print("β Readings data loaded to raw_readings staging table!")
- def load_weather_to_staging(**context):
- print("π Loading weather data to staging...")
- weather_path = os.path.join(DATA_DIR, "weather.csv")
- weather_df = pd.read_csv(weather_path)
- # β Convert to datetime
- weather_df['date'] = pd.to_datetime(weather_df['date'])
- print(f"π Loading {len(weather_df)} weather records")
- postgres_hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
- engine = postgres_hook.get_sqlalchemy_engine()
- weather_df.to_sql(
- 'raw_weather',
- engine,
- if_exists='replace',
- index=False,
- method='multi'
- )
- print("β Weather data loaded to raw_weather staging table!")
- # Default arguments
- default_args = {
- 'owner': 'data-engineer',
- 'depends_on_past': False,
- 'start_date': datetime(2025, 9, 24),
- 'email_on_failure': False,
- 'email_on_retry': False,
- 'retries': 1,
- 'retry_delay': timedelta(minutes=5),
- }
- # Create DAG
- with DAG(
- dag_id='elt_iot_pipeline',
- default_args=default_args,
- description='IoT ELT Pipeline - Transform in Database',
- schedule_interval=None, # Manual trigger
- catchup=False,
- tags=['iot', 'elt', 'demo'],
- ) as dag:
- # Create staging tables
- create_staging_tables = PostgresOperator(
- task_id='create_staging_tables',
- postgres_conn_id=POSTGRES_CONN_ID,
- sql="""
- -- Drop existing staging tables
- DROP TABLE IF EXISTS raw_sensors CASCADE;
- DROP TABLE IF EXISTS raw_readings CASCADE;
- -- Create sensors staging table
- CREATE TABLE raw_sensors (
- sensor_id INTEGER,
- location VARCHAR(100)
- );
- -- Create readings staging table
- CREATE TABLE raw_readings (
- reading_id INTEGER,
- sensor_id INTEGER,
- temperature DECIMAL(5,2),
- timestamp TIMESTAMP
- );
- -- Create weather staging table
- CREATE TABLE IF NOT EXISTS raw_weather (
- date DATE,
- temperature FLOAT,
- humidity INTEGER,
- wind_speed FLOAT
- );
- """,
- )
- # Load tasks - Extract and Load in one step
- load_sensors_task = PythonOperator(
- task_id='load_sensors',
- python_callable=load_sensors_to_staging,
- provide_context=True,
- )
- load_readings_task = PythonOperator(
- task_id='load_readings',
- python_callable=load_readings_to_staging,
- provide_context=True,
- )
- load_weather_task = PythonOperator(
- task_id='load_weather',
- python_callable=load_weather_to_staging,
- provide_context=True,
- )
- # Transform in SQL - This is where ELT differs from ETL
- transform_in_sql = PostgresOperator(
- task_id='transform_in_sql',
- postgres_conn_id=POSTGRES_CONN_ID,
- sql="""
- -- Create partitioned summary table if not exists
- DO $$
- BEGIN
- IF NOT EXISTS (
- SELECT FROM pg_tables WHERE tablename = 'daily_sensor_summary_elt'
- ) THEN
- CREATE TABLE daily_sensor_summary_elt (
- location VARCHAR(100),
- date DATE,
- avg_temp DECIMAL(5,2),
- min_temp DECIMAL(5,2),
- max_temp DECIMAL(5,2),
- reading_count INTEGER,
- humidity INTEGER,
- wind_speed FLOAT,
- moving_avg_temp DECIMAL(5,2),
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- ) PARTITION BY RANGE (date);
- END IF;
- END $$;
- -- Create temporary table to hold new data
- DROP TABLE IF EXISTS daily_sensor_summary_elt_temp;
- CREATE TEMP TABLE daily_sensor_summary_elt_temp AS
- WITH base AS (
- SELECT
- s.location,
- DATE(r.timestamp) AS date,
- r.temperature,
- w.humidity,
- w.wind_speed
- FROM raw_readings r
- JOIN raw_sensors s ON r.sensor_id = s.sensor_id
- LEFT JOIN raw_weather w ON DATE(r.timestamp) = w.date
- WHERE DATE(r.timestamp) > (
- SELECT COALESCE(MAX(date), '1900-01-01') FROM daily_sensor_summary_elt
- )
- ),
- aggregated AS (
- SELECT
- location,
- date,
- ROUND(AVG(temperature)::numeric, 2) AS avg_temp,
- ROUND(MIN(temperature)::numeric, 2) AS min_temp,
- ROUND(MAX(temperature)::numeric, 2) AS max_temp,
- COUNT(*) AS reading_count,
- MAX(humidity) AS humidity,
- MAX(wind_speed) AS wind_speed
- FROM base
- GROUP BY location, date
- ),
- moving_avg AS (
- SELECT *,
- ROUND(AVG(avg_temp) OVER (
- PARTITION BY location ORDER BY date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
- )::numeric, 2) AS moving_avg_temp
- FROM aggregated
- ),
- validated AS (
- SELECT *
- FROM moving_avg
- WHERE avg_temp BETWEEN -10 AND 50 AND humidity BETWEEN 0 AND 100
- )
- SELECT * FROM validated;
- -- Insert new data into partitioned table
- INSERT INTO daily_sensor_summary_elt (
- location, date, avg_temp, min_temp, max_temp,
- reading_count, humidity, wind_speed, moving_avg_temp, created_at
- )
- SELECT
- location, date, avg_temp, min_temp, max_temp,
- reading_count, humidity, wind_speed, moving_avg_temp, CURRENT_TIMESTAMP
- FROM daily_sensor_summary_elt_temp;
- -- Add indexes for performance (only once)
- DO $$
- BEGIN
- IF NOT EXISTS (
- SELECT FROM pg_indexes WHERE tablename = 'daily_sensor_summary_elt' AND indexname = 'idx_daily_summary_elt_location'
- ) THEN
- CREATE INDEX idx_daily_summary_elt_location ON daily_sensor_summary_elt(location);
- END IF;
- IF NOT EXISTS (
- SELECT FROM pg_indexes WHERE tablename = 'daily_sensor_summary_elt' AND indexname = 'idx_daily_summary_elt_date'
- ) THEN
- CREATE INDEX idx_daily_summary_elt_date ON daily_sensor_summary_elt(date);
- END IF;
- END $$;
- """,
- )
- alert_extreme_temp = PostgresOperator(
- task_id='alert_extreme_temperature',
- postgres_conn_id=POSTGRES_CONN_ID,
- sql="""
- SELECT location, date, avg_temp
- FROM daily_sensor_summary_elt
- WHERE avg_temp > 40;
- """,
- )
- # Data quality checks
- quality_checks = PostgresOperator(
- task_id='data_quality_checks',
- postgres_conn_id=POSTGRES_CONN_ID,
- sql="""
- -- Check for data completeness
- DO $$
- DECLARE
- staging_sensors_count INTEGER;
- staging_readings_count INTEGER;
- summary_count INTEGER;
- BEGIN
- SELECT COUNT(*) INTO staging_sensors_count FROM raw_sensors;
- SELECT COUNT(*) INTO staging_readings_count FROM raw_readings;
- SELECT COUNT(*) INTO summary_count FROM daily_sensor_summary_elt;
- RAISE NOTICE 'Data Quality Report:';
- RAISE NOTICE '- Raw sensors: % records', staging_sensors_count;
- RAISE NOTICE '- Raw readings: % records', staging_readings_count;
- RAISE NOTICE '- Daily summaries: % records', summary_count;
- -- Basic validation
- IF staging_sensors_count = 0 THEN
- RAISE EXCEPTION 'No sensor data found in staging';
- END IF;
- IF staging_readings_count = 0 THEN
- RAISE EXCEPTION 'No readings data found in staging';
- END IF;
- IF summary_count = 0 THEN
- RAISE EXCEPTION 'No summary data generated';
- END IF;
- RAISE NOTICE 'All data quality checks passed!';
- END $$;
- """,
- )
- # Verify final results
- verify_results = PostgresOperator(
- task_id='verify_elt_results',
- postgres_conn_id=POSTGRES_CONN_ID,
- sql="""
- SELECT
- 'ELT Pipeline Results' as pipeline,
- COUNT(*) as total_records,
- COUNT(DISTINCT location) as locations,
- COUNT(DISTINCT date) as dates,
- ROUND(AVG(avg_temp)::numeric, 2) as overall_avg_temp,
- MIN(date) as earliest_date,
- MAX(date) as latest_date
- FROM daily_sensor_summary_elt;
- """,
- autocommit=True,
- )
- # Task dependencies
- create_staging_tables >> [load_sensors_task, load_readings_task, load_weather_task] >> transform_in_sql >> alert_extreme_temp >> quality_checks >> verify_results
Advertisement
Add Comment
Please, Sign In to add comment