개요

금융권 회사에 소속되어 Airflow를 내/외부 ETL 용도로 도입하고자 한다.

이번에는 로컬에서 airflow를 실행 한 뒤 ETL을 테스트 하기 위해 간단히 로컬 파일 및 SFTP에 대한 sensor를 DAG로 구현해 본 뒤 정리해둔다.

미리할일

새로 개발할 dag는 airflow.cfgdags_folder path를 지정한 뒤 해당 디렉토리에 python 파일을 적재하면 된다.

File Sensor

가장 간단한 형태의 local file sensor 이다.

작동방식은 task 1에서 /Users/lks21c/airflow/files/a 파일을 센싱(감지) 하는 역할을 한다.

감지가 되면 task2에서 cat /Users/lks21c/airflow/files/a를 수행한다.

이 task를 동작시키기 위해 a 파일에 this is a secret. 텍스트를 입력해두었다.

코드는 아래와 같다.

from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor

default_args = {
    'start_date': days_ago(1),
    'retries': 0,
    'catchup': False,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'sensor_test3',
    default_args=default_args,
    schedule_interval="@once",
)

t1 = FileSensor(
    task_id='sensor_a',
    filepath='/Users/lks21c/airflow/files/a',
    dag=dag,
)

t2 = BashOperator(
    task_id='cat_a',
    bash_command='cat /Users/lks21c/airflow/files/a',
    dag=dag,
)

t1 >> t2

DAG 구성은 아래와 같다.

(내가 좋아하는) t1 로그는 아래와 같다.

감지하고자 하는 파일이 존재하므로 Success criteria met. Exiting.가 출력된것을 확인 할 수 있다.

*** Reading local file: /Users/lks21c/airflow/logs/sensor_test3/sensor_a/2022-01-09T01:12:14.377224+00:00/1.log
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: sensor_test3.sensor_a manual__2022-01-09T01:12:14.377224+00:00 [queued]>
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: sensor_test3.sensor_a manual__2022-01-09T01:12:14.377224+00:00 [queued]>
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1238} INFO -
--------------------------------------------------------------------------------
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1239} INFO - Starting attempt 1 of 1
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1240} INFO -
--------------------------------------------------------------------------------
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1259} INFO - Executing <Task(FileSensor): sensor_a> on 2022-01-09 01:12:14.377224+00:00
[2022-01-09, 01:12:16 UTC] {standard_task_runner.py:52} INFO - Started process 22750 to run task
[2022-01-09, 01:12:16 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'sensor_test3', 'sensor_a', 'manual__2022-01-09T01:12:14.377224+00:00', '--job-id', '69', '--raw', '--subdir', 'DAGS_FOLDER/file_sensor.py', '--cfg-path', '/var/folders/xz/1wy0fpgn5lv5q6rfj2cx_km80000gn/T/tmp81uvvcn_', '--error-file', '/var/folders/xz/1wy0fpgn5lv5q6rfj2cx_km80000gn/T/tmproifyjka']
[2022-01-09, 01:12:16 UTC] {standard_task_runner.py:77} INFO - Job 69: Subtask sensor_a
[2022-01-09, 01:12:16 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: sensor_test3.sensor_a manual__2022-01-09T01:12:14.377224+00:00 [running]> on host 251.1.168.192.in-addr.arpa
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1424} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=sensor_test3
AIRFLOW_CTX_TASK_ID=sensor_a
AIRFLOW_CTX_EXECUTION_DATE=2022-01-09T01:12:14.377224+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-01-09T01:12:14.377224+00:00
[2022-01-09, 01:12:16 UTC] {filesystem.py:59} INFO - Poking for file /Users/lks21c/airflow/files/a
[2022-01-09, 01:12:16 UTC] {filesystem.py:65} INFO - Found File /Users/lks21c/airflow/files/a last modified: 20220109100100
[2022-01-09, 01:12:16 UTC] {base.py:249} INFO - Success criteria met. Exiting.
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1267} INFO - Marking task as SUCCESS. dag_id=sensor_test3, task_id=sensor_a, execution_date=20220109T011214, start_date=20220109T011216, end_date=20220109T011216
[2022-01-09, 01:12:16 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-01-09, 01:12:16 UTC] {local_task_job.py:264} INFO - 1 downstream tasks scheduled from follow-on schedule check

이후 t2 로그를 보면 정상적으로 bash cmd를 실행하여 output인 this is a secret.이 출력되는 것을 확인 할 수 있다.

*** Reading local file: /Users/lks21c/airflow/logs/sensor_test3/cat_a/2022-01-09T01:12:14.377224+00:00/1.log
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: sensor_test3.cat_a manual__2022-01-09T01:12:14.377224+00:00 [queued]>
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: sensor_test3.cat_a manual__2022-01-09T01:12:14.377224+00:00 [queued]>
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1238} INFO -
--------------------------------------------------------------------------------
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1239} INFO - Starting attempt 1 of 1
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1240} INFO -
--------------------------------------------------------------------------------
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1259} INFO - Executing <Task(BashOperator): cat_a> on 2022-01-09 01:12:14.377224+00:00
[2022-01-09, 01:12:17 UTC] {standard_task_runner.py:52} INFO - Started process 22752 to run task
[2022-01-09, 01:12:17 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'sensor_test3', 'cat_a', 'manual__2022-01-09T01:12:14.377224+00:00', '--job-id', '70', '--raw', '--subdir', 'DAGS_FOLDER/file_sensor.py', '--cfg-path', '/var/folders/xz/1wy0fpgn5lv5q6rfj2cx_km80000gn/T/tmpxe072ebc', '--error-file', '/var/folders/xz/1wy0fpgn5lv5q6rfj2cx_km80000gn/T/tmpcyw8pvyp']
[2022-01-09, 01:12:17 UTC] {standard_task_runner.py:77} INFO - Job 70: Subtask cat_a
[2022-01-09, 01:12:17 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: sensor_test3.cat_a manual__2022-01-09T01:12:14.377224+00:00 [running]> on host 251.1.168.192.in-addr.arpa
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1424} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=sensor_test3
AIRFLOW_CTX_TASK_ID=cat_a
AIRFLOW_CTX_EXECUTION_DATE=2022-01-09T01:12:14.377224+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-01-09T01:12:14.377224+00:00
[2022-01-09, 01:12:17 UTC] {subprocess.py:62} INFO - Tmp dir root location:
 /var/folders/xz/1wy0fpgn5lv5q6rfj2cx_km80000gn/T
[2022-01-09, 01:12:17 UTC] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'cat /Users/lks21c/airflow/files/a']
[2022-01-09, 01:12:17 UTC] {subprocess.py:85} INFO - Output:
[2022-01-09, 01:12:17 UTC] {subprocess.py:89} INFO - this is a secret.
[2022-01-09, 01:12:17 UTC] {subprocess.py:93} INFO - Command exited with return code 0
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1267} INFO - Marking task as SUCCESS. dag_id=sensor_test3, task_id=cat_a, execution_date=20220109T011214, start_date=20220109T011217, end_date=20220109T011217
[2022-01-09, 01:12:17 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-01-09, 01:12:17 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check

sftp sensor

sftp sensor를 정상 실행시키기 위해 아래 패키지를 설치해야 한다.

pip install apache-airflow-providers-sftp
pip install apache-airflow-providers-ssh

이 후 web UI의 admin -> Connections 에서 웹 커넥션을 하나 생성해준다.

Extra에 들어가는 값은 아래와 같다.

{
  "key_file": "/Users/lks21c/.ssh/id_rsa",
  "no_host_key_check": "False",
  "host_key": "AAAAB3**"
}

주의할점은 host_key의 경우 “~/.ssh/known_hosts`에 sftp연결도메인의 정보를 도메인 제외하고 입력한다. 예를들어 아래와 같은 문자열 행을 앞의 도메인 정보는 떼고 입력한다.

hydra01.domain.com ssh-rsa AAAAB3**

이후 아래와 같이 코드를 작성한다.

동작방식은 ftp서버로 부터 pdf 파일을 하나 다운받아 처리한 뒤 같은 ftp서버에 업로드해보는 것이다.

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.providers.sftp.operators.sftp import SFTPOperator


def process_file(**kwargs):
    templates_dict = kwargs.get("templates_dict")
    input_file = templates_dict.get("input_file")
    output_file = templates_dict.get("output_file")
    print(f'{templates_dict} {input_file} {output_file}')


with DAG("sftp_operators_workflow",
         schedule_interval=None,
         start_date=days_ago(2)) as dag:
    src_path = "/ebook/Java-8-In-Action.pdf"
    conn_id = 'hydra01_sftp'
    local_path = '/tmp/{{ run_id }}/sample.pdf'
    wait_for_input_file = SFTPSensor(task_id="check-for-file",
                                     sftp_conn_id=conn_id,
                                     path=src_path,
                                     poke_interval=10)

    download_file = SFTPOperator(
        task_id="get-file",
        ssh_conn_id=conn_id,
        remote_filepath=src_path,
        local_filepath=local_path,
        operation="get",
        create_intermediate_dirs=True
    )

    process_file = PythonOperator(task_id="process-file",
                                  templates_dict={
                                      "input_file": local_path,
                                      "output_file": local_path
                                  },
                                  python_callable=process_file)

    upload_file = SFTPOperator(
        task_id="put-file",
        ssh_conn_id=conn_id,
        remote_filepath="/download/sample.pdf",
        local_filepath=local_path,
        operation="put"
    )

    wait_for_input_file >> download_file >> process_file >> upload_file

DAG 구성은 아래와 같다.

get-file task의 로그중 일부를 보면 아래와 같이 정상적으로 파일을 가져오는것을 확인 할 수 있고 로컬에서도 정상적으로 다운로드 된것을 확인 할 수 있었다.

[2022-01-10, 05:40:57 UTC] {sftp.py:141} INFO - Starting to transfer from /ebook/Java-8-In-Action.pdf to /tmp/manual__2022-01-10T05:40:52.745608+00:00/sample.pdf
[

이후 put-file을 보더라도 정상적으로 업로드가 되는것을 확인 할 수 있었다.

[2022-01-10, 05:41:02 UTC] {sftp.py:151} INFO - Starting to transfer file from /tmp/manual__2022-01-10T05:40:52.745608+00:00/sample.pdf to /download/sample.pdf

에필로그

처음 알아볼 때 sftp sensor대신에 ftp sensor를 알아보고 구현하려 했었다. 그런데 생각해보면 보안이 중요하기 때문에 가급적 sftp 로 시스템을 구성할까 한다.