Apache Airflow를 이용한 데이터 워크플로우 자동화
최유석
이 글에서는 데이터 워크플로우 관리도구인 Apache Airflow와 병렬 배치 데이터 전송 도구인 TreasureData의 Embulk를 이용해서 Cloud SQL에서 빅쿼리까지의 데이터 워크플로우를 구성하고 자동화하는 방법에 대해서 알아보겠다. Embulk에 대한 자세한 내용은 생략한다. 조대협님의 글 http://bcho.tistory.com/1126 에 Embulk에 대한 내용이 자세히 소개되어 있으니 참고하기 바란다..
본 글에서 구현할 아키텍처의 구성은 다음과 같으며, Data Pipeline을 구축하고 자동화 처리하는 게 주요목적이어서 Reporting Services부분에 대한 별도 설명은 생략한다.
일반적인 빅데이터 분석 과정
빅데이터를 분석하기 위해서는 먼저 어플리케이션에서 로그 및 사용자 정보, 등의 데이터를 수집하는 시스템이 있고 이렇게 수집된 원천데이터에서 데이터를 추출하고 처리되어야하는 데이터에 따라 BATCH/Streaming 형태로 변환 및 가공 처리해서 하둡과 같은 오픈소스 기반의 시스템 또는 클라우드 기반의 데이터 웨어하우스 서비스(CF. 구글 빅쿼리) 에 적재해서 이렇게 적재된 데이터를 가지고 쿼리를 실행해서 원하는 형태로 결과 데이터를 만들고 시각화 하기까지 일련의 과정을 거쳐서 빅데이터 분석을 하게 된다.
데이터 워크플로우 관리도구의 등장
앞에서 보았던 것처럼, 빅데이터 분석을 하기까지의 일련의 과정에서 데이터를 수집하고, ETL처리하고 쿼리하는 등의 여러 개의 구간으로 나뉘어서 각각의 태스크를 연결해서 실행되게 되는데 이렇게 실행되는 여러 개의 태스크들을 하나의 파이프라인으로 실행할 수 있게 해주고 스케줄링 기능을 통해서 자동화까지 가능하게 해주는 데이터 워크플로우 관리 도구가 등장하게 되었다. 사실 이러한 기능은 CRON과 쉘 스크립트를 이용하면 순차적인 태스크의 실행, 스케줄링, 등에 대한 구현은 가능했지만, 에러에 대한 디버깅, 재처리 작업등이 어려웠고 앞의 태스크의 실행 결과에 대한 분기, 분산 환경에 대한 지원, 등이 필요하게 되면서 좀 더 구조화 된 기능을 충족하는 데이터 워크 플로우 관리 도구의 필요성이 더 높아지게 되었다.
데이터 워크플로우 관리도구
앞에서 설명한 것처럼, 데이터를 분석 과정에서의 늘어나는 요구 사항을 충족시키기 위해서 오픈소스로 여러가지 데이터 워크플로우를 관리하기 위한 도구가 개발되었는데, 대표적인 도구로는 하둡 에코시스템(HADOOP ECOSYSTEM)의 우지(oozie) 등이 있다. 하둡의 여러가지 에코 시스템의 도구들을 유기적으로 조합하기 위해 개발된 도구로 다양한 하둡 에코시스템의 도구들을 연동하기 위한 아답터를 가지고 있다. 이 글에서 소개하고자 하는 데이터 워크플로우 관리 도구인 아파치 에어플로우(Apache Airflow)는 처음 에어비앤비(Airbnb) 엔지니어링팀에서 개발된 도구로 현재는 아파치 재단에서 인큐베이션 단계에 있는 소프트웨어이다. 이글에서 Airflow를 소개하려는 이유는 첫번째로 개발자, 데이터 분석 전문가(데이터 과학자, 엔지니어)에게 익숙한 파이썬을 기반으로 태스크들에 대한 코드를 작성할 수 있어 접근이 쉽다. 두번째로 HDFS와 같이 여러대의 노드가 동작해야 하는 환경에 대해서도 지원한다. 세번째로 웹 UI기반의 강력한 모니터링 기능을 제공하여 여러 개의 태스크가 합쳐져서 동작하는 데이터 파이프라인에서도 강력한 디버깅 기능을 사용할 수 있다. 마지막으로 GCP Operator를 제공해서 구글 클라우드의 여러가지 데이터 분석 서비스와 연계해서 사용이 쉽다 라는 장점을 가지고 있다.
Airflow의 구조
DAG (Directed Acyclic Graph)
Airflow에서는 DAG은 앞에서 설명한 것처럼, 여러 개의 테스트가 합쳐져서 동작하는 하나의 데이터 워크 플로우를 말한다. DAG은 파이썬 코드를 기반으로 사용자가 정의하고 실행된다. 예를 들면 위의 그림에서처럼 CloudSQL에서부터 데이터를 가져와서 빅쿼리까지 적재하는 모든 개별 구간(태스크)을 합쳐 하나로 만든 데이터 파이프라인이 DAG이 된다.
Operator and Task
Operator는 DAG안에서 정의되는 작업 함수(함수임을 주의하자) 이며, 데이터가 처리되고 이동하는 각각의 개별 태스크를 정의하는 함수이다.
이 Operator 가 DAG 이 실제로 데이터 워크플로우에 정의되고 실행되면서 호출 된 것이 Task 이다. 조금 더 이해를 돕기 위해 부연설명을 덧붙이자면, 객체지향 언어에서의 Operator가 class 라면, Task는 object 라고 보면 된다.
Airflow 데이터 워크플로우 만들기
처음에 소개한 것처럼. 에어플로우(Airflow)와 엠벌크(Embulk)를 이용해서 GCE VM인스턴스를 이용해서 ETL서버를 구축하고 구글 클라우드의 RDBMS 서비스인 CloudSQL에 있는 데이터를 수집해서 구글 클라우드의 데이터 웨어하우스 서비스인 빅쿼리까지 데이터를 적재하고 간단한 쿼리를 실행하기까지의 하나의 데이터 파이프라인(워크플로우)를 구성하는 과정에 대해 설명하겠다.
샘플 데이터 준비하기
먼저, 이 예제에서 사용할 데이터는 빅쿼리에서 공개 데이터셋으로 제공하는 이용하겠다.
빅쿼리에서 제공하는 테이블 데이터 내보내기 기능을 통해 구글 클라우드 스토리지(GCS)로 테이블 데이터를 내보내고 이 CSV데이터 파일을 Cloud SQL My SQL 인스턴스에 Import 하여 활용하였다. 또한, 이 예제를 통해 데이터 워크 플로우를 구성하는 방법에 대한 소개가 주요 목적이기 때문에 비교적 작은 크기의 테이블 데이터를 사용했다.
샘플 테이블 정보
샘플로 사용한 빅쿼리 공개 데이터셋의 테이블(311_service_requests)은 2008 년 7 월부터 현재까지의 모든 샌프란시스코 311전화번호 서비스에 대한 요청 정보가 포함되어 있다. 테이블의 크기는 총 767MB, 총 2,659,261개의 Row를 가지고 있다.
빅쿼리 테이블 데이터 export
아래와 같이, 빅쿼리에 공개데이셋 테이블(311_service_requests)을 export해서 저장시킬 GCS 버켓을 먼저 생성한다.
- Default storage class : Regional
- Regional location : us-central1
GCS버켓을 생성하였다면, 빅쿼리 웹 콘솔로 접속해서 311_service_requests 테이블을 csv형식으로 export한다.
- Google cloud Storage URI : gs://bigquery-public-dataset/311_service_requests.csv
- 나머지 값 : Default
Cloud SQL인스턴스 생성 및 데이터 Import
앞에서 설명한 것처럼, 본 예제에서 사용할 빅쿼리 공개 데이터셋의 테이블(311_service_requests)을 GCS로 export하였다. 이제 GCS에 저장된 csv형식의 데이터를 CloudSQL에 MySQL(2세대)인스턴스를 생성하여 Import하여 예제를 진행하기 위한 데이터 준비를 마무리 하겠다.
먼저, 클라우드 콘솔에 접속해서 CloudSQL메뉴로 진입하고 MySQL(2세대) 인스턴스 생성을 진행한다.
임의로 인스턴스의 이름과 패스워드를 지정하고 하단의 생성 메뉴를 확장하여, 4 Authorize networks 에서 DB접속을 허용할 네트워크 IP범위를 0.0.0.0/0으로 입력하고 MySQL 인스턴스를 생성한다.
MySQL 테이블 생성
앞에서 생성한 MySQL인스턴스에 테이블을 생성하기 위해서 CloudShell로 접속한다. CloudShell로 접속하면 다음의 명령어를 실행하고 앞에서 MySQL인스턴스 생성 시에 지정했던 패스워드를 입력하여 MySQL Shell로 접속한다..
$ gcloud sql connect airflow-test-db --user=root
(CF. $ gcloud sql connect "인스턴스명" --user=root)
MySQL Shell에 접속하였다면, 아래 쿼리를 실행해서 데이터 베이스와 테이블을 생성한다
CREATE DATABASE sanfrancisco; USE sanfrancisco; CREATE TABLE `311_service_requests` (`unique_key` INT(11) NULL DEFAULT NULL, `created_date` TIMESTAMP NULL DEFAULT NULL, `closed_date` TIMESTAMP NULL DEFAULT NULL, `resolution_action_updated_date` TIMESTAMP NULL DEFAULT NULL, `status` VARCHAR(255) NULL DEFAULT NULL, `status_notes` VARCHAR(255) NULL DEFAULT NULL, `agency_name` VARCHAR(255) NULL DEFAULT NULL, `category` VARCHAR(255) NULL DEFAULT NULL, `complaint_type` VARCHAR(255) NULL DEFAULT NULL, `descriptor` VARCHAR(255) NULL DEFAULT NULL, `incident_address` VARCHAR(255) NULL DEFAULT NULL, `supervisor_district` INT(11) NULL DEFAULT NULL, `neighborhood` VARCHAR(255) NULL DEFAULT NULL, `location` VARCHAR(255) NULL DEFAULT NULL, `source` VARCHAR(255) NULL DEFAULT NULL, `media_url` VARCHAR(255) NULL DEFAULT NULL, `latitude` FLOAT NULL DEFAULT NULL, `longitude` FLOAT NULL DEFAULT NULL, `police_district` VARCHAR(255) NULL DEFAULT NULL ) COLLATE='utf8_general_ci' ENGINE=InnoDB ; |
CSV데이터 Import
이제 MySQL인스턴스에 테이블 생성까지 끝났다면, 빅쿼리에서 Export하여 GCS에 저장한 CSV형식의 테이블 데이터 파일을 CloudSQL인스턴스(airflow-test-db)에 Import하기위해 Instance details페이지로 접속하여 상단의 IMPORT버튼을 클릭하고 아래와 같이 설정하여 CSV데이터 Import를 진행한다.
- Cloud Storage file : bigquery-public-dataset/311_service_requests.csv
- Format of import : CSV
- Database : sanfrancisco
- Table : 311_service_requests
데이터의 크기가 크지 않기 때문에 약간의 시간이 지나면 CloudSQL에 생성한 테이블에 CSV파일 Import가 완료된다.
ETL 처리 서버 구성하기
지금까지 본 예제에서 사용 할 데이터의 준비가 완료되었다. 이제 Compute Engine(GCE)에서 인스턴스(VM)를 하나 생성하고 이 VM에 Embulk를 설치하고 CloudSQL에서 빅쿼리까지의 데이터 전송과정에서의 ETL처리를 위한 서버 환경을 구성한다.
VM 생성하기
준비된 데이터의 ETL처리를 담당할 Compute Engine의 VM을 아래와 같이 생성한다.
- Name: etl-server
- Zone: us-central1-a
- Boot Disk: Ubuntu 14.04 LTS / 10 GB(Standard persistent disk)
- Identity and API access : Allow full access to all Cloud APIs
* 생성한 VM에서 CloudSQL, GCS, BigQuery 에 대한 액세스 권한을 가지고 있어야한다. 편의를 위해 모든 Cloud API에 대한 전체 액세스 허용하는 옵션으로 선택한다.
기타 팁: Embulk 는 배치 데이터 전송도구 이다. JVM위에서 동작하고 배치 처리가 주요동작이기 때문에 실행 시 많은 리소스가 필요할 수 있다. 이 글에서는 예제를 위해서 서버의 (CPU, Memory, Disk)크기를 작게 설정하였기 때문에 실제 동작이 느릴 수 있다.
Embulk 설치하기
VM생성이 완료 되었다면 SSH터미널로 접속하여 데이터 전송 및 ETL 작업을 수행할 Embulk와 사용할 패키지들을 설치한다.
자바 설치
Embulk는 설치 및 실행을 할 때, JVM(Java Virtual Machine)에 대한 의존성을 가지고 있다. 먼저, 다음의 명령어를 실행해서 우분투 apt패키지 설치 정보를 업데이트하고 JVM을 설치한다.
$ sudo apt-get update
$ sudo apt-get install default-jre
Embulk 설치
JVM설치가 완료되었다면, 아래 명령어를 실행해서 Embulk 설치를 진행한다.
$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc
Embulk MySQL Input, BigQuery Output, parser-none 플러그인 설치
Embulk 설치가 완료되었다면, 본 예제에서 사용하게 될 MySQL Input 플러그인과 BigQuery Output 플러그인, 그리고 Embulk 실행 속도를 빠르게 하기 위해서 parser-none플러그인을
아래 명령어를 실행해서 설치한다.
$ embulk gem install embulk-input-mysql
$ embulk gem install embulk-output-bigquery
$ embulk gem install embulk-parser-none
Embulk로 MySQL 데이터 가져오기
Embulk를 이용해서 앞에서 준비한 CloudSQL(MySQL)의 테이블 데이터를 가져오기 위해 311_service_requests_input.yml이라는 이름으로 Embulk 설정파일을 아래와 같이 생성한다.
in: type: mysql auth_method: compute_engine project: “프로젝트 ID” host: “CloudSQL 인스턴스 IP” port: 3306 user: root password: “CloudSQL 인스턴스의 root 계정 패스워드” database: sanfrancisco column_options: created_date: {type: string, timestamp_format: '%Y-%m-%d %H:%M:%S'} query: | SELECT created_date, status,source FROM 311_service_requests WHERE status="Closed" AND MONTH(created_date) = MONTH(now()) AND DAYOFMONTH(created_date) = DAYOFMONTH(now()) GROUP BY created_date, status, source ORDER BY created_date DESC out: type: file path_prefix: /데이터를 가져올 디렉토리/311_service_requests_ file_ext: CSV formatter: charset: UTF-8 newline: LF type: csv delimiter: ',' quote: '"' escape: '"' header_line: false columns: - {name: unique_key, type: long} - {name: created_date, type: timestamp, format: '%Y-%m-%d %H:%M:%S'} - {name: closed_date, type: timestamp, format: '%Y-%m-%d %H:%M:%S'} - {name: resolution_action_updated_date, type: timestamp, format: '%Y-%m-%d %H:%M:%S'} - {name: status, type: string} - {name: status_notes, type: string} - {name: agency_name, type: string} - {name: category, type: string} - {name: complaint_type, type: string} - {name: descriptor, type: string} - {name: incident_address, type: string} - {name: supervisor_district, type: long} - {name: neighborhood, type: string} - {name: location, type: string} - {name: source, type: string} - {name: media_url, type: string} - {name: latitude, type: double} - {name: longitude, type: double} - {name: police_district, type: string} |
- .yml형식의 설정파일을 보면 크게 in(input)과 out(output) 두가지 부분으로 나뉜다.
- in부분을 보면 mysql플러그인을 실행하기위해 type는 mysql로 인증 처리를 위한 옵션으로 auth_method는 compute_engine이라는 인증 방식을 사용하였다.(이 방식으로 인증하게 되면 VM생성 시에 다른 구글 클라우드 API에 대한 액세스 권한을 가지게 하였기 때문에 별도의 인증 절차를 생략할 수 있다.) 다음으로 MySQL의 테이블에 접근하기 위한 정보를 해당 옵션들(host, user, port, 등)을 사용해서 설정하고, column_options부분에 테이블에서 날짜 형식의 컬럼 데이터를 가져올 때 소수점 값 처리에서 에러가 날 수 있어 이를 방지하기 위해 string형식으로 가져오게 한다. 마지막으로 query부분에 MySQL에서 테이블에 쿼리를 실행하고 원하는 특정 결과 데이터만 가져올 수 있게 할 수 있도록 쿼리를 입력하였다. 이를 이용하면 사용자가 원하는 데이터만 가져올 수 있게 되어 별도의 프로그래밍 적인 ETL처리작업이 필요 없어져서 매우 유용하게 사용할 수 있는 옵션이다.
- MySQL에 저장된 테이블 데이터를 가져올 때 실행되는 쿼리는 311콜센터에 접수되어 있는 데이터 중에서 현재 상태가 “Closed”된 데이터들에 대해 언제 어떠한 매체(Web, Phone, ETC)를 통해 접수 되었는 지에 대한 정보를 현재 날짜와 같은 날짜의 데이터(예제를 위한 데이터로 이전의 데이터이기 때문에 년도는 무시한다.)로 가져오게 하였다.
- out부분을 보면 앞의 in부분 설정을 통해서 가져온 MySQL데이터를type을 file로 해서 path_prefix로 지정한 위치에 file_ext옵션으로 CSV파일로 생성하게 지정하고 formatter옵션을 이용해서 생성될 CSV 파일에 대한 상세 설정(컬럼, 캐릭터셋, 등)을 하였다.
MySQL을 가져오기 위한 설정파일을 생성하였다면, 아래의 Embulk명령어를 실행해서 실제로 MySQL인스턴스에서 데이터를 정상적으로 가져오는지 확인한다.
$ embulk run 311_service_requests_input.yml
Embulk로 빅쿼리에 데이터 적재하기
VM에 구성한 Embulk를 통해서 CloudSQL(MySQL)의 데이터를 정상적으로 가져오는 것을 확인하였다면, 가져온 CSV데이터 파일을 빅쿼리에 적재하는 과정을Embulk를 이용해서 진행한다.
Embulk를 이용해서 가져온 테이블 데이터 파일(CSV)을 빅쿼리에 적재하기 위해 311_service_requests_output.yml이라는 이름으로 Embulk설정파일을 아래와 같이 생성한다.
in: type: file path_prefix: /가져온 데이터 파일을 저장한 디렉토리/311_service_requests_ parser: type: none out: type: bigquery mode: append auth_method: compute_engine project: lab-test-01 dataset: embulk table: 311_service_requests_%Y%m%d_%H schema_file: /스키마파일을 저장한 디렉토리/311_service_requests.json auto_create_table: true |
- in부분을 보면 빅쿼리에 적재할 데이터에 대해 설정하고 none-parser 플러그인을 사용해서 파싱을 하지 않게 해서 적재 성능을 높였다.
- out부분을 보면 type을 bigquery로 설정하고 mode를 append로 해서 기존 테이블이 있다면 추가데이터는 덧붙이게 된다. auth_method의 인증방식은 MySQL에서 데이터를 가져올 때 생성한 방식과 동일하게 compute_engine으로 적용하고 빅쿼리에 저장할 데이터셋, 테이블 형식(Wild카드 형식으로 쿼리 사용이 가능하게 테이블을 생성) 파싱작업 생략을 위해 schema_file에 json형식의 스키마 파일을 생성해둔다. 테이블의 경우 없으면 자동으로 생성되도록 auto_create_table옵션을 true로 주었다.
- 스키마 파일의 내용은 아래와 같이 구성되어 있다.
[ { "name": "created_date", "type": "TIMESTAMP", "mode": "NULLABLE" }, { "name": "status", "type": "STRING", "mode": "NULLABLE" }, { "name": "source", "type": "STRING", "mode": "NULLABLE" } ] |
빅쿼리로 데이터를 적재하기위한 설정파일을 생성하였다면, 아래의 Embulk명령어를 실행해서 실제로 데이터를 정상적으로 빅쿼리에 적재되는지 확인한다.
$ embulk run 311_service_requests_output.yml
Airflow 서버 구성하기
지금까지 CloudSQL에서 빅쿼리까지 데이터를 적재하는 과정을 각각의 구간으로 나누어 실행해보았다. 이제 VM을 추가로 생성하여 앞에서 각각 구간 별로 나누어 실행하였던 데이터 파이프라인 구간을 하나의 구간의 데이터 워크플로우로 통합하고 자동화하여 실행할 수 있게 해주는 Airflow를 설치하고 환경을 구성해보도록 한다.
VM 생성하기
앞에서 ETL처리를 위한 VM생성할 때와 적용되는 옵션은 다르지 않다. 별도 설명은 생략한다.
방화벽 포트 오픈
Airflow 웹서버를 실행하여 액세스 하기 위해서 다음과 같이 현재 사용중인 프로젝트의 네트워크에 대한 방화벽 8080포트를 허용하는 새로운 규칙을 추가한다.
- Targets : All instances in the network
- Source filter : IP ranges
- Source IP ranges : 0.0.0.0/0
- Protocols and ports : Specified protocols and ports -> tcp:8080
- 나머지 값 : Default
Airflow 설치하기
Compute Engine에 VM생성 및 방화벽 8080포트에 대한 허용 규칙을 추가하였다면, 새롭게 생성한 VM의 SSH 터미널에 접속하여 다음의 명령어를 실행해서 Airflow를 설치한다.
환경 변수 설정 $ export AIRFLOW_HOME=~/airflow Airflow 설치 $ pip install airflow DB 초기화 $ airflow initdb 웹 서버 실행 $ airflow webserver -p 8080 |
위의 명령어 순서대로 Airflow를 실행하고 웹 서버를 실행한 상태에서 생성한 VM의 External IP의 8080(http://VM External IP:8080)로 접근하여 정상적으로 동작되는 지 확인한다.
Airflow 설정 파일 수정하기
Airflow의 설치가 완료되었다면, Airflow의 정상적인 동작을 위해 설정파일($AIRFLOW_HOME/airflow.cfg)을 수정하여야 한다. VI등의 편집기를 이용해서 설정 파일의 내용을 아래와 같이 수정한다.
... # Are DAGs paused by default at creation // True -> False로 변경 dags_are_paused_at_creation = False ... # Whether to load the examples that ship with Airflow. It's good to # get started, but you probably want to set this to False in a production # environment // True -> False로 변경 load_examples = False ... |
ETL 서버 SSH연결 설정
데이터 워크플로우의 동작을 위해서는 Airflow가 구성된 서버에서 ETL서버로의 SSH 원격 명령 실행이 필요하다. ETL서버에 대한 원격SSH연결 설정을 위해 아래와 같이 Airflow 웹 서버를 실행시킨 상태에서 웹 콘솔의 상단의 Admin -> Connections메뉴로 이동한다.
*Airflow에서는 웹 콘솔의 Connetions 메뉴를 통해 DB, file system, HDFS, 등의 데이터의 소스에 대한 커넥션 설정 뿐 만 아니라, SSH연결 설정 또한 쉽게 할 수 있는 기능을 제공하여 편리하다.
ETL서버에 대한 원격SSH연결 설정을 위해, [Create] 버튼을 클릭하고 다음과 같이 내용을 입력하여 새로운 커넥션 설정을 구성한다.
- Conn Id : etl-server-ssh(임의로 설정)
- Conn Type : SSH
- Host : etl-server (앞에서 생성한 ETL서버 VM의 이름)
- Port: 22
- 나머지 값 : 공란
Airflow DAG파일 작성
지금까지 Airflow를 실행하기 위한 환경 설정을 하였다. 이제 DAG을 구성(파이썬 코드로 작성)하여 앞에서 구성한 전체 구간에 대한 데이터 파이프라인을 각각의 태스크 단위로 작성하고 매시간 30분마다 자동으로 실행되도록 스케줄링을 적용하여 자동화 처리를 하도록 하겠다.
기본적으로 DAG(파이썬 실행 파일)은 $AIRFLOW_HOME/dags/에 저장하고 Airlfow 명령을 실행하여 동작하게 한다. 만약, 해당 폴더가 없으면 생성하고 DAG 파일을 작성하면 된다.
$AIRFLOW_HOME/dags/ 폴더에 data_workflow_test.py라는 이름으로 아래와 같은 내용으로 작성하여 저장한다.
from airflow import DAG from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator from airflow.contrib.hooks import SSHHook from datetime import datetime,timedelta sshHook = SSHHook(conn_id='etl-server-ssh') dag = DAG('airflow-test-sample',description='Data Load to GCP With airflow and Embulk', schedule_interval = '30 * * * *', start_date=datetime(2017, 7, 1),catchup=False) embulk_run = SSHExecuteOperator( task_id='mysql_down', bash_command='/home/yooseok_choi/.embulk/bin/embulk run /home/yooseok_choi/311_service_requests_input.yml', ssh_hook=sshHook, dag=dag) bq_run = SSHExecuteOperator( task_id='bq_load', bash_command='/home/yooseok_choi/.embulk/bin/embulk run /ETL서버에서 데이터를 가져올 디렉토리 /311_service_requests_output.yml', ssh_hook=sshHook, dag=dag) bq_run.set_upstream(embulk_run) rm_run = SSHExecuteOperator( task_id='local_file_delete', bash_command='/bin/rm / 가져온 데이터 파일을 저장한 디렉토리/311_service_requests_*', ssh_hook=sshHook, dag=dag) rm_run.set_upstream(bq_run) |
- DAG 정의 부분을 보면. 먼저, DAG 객체는 DAG에 대한 전체 컨택스트를 저장 및 유지 관리한다.
- DAG('airflow-test-sample',description='Data Load to GCP With airflow and Embulk', 에서 DAG를 이름을 ‘'airflow-test-sample’로 정의하고 description에 설명을 작성한다. 다음으로 이 DAG가 실행되는 주기를 정해야 하는데, schedule_interval = '30 * * * *', 부분에 cron 명령과 같은 노테이션으로 정의한다. 위 설정은 매 시간 30분마다 등록한 DAG이 실행되도록 한다. 마지막으로, start_date=datetime(2017,07,01), ,DAG를 언제부터 시작 할 것인지 지정한다. DAG는 반드시 전역 변수로 지정한다. DAG안에서 다른 DAG를 부르는 sub DAG의 경우에는 지역 변수로 지정이 가능하다.
- 다음 task에 사용할 operator를 정의하게 되는데, 웹 콘솔에서 정의한 ETL서버에 대한 SSH Connection설정 정보를 사용하기 위해 SSHHook과 이 SSHHook으로 정의한 원격 서버에 SSH 명령(쉘 커맨드)을 실행할 SSHExecuteOperator에 대한 내용을 정의한다. 먼저, 'mysql_down'이라는 이름의 task로 MySQL에서 데이터를 가져오는 부분을 다음으로 'bq_load'라는 이름의 task로 가져온 데이터를 빅쿼리에 적재하는 부분을 정의하고 마지막으로 'local_file_delete'이라는 이름으로 빅쿼리에 데이터 적재가 끝나고 ETL서버에 저장된 데이터를 삭제하는 task를 정의하였다.
DAG파일 작성 후 다음의 명령어로 작성한 DAG파일이 정상적으로 등록이 되었는지 확인한다. 정상적으로 등록되었다면, “airflow-test-sample”을 등록되어 있는 DAG의 리스트로 확인할 수 있다.
$ airflow list_dags
Airflow 재실행
이제 DAG 파일 구성 까지 완료되었다면, Airflow 웹서버를 중지하고 DB를 초기화하고 재시작을 해야 정상적으로 구성한 DAG파일이 동작하게 된다. 따라서 웹서버가 동작하고 있는 터미널에서는 Ctrl+C로 실행을 중지한 상태에서 터미널에서 다음의 명령을 실행한다.
DB초기화
$ airflow initdb
Airflow 웹 서버 재실행
$ airflow webserver -p 8080
*Airflow 웹 서버가 중지되지 않는다면 아래의 명령을 이용하면 해당 프로세스를 kill 할 수 있다.
$ cat $AIRFLOW_HOME/airflow-webserver.pid | xargs kill -9
Airflow DAG실행
이제 모든 구성이 완료하였다. 다른 SSH 터미널을 열고 아래의 명령어를 실행해서 등록한 DAG이 정상적으로 동작되는 지 확인해보자.
$ airflow scheduler
* 위의 명령어는 Airflow DAG에 등록된 스케줄에 따라서 태스크들을 지속적으로 실행하게 한다.
Airflow 웹서버를 이용한 모니터링
스케줄러에 등록한 DAG이 정상적으로 동작되는지 Airflow 웹 서버 콘솔로 접속해서 확인해보자.
먼저, 웹 서버 콘솔로 접속하면 아래와 같이 앞에서 작성해서 등록한 DAG이 실행되고 있는 것을 확인할 수 있다. DAG이 실행되면서 해당 DAG에서 작성된 task의 순서대로 실행되는 것을 확인할 수 있을 것이다.
Dag의 실행이 완료되고 빅쿼리 웹 콘솔에 CloudSQL에서 Embulk가 구성된 VM에서 가져온 데이터가 앞에서 구성한대로 아래와 같이 테이블로 저장되어있는 것을 확인할 수 있다.
이제 Airflow 스케줄러를 정지 시키지 않는 다면 매시간 30분에 한번씩 앞에서 구성한 데이터 워크플로우에 따라 빅쿼리에 테이블이 자동으로 생성될 것이다.
Airflow 웹 서버 콘솔의 주요기능
지금까지 Airflow와 Embulk를 이용해서 CloudSQL에서부터 빅쿼리까지 데이터를 적재하기 까지의 데이터 파이프라인을 구성하고 Airflow를 이용해서 하나의 자동화된 데이터워크플로우를 구성해보았다. 마지막으로 Airflow 웹 서버에서 제공하는 주요한 기능들에 대해 알아보고 마무리하겠다.
Graph View
Graph View는 DAG의 구조를 그래프 형태로 보여주는 뷰이다
복잡한 워크플로우의 경우 그 구조를 파악하는데 유용하다. 위의 그림을 보면 'airflow-test-sample'이라는 이름으로 정의한 DAG의 내부 task들이 mysql_down -> bq_load -> local_file_delete 순서대로 호출되어 실행되는 것을 볼 수 있다.
Tree View
트리뷰를 보면, DAG의 구조를 트리 형태로 보여주고, DAG의 태스크가 각각 성공했는지 실패 했는지를 우측 그래프처럼 표현해준다. 각 태스크의 실행 로그를 보려면 각 태스크 실행 결과 그래프를 클리하면 아래와 같이 세부 메뉴가 나온다.
여기서 View Log를 누르면 각 Task 별로 실행 당시의 로그를 볼 수 있다. 아래는 mysql_down태스크를 실행한 로그이다.
위의 그림처럼 각 task에 에러가 발생하여도 출력된 실행로그를 확인하여 디버깅이 가능하다..
Task Duration
Task duration은 DAG에서 수행된 각 태스크의 수행 시간을 아래와 같이 그래프 형태로 나타내준다.
Task Tries
Task Tries 에서는 각 수행 별로 각각의 태스크를 수행한 횟수를 그래프로 보여준다. 즉 재시도 (RETRY)횟수를 모니터링할 수 있다.
Gantt
Gantt 차트는 각 수행된 태스크들에 대해서 수행 순서에 따라서 소모된 시간과 함께 간트 차트로 표시해준다. 각 태스크의 수행 순서와 태스크당 시간을 한꺼번에 보여주기 때문에 병목 구간 파악이 쉬워진다.
마무리
지금까지 Apache Airflow를 이용하여 데이터 워크플로우를 구성 하는 방법에 대해서 알아보았다. 이미 링크드인의 Azkaban이나, 스포티파이의 Luigi, 하둡의 Oozie 등 여러가지 워크 플로우 관리 시스템이 있지만, 아직 인큐베이터 단계인 airflow를 주목하는 이유는 분산 환경 지원이 가능하고, 태스크에 대한 스크립트를 파이썬을 사용할 수 있기 때문에, 각종 빅데이타 분석 시스템이나 라이브러리 그리고 머신러닝 시스템과 연동이 쉽고, 파이썬 언어만 알면 쉽고 정교한 데이터 워크 플로우의 개발이 가능하기 때문에, ( XML등의 설정을 하지 않고도) 활용 가능성이 높다.
참고링크
http://airbnb.io/projects/airflow/
https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-mysql
https://github.com/embulk/embulk-output-bigquery
https://github.com/sonots/embulk-parser-none