반응형
Google Cloud Platform(GCP)의 Composer는 Apache Airflow 기반의 워크플로 자동화 및 스케줄링 서비스입니다. 이 문장에서 "GCP Composer를 통해 DAG를 호출한다"는 의미는 Composer에서 실행 중인 Airflow 환경을 사용하여 특정 워크플로(DAG)를 실행하거나 트리거한다는 뜻입니다.
용어 설명
- GCP Composer:
- Google Cloud에서 제공하는 관리형 Apache Airflow 서비스입니다.
- 데이터 처리, ETL 작업, 머신러닝 파이프라인 등의 워크플로를 정의하고 관리할 수 있습니다.
- DAG (Directed Acyclic Graph):
- Airflow에서 워크플로를 정의하는 단위입니다.
- 작업(Task)들의 순서를 표현하며, 각 작업은 의존성(Dependency)을 가질 수 있습니다.
- DAG는 Airflow에서 Python 코드로 정의됩니다.
- DAG 호출:
- 특정 시점에 DAG를 실행하거나, 외부에서 트리거하여 작업을 시작하는 행위입니다.
GCP Composer에서 DAG 호출 방식
- 스케줄 기반 호출:
- DAG 파일에 설정된 스케줄에 따라 자동으로 실행됩니다.
- from airflow import DAG from airflow.operators.dummy import DummyOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email_on_failure': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'example_dag', default_args=default_args, description='An example DAG', schedule_interval=timedelta(days=1), start_date=datetime(2025, 1, 1), catchup=False, ) task1 = DummyOperator( task_id='task1', dag=dag, )
- REST API를 통한 호출:
- GCP Composer에서 제공하는 Airflow REST API를 사용해 DAG를 수동으로 트리거합니다.
curl -X POST \ -u "username:password" \ -d '{"conf": {}}' \ "http://<airflow-webserver-url>/api/v1/dags/<dag_id>/dagRuns"
- GCP Composer에서 제공하는 Airflow REST API를 사용해 DAG를 수동으로 트리거합니다.
- Python 클라이언트를 사용한 호출:
- Airflow CLI 또는 Google Cloud SDK를 사용해 DAG 실행 명령을 내립니다.
gcloud composer environments run <environment-name> \ --location <region> \ trigger_dag -- <dag-id>
- Airflow CLI 또는 Google Cloud SDK를 사용해 DAG 실행 명령을 내립니다.
- 이벤트 기반 호출:
- Pub/Sub 메시지, Cloud Function, 또는 GCS 이벤트를 통해 DAG를 트리거합니다.
- 예: 특정 데이터 파일이 업로드되면 DAG 실행.
- from google.cloud import storage from airflow.providers.google.cloud.operators.pubsub import PubSubPublishMessageOperator def trigger_dag(event, context): client = storage.Client() bucket = client.get_bucket(event['bucket']) print(f"Triggered DAG because of file {event['name']} in {bucket.name}")
실무 활용 사례
- ETL 워크플로 실행:
- 데이터를 GCS에서 읽고, 처리한 후, BigQuery에 저장.
- 데이터 파이프라인 자동화:
- 매일 새벽 데이터를 수집하고 정리.
- 이벤트 기반 데이터 처리:
- 특정 조건에 따라 DAG를 실행해 데이터를 처리하거나 알림 전송.
요약
GCP Composer를 통해 DAG를 호출한다는 것은 Apache Airflow의 DAG(워크플로)를 실행하거나 트리거하여, 데이터 처리 및 스케줄링 작업을 수행하는 것을 의미합니다. 호출 방식에는 스케줄 기반, API 트리거, CLI 또는 이벤트 기반 호출이 포함됩니다.
반응형