IT/Airflow 7

[Airflow] Airflow 2.4.0에선 무엇이 달라졌을까?

본 글은 Ash Berlin-Taylor가 작성한 ‘Apache Airflow 2.4.0: That Data Aware Release’ 글을 읽고 번역한 글입니다. 중간 중간에 필요한 제 사견이나 첨언도 들어가 있으니 참고 부탁드립니다. Apache Airflow 2.4.0에는 650개 이상의 유저 커밋, 그리고 총 870개 이상의 커밋이 포함되어 있습니다. 이번 버전에는 46개의 새로운 기능, 39개의 개선 사항 그리고 52개의 버그 픽스가 포함됩니다. Data-aware scheduling (AIP-48) 정말 대단한 기능입니다. 이제 Airflow는 데이터셋을 업데이트하는 다른 task를 기반으로 DAG을 스케줄링할 수 있게 되었습니다. 이게 정확히 무슨 뜻일까요? 이 기능으로 인해 DAG 작성자들..

IT/Airflow 2022.10.15

[Airflow] Backfill과 Clear를 정리해보자

Backfill 데이터 파이프라인을 운용하다보면, 이미 지난 날짜를 기준으로 재처리를 해야 하는 일이 왕왕 있습니다. 백필은 바로 이 재처리 작업을 의미합니다. 단어의 의미 그대로 ‘메우는 작업'이라고 생각하면 될 듯합니다. Backfill을 하게 되는 경우 백필 작업을 하는 경우는 나름 명확합니다. 다음과 같은 사례가 있습니다. 버그가 있거나 어떤 이유로 로직이 변경되었을 때 전체 데이터를 새로 말아주어야 할 때 컬럼 등의 메타 데이터가 변경되었을 때 이를 반영하기 위한 append 성의 작업이 필요할 때 이외에도 과거의 데이터를 재처리하고자 하는 니즈가 있다면 백필을 먼저 떠올리면 됩니다. Airflow Backfill Airflow를 이용하고 있다면 몇 가지 방법으로 백필 작업을 수행할 수 ..

IT/Airflow 2022.09.18

[Airflow] PythonSensor에 pod override 옵션 적용하기

PythonSensor & Kubernetes Excutor 최근에 Kubernetes Executor를 사용할 때 PythonSensor의 pod_override 옵션을 어떻게 적용해야 하나 하는 문제로 헤맸던 적이 있습니다. 이에 대해 간단하게 정리해보았는데요. PythonOperator 문서를 가보면, pod spec을 변경하는 방법을 소개한 문서가 존재합니다. 거기다 이렇게 executor_config를 변경한 예제 코드도 있습니다. 문제는 PythonSensor 스펙을 소개한 문서 어딜 봐도 PythonOperator가 제공하는 executor_config가 없었습니다. 특히 PythonOperator가 상속한 BaseOperator를 보면 executor_config 필드가 있었지만, Base..

IT/Airflow 2022.09.04

[Airflow] Pool

Pool Airflow가 동시에 실행하는 Task가 너무 많을 경우, 시스템에 부하를 줄 수 있습니다. Pool은 이러한 문제가 발생하는 것을 방지하기 위해, 해당 Pool을 사용하는 DAG들에서 병렬로 실행되는 Task의 개수를 제한하기 위해 도입되었습니다. Pool을 사용하면, Task Set에 대한 병렬 처리를 제한하여 각 Task가 실행되는 시기를 세밀하게 제어할 수 있습니다. 이 내용에 대해서는 마지막에 usecase를 보면서 설명드리겠습니다. 특히, 특정 Task를 수행하는 병렬 Task의 수를 제한하기 위해 자주 사용합니다. 예를 들어, 동일한 API Endpoint나 DB를 찌르는 작업일 때, 또는 Kubernetes 클러스터의 GPU 노드에서 GPU 할당을 제어하기 위해 사용합니다. 기본..

IT/Airflow 2022.07.09

[Airflow] Dynamic Task Mapping (동적 태스크 매핑)

Introduction 최근 사내에서 사용하는 몇몇 Airflow 클러스터 관리를 담당하게 되면서 Airflow 2.3.0으로의 버전업을 진행하게 되었습니다. 사실 이전에 다뤘던 두 포스트도 그 일환으로 작성했던 내용이네요 ㅎㅎ Airflow 2.3.0 업그레이드에는 나름 큰(?) 변경 사항과 추가 사항이 있습니다. 그동안 사용해왔던 TreeView를 대체하는 GridView의 도입, LocalKubernetesExecutor의 도입, 메타데이터 DB에 쌓여만 가던 오래된 데이터를 지워주는 airflow db clean 명령어 등, 각각 하나의 글로 작성해도 충분할만큼 중요한 신규 기능, 변경점들입니다. 그중에서도 가장 주목 받았던 신규 기능이라고 한다면, Dynamic Task Mapping 을 꼽을 ..

IT/Airflow 2022.06.06

[Airflow] Sensor를 정리해보자

Overview 최근 들어 Airflow를 적극적으로 다루는 일이 많아지다보니, 여러 요구사항을 만나게 되는데요. Airflow 자체적으로 제공하지 않는 스케줄링 처리, 특정 파일 및 조건을 만족할 때까지 대기하는 구간 등을 정의하게 되었습니다. 전자는 간단한 Operator를 구현해, 정공법은 아니지만 Workaround 느낌으로 파훼했고, 후자는 이번에 알아볼 Sensor로 해결할 수 있었습니다. Apache Airflow Sensor는 어떤 사건이 발생할 때 까지 기다리도록 설계된 특수한 종류의 오퍼레이터입니다. 실행된 Sensor는 특정 조건을 만족하면, 성공으로 마킹되며 이후 다운스트림 태스크를 실행합니다. 적절하게 사용되면, DAG를 좀 더 Event-Driven하게 작성할 수 있도록 도와줍니..

IT/Airflow 2022.05.22

[Airflow] Airflow DAG Serialization (직렬화)

직렬화(Serialization) Airflow의 DAG 직렬화를 알아보기 전에, 직렬화가 무엇인지부터 정리해보려고 합니다. 우리가 어떤 객체 데이터를 가지고 저장하거나 통신한다고 가정해보겠습니다. 실제 실행중인 프로세스상에서 데이터는 연속적이지 않게 메모리에 퍼져 있습니다. 물론 프로세스를 메모리에 연속된 주소로 할당하는 기법도 있긴 하지만, 단편화 문제도 있고, 가장 크게는 메모리 크기를 넘어서는 프로세스는 실행할 수 없다는 치명적인 문제가 있어서 사용하지 않고 있으니 요 친구는 논외로 하겠습니다. 그러니 앞서 말씀드린 것처럼, 객체 데이터는 비연속적으로 퍼져 있다고 볼 수 있습니다. 이런 객체 데이터를 가지고 통신하거나 저장하려면, 그 객체 데이터 그대로를 사용해서는 불가능합니다. 메모리 주솟값 등..

IT/Airflow 2022.05.08