Backfill
데이터 파이프라인을 운용하다보면, 이미 지난 날짜를 기준으로 재처리를 해야 하는 일이 왕왕 있습니다. 백필은 바로 이 재처리 작업을 의미합니다. 단어의 의미 그대로 ‘메우는 작업'이라고 생각하면 될 듯합니다.
Backfill을 하게 되는 경우
백필 작업을 하는 경우는 나름 명확합니다. 다음과 같은 사례가 있습니다.
- 버그가 있거나 어떤 이유로 로직이 변경되었을 때 전체 데이터를 새로 말아주어야 할 때
- 컬럼 등의 메타 데이터가 변경되었을 때 이를 반영하기 위한 append 성의 작업이 필요할 때
이외에도 과거의 데이터를 재처리하고자 하는 니즈가 있다면 백필을 먼저 떠올리면 됩니다.
Airflow Backfill
Airflow를 이용하고 있다면 몇 가지 방법으로 백필 작업을 수행할 수 있습니다.
Backfill
Airflow에선 Backfill 커맨드를 제공하고 있습니다. 현재는 CLI로만 이 커맨드를 제공하고 있으며 Web UI에선 사용할 수 없습니다. (별도 플러그인을 제작해서 붙인 게 아니라면요)
Airflow의 Backfill 커맨드를 알아보겠습니다.
usage: airflow dags backfill [-h] [-c CONF] [--continue-on-failures] [--delay-on-limit DELAY_ON_LIMIT] [-x] [-n] [-e END_DATE] [-i] [-I] [-l] [-m] [--pool POOL] [--rerun-failed-tasks] [--reset-dagruns] [-B]
[-s START_DATE] [-S SUBDIR] [-t TASK_REGEX] [-v] [-y]
dag_id
Run subsections of a DAG for a specified date range. If reset_dag_run option is used, backfill will first prompt users whether airflow should clear all the previous dag_run and task_instances within the backfill date range. If rerun_failed_tasks is used, backfill will auto re-run the previous failed task instances within the backfill date range
positional arguments:
dag_id The id of the dag
optional arguments:
-h, --help show this help message and exit
-c CONF, --conf CONF JSON string that gets pickled into the DagRun's conf attribute
--continue-on-failures
if set, the backfill will keep going even if some of the tasks failed
--delay-on-limit DELAY_ON_LIMIT
Amount of time in seconds to wait when the limit on maximum active dag runs (max_active_runs) has been reached before trying to execute a dag run again
-x, --donot-pickle Do not attempt to pickle the DAG object to send over to the workers, just tell the workers to run their version of the code
-n, --dry-run Perform a dry run for each task. Only renders Template Fields for each task, nothing else
-e END_DATE, --end-date END_DATE
Override end_date YYYY-MM-DD
-i, --ignore-dependencies
Skip upstream tasks, run only the tasks matching the regexp. Only works in conjunction with task_regex
-I, --ignore-first-depends-on-past
Ignores depends_on_past dependencies for the first set of tasks only (subsequent executions in the backfill DO respect depends_on_past)
-l, --local Run the task using the LocalExecutor
-m, --mark-success Mark jobs as succeeded without running them
--pool POOL Resource pool to use
--rerun-failed-tasks if set, the backfill will auto-rerun all the failed tasks for the backfill date range instead of throwing exceptions
--reset-dagruns if set, the backfill will delete existing backfill-related DAG runs and start anew with fresh, running DAG runs
-B, --run-backwards if set, the backfill will run tasks from the most recent day first. if there are tasks that depend_on_past this option will throw an exception
-s START_DATE, --start-date START_DATE
Override start_date YYYY-MM-DD
-S SUBDIR, --subdir SUBDIR
File location or directory from which to look for the dag. Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg'
-t TASK_REGEX, --task-regex TASK_REGEX
The regex to filter specific task_ids to backfill (optional)
-v, --verbose Make logging output more verbose
-y, --yes Do not prompt to confirm. Use with care!
-c CONF, --conf CONF
- 백필 작업을 수행할 때, 해당 DagRun에 넘겨주어야 하는 CONF를 JSON String 형태로 제공해줄 때 사용합니다.
--continue-on-failures
- 몇몇 Task가 실패하더라도 Backfill 자체는 실패하지 않고 계속 진행되도록 할 때 주는 옵션입니다.
—delay-on-limit DELAY_ON_LIMIT
- DagRun을 다시 수행하려고 할 때,
max_active_runs
값에 막혀 실행되지 못한 경우에 재수행까지 얼마나 기다리도록 할지 지연시간 값을 주는 옵션입니다.
- DagRun을 다시 수행하려고 할 때,
-x, --donot-pickle
- 수행할 DAG를 Worker에 넘겨줄 때 피클링하지 않고, Worker가 갖고 있는 코드를 그대로 수행하도록 하고자 할 때 사용합니다. (언제 쓸 수 있을까요? 흠..)
-e END_DATE, --end-date END_DATE
- 백필 하려는 마지막 DagRun의 data_interval 시작 시각을 의미합니다.
- YYYY-MM-DD 형식을 지원하며, 만약 시간을 붙이고 싶다면
YYYY-MM-DDTHH:mm:SS
형식으로 나타낼 수 있습니다. - timezone을 추가하고 싶다면
+
로 나타낼 수 있습니다. 예를 들어 KST는 다음처럼 표현합니다.2022-09-01T00:10:00+09:00
-i, --ignore-dependencies
- 기본적으로 상위 Task가 있는 경우, 상위 Task에 종속성이 있다고 판단해서 먼저 실행하는데, 관리자가 판단하기에 종속성을 무시해도 괜찮은 경우, 상위 Task를 실행하지 않게 하는 옵션입다.
-I, --ignore-first-depends-on-past
- 첫 번째 Task에 대해서
depends_on_past
의존성을 무시하도록 하게 합니다. (다만, 첫 번째 이후의 Task는depends_on_past
에 의존성이 있습니다.)
- 첫 번째 Task에 대해서
-l, --local
- 백필 작업을 LocalExecutor에서 수행합니다. 다른 Executor가 아닌 현재 Backfill 작업을 시작한 위치에서 작업이 수행됩니다.
-m, --mark-success
- 실제 백필 작업을 수행하는 대신, 모든 Task를 일괄
SUCCESS
상태로 처리합니다.
- 실제 백필 작업을 수행하는 대신, 모든 Task를 일괄
--pool POOL
- 백필 Task가 실행될 pool을 정해줄 수 있습니다.
--rerun-failed-tasks
- 백필 작업중에 실패한 Task가 있더라도, Exception을 던지는 것이 아니라, 재수행하도록 합니다.
--reset-dagruns
- DagRun을 초기화합니다.
-B, --run-backwards
- 이 옵션을 주면, 가장 최근의 Data_Interval 부터 실행합니다.
END_DATE
부터START_DATE
로 흘러갑니다.
- 이 옵션을 주면, 가장 최근의 Data_Interval 부터 실행합니다.
-s START_DATE, --start-date START_DATE
- 백필 하려는 시작 DagRun의 data_interval 시작 시각을 의미합니다.
- END_DATE 파라미터와 마찬가지로 YYYY-MM-DD 형식을 지원하며, 만약 시간을 붙이고 싶다면
YYYY-MM-DDTHH:mm:SS
형식으로 나타낼 수 있습니다.
-t TASK_REGEX, --task-regex TASK_REGEX
- DAG 내의 특정 Task만 백필하고자 할 때 사용합니다. 지정해주지 않을 경우, DAG 내의 전체 Task가 백필됩니다.
예를 들어 some_dag
의 some_task
를 2022-09-01T00:10:00
부터 2022-09-01T12:10:00
까지 백필하고 싶다면 다음처럼 실행하면 됩니다.
$ airflow dags backfill -t some_task some_dag -s 2022-09-01T00:10:00 -e 2022-09-01T12:10:00
버그
다만 현재 Airflow backfill 커맨드에는 버그가 있습니다. 백필 작업이 시작되면 현재 실행중인 DagRun의 모든 Task Instance를 Scheduled
상태로 설정하는데, 이게 dagrun.update_state
를 매번 호출하는데 RUNNING
상태이거나 Schedulable
상태인 Task Instance가 없을 때 Deadlock 상태에 빠집니다.
요약하자면, 실행할 Task Instance를 전부 예약을 걸어두었는데, 추가 예약이 없는 경우 더 진행되지 않고 예약 상태로만 남아있는 문제입니다.
2.4.0RC1에서 이를 패치한 코드가 있어서 체리픽해보았는데, 처음엔 잘 되는 거 같아 실제 클러스터에도 적용해봤는데 잘 안 되더군요 🥲 백필 코드만 바꾼게 아닐 수도 있을 거 같아서, 실제 2.4.0 버전이 릴리즈되면 그때 다시 적용해보려고 합니다.
Clear
Backfill과 더불어서 Clear도 백필 작업처럼 사용할 수 있습니다. Clear는 기존에 실행되었던 DagRun을 지워주는 역할을 합니다.
usage: airflow tasks clear [-h] [-R] [-d] [-e END_DATE] [-X] [-x] [-f] [-r] [-s START_DATE] [-S SUBDIR] [-t TASK_REGEX] [-u] [-y] dag_id
Clear a set of task instance, as if they never ran
positional arguments:
dag_id The id of the dag
optional arguments:
-h, --help show this help message and exit
-R, --dag-regex Search dag_id as regex instead of exact string
-d, --downstream Include downstream tasks
-e END_DATE, --end-date END_DATE
Override end_date YYYY-MM-DD
-X, --exclude-parentdag
Exclude ParentDAGS if the task cleared is a part of a SubDAG
-x, --exclude-subdags
Exclude subdags
-f, --only-failed Only failed jobs
-r, --only-running Only running jobs
-s START_DATE, --start-date START_DATE
Override start_date YYYY-MM-DD
-S SUBDIR, --subdir SUBDIR
File location or directory from which to look for the dag. Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg'
-t TASK_REGEX, --task-regex TASK_REGEX
The regex to filter specific task_ids to backfill (optional)
-u, --upstream Include upstream tasks
-y, --yes Do not prompt to confirm. Use with care!
-R, --dag-regex
- DAG의 이름을 정확하게 지정하지 않는 대신, DAG_ID를 regex를 이용해 검색할 수 있게 합니다.
-d, --downstream
- 클리어 대상이 되는 태스크의 하위 태스크(현재 태스크에 의존성을 갖고 있는 태스크)를 포함해서 클리어합니다.
-e END_DATE, --end-date END_DATE
- Clear 할 마지막 DagRun의 data_interval 시작 시각을 의미합니다.
- YYYY-MM-DD 형식을 지원하며, 만약 시간을 붙이고 싶다면
YYYY-MM-DDTHH:mm:SS
형식으로 나타낼 수 있습니다. - timezone을 추가하고 싶다면
+
로 나타낼 수 있습니다. 예를 들어 KST는 다음처럼 표현합니다.2022-09-01T00:10:00+09:00
-X, --exclude-parentdag
- Clear 된 Task가 어떤 SubDAG의 일부일 때 ParentDAG을 Clear 대상에 포함하지 않습니다.
-x, --exclude-subdags
- SubDAG을 포함하지 않습니다.
-f, --only-failed
- FAILED 상태의 작업에 대해서만 Clear를 수행합니다.
-r, --only-running
- RUNNING 상태의 작업에 대해서만 Clear를 수행합니다.
-s START_DATE, --start-date START_DATE
- Clear 할 시작 DagRun의 data_interval 시작 시각을 의미합니다.
- END_DATE 파라미터와 마찬가지로 YYYY-MM-DD 형식을 지원하며, 만약 시간을 붙이고 싶다면
YYYY-MM-DDTHH:mm:SS
형식으로 나타낼 수 있습니다.
-S SUBDIR, --subdir SUBDIR
- DAG 파일이 위치한 디렉토리를 지정해줄 수 있습니다. 기본값으로
[AIRFLOW_HOME]/dags
디렉토리가 참조됩니다.
- DAG 파일이 위치한 디렉토리를 지정해줄 수 있습니다. 기본값으로
-t TASK_REGEX, --task-regex TASK_REGEX
- DAG 내의 특정 Task에 대해서만 Clear하고자 하는 경우 지정해주면 됩니다.
-u, --upstream
- 클리어 대상이 되는 태스크의 상위 태스크(현재 태스크가 의존성을 갖고 있는 태스크)를 포함해서 클리어합니다.
Backfill 커맨드와 같은 상황을 Clear 커맨드로 나타내보면 다음과 같습니다. (기본값만 나타내보면 크게 다르지 않습니다)
$ airflow tasks clear -t some_task some_dag -s 2022-09-01T00:10:00 -e 2022-09-01T12:10:00
Backfill 커맨드와 다르게 Web UI에서도 수행할 수 있습니다. 그리고 스케줄에 따라 스케줄러가 생성했던 DagRun이라면 Clear 되었을 때 자동으로 다시 Trigger 됩니다.
다만 Backfill 작업이 진행되었던 DagRun에 대해선 Clear 하더라도 다시 트리거가 되지 않는다는 점에 유의해야 합니다. (참고) Airflow 스케줄러는 수동으로 생성된 DagRun에 대해선 스케줄 큐에 추가하지 않기 때문입니다.
또한 이미 실행되었던 DagRun이 아니라면 당연하게도 Clear할 수 없습니다. 따라서 기록되지 않은 시각을 Trigger 하려면 Backfill을 사용하거나 날짜를 직접 주고 실행하는 Trigger Run을 이용해야 합니다. (Trigger DAG w/ config
)
마무리
그러나 백필의 핵심은 이런 기술적인 방법이 아니라, 보다 섬세한 로직과 쿼리에 있는 게 아닐까 합니다. 보통 백필을 하게 되면, 몇 개월 이상의 데이터를 백필하게 되는데, 이 경우 데이터를 읽고 처리하는 과정에서 오랜 시간이 소요됩니다. 데이터를 Write 하는 시간보다는 Read와 Process하는 쪽에서 대부분의 지연이 발생하기 때문입니다.
예를 들어, STRING 포맷으로 저장된 JSON 데이터를 처리해야 하는 경우, STRING 데이터를 읽어다가 JSON 오브젝트로 serialize 하는 과정이 수반됩니다. 로직을 적용하기엔 STRING 데이터만으로는 어렵기 때문입니다. 이러한 이유로 JSON 오브젝트로의 변환이 이뤄지는데, 이 과정에서 발생하는 자원 소모도 크고, 모든 STRING 데이터를 변환하는 데에 드는 시간도 무시할 수 없습니다.
따라서 재처리해야 하는 데이터를 최소화하고, 꼭 필요한 데이터에만 로직을 적용하려는 노력이 필요합니다. 잘 짠 로직도 중요하지만, 읽어야 할 데이터 자체가 적다면 그만큼 처리에 필요한 시간도 비약적으로 줄어들테니까요.
만약 이 글이 도움이 되셨다면 글 좌측 하단의 하트❤를 눌러주시면 감사하겠습니다.
혹시라도 글에 이상이 있거나, 오역, 이상한 번역이 있거나, 이해가 가지 않으시는 부분, 또는 추가적으로 궁금하신 내용이 있다면 주저 마시고 댓글💬을 남겨주세요! 빠른 시간 안에 답변을 드리겠습니다 😊
참고
- 백필 관련된 정보를 세세히 작성해주신 포스트
- backfill 관련 이슈를 해결한 패치
'IT > Airflow' 카테고리의 다른 글
[Airflow] Airflow 2.4.0에선 무엇이 달라졌을까? (0) | 2022.10.15 |
---|---|
[Airflow] PythonSensor에 pod override 옵션 적용하기 (0) | 2022.09.04 |
[Airflow] Pool (0) | 2022.07.09 |
[Airflow] Dynamic Task Mapping (동적 태스크 매핑) (0) | 2022.06.06 |
[Airflow] Sensor를 정리해보자 (2) | 2022.05.22 |