dayintararas

Modifikasi ELT Pipeline

Oct 12th, 2025
68
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 11.15 KB | None | 0 0
  1. """
  2. ELT IoT Pipeline: Extract/Load raw data, Transform in SQL
  3. This pipeline demonstrates the ELT pattern where raw data is loaded first,
  4. then transformation happens in the data warehouse using SQL.
  5. """
  6. from airflow import DAG
  7. from airflow.operators.python import PythonOperator
  8. from airflow.providers.postgres.operators.postgres import PostgresOperator
  9. from airflow.providers.postgres.hooks.postgres import PostgresHook
  10. from datetime import datetime, timedelta
  11. import pandas as pd
  12. import os
  13.  
  14. # Configuration
  15. DATA_DIR = "/opt/airflow/data"
  16. POSTGRES_CONN_ID = "postgres_default"
  17.  
  18. def load_sensors_to_staging(**context):
  19.     """
  20.    Load raw sensors data directly to staging table
  21.    """
  22.     print("πŸ”„ Loading sensors data to staging...")
  23.    
  24.     # Read CSV file
  25.     sensors_path = os.path.join(DATA_DIR, "sensors.csv")
  26.     sensors_df = pd.read_csv(sensors_path)
  27.    
  28.     print(f"πŸ“Š Loading {len(sensors_df)} sensor records")
  29.    
  30.     # Get PostgreSQL connection and load data
  31.     postgres_hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
  32.     engine = postgres_hook.get_sqlalchemy_engine()
  33.    
  34.     # Load raw data to staging table
  35.     sensors_df.to_sql(
  36.         'raw_sensors',
  37.         engine,
  38.         if_exists='replace',
  39.         index=False,
  40.         method='multi'
  41.     )
  42.    
  43.     print("βœ… Sensors data loaded to raw_sensors staging table!")
  44.  
  45. def load_readings_to_staging(**context):
  46.     """
  47.    Load raw readings data directly to staging table
  48.    """
  49.     print("πŸ”„ Loading readings data to staging...")
  50.    
  51.     # Read CSV file
  52.     readings_path = os.path.join(DATA_DIR, "readings.csv")
  53.     readings_df = pd.read_csv(readings_path)
  54.    
  55.     print(f"πŸ“Š Loading {len(readings_df)} reading records")
  56.    
  57.     # Get PostgreSQL connection and load data
  58.     postgres_hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
  59.     engine = postgres_hook.get_sqlalchemy_engine()
  60.    
  61.     # Load raw data to staging table
  62.     readings_df.to_sql(
  63.         'raw_readings',
  64.         engine,
  65.         if_exists='replace',
  66.         index=False,
  67.         method='multi'
  68.     )
  69.    
  70.     print("βœ… Readings data loaded to raw_readings staging table!")
  71.  
  72. def load_weather_to_staging(**context):
  73.     print("πŸ”„ Loading weather data to staging...")
  74.     weather_path = os.path.join(DATA_DIR, "weather.csv")
  75.     weather_df = pd.read_csv(weather_path)
  76.    
  77.     # βœ… Convert to datetime
  78.     weather_df['date'] = pd.to_datetime(weather_df['date'])
  79.    
  80.     print(f"πŸ“Š Loading {len(weather_df)} weather records")
  81.  
  82.     postgres_hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
  83.     engine = postgres_hook.get_sqlalchemy_engine()
  84.  
  85.     weather_df.to_sql(
  86.         'raw_weather',
  87.         engine,
  88.         if_exists='replace',
  89.         index=False,
  90.         method='multi'
  91.     )
  92.     print("βœ… Weather data loaded to raw_weather staging table!")
  93.  
  94. # Default arguments
  95. default_args = {
  96.     'owner': 'data-engineer',
  97.     'depends_on_past': False,
  98.     'start_date': datetime(2025, 9, 24),
  99.     'email_on_failure': False,
  100.     'email_on_retry': False,
  101.     'retries': 1,
  102.     'retry_delay': timedelta(minutes=5),
  103. }
  104.  
  105. # Create DAG
  106. with DAG(
  107.     dag_id='elt_iot_pipeline',
  108.     default_args=default_args,
  109.     description='IoT ELT Pipeline - Transform in Database',
  110.     schedule_interval=None,  # Manual trigger
  111.     catchup=False,
  112.     tags=['iot', 'elt', 'demo'],
  113. ) as dag:
  114.  
  115.     # Create staging tables
  116.     create_staging_tables = PostgresOperator(
  117.         task_id='create_staging_tables',
  118.         postgres_conn_id=POSTGRES_CONN_ID,
  119.         sql="""
  120.        -- Drop existing staging tables
  121.        DROP TABLE IF EXISTS raw_sensors CASCADE;
  122.        DROP TABLE IF EXISTS raw_readings CASCADE;
  123.        
  124.        -- Create sensors staging table
  125.        CREATE TABLE raw_sensors (
  126.            sensor_id INTEGER,
  127.            location VARCHAR(100)
  128.        );
  129.        
  130.        -- Create readings staging table  
  131.        CREATE TABLE raw_readings (
  132.            reading_id INTEGER,
  133.            sensor_id INTEGER,
  134.            temperature DECIMAL(5,2),
  135.            timestamp TIMESTAMP
  136.        );
  137.  
  138.        -- Create weather staging table
  139.        CREATE TABLE IF NOT EXISTS raw_weather (
  140.            date DATE,
  141.            temperature FLOAT,
  142.            humidity INTEGER,
  143.            wind_speed FLOAT
  144.        );
  145.  
  146.  
  147.        """,
  148.     )
  149.  
  150.     # Load tasks - Extract and Load in one step
  151.     load_sensors_task = PythonOperator(
  152.         task_id='load_sensors',
  153.         python_callable=load_sensors_to_staging,
  154.         provide_context=True,
  155.     )
  156.  
  157.     load_readings_task = PythonOperator(
  158.         task_id='load_readings',
  159.         python_callable=load_readings_to_staging,
  160.         provide_context=True,
  161.     )
  162.  
  163.     load_weather_task = PythonOperator(
  164.         task_id='load_weather',
  165.         python_callable=load_weather_to_staging,
  166.         provide_context=True,
  167.     )
  168.  
  169.     # Transform in SQL - This is where ELT differs from ETL
  170.     transform_in_sql = PostgresOperator(
  171.         task_id='transform_in_sql',
  172.         postgres_conn_id=POSTGRES_CONN_ID,
  173.         sql="""
  174.        -- Create partitioned summary table if not exists
  175.        DO $$
  176.        BEGIN
  177.            IF NOT EXISTS (
  178.                SELECT FROM pg_tables WHERE tablename = 'daily_sensor_summary_elt'
  179.            ) THEN
  180.                CREATE TABLE daily_sensor_summary_elt (
  181.                    location VARCHAR(100),
  182.                    date DATE,
  183.                    avg_temp DECIMAL(5,2),
  184.                    min_temp DECIMAL(5,2),
  185.                    max_temp DECIMAL(5,2),
  186.                    reading_count INTEGER,
  187.                    humidity INTEGER,
  188.                    wind_speed FLOAT,
  189.                    moving_avg_temp DECIMAL(5,2),
  190.                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  191.                ) PARTITION BY RANGE (date);
  192.            END IF;
  193.        END $$;
  194.  
  195.        -- Create temporary table to hold new data
  196.        DROP TABLE IF EXISTS daily_sensor_summary_elt_temp;
  197.        CREATE TEMP TABLE daily_sensor_summary_elt_temp AS
  198.        WITH base AS (
  199.            SELECT
  200.                s.location,
  201.                DATE(r.timestamp) AS date,
  202.                r.temperature,
  203.                w.humidity,
  204.                w.wind_speed
  205.            FROM raw_readings r
  206.            JOIN raw_sensors s ON r.sensor_id = s.sensor_id
  207.            LEFT JOIN raw_weather w ON DATE(r.timestamp) = w.date
  208.            WHERE DATE(r.timestamp) > (
  209.                SELECT COALESCE(MAX(date), '1900-01-01') FROM daily_sensor_summary_elt
  210.            )
  211.        ),
  212.        aggregated AS (
  213.            SELECT
  214.                location,
  215.                date,
  216.                ROUND(AVG(temperature)::numeric, 2) AS avg_temp,
  217.                ROUND(MIN(temperature)::numeric, 2) AS min_temp,
  218.                ROUND(MAX(temperature)::numeric, 2) AS max_temp,
  219.                COUNT(*) AS reading_count,
  220.                MAX(humidity) AS humidity,
  221.                MAX(wind_speed) AS wind_speed
  222.            FROM base
  223.            GROUP BY location, date
  224.        ),
  225.        moving_avg AS (
  226.            SELECT *,
  227.                ROUND(AVG(avg_temp) OVER (
  228.                    PARTITION BY location ORDER BY date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
  229.                )::numeric, 2) AS moving_avg_temp
  230.            FROM aggregated
  231.        ),
  232.        validated AS (
  233.            SELECT *
  234.            FROM moving_avg
  235.            WHERE avg_temp BETWEEN -10 AND 50 AND humidity BETWEEN 0 AND 100
  236.        )
  237.        SELECT * FROM validated;
  238.  
  239.        -- Insert new data into partitioned table
  240.        INSERT INTO daily_sensor_summary_elt (
  241.            location, date, avg_temp, min_temp, max_temp,
  242.            reading_count, humidity, wind_speed, moving_avg_temp, created_at
  243.        )
  244.        SELECT
  245.            location, date, avg_temp, min_temp, max_temp,
  246.            reading_count, humidity, wind_speed, moving_avg_temp, CURRENT_TIMESTAMP
  247.        FROM daily_sensor_summary_elt_temp;
  248.  
  249.        -- Add indexes for performance (only once)
  250.        DO $$
  251.        BEGIN
  252.            IF NOT EXISTS (
  253.                SELECT FROM pg_indexes WHERE tablename = 'daily_sensor_summary_elt' AND indexname = 'idx_daily_summary_elt_location'
  254.            ) THEN
  255.                CREATE INDEX idx_daily_summary_elt_location ON daily_sensor_summary_elt(location);
  256.            END IF;
  257.            IF NOT EXISTS (
  258.                SELECT FROM pg_indexes WHERE tablename = 'daily_sensor_summary_elt' AND indexname = 'idx_daily_summary_elt_date'
  259.            ) THEN
  260.                CREATE INDEX idx_daily_summary_elt_date ON daily_sensor_summary_elt(date);
  261.            END IF;
  262.        END $$;
  263.        """,
  264.     )
  265.  
  266.     alert_extreme_temp = PostgresOperator(
  267.         task_id='alert_extreme_temperature',
  268.         postgres_conn_id=POSTGRES_CONN_ID,
  269.         sql="""
  270.        SELECT location, date, avg_temp
  271.        FROM daily_sensor_summary_elt
  272.        WHERE avg_temp > 40;
  273.        """,
  274.     )
  275.  
  276.     # Data quality checks
  277.     quality_checks = PostgresOperator(
  278.         task_id='data_quality_checks',
  279.         postgres_conn_id=POSTGRES_CONN_ID,
  280.         sql="""
  281.        -- Check for data completeness
  282.        DO $$
  283.        DECLARE
  284.            staging_sensors_count INTEGER;
  285.            staging_readings_count INTEGER;
  286.            summary_count INTEGER;
  287.        BEGIN
  288.            SELECT COUNT(*) INTO staging_sensors_count FROM raw_sensors;
  289.            SELECT COUNT(*) INTO staging_readings_count FROM raw_readings;
  290.            SELECT COUNT(*) INTO summary_count FROM daily_sensor_summary_elt;
  291.            
  292.            RAISE NOTICE 'Data Quality Report:';
  293.            RAISE NOTICE '- Raw sensors: % records', staging_sensors_count;
  294.            RAISE NOTICE '- Raw readings: % records', staging_readings_count;
  295.            RAISE NOTICE '- Daily summaries: % records', summary_count;
  296.            
  297.            -- Basic validation
  298.            IF staging_sensors_count = 0 THEN
  299.                RAISE EXCEPTION 'No sensor data found in staging';
  300.            END IF;
  301.            
  302.            IF staging_readings_count = 0 THEN
  303.                RAISE EXCEPTION 'No readings data found in staging';
  304.            END IF;
  305.            
  306.            IF summary_count = 0 THEN
  307.                RAISE EXCEPTION 'No summary data generated';
  308.            END IF;
  309.            
  310.            RAISE NOTICE 'All data quality checks passed!';
  311.        END $$;
  312.        """,
  313.     )
  314.  
  315.     # Verify final results
  316.     verify_results = PostgresOperator(
  317.         task_id='verify_elt_results',
  318.         postgres_conn_id=POSTGRES_CONN_ID,
  319.         sql="""
  320.        SELECT
  321.            'ELT Pipeline Results' as pipeline,
  322.            COUNT(*) as total_records,
  323.            COUNT(DISTINCT location) as locations,
  324.            COUNT(DISTINCT date) as dates,
  325.            ROUND(AVG(avg_temp)::numeric, 2) as overall_avg_temp,
  326.            MIN(date) as earliest_date,
  327.            MAX(date) as latest_date
  328.        FROM daily_sensor_summary_elt;
  329.        """,
  330.         autocommit=True,
  331.     )
  332.  
  333.     # Task dependencies
  334.     create_staging_tables >> [load_sensors_task, load_readings_task, load_weather_task] >> transform_in_sql >> alert_extreme_temp >> quality_checks >> verify_results
Advertisement
Add Comment
Please, Sign In to add comment