개요
금융권 회사에 소속되어 Airflow를 내/외부 ETL 용도로 도입하고자 한다.
이번에는 로컬에서 airflow를 실행 한 뒤 ETL을 테스트 하기 위해 간단히 로컬 파일 및 SFTP에 대한 sensor를 DAG로 구현해 본 뒤 정리해둔다.
미리할일
새로 개발할 dag는 airflow.cfg
에 dags_folder
path를 지정한 뒤 해당 디렉토리에 python 파일을 적재하면 된다.
File Sensor
가장 간단한 형태의 local file sensor 이다.
작동방식은 task 1에서 /Users/lks21c/airflow/files/a
파일을 센싱(감지) 하는 역할을 한다.
감지가 되면 task2에서 cat /Users/lks21c/airflow/files/a
를 수행한다.
이 task를 동작시키기 위해 a 파일에 this is a secret.
텍스트를 입력해두었다.
코드는 아래와 같다.
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
default_args = {
'start_date': days_ago(1),
'retries': 0,
'catchup': False,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'sensor_test3',
default_args=default_args,
schedule_interval="@once",
)
t1 = FileSensor(
task_id='sensor_a',
filepath='/Users/lks21c/airflow/files/a',
dag=dag,
)
t2 = BashOperator(
task_id='cat_a',
bash_command='cat /Users/lks21c/airflow/files/a',
dag=dag,
)
t1 >> t2
DAG 구성은 아래와 같다.
(내가 좋아하는) t1 로그는 아래와 같다.
감지하고자 하는 파일이 존재하므로 Success criteria met. Exiting.
가 출력된것을 확인 할 수 있다.
*** Reading local file: /Users/lks21c/airflow/logs/sensor_test3/sensor_a/2022-01-09T01:12:14.377224+00:00/1.log
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: sensor_test3.sensor_a manual__2022-01-09T01:12:14.377224+00:00 [queued]>
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: sensor_test3.sensor_a manual__2022-01-09T01:12:14.377224+00:00 [queued]>
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1238} INFO -
--------------------------------------------------------------------------------
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1239} INFO - Starting attempt 1 of 1
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1240} INFO -
--------------------------------------------------------------------------------
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1259} INFO - Executing <Task(FileSensor): sensor_a> on 2022-01-09 01:12:14.377224+00:00
[2022-01-09, 01:12:16 UTC] {standard_task_runner.py:52} INFO - Started process 22750 to run task
[2022-01-09, 01:12:16 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'sensor_test3', 'sensor_a', 'manual__2022-01-09T01:12:14.377224+00:00', '--job-id', '69', '--raw', '--subdir', 'DAGS_FOLDER/file_sensor.py', '--cfg-path', '/var/folders/xz/1wy0fpgn5lv5q6rfj2cx_km80000gn/T/tmp81uvvcn_', '--error-file', '/var/folders/xz/1wy0fpgn5lv5q6rfj2cx_km80000gn/T/tmproifyjka']
[2022-01-09, 01:12:16 UTC] {standard_task_runner.py:77} INFO - Job 69: Subtask sensor_a
[2022-01-09, 01:12:16 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: sensor_test3.sensor_a manual__2022-01-09T01:12:14.377224+00:00 [running]> on host 251.1.168.192.in-addr.arpa
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1424} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=sensor_test3
AIRFLOW_CTX_TASK_ID=sensor_a
AIRFLOW_CTX_EXECUTION_DATE=2022-01-09T01:12:14.377224+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-01-09T01:12:14.377224+00:00
[2022-01-09, 01:12:16 UTC] {filesystem.py:59} INFO - Poking for file /Users/lks21c/airflow/files/a
[2022-01-09, 01:12:16 UTC] {filesystem.py:65} INFO - Found File /Users/lks21c/airflow/files/a last modified: 20220109100100
[2022-01-09, 01:12:16 UTC] {base.py:249} INFO - Success criteria met. Exiting.
[2022-01-09, 01:12:16 UTC] {taskinstance.py:1267} INFO - Marking task as SUCCESS. dag_id=sensor_test3, task_id=sensor_a, execution_date=20220109T011214, start_date=20220109T011216, end_date=20220109T011216
[2022-01-09, 01:12:16 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-01-09, 01:12:16 UTC] {local_task_job.py:264} INFO - 1 downstream tasks scheduled from follow-on schedule check
이후 t2 로그를 보면 정상적으로 bash cmd를 실행하여 output인 this is a secret.
이 출력되는 것을 확인 할 수 있다.
*** Reading local file: /Users/lks21c/airflow/logs/sensor_test3/cat_a/2022-01-09T01:12:14.377224+00:00/1.log
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: sensor_test3.cat_a manual__2022-01-09T01:12:14.377224+00:00 [queued]>
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: sensor_test3.cat_a manual__2022-01-09T01:12:14.377224+00:00 [queued]>
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1238} INFO -
--------------------------------------------------------------------------------
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1239} INFO - Starting attempt 1 of 1
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1240} INFO -
--------------------------------------------------------------------------------
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1259} INFO - Executing <Task(BashOperator): cat_a> on 2022-01-09 01:12:14.377224+00:00
[2022-01-09, 01:12:17 UTC] {standard_task_runner.py:52} INFO - Started process 22752 to run task
[2022-01-09, 01:12:17 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'sensor_test3', 'cat_a', 'manual__2022-01-09T01:12:14.377224+00:00', '--job-id', '70', '--raw', '--subdir', 'DAGS_FOLDER/file_sensor.py', '--cfg-path', '/var/folders/xz/1wy0fpgn5lv5q6rfj2cx_km80000gn/T/tmpxe072ebc', '--error-file', '/var/folders/xz/1wy0fpgn5lv5q6rfj2cx_km80000gn/T/tmpcyw8pvyp']
[2022-01-09, 01:12:17 UTC] {standard_task_runner.py:77} INFO - Job 70: Subtask cat_a
[2022-01-09, 01:12:17 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: sensor_test3.cat_a manual__2022-01-09T01:12:14.377224+00:00 [running]> on host 251.1.168.192.in-addr.arpa
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1424} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=sensor_test3
AIRFLOW_CTX_TASK_ID=cat_a
AIRFLOW_CTX_EXECUTION_DATE=2022-01-09T01:12:14.377224+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-01-09T01:12:14.377224+00:00
[2022-01-09, 01:12:17 UTC] {subprocess.py:62} INFO - Tmp dir root location:
/var/folders/xz/1wy0fpgn5lv5q6rfj2cx_km80000gn/T
[2022-01-09, 01:12:17 UTC] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'cat /Users/lks21c/airflow/files/a']
[2022-01-09, 01:12:17 UTC] {subprocess.py:85} INFO - Output:
[2022-01-09, 01:12:17 UTC] {subprocess.py:89} INFO - this is a secret.
[2022-01-09, 01:12:17 UTC] {subprocess.py:93} INFO - Command exited with return code 0
[2022-01-09, 01:12:17 UTC] {taskinstance.py:1267} INFO - Marking task as SUCCESS. dag_id=sensor_test3, task_id=cat_a, execution_date=20220109T011214, start_date=20220109T011217, end_date=20220109T011217
[2022-01-09, 01:12:17 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-01-09, 01:12:17 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
sftp sensor
sftp sensor를 정상 실행시키기 위해 아래 패키지를 설치해야 한다.
이 후 web UI의 admin -> Connections
에서 웹 커넥션을 하나 생성해준다.
Extra에 들어가는 값은 아래와 같다.
주의할점은 host_key의 경우 “~/.ssh/known_hosts`에 sftp연결도메인의 정보를 도메인 제외하고 입력한다. 예를들어 아래와 같은 문자열 행을 앞의 도메인 정보는 떼고 입력한다.
이후 아래와 같이 코드를 작성한다.
동작방식은 ftp서버로 부터 pdf 파일을 하나 다운받아 처리한 뒤 같은 ftp서버에 업로드해보는 것이다.
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.providers.sftp.operators.sftp import SFTPOperator
def process_file(**kwargs):
templates_dict = kwargs.get("templates_dict")
input_file = templates_dict.get("input_file")
output_file = templates_dict.get("output_file")
print(f'{templates_dict} {input_file} {output_file}')
with DAG("sftp_operators_workflow",
schedule_interval=None,
start_date=days_ago(2)) as dag:
src_path = "/ebook/Java-8-In-Action.pdf"
conn_id = 'hydra01_sftp'
local_path = '/tmp/{{ run_id }}/sample.pdf'
wait_for_input_file = SFTPSensor(task_id="check-for-file",
sftp_conn_id=conn_id,
path=src_path,
poke_interval=10)
download_file = SFTPOperator(
task_id="get-file",
ssh_conn_id=conn_id,
remote_filepath=src_path,
local_filepath=local_path,
operation="get",
create_intermediate_dirs=True
)
process_file = PythonOperator(task_id="process-file",
templates_dict={
"input_file": local_path,
"output_file": local_path
},
python_callable=process_file)
upload_file = SFTPOperator(
task_id="put-file",
ssh_conn_id=conn_id,
remote_filepath="/download/sample.pdf",
local_filepath=local_path,
operation="put"
)
wait_for_input_file >> download_file >> process_file >> upload_file
DAG 구성은 아래와 같다.
get-file task의 로그중 일부를 보면 아래와 같이 정상적으로 파일을 가져오는것을 확인 할 수 있고 로컬에서도 정상적으로 다운로드 된것을 확인 할 수 있었다.
[2022-01-10, 05:40:57 UTC] {sftp.py:141} INFO - Starting to transfer from /ebook/Java-8-In-Action.pdf to /tmp/manual__2022-01-10T05:40:52.745608+00:00/sample.pdf
[
이후 put-file을 보더라도 정상적으로 업로드가 되는것을 확인 할 수 있었다.
[2022-01-10, 05:41:02 UTC] {sftp.py:151} INFO - Starting to transfer file from /tmp/manual__2022-01-10T05:40:52.745608+00:00/sample.pdf to /download/sample.pdf
에필로그
처음 알아볼 때 sftp sensor대신에 ftp sensor를 알아보고 구현하려 했었다. 그런데 생각해보면 보안이 중요하기 때문에 가급적 sftp 로 시스템을 구성할까 한다.