카테고리 없음

GCP Composer를 통해 DAG를 호출하는 의미

idea9329 2025. 1. 22. 10:30
반응형

 

Google Cloud Platform(GCP)의 Composer는 Apache Airflow 기반의 워크플로 자동화 및 스케줄링 서비스입니다. 이 문장에서 "GCP Composer를 통해 DAG를 호출한다"는 의미는 Composer에서 실행 중인 Airflow 환경을 사용하여 특정 워크플로(DAG)를 실행하거나 트리거한다는 뜻입니다.


용어 설명

  1. GCP Composer:
    • Google Cloud에서 제공하는 관리형 Apache Airflow 서비스입니다.
    • 데이터 처리, ETL 작업, 머신러닝 파이프라인 등의 워크플로를 정의하고 관리할 수 있습니다.
  2. DAG (Directed Acyclic Graph):
    • Airflow에서 워크플로를 정의하는 단위입니다.
    • 작업(Task)들의 순서를 표현하며, 각 작업은 의존성(Dependency)을 가질 수 있습니다.
    • DAG는 Airflow에서 Python 코드로 정의됩니다.
  3. DAG 호출:
    • 특정 시점에 DAG를 실행하거나, 외부에서 트리거하여 작업을 시작하는 행위입니다.

GCP Composer에서 DAG 호출 방식

  1. 스케줄 기반 호출:
    • DAG 파일에 설정된 스케줄에 따라 자동으로 실행됩니다.
    • from airflow import DAG from airflow.operators.dummy import DummyOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email_on_failure': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'example_dag', default_args=default_args, description='An example DAG', schedule_interval=timedelta(days=1), start_date=datetime(2025, 1, 1), catchup=False, ) task1 = DummyOperator( task_id='task1', dag=dag, )
  2. REST API를 통한 호출:
    • GCP Composer에서 제공하는 Airflow REST API를 사용해 DAG를 수동으로 트리거합니다.
      curl -X POST \
          -u "username:password" \
          -d '{"conf": {}}' \
          "http://<airflow-webserver-url>/api/v1/dags/<dag_id>/dagRuns"
  3. Python 클라이언트를 사용한 호출:
    • Airflow CLI 또는 Google Cloud SDK를 사용해 DAG 실행 명령을 내립니다.
      gcloud composer environments run <environment-name> \
          --location <region> \
          trigger_dag -- <dag-id>
  4. 이벤트 기반 호출:
    • Pub/Sub 메시지, Cloud Function, 또는 GCS 이벤트를 통해 DAG를 트리거합니다.
    • 예: 특정 데이터 파일이 업로드되면 DAG 실행.
    • from google.cloud import storage from airflow.providers.google.cloud.operators.pubsub import PubSubPublishMessageOperator def trigger_dag(event, context): client = storage.Client() bucket = client.get_bucket(event['bucket']) print(f"Triggered DAG because of file {event['name']} in {bucket.name}")

실무 활용 사례

  1. ETL 워크플로 실행:
    • 데이터를 GCS에서 읽고, 처리한 후, BigQuery에 저장.
  2. 데이터 파이프라인 자동화:
    • 매일 새벽 데이터를 수집하고 정리.
  3. 이벤트 기반 데이터 처리:
    • 특정 조건에 따라 DAG를 실행해 데이터를 처리하거나 알림 전송.

요약

GCP Composer를 통해 DAG를 호출한다는 것은 Apache Airflow의 DAG(워크플로)를 실행하거나 트리거하여, 데이터 처리 및 스케줄링 작업을 수행하는 것을 의미합니다. 호출 방식에는 스케줄 기반, API 트리거, CLI 또는 이벤트 기반 호출이 포함됩니다.

반응형