IT/Airflow

[Airflow] PythonSensor에 pod override 옵션 적용하기

wookiist 2022. 9. 4. 20:32

PythonSensor & Kubernetes Excutor

최근에 Kubernetes Executor를 사용할 때 PythonSensor의 pod_override 옵션을 어떻게 적용해야 하나 하는 문제로 헤맸던 적이 있습니다. 이에 대해 간단하게 정리해보았는데요.

PythonOperator 문서를 가보면, pod spec을 변경하는 방법을 소개한 문서가 존재합니다. 거기다 이렇게 executor_config를 변경한 예제 코드도 있습니다.

문제는 PythonSensor 스펙을 소개한 문서 어딜 봐도 PythonOperator가 제공하는 executor_config가 없었습니다. 특히 PythonOperator가 상속한 BaseOperator를 보면 executor_config 필드가 있었지만, BaseSensor에는 없었습니다.

BaseOperator는 executor_config에 대해 다음의 문서를 통해 알아볼 수 있습니다.

그러나 BaseSensor는 executor_config 필드가 따로 정의된 바가 없어서 어지러웠습니다. 그럼에도 Sensor도 결국 Operator와는 본질적으로 같은 오브젝트이니, 써보면 되지 않을까 해서 써봤는데, 이게 웬걸! 잘 생성되는 걸 확인할 수 있었습니다.

다음은 PythonSensor를 커스텀하는 과정에서 executor_config 필드를 이용해 pod 명세를 정의한 코드입니다. 코드를 보면 kubernetes 오브젝트 기술 방법을 사용해서 pod 디스크립션을 정의한 것을 알 수 있습니다.

K8S_VOLUMES=[
    k8s.V1Volume(
        name='some_volume',
        persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='some_volume'),
    )
]

K8S_VOLUME_MOUNTS=[
    k8s.V1VolumeMount(name='some_volume', mount_path='/some_path/')
]

...
self.executor_config = {
    "pod_override": k8s.V1Pod(
        metadata=k8s.V1ObjectMeta(
            namespace='some_namespace'
        ),
        spec=k8s.V1PodSpec(
            containers=[
                k8s.V1Container(
                    name='base',
                    volume_mounts=K8S_VOLUME_MOUNTS,
                    security_context=k8s.V1SecurityContext(
                        run_as_group=0
                    )
                )
            ],
            volumes=K8S_VOLUMES,
        )
    )
}

executor_configpod_override 필드와 KubernetesExecutor 필드를 인식합니다. 다만, KubernetesExecutor 는 예전에 사용되던 필드의 이름이고, 현재는 pod_override 필드를 사용하면 됩니다.

KubernetesExecutor 필드를 사용한다면 본 문서를 참조해보세요.

pod_override 필드는 이 문서를 참조해서 Pod 명세를 오브젝트로 표현해주면 됩니다.

만약 둘 다 사용하려고 하면, 하나만 쓰라는 에러가 발생합니다. 그러니 둘 중에 하나를 사용해주세요! 되도록이면 pod_override 필드를 사용하는 것을 추천드립니다.

그러나… 네임스페이스 버그가 있어요

그러나 현재 최신 버전까지도 이 override 옵션에 대해 버그가 존재하는데요.

metadata=k8s.V1ObjectMeta(
    namespace='some_namespace'
),

이 코드처럼 직접 정의한 네임스페이스가 적용되지 않는 버그입니다. (https://github.com/apache/airflow/pull/24342) 이 버그는 최근에 머지된 코드를 통해 해결할 수 있는데요.

airflow/kubernetes/pod_generator.py 코드를 수정해주면 됩니다.

try:
    final_namespace = pod_override_object.metadata.namespace  # type: ignore
    if not final_namespace:
        final_namespace = namespace
except Exception:
    final_namespace = namespace

해당 변경 내역을 보면, 이 코드처럼 네임스페이스 오버라이드를 하는 경우엔 최종 네임스페이스를 변경하려는 네임스페이스로 바꿔주고, 그렇지 않다면, 기본값 네임스페이스가 최종 네임스페이스가 되도록 반영해주는 매우.. 매우 단순한 코드가 추가 됩니다.

분명.. 잘 정의했는데.. 이게 왜 적용이 안 되나, 뭘 잘못 정의해서 그런건가.. 싶어서 헤매던 시간을 생각해보면 아직도 아찔합니다 ㅎㅎ

마무리

매우 짧은 글이지만, 단시간 안에 PythonSensor에 pod 수정 옵션을 주어야 했던 저에게는 귀중한 정보라서 이렇게 공유드리게 되었습니다. 순전히 제가 급하게 알아냈던 정보들이니 틀린 정보가 있을 수 있습니다. 😅

만약 이 글이 도움이 되셨다면 글 좌측 하단의 하트❤를 눌러주시면 감사하겠습니다.

혹시라도 글에 이상이 있거나, 오역, 이상한 번역이 있거나, 이해가 가지 않으시는 부분, 또는 추가적으로 궁금하신 내용이 있다면 주저 마시고 댓글💬을 남겨주세요! 빠른 시간 안에 답변을 드리겠습니다 😊

728x90
반응형