개요

데이터 레이크에서 MWAA를 이용하여 Airflow를 구축하면서 수립한 아키텍처와 기술적 특징에 대해 정리해본다.

아키텍처

구축하는 데이터 레이크는 On-Prem에서 데이터를 옮긴 뒤 클라우드 S3를 data lake로 삼고 필요시 Glue ETL등으로 Athena 테이블로 만드는 과정이다.

기술 특징

CDK를 통한 Airflow 인프라 생성

Airflow는 private only 모드로 생성하였고 DAG가 저장되는 S3는 KMS로 암호화 되어 관리된다.

끝으로 Airflow 전용 permission을 만든 뒤 ENV를 생성하였다.

CDK로 직접 짜서 구축했는데 Cloud Formation 대비 중간 중간 파이썬 코드로 리소스 id를 불러오는 등의 하이브리드 구현이 가능해서 장점으로 생각되었다.

import os
import sys

from baram.s3_manager import S3Manager
from baram.ec2_manager import EC2Manager

from aws_cdk import App
from aws_cdk import Environment
from aws_cdk import DefaultStackSynthesizer

from core.kms import KMSStack
from core.s3 import S3Stack
from core.aflow_env import AflowEnvStack
from core.aflow_execution_iam import AFlowIAMStack

app = App()
em = EC2Manager()

dl_s3_bucket_name = f'sli-dst-dl{app.node.try_get_context("PHASE")}'
sm = S3Manager(dl_s3_bucket_name)

cdk_environment = Environment(
    account=app.node.try_get_context("ACCOUNT_ID"),
    region=app.node.try_get_context("REGION")
)

aflow_s3_kms = KMSStack(
    app,
    id=f'aflow{app.node.try_get_context("PHASE")}-s3-kms',
    env=cdk_environment,
    synthesizer=DefaultStackSynthesizer(file_assets_bucket_name=app.node.try_get_context("file_assets_bucket_name"))
)

aflow_s3 = S3Stack(
    app,
    id=f'sli-dst-aflow{app.node.try_get_context("PHASE")}',
    key=aflow_s3_kms.key,
    env=cdk_environment,
    synthesizer=DefaultStackSynthesizer(file_assets_bucket_name=app.node.try_get_context("file_assets_bucket_name"))
)

iam = AFlowIAMStack(app,
                    f'aflow{app.node.try_get_context("PHASE")}-execution-iam',
                    aflow_s3_bucket_arn=aflow_s3.bucket.bucket_arn,
                    glue_s3_bucket_arn=sm.get_s3_arn(dl_s3_bucket_name),
                    kms_arn=aflow_s3_kms.key.key_arn,
                    env=cdk_environment,
                    synthesizer=DefaultStackSynthesizer(
                        file_assets_bucket_name=app.node.try_get_context("file_assets_bucket_name")
                    ))

vpc_name = f'{app.node.try_get_context("PHASE")}-vpc'
vpc_id = em.get_vpc_id_with_vpc_name(vpc_name)
assert vpc_id
sg_id = em.get_sg_id_with_sg_name(f'{vpc_name}-default-sg')
assert sg_id

pri_sub1_id = em.get_subnet_id(vpc_id, f'{vpc_name}-pri-sub1')
assert pri_sub1_id
pri_sub2_id = em.get_subnet_id(vpc_id, f'{vpc_name}-pri-sub2')
assert pri_sub2_id

AflowEnvStack(app,
              f'aflow-{app.node.try_get_context("PHASE")}-env',
              aflow_bucket_arn=aflow_s3.bucket.bucket_arn,
              execution_role_arn=iam.role.role_arn,
              security_group_id=sg_id,
              pri_subnet_id1=pri_sub1_id,
              pri_subnet_id2=pri_sub2_id,
              kms_key_arn=aflow_s3_kms.key.key_arn,
              env=cdk_environment,
              synthesizer=DefaultStackSynthesizer(
                  file_assets_bucket_name=app.node.try_get_context("file_assets_bucket_name")
              ))

app.synth()

2개의 airflow 구조

아키텍처를 보면 Airflow가 2가지 영역에서 존재한다. 첫째는 MWAA로써 AWS 진영에서 필요한 Workflow를 수행한다. 두번째는 On-Prem에 추가로 Airflow를 설치하여 AWS로 데이터를 옮기는 스케쥴링을 수행한다.

설계 의도는 대부분의 일을 MWAA에서 수행하고 부득이 On-Prem에서 수행이 필요한 업무들만 On Prem Airflow에서 수행되게 만들었다. On Prem은 굳이 Airflow일 필요는 없으나 유지보수의 편의성 및 역량 향상 2가지 측면에서 모두 Airflow로 통일해 두는것이 효율적이라 판단했다.

MWAA의 DAG 단위 및 트리거 시점은?

마이데이터 등의 데이터가 적재되는 만큼 재처리 효율성을 고려하여 업권 단위로 DAG를 묶었다. 실제로 업권단위 데이터 누락, 재처리 등의 일이 많이 발생했기 때문이다.

MWAA DAG의 트리거 방식은 크게 2가지로 나뉘는데 새로운 파일이 생성되는것을 Sensor로 관측하여 트리거 시키거나 혹은 On Prem Airflow에서 DAG를 직접 원격 트리거 해주는 경우이다.

즉, MWAA 자체적으로 schedule_interval에 의해 트리거 되지 않고 On Prem에서 넘어오는 데이터나 On Prem의 DAG 실행에 이어 downstream으로 실행되는 구조이다. 이는 데이터 레이크 구조의 특성에 기인한다고 볼 수 있다.

framework 구조 및 코드 스니펫

Airflow로 DAG 개발은 ETL을 진행하는 사람들이 직접 만든다. 이들이 편리하게 Airflow DAG를 생성하고 Glue Job 생성으로 이어질 수 있게 Framework를 만들었다.

또한 기존에 없는 동작 방식이 필요한 경우 operator, sensor를 기존것을 일부 수정하거나 새로 만들어 활용하였다.

리니지 추출

데이터 거버넌스 툴로 Linked In Datahub를 설계부터 염두해 두었다. 따라서 datahub 리니지를 자동 추출할 수 있는 플러그인을 심어 dag 중심으로 진행되는 etl에서 추출되게 만들었다.

파라미터 자동 셋팅

MWAA->Glue ETL Job을 호출 할 때 처리되는 날짜 정보가 전달되게 만들어 두었다. Glue ETL Job입장에서는 넘어오는 날짜 파라미터 대로 ETL이 수행되고 MWAA에서는 날짜 선택권을 가지게 된다.

만든 데이터레이크 에서는 일단위 데이터 처리가 많아 MWAA에서는 기본으로는 어제 날짜 기준으로 날짜 파리미터를 설정해서 전달한다.

만약 재처리 등의 목적으로 날짜를 바꾸어야 하는 필요성이 생기는데 이때는 아래의 우선순위로 날짜값을 overriding 해서 쓰게 만들었다.

  • 1순위 : dag 코드에서 직접 설정
  • 2순위 : conf 입력
  • 3순위 : 자동 설정