Apache Airflow 이용한 데이터 워크플로우 자동화

 최유석

글에서는 데이터 워크플로우 관리도구인 Apache Airflow 병렬 배치 데이터 전송 도구인 TreasureData Embulk 이용해서 Cloud SQL에서 빅쿼리까지의 데이터 워크플로우를 구성하고 자동화하는 방법에 대해서 알아보겠다. Embulk 대한 자세한 내용은 생략한다.  조대협님의 http://bcho.tistory.com/1126 Embulk 대한 내용이 자세히 소개되어 있으니 참고하기 바란다..

 

글에서 구현할 아키텍처의 구성은 다음과 같으며, Data Pipeline 구축하고 자동화 처리하는 주요목적이어서 Reporting Services부분에 대한 별도 설명은 생략한다.

 

일반적인 빅데이터 분석 과정

빅데이터를 분석하기 위해서는 먼저 어플리케이션에서 로그 사용자 정보, 등의 데이터를 수집하는 시스템이 있고 이렇게 수집된 원천데이터에서 데이터를 추출하고 처리되어야하는 데이터에 따라 BATCH/Streaming 형태로 변환 가공 처리해서 하둡과 같은 오픈소스 기반의 시스템 또는 클라우드 기반의 데이터 웨어하우스 서비스(CF. 구글 빅쿼리) 적재해서 이렇게 적재된 데이터를 가지고 쿼리를 실행해서 원하는 형태로 결과 데이터를 만들고 시각화 하기까지 일련의 과정을 거쳐서 빅데이터 분석을 하게 된다.

 

데이터 워크플로우 관리도구의 등장

앞에서 보았던 것처럼, 빅데이터 분석을 하기까지의 일련의 과정에서 데이터를 수집하고, ETL처리하고 쿼리하는 등의 여러 개의 구간으로 나뉘어서 각각의 태스크를 연결해서 실행되게 되는데 이렇게 실행되는 여러 개의 태스크들을 하나의 파이프라인으로 실행할 있게 해주고 스케줄링 기능을 통해서 자동화까지 가능하게 해주는 데이터 워크플로우 관리 도구가 등장하게 되었다. 사실 이러한 기능은 CRON 스크립트를 이용하면 순차적인 태스크의 실행, 스케줄링, 등에 대한 구현은 가능했지만, 에러에 대한 디버깅, 재처리 작업등이 어려웠고 앞의 태스크의 실행 결과에 대한 분기, 분산 환경에 대한 지원, 등이 필요하게 되면서 구조화 기능을 충족하는 데이터 워크 플로우 관리 도구의 필요성이 높아지게 되었다.

데이터 워크플로우 관리도구

앞에서 설명한 것처럼, 데이터를 분석 과정에서의 늘어나는 요구 사항을 충족시키기 위해서 오픈소스로 여러가지 데이터 워크플로우를 관리하기 위한 도구가 개발되었는데, 대표적인 도구로는 하둡 에코시스템(HADOOP ECOSYSTEM) 우지(oozie) 등이 있다. 하둡의 여러가지 에코 시스템의 도구들을 유기적으로 조합하기 위해 개발된 도구로 다양한 하둡 에코시스템의 도구들을 연동하기 위한 아답터를 가지고 있다. 글에서 소개하고자 하는 데이터 워크플로우 관리 도구인 아파치 에어플로우(Apache Airflow) 처음 에어비앤비(Airbnb) 엔지니어링팀에서 개발된 도구로 현재는 아파치 재단에서 인큐베이션 단계에 있는 소프트웨어이다. 이글에서 Airflow 소개하려는 이유는 첫번째로 개발자, 데이터 분석 전문가(데이터 과학자, 엔지니어)에게 익숙한 파이썬을 기반으로 태스크들에 대한 코드를 작성할 있어 접근이 쉽다. 두번째로 HDFS 같이 여러대의 노드가 동작해야 하는 환경에 대해서도 지원한다. 세번째로 UI기반의 강력한 모니터링 기능을 제공하여 여러 개의 태스크가 합쳐져서 동작하는 데이터 파이프라인에서도 강력한 디버깅 기능을 사용할 있다. 마지막으로 GCP Operator 제공해서 구글 클라우드의 여러가지 데이터 분석 서비스와 연계해서 사용이 쉽다 라는 장점을 가지고 있다.

 

Airflow 구조

DAG (Directed Acyclic Graph)

Airflow에서는 DAG 앞에서 설명한 것처럼, 여러 개의 테스트가 합쳐져서 동작하는 하나의 데이터 워크 플로우를 말한다. DAG 파이썬 코드를 기반으로 사용자가 정의하고 실행된다. 예를 들면 위의 그림에서처럼 CloudSQL에서부터 데이터를 가져와서 빅쿼리까지 적재하는 모든 개별 구간(태스크) 합쳐 하나로 만든 데이터 파이프라인이 DAG 된다.

Operator and Task

Operator DAG안에서 정의되는 작업 함수(함수임을 주의하자) 이며, 데이터가 처리되고 이동하는 각각의 개별 태스크를 정의하는 함수이다.

Operator DAG 실제로 데이터 워크플로우에 정의되고 실행되면서 호출 것이  Task 이다. 조금 이해를 돕기 위해 부연설명을 덧붙이자면, 객체지향 언어에서의 Operator class 라면, Task object 라고 보면 된다.

 

Airflow 데이터 워크플로우 만들기

처음에 소개한 것처럼. 에어플로우(Airflow) 엠벌크(Embulk) 이용해서 GCE VM인스턴스를 이용해서 ETL서버를 구축하고 구글 클라우드의 RDBMS 서비스인 CloudSQL 있는 데이터를 수집해서 구글 클라우드의 데이터 웨어하우스 서비스인 빅쿼리까지 데이터를 적재하고 간단한 쿼리를 실행하기까지의 하나의 데이터 파이프라인(워크플로우) 구성하는 과정에 대해 설명하겠다.

 

샘플 데이터 준비하기

먼저, 예제에서 사용할 데이터는 빅쿼리에서 공개 데이터셋으로 제공하는 이용하겠다.

빅쿼리에서 제공하는 테이블 데이터 내보내기 기능을 통해 구글 클라우드 스토리지(GCS) 테이블 데이터를 내보내고 CSV데이터 파일을 Cloud SQL My SQL 인스턴스에 Import 하여 활용하였다. 또한, 예제를 통해 데이터 워크 플로우를 구성하는 방법에 대한 소개가 주요 목적이기 때문에 비교적 작은 크기의 테이블 데이터를 사용했다.

샘플 테이블 정보

 샘플로 사용한 빅쿼리 공개 데이터셋의 테이블(311_service_requests) 2008 7 월부터 현재까지의 모든 샌프란시스코 311전화번호 서비스에 대한 요청 정보가 포함되어 있다. 테이블의 크기는 767MB, 2,659,261개의 Row 가지고 있다.

 

빅쿼리 테이블 데이터 export

아래와 같이, 빅쿼리에 공개데이셋 테이블(311_service_requests) export해서 저장시킬 GCS 버켓을 먼저 생성한다.

  • Default storage class : Regional
  • Regional location : us-central1

 

GCS버켓을 생성하였다면, 빅쿼리 콘솔로 접속해서 311_service_requests 테이블을 csv형식으로 export한다.

  • Google cloud Storage URI : gs://bigquery-public-dataset/311_service_requests.csv
  • 나머지 : Default

Cloud SQL인스턴스 생성 데이터 Import

앞에서 설명한 것처럼, 예제에서 사용할 빅쿼리 공개 데이터셋의 테이블(311_service_requests) GCS export하였다. 이제 GCS 저장된 csv형식의 데이터를 CloudSQL MySQL(2세대)인스턴스를 생성하여 Import하여 예제를 진행하기 위한 데이터 준비를 마무리 하겠다.

 

먼저, 클라우드 콘솔에 접속해서 CloudSQL메뉴로 진입하고 MySQL(2세대) 인스턴스 생성을 진행한다.

 

임의로 인스턴스의 이름과 패스워드를 지정하고 하단의 생성 메뉴를 확장하여, 4 Authorize networks 에서 DB접속을 허용할 네트워크 IP범위를 0.0.0.0/0으로 입력하고 MySQL 인스턴스를 생성한다.

 

MySQL 테이블 생성

앞에서 생성한 MySQL인스턴스에 테이블을 생성하기 위해서 CloudShell 접속한다. CloudShell 접속하면 다음의 명령어를 실행하고 앞에서 MySQL인스턴스 생성 시에 지정했던 패스워드를 입력하여 MySQL Shell 접속한다..

$ gcloud sql connect airflow-test-db --user=root

(CF. $ gcloud sql connect "인스턴스명" --user=root)

MySQL Shell 접속하였다면, 아래 쿼리를 실행해서 데이터 베이스와 테이블을 생성한다

CREATE DATABASE sanfrancisco;
 
USE sanfrancisco;
 
CREATE TABLE `311_service_requests`
(`unique_key` INT(11) NULL DEFAULT NULL,
`created_date` TIMESTAMP NULL DEFAULT NULL,
`closed_date` TIMESTAMP  NULL DEFAULT NULL,
`resolution_action_updated_date` TIMESTAMP  NULL DEFAULT NULL,
`status` VARCHAR(255) NULL DEFAULT NULL,
`status_notes` VARCHAR(255) NULL DEFAULT NULL,
`agency_name` VARCHAR(255) NULL DEFAULT NULL,
`category` VARCHAR(255) NULL DEFAULT NULL,
`complaint_type` VARCHAR(255) NULL DEFAULT NULL,
`descriptor` VARCHAR(255) NULL DEFAULT NULL,
`incident_address` VARCHAR(255) NULL DEFAULT NULL,
`supervisor_district` INT(11) NULL DEFAULT NULL,
`neighborhood` VARCHAR(255) NULL DEFAULT NULL,
`location` VARCHAR(255) NULL DEFAULT NULL,
`source` VARCHAR(255) NULL DEFAULT NULL,
`media_url` VARCHAR(255) NULL DEFAULT NULL,
`latitude` FLOAT NULL DEFAULT NULL,
`longitude` FLOAT NULL DEFAULT NULL,
`police_district` VARCHAR(255) NULL DEFAULT NULL
)
COLLATE='utf8_general_ci'
ENGINE=InnoDB
;

 

CSV데이터 Import

이제 MySQL인스턴스에 테이블 생성까지 끝났다면, 빅쿼리에서 Export하여 GCS 저장한 CSV형식의 테이블 데이터 파일을 CloudSQL인스턴스(airflow-test-db) Import하기위해 Instance details페이지로 접속하여 상단의 IMPORT버튼을 클릭하고 아래와 같이 설정하여 CSV데이터 Import 진행한다.

 

  • Cloud Storage file : bigquery-public-dataset/311_service_requests.csv
  • Format of import : CSV
  • Database : sanfrancisco
  • Table : 311_service_requests 

데이터의 크기가 크지 않기 때문에 약간의 시간이 지나면 CloudSQL 생성한 테이블에 CSV파일 Import 완료된다.

ETL 처리 서버 구성하기

지금까지 예제에서 사용 데이터의 준비가 완료되었다. 이제 Compute Engine(GCE)에서 인스턴스(VM) 하나 생성하고 VM Embulk 설치하고 CloudSQL에서 빅쿼리까지의 데이터 전송과정에서의 ETL처리를 위한 서버 환경을 구성한다.

VM 생성하기

준비된 데이터의 ETL처리를 담당할 Compute Engine VM 아래와 같이 생성한다.

  • Name: etl-server
  • Zone: us-central1-a
  • Boot Disk: Ubuntu 14.04 LTS / 10 GB(Standard persistent disk)
  • Identity and API access : Allow full access to all Cloud APIs

* 생성한 VM에서 CloudSQL, GCS, BigQuery 대한 액세스 권한을 가지고 있어야한다. 편의를 위해 모든 Cloud API 대한 전체 액세스 허용하는 옵션으로 선택한다.

기타 : Embulk 배치 데이터 전송도구 이다. JVM위에서 동작하고 배치 처리가 주요동작이기 때문에 실행 많은 리소스가 필요할 있다. 글에서는 예제를 위해서 서버의 (CPU, Memory, Disk)크기를 작게 설정하였기 때문에 실제 동작이 느릴 있다.

 

Embulk 설치하기

VM생성이 완료 되었다면 SSH터미널로 접속하여 데이터 전송 ETL 작업을 수행할 Embulk 사용할 패키지들을 설치한다.

 

자바 설치

Embulk 설치 실행을 , JVM(Java Virtual Machine) 대한 의존성을 가지고 있다. 먼저, 다음의 명령어를 실행해서 우분투 apt패키지 설치 정보를 업데이트하고 JVM 설치한다.

$ sudo apt-get update

$ sudo apt-get install default-jre

Embulk 설치

JVM설치가 완료되었다면, 아래 명령어를 실행해서 Embulk 설치를 진행한다.

$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"

$ chmod +x ~/.embulk/bin/embulk

$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc

$ source ~/.bashrc

 

Embulk MySQL Input, BigQuery Output, parser-none 플러그인 설치

Embulk 설치가 완료되었다면, 예제에서 사용하게 MySQL Input 플러그인과 BigQuery Output 플러그인, 그리고 Embulk 실행 속도를 빠르게 하기 위해서 parser-none플러그인을

아래 명령어를 실행해서 설치한다.

$ embulk gem install embulk-input-mysql 

$ embulk gem install embulk-output-bigquery 

$ embulk gem install embulk-parser-none

 

Embulk MySQL 데이터 가져오기

Embulk 이용해서 앞에서 준비한 CloudSQL(MySQL) 테이블 데이터를 가져오기 위해 311_service_requests_input.yml이라는 이름으로 Embulk 설정파일을 아래와 같이 생성한다.

in:
  type: mysql
  auth_method: compute_engine
  project: “프로젝트 ID”
  host: “CloudSQL 인스턴스 IP”
  port: 3306
  user: root
  password: “CloudSQL 인스턴스의 root 계정 패스워드
  database: sanfrancisco
  column_options:
    created_date: {type: string, timestamp_format: '%Y-%m-%d %H:%M:%S'}
  query: |
    SELECT created_date, status,source
    FROM 311_service_requests
    WHERE status="Closed"
      AND MONTH(created_date) = MONTH(now())
      AND DAYOFMONTH(created_date) = DAYOFMONTH(now())
    GROUP BY created_date, status, source
    ORDER BY created_date DESC
out:
  type: file
  path_prefix: /데이터를 가져올 디렉토리/311_service_requests_
  file_ext: CSV
  formatter:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    header_line: false
    columns:
    - {name: unique_key, type: long}
    - {name: created_date, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: closed_date, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: resolution_action_updated_date, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: status, type: string}
    - {name: status_notes, type: string}
    - {name: agency_name, type: string}
    - {name: category, type: string}
    - {name: complaint_type, type: string}
    - {name: descriptor, type: string}
    - {name: incident_address, type: string}
    - {name: supervisor_district, type: long}
    - {name: neighborhood, type: string}
    - {name: location, type: string}
    - {name: source, type: string}
    - {name: media_url, type: string}
    - {name: latitude, type: double}
    - {name: longitude, type: double}
    - {name: police_district, type: string}
 
  •      .yml형식의 설정파일을 보면 크게 in(input) out(output) 두가지 부분으로 나뉜다.
  •       in부분을 보면 mysql플러그인을 실행하기위해 type mysql 인증 처리를 위한 옵션으로 auth_method compute_engine이라는 인증 방식을 사용하였다.( 방식으로 인증하게 되면 VM생성 시에 다른 구글 클라우드 API 대한 액세스 권한을 가지게 하였기 때문에 별도의 인증 절차를 생략할 있다.) 다음으로 MySQL 테이블에 접근하기 위한 정보를 해당 옵션들(host, user, port, ) 사용해서 설정하고, column_options부분에 테이블에서 날짜 형식의 컬럼 데이터를 가져올 소수점 처리에서 에러가 있어 이를 방지하기 위해 string형식으로 가져오게 한다. 마지막으로 query부분에 MySQL에서 테이블에 쿼리를 실행하고 원하는 특정 결과 데이터만 가져올 있게 있도록 쿼리를 입력하였다. 이를 이용하면 사용자가 원하는 데이터만 가져올 있게 되어 별도의 프로그래밍 적인 ETL처리작업이 필요 없어져서 매우 유용하게 사용할 있는 옵션이다.
  •       MySQL 저장된 테이블 데이터를 가져올 실행되는 쿼리는 311콜센터에 접수되어 있는 데이터 중에서 현재 상태가 “Closed” 데이터들에 대해 언제 어떠한 매체(Web, Phone, ETC) 통해 접수 되었는 지에 대한 정보를 현재 날짜와 같은 날짜의 데이터(예제를 위한 데이터로 이전의 데이터이기 때문에 년도는 무시한다.) 가져오게 하였다.
  •       out부분을 보면 앞의 in부분 설정을 통해서 가져온 MySQL데이터를type file 해서 path_prefix 지정한 위치에 file_ext옵션으로 CSV파일로 생성하게 지정하고 formatter옵션을 이용해서 생성될 CSV 파일에 대한 상세 설정(컬럼, 캐릭터셋, ) 하였다.

 

MySQL 가져오기 위한 설정파일을 생성하였다면, 아래의 Embulk명령어를 실행해서 실제로 MySQL인스턴스에서 데이터를 정상적으로 가져오는지 확인한다.

$ embulk run 311_service_requests_input.yml

 

Embulk 빅쿼리에 데이터 적재하기

VM 구성한 Embulk 통해서 CloudSQL(MySQL) 데이터를 정상적으로 가져오는 것을 확인하였다면, 가져온 CSV데이터 파일을 빅쿼리에 적재하는 과정을Embulk 이용해서 진행한다.

Embulk 이용해서 가져온 테이블 데이터 파일(CSV) 빅쿼리에 적재하기 위해 311_service_requests_output.yml이라는 이름으로 Embulk설정파일을 아래와 같이 생성한다.

in:
  type: file
  path_prefix: /가져온 데이터 파일을 저장한 디렉토리/311_service_requests_
  parser:
    type: none
out:
  type: bigquery
  mode: append
  auth_method: compute_engine
  project: lab-test-01
  dataset: embulk
  table: 311_service_requests_%Y%m%d_%H
schema_file: /스키마파일을 저장한 디렉토리/311_service_requests.json
auto_create_table: true
  •       in부분을 보면 빅쿼리에 적재할 데이터에 대해 설정하고 none-parser 플러그인을 사용해서 파싱을 하지 않게 해서 적재 성능을 높였다.
  •      out부분을 보면 type bigquery 설정하고 mode append 해서 기존 테이블이 있다면 추가데이터는 덧붙이게 된다. auth_method 인증방식은 MySQL에서 데이터를 가져올 생성한 방식과 동일하게 compute_engine으로 적용하고 빅쿼리에 저장할 데이터셋, 테이블 형식(Wild카드 형식으로 쿼리 사용이 가능하게 테이블을 생성) 파싱작업 생략을 위해 schema_file json형식의 스키마 파일을 생성해둔다. 테이블의 경우 없으면 자동으로 생성되도록 auto_create_table옵션을 true 주었다.
  •      스키마 파일의 내용은 아래와 같이 구성되어 있다.
[
   {
    "name": "created_date",
    "type": "TIMESTAMP",
    "mode": "NULLABLE"
   },  
   {
    "name": "status",
    "type": "STRING",
    "mode": "NULLABLE"
   },  
   {
    "name": "source",
    "type": "STRING",
    "mode": "NULLABLE"
   }
]

빅쿼리로 데이터를 적재하기위한 설정파일을 생성하였다면, 아래의 Embulk명령어를 실행해서 실제로 데이터를 정상적으로 빅쿼리에 적재되는지 확인한다.

$ embulk run 311_service_requests_output.yml

Airflow 서버 구성하기

지금까지 CloudSQL에서 빅쿼리까지 데이터를 적재하는 과정을 각각의 구간으로 나누어 실행해보았다. 이제 VM 추가로 생성하여 앞에서 각각 구간 별로 나누어 실행하였던 데이터 파이프라인 구간을 하나의 구간의 데이터 워크플로우로 통합하고 자동화하여 실행할 있게 해주는 Airflow 설치하고 환경을 구성해보도록 한다.

VM 생성하기

앞에서 ETL처리를 위한 VM생성할 때와 적용되는 옵션은 다르지 않다. 별도 설명은 생략한다.

방화벽 포트 오픈

Airflow 웹서버를 실행하여 액세스 하기 위해서 다음과 같이 현재 사용중인 프로젝트의 네트워크에 대한 방화벽 8080포트를 허용하는 새로운 규칙을 추가한다.

  • Targets : All instances in the network
  • Source filter : IP ranges
  • Source IP ranges : 0.0.0.0/0
  • Protocols and ports : Specified protocols and ports -> tcp:8080
  • 나머지 : Default

 

Airflow 설치하기

Compute Engine VM생성 방화벽 8080포트에 대한 허용 규칙을 추가하였다면, 새롭게 생성한 VM SSH 터미널에 접속하여 다음의 명령어를 실행해서 Airflow 설치한다.

환경 변수 설정
$ export AIRFLOW_HOME=~/airflow
 
Airflow 설치
$ pip install airflow
 
DB 초기화
$ airflow initdb
 
서버 실행
$ airflow webserver -p 8080

위의 명령어 순서대로 Airflow 실행하고 서버를 실행한 상태에서 생성한 VM External IP 8080(http://VM External IP:8080) 접근하여 정상적으로 동작되는 확인한다.

Airflow 설정 파일 수정하기

Airflow 설치가 완료되었다면, Airflow 정상적인 동작을 위해 설정파일($AIRFLOW_HOME/airflow.cfg) 수정하여야 한다. VI등의 편집기를 이용해서 설정 파일의 내용을 아래와 같이 수정한다.

...
# Are DAGs paused by default at creation // True -> False 변경
dags_are_paused_at_creation = False
...
# Whether to load the examples that ship with Airflow. It's good to
# get started, but you probably want to set this to False in a production
# environment  // True -> False 변경
load_examples = False
...

 

ETL 서버 SSH연결 설정

데이터 워크플로우의 동작을 위해서는 Airflow 구성된 서버에서 ETL서버로의 SSH 원격 명령 실행이 필요하다. ETL서버에 대한 원격SSH연결 설정을 위해 아래와 같이 Airflow 서버를 실행시킨 상태에서 콘솔의 상단의 Admin -> Connections메뉴로 이동한다.

*Airflow에서는 콘솔의 Connetions 메뉴를 통해 DB, file system, HDFS, 등의 데이터의 소스에 대한 커넥션 설정 아니라, SSH연결 설정 또한 쉽게 있는 기능을 제공하여 편리하다.

 

ETL서버에 대한 원격SSH연결 설정을 위해, [Create] 버튼을 클릭하고 다음과 같이 내용을 입력하여 새로운 커넥션 설정을 구성한다.

  • Conn Id : etl-server-ssh(임의로 설정)
  • Conn Type : SSH
  • Host : etl-server (앞에서 생성한 ETL서버 VM 이름)
  • Port: 22
  • 나머지 : 공란

Airflow DAG파일 작성

지금까지 Airflow 실행하기 위한 환경 설정을 하였다. 이제 DAG 구성(파이썬 코드로 작성)하여 앞에서 구성한 전체 구간에 대한 데이터 파이프라인을 각각의 태스크 단위로 작성하고 매시간 30분마다 자동으로 실행되도록 스케줄링을 적용하여 자동화 처리를 하도록 하겠다.

기본적으로 DAG(파이썬 실행 파일) $AIRFLOW_HOME/dags/ 저장하고 Airlfow 명령을 실행하여 동작하게 한다. 만약, 해당 폴더가 없으면 생성하고 DAG 파일을 작성하면 된다.

 

$AIRFLOW_HOME/dags/ 폴더에 data_workflow_test.py라는 이름으로 아래와 같은 내용으로 작성하여 저장한다.

from airflow import DAG
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.contrib.hooks import SSHHook
from datetime import datetime,timedelta
 
sshHook = SSHHook(conn_id='etl-server-ssh')
 
dag = DAG('airflow-test-sample',description='Data Load to GCP With airflow and Embulk',
                        schedule_interval = '30 * * * *',
                        start_date=datetime(2017, 7, 1),catchup=False)
 
embulk_run = SSHExecuteOperator(
        task_id='mysql_down',
        bash_command='/home/yooseok_choi/.embulk/bin/embulk run /home/yooseok_choi/311_service_requests_input.yml',
        ssh_hook=sshHook,
        dag=dag)
 
bq_run = SSHExecuteOperator(
        task_id='bq_load',
        bash_command='/home/yooseok_choi/.embulk/bin/embulk run /ETL서버에서 데이터를 가져올 디렉토리 /311_service_requests_output.yml',
        ssh_hook=sshHook,
        dag=dag)
 
bq_run.set_upstream(embulk_run)
 
rm_run = SSHExecuteOperator(
        task_id='local_file_delete',
        bash_command='/bin/rm / 가져온 데이터 파일을 저장한 디렉토리/311_service_requests_*',
        ssh_hook=sshHook,
        dag=dag)
 
rm_run.set_upstream(bq_run)
  •       DAG 정의 부분을 보면. 먼저, DAG 객체는 DAG 대한 전체 컨택스트를 저장 유지 관리한다.
  •       DAG('airflow-test-sample',description='Data Load to GCP With airflow and Embulk', 에서 DAG 이름을 ‘'airflow-test-sample’ 정의하고 description 설명을 작성한다. 다음으로 DAG 실행되는 주기를 정해야 하는데, schedule_interval = '30 * * * *', 부분에 cron 명령과 같은 노테이션으로 정의한다. 설정은 시간 30분마다 등록한 DAG 실행되도록 한다. 마지막으로, start_date=datetime(2017,07,01), ,DAG 언제부터 시작 것인지 지정한다. DAG 반드시 전역 변수로 지정한다. DAG안에서 다른 DAG 부르는 sub DAG 경우에는 지역 변수로 지정이 가능하다.
  •       다음 task 사용할 operator 정의하게 되는데, 콘솔에서 정의한 ETL서버에 대한 SSH Connection설정 정보를 사용하기 위해 SSHHook SSHHook으로 정의한 원격 서버에  SSH 명령( 커맨드) 실행할 SSHExecuteOperator 대한 내용을 정의한다. 먼저, 'mysql_down'이라는 이름의 task MySQL에서 데이터를 가져오는 부분을 다음으로 'bq_load'라는 이름의 task 가져온 데이터를 빅쿼리에 적재하는 부분을 정의하고 마지막으로 'local_file_delete'이라는 이름으로 빅쿼리에 데이터 적재가 끝나고 ETL서버에 저장된 데이터를 삭제하는 task 정의하였다.

  

DAG파일 작성 다음의 명령어로 작성한 DAG파일이 정상적으로 등록이 되었는지 확인한다. 정상적으로 등록되었다면, “airflow-test-sample” 등록되어 있는 DAG 리스트로 확인할 있다.

$ airflow list_dags

Airflow 재실행

이제 DAG 파일 구성 까지 완료되었다면, Airflow 웹서버를 중지하고 DB 초기화하고 재시작을 해야 정상적으로 구성한 DAG파일이 동작하게 된다. 따라서 웹서버가 동작하고 있는 터미널에서는 Ctrl+C 실행을 중지한 상태에서 터미널에서 다음의 명령을 실행한다.

DB초기화

$ airflow initdb

 

Airflow 서버 재실행

$ airflow webserver -p 8080

 

*Airflow 서버가 중지되지 않는다면 아래의 명령을 이용하면 해당 프로세스를 kill 있다.

$ cat $AIRFLOW_HOME/airflow-webserver.pid | xargs kill -9

 

Airflow DAG실행

이제 모든 구성이 완료하였다. 다른 SSH 터미널을 열고 아래의 명령어를 실행해서 등록한 DAG 정상적으로 동작되는 확인해보자.

$ airflow scheduler

 

* 위의 명령어는 Airflow DAG 등록된 스케줄에 따라서 태스크들을 지속적으로 실행하게 한다.

 

Airflow 웹서버를 이용한 모니터링

스케줄러에 등록한 DAG 정상적으로 동작되는지 Airflow 서버 콘솔로 접속해서 확인해보자.

먼저, 서버 콘솔로 접속하면 아래와 같이 앞에서 작성해서 등록한 DAG 실행되고 있는 것을 확인할 있다. DAG 실행되면서 해당 DAG에서 작성된 task 순서대로 실행되는 것을 확인할 있을 것이다.

Dag 실행이 완료되고 빅쿼리 콘솔에 CloudSQL에서 Embulk 구성된 VM에서 가져온 데이터가 앞에서 구성한대로 아래와 같이 테이블로 저장되어있는 것을 확인할 있다.

이제 Airflow 스케줄러를 정지 시키지 않는 다면 매시간 30분에 한번씩 앞에서 구성한 데이터 워크플로우에 따라 빅쿼리에 테이블이 자동으로 생성될 것이다.

 

Airflow 서버 콘솔의 주요기능

지금까지 Airflow Embulk 이용해서 CloudSQL에서부터 빅쿼리까지 데이터를 적재하기 까지의 데이터 파이프라인을 구성하고 Airflow 이용해서 하나의 자동화된 데이터워크플로우를 구성해보았다. 마지막으로 Airflow 서버에서 제공하는 주요한 기능들에 대해 알아보고 마무리하겠다.

 

Graph View

Graph View DAG 구조를 그래프 형태로 보여주는 뷰이다

 

복잡한 워크플로우의 경우 구조를 파악하는데 유용하다. 위의 그림을 보면 'airflow-test-sample'이라는 이름으로 정의한 DAG 내부 task들이 mysql_down -> bq_load -> local_file_delete 순서대로 호출되어 실행되는 것을 있다.

 

Tree View

트리뷰를 보면, DAG 구조를 트리 형태로 보여주고, DAG 태스크가 각각 성공했는지 실패 했는지를 우측 그래프처럼 표현해준다. 태스크 실행 로그를 보려면 태스크 실행 결과 그래프를 클리하면 아래와 같이 세부 메뉴가 나온다.

 

여기서 View Log 누르면 Task 별로 실행 당시의 로그를 있다. 아래는 mysql_down태스크를 실행한 로그이다.

위의 그림처럼 task 에러가 발생하여도 출력된 실행로그를 확인하여 디버깅이 가능하다..

 

Task Duration

Task duration DAG에서 수행된 태스크의 수행 시간을 아래와 같이 그래프 형태로 나타내준다.

 

Task Tries

Task Tries 에서는 수행 별로 각각의 태스크를 수행한 횟수를 그래프로 보여준다. 재시도 (RETRY)횟수를 모니터링할 있다.

 

Gantt

Gantt 차트는 수행 태스크들 대해서 수행 순서에 따라서 소모된 시간과 함께 간트 차트로 표시해준다. 태스크의 수행 순서와 태스크당 시간을 한꺼번에 보여주기 때문에 병목 구간 파악이 쉬워진다.

 

마무리

지금까지 Apache Airflow를 이용하여 데이터 워크플로우를 구성 하는 방법에 대해서 알아보았다. 이미 링크드인의 Azkaban이나, 스포티파이의 Luigi, 하둡의 Oozie 여러가지 워크 플로우 관리 시스템이 있지만, 아직 인큐베이터 단계인 airflow 주목하는 이유는 분산 환경 지원이 가능하고, 태스크에 대한 스크립트를 파이썬을 사용할 있기 때문에, 각종 빅데이타 분석 시스템이나 라이브러리 그리고 머신러닝 시스템과 연동이 쉽고, 파이썬 언어만 알면 정교한 데이터 워크 플로우 개발이 가능하기 때문에, ( XML등의 설정을 하지 않고도) 활용 가능성이 높다.

 

참고링크

https://airflow.apache.org/

https://cloud.google.com/blog/big-data/2017/07/how-to-aggregate-data-for-bigquery-using-apache-airflow

http://airbnb.io/projects/airflow/

http://www.embulk.org/docs/

https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-mysql

https://github.com/embulk/embulk-output-bigquery

https://github.com/sonots/embulk-parser-none

 

 

구글 클라우드 NAT Gateway 구성하기

 
최유석

이 글에서는 구글 클라우드의 구글 컴퓨트 엔진의 VPC 네트워크 서비스와 기능 및 인스턴스, 인스턴스 템플릿과 그룹을 이용하여 NAT Gateway를 구성하는 방법에 대해서 알아보겠다. 작년에 작성했던 글(http://whitechoi.tistory.com/24)의 내용에서 주의해야 할 부분이 추가되어 새로 글을 작성하게 되었다. 이미 많은 글들을 통해 무료평가판 등록, 프로젝트 생성, 과금설정하는 부분에 대해서 언급하였다. 따라서 이러한 부분은 생략하고 NAT Gateway를 구성하는 방법에 대해서만 간략하게 설명하겠다.

이 글을 통해 구성될 NAT Gateway의 구성도이다.

 


 

VPC 네트워크 생성

다음과 같이 VPC네트워크를 맞춤설정 네트워크로 서브넷을 추가하여 새로 생성한다.

  • VPC 네트워크 이름: nat-nw
  • 서브넷(맞춤설정)          

            - 이름 : sub-asia

- 지역: asia-east1

- IP 주소 범위: 192.168.0.0/24

- 비공개 Google 액세스: 사용 설정(이 구성과 연관성은 없지만 사용하는게 보안적으로 좋음)

  • 나머지 기본값

 

방화벽 규칙 생성

다음과 같이 SSH접속을 위한 tcp 22포트, 네트워크 내부 통신을 위한 서브넷 범위의 tcp, udp, icmp 허용 규칙을 생성한다.

SSH접속을 위한 TCP 22포트 허용

  • 방화벽 규칙 이름: asia-allow-ssh
  • 네트워크: nat-nw
  • 대상: 네트워크의 모든 인스턴스
  • 소스 필터: IP 범위
  • 소스 IP 범위: 0.0.0.0/0
  • 프로토콜 및 포트(지정된 프로토콜 및 포트): tcp:22
  • 나머지 기본값

 

서브넷(sub-asia)범위의 내부통신을 위한 TCP, UDP, ICMP 허용

  • 방화벽 규칙 이름: asia-allow-internal
  • 네트워크: nat-nw
  • 대상: 네트워크의 모든 인스턴스
  • 소스 필터: 하위 네트워크
  • 하위 네트워크: sub-asia 192.168.0.0/24
  • 프로토콜 및 포트(지정된 프로토콜 및 포트): tcp;udp;icmp
  • 나머지 기본값

 

인스턴스 생성

 

다음과 같이 NAT Gateway로 사용될 인스턴스를 생성한다.

  • 인스턴스 이름: nat
  • 영역: asia-east1-a
  • 부팅 디스크: Ubuntu 14.04 LTS
  • ID 및 API 액세스: 모든 Cloud API에 대한 전체 액세스 허용
  • 네트워크 태그: nat
  • 네트워크 인터페이스

- 네트워크: nat-nw

- 하위 네트워크: sub-asia(192.168.0.0/24)

- 외부IP: nat-ip(고정 IP로 생성, 임시로 해도 무방) 

- IP 전달: 사용

  • 나머지 기본값

인스턴스 템플릿 생성

다음과 같이 인스턴스 그룹에 사용될 인스턴스 템플릿을 생성한다. 여기에서는 NAT Gateway구성만 할 예정이지만, 인스턴스 템플릿과 그룹을 이용하여 로드밸런서, 오토스케일링에 사용할 수 있는 백엔드로 사용할 수 있다.

  • 인스턴스 템플릿 이름: instance-template

 

  • 머신 유형(맞춤설정): vCPU 1개, 1GB 메모리

 

  • 부팅 디스크: Ubuntu 14.04 LTS
  • ID 및 API 액세스: 모든 Cloud API에 대한 전체 액세스 허용
  • 네트워크 태그: no-ip
  • 네트워크 인터페이스

- 네트워크: nat-nw

- 하위 네트워크: sub-asia(192.168.0.0/24)

- 외부IP: 없음

  • 나머지 기본값

인스턴스 그룹 생성

다음과 같이 인스턴스 템플릿을 사용할 인스턴스 그룹을 생성한다.

  • 인스턴스 그룹 이름: instance-group
  • 영역: asia-east1-a
  • 인스턴스 템플릿: instance-template
  • 나머지 기본값

네트워크 경로 생성

다음과 같이 NAT Gateway 구성에 사용될 네트워크 경로를 생성한다.

  • 경로 이름: nat-route
  • 네트워크: nat-nw
  • 대상 IP 범위: 0.0.0.0/0
  • 우선순위: 800
  • 인스턴스 태그: no-ip (인스턴스 템플릿에 설정한 네트워크 태그와 동일해야하며, NAT Gateway로 사용될 인스턴스와 다른 네트워크 태그이어야 한다.) 
  • 다음 홉: 인스턴스 지정
  • 다음 홉 인스턴스: nat

iptables을 이용한 NAT 설정

iptables를 이용해서 NAT 설정을 위해 VM 인스턴스 페이지의 nat(NAT Gateway 인스턴스)의 우측에 있는 SSH버튼을 클릭하여 SSH 터미널로 접속한다.

 

nat 인스턴스의 SSH에 접속하였다면, NAT 설정을 위해 다음의 명령어를 실행 한다.

$ sudo sysctl -w net.ipv4.ip_forward=1

$ sudo iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE

 

NAT Gateway 구성 확인하기

NAT Gateway 구성이 정상적으로 되었는지 확인하기 위해 다음의 명령어를 이용해서 인스턴스 그룹을 통해 생성된 인스턴스에 SSH로 접속한다.

$ ssh instance-group-nwb2

참고

$ ssh "인스턴스 그룹으로 자동 생성된 인스턴스 명"

NAT Gateway 구성 확인을 위한 traceroute 설치

인스턴스 그룹으로 자동 생성된 인스턴스의 SSH 터미널에서 NAT Gateway가 정상적으로 구성되고 동작되는지 확인을 위해 다음의 명령어로 traceroute를 설치한다.

$ sudo apt-get install -y traceroute

traceroute명령을 이용해서 NAT Gateway 동작 확인하기

traceroute로 외부로 나가는 네트워크 이동경로 확인을 위해 다음의 명령어를 실행한다.

$ traceroute www.google.com

* traceroute 명령을 통해 www.google.com까지 위에서 구성한 nat(NAT Gateway)인스턴스의 내부 IP를 통해서 접근하는 것을 확인할 수 있다.

* 네트워크 이동 경로를 보면 내부 IP를 통해서 이동한 것으로 확인되는데 실제로는 인스턴스의 내부IP -> 외부IP로 외부 네트워크(CF. 인터넷)와 통신하게 된다. 내부IP는 외부 네트워크와의 통신이 되지 않는다.

* NAT 구성에서 주의할 점은 백엔드로 사용 될 인스턴스(또는 인스턴스 템플릿)의 네트워크 태그와 경로에 적용할 태그가 같아야하며, NAT Gateway로 사용될 인스턴스의 태그와는 달라야 한다. 예전에는 같은 태그를 적용해도 문제 없었으나, 현재 직접 테스트를 통해 확인해보니 정상적으로 구성되지 않는다. 

참고링크

https://cloud.google.com/compute/docs/vpc/special-configurations#natgateway

 

 

 

Google Cloud Dataproc 사용하기

 최유석

글에서는 Google Cloud Dataproc 대해서 알아보겠다.

 

Google Cloud DataProc?

구글 클라우드 플랫폼에서 제공하는 매니지드 HADOOP, Spark(+ Hive, Pig) 클러스터 서비스로 사용자가 손쉽게 HADOOP또는 Spark 클러스터를 구성할 있다.

빠른 클러스터 구성

규모에 상관없이 HADOOP, Spark 클러스터 구성에 걸리는 시간이 90초이내로 빠른 속도로 클러스터를 구성할 있고 이를 활용해서 사용자는 데이터 처리 또는 분석을 위한 작업에서 빠른 연산이 가능하다.

비용 절감

매우 빠른 속도로 클러스터를 구성할 있고, 분당과금 정책을 적용하여 사용자가 원하는 시간만큼 클러스터를 구성해서 연산을 실행하고 클러스터를 내려도 추가적인 가격 부담이 없다.

클러스터에 다양한 VM 타입 지원

클러스터 구성에 Predefined 형태의 VM 머신 뿐만 아니라, 사용자가 CPU, Memory크기를 자유롭게 설정하여 사용할 있는 Custom Machine Type 지원하여 각각의 Job 최적화한 크기의 클러스터 구성을 있고, 사용에 약간의 제약은 있지만, 80%낮은 가격으로 인스턴스를 사용할 있는 Preemptible VM타입을 지원하여 각각의 Job 더욱 비용 효율적인 클러스터의 구성을 있게 지원한다. 추가로 클러스터의 노드에 GPU Attach 있기 때문에 GPU 연산이 필요한 경우에도 쉽게 사용 있다.

*GPU Preemtible VM에는 지원되지 않는다.

구글 클라우드 서비스들과의 통합

단순하게 Dataproc 클러스터만 구성해서 사용할 있는게 아니라 BigQuery, Cloud Storage, Cloud Bigtable, Stackdriver Logging, Monitoring, 등과의 함께 통합된 서비스로 제공되기 때문에 더욱 유연한 사용을 가능하게 한다. 

Dataproc 클러스터 구성 테스트하기

앞서 Google Cloud Dataproc 대한 개념적인 부분에 대해서 간단하게 알아보았다. 이제 Dataproc 클러스터를 구성하고 구성한 클러스터에서 sparkSQL 이용해서 쿼리를 실행해보자. 쿼리를 실행할 처음에는 Worker Node 2 구성해서 쿼리를 실행해보고 이후 100개로 노드 수를 늘려서 쿼리를 실행하고 용량의 데이터에도 쿼리를 실행하여 Node 따라서 성능이 어떻게 달라지는지 확인해본다. 또한, Dataproc에서 사용한 동일 데이터와 쿼리를 이용해서 빅쿼리에서도 실행해보고 결과를 비교해보려고 한다. 

데이터 준비하기

예제에서 사용할 데이터는 빅쿼리에서 공개 데이터셋으로 제공하는 데이터를 이용한다.

Dataproc 클러스터에서 Cloud Storage Connector를 Built-in 형태로 제공하기 때문에 Cloud Storage 있는 데이터에 다이렉트로 액세스가 가능하다. 이를 이용해서 빅쿼리 공개 데이터셋에 있는 특정 테이블을 Cloud Storage Export하고 이렇게 Export한 데이터에 바로 접근하여 사용할 예정이다.

Node 수에 따라서 성능이 어떻게 변화하는 지 확인하기 위해 서로 크기가 다른 개의 테이블을 사용할 예정이다.

첫번째 테이블은 음악 앨범에 대한 플레이리스트 정보들이 기록되어 있으, 테이블의 크기는 1.11GB, 12,304,975개의 Row 가지고 있다

두번째로 사용할 테이블은 위키미디아에서 관리하는 각종 위키관련사이트의 페이지 뷰수에 대한 정보가 기록된 테이블로 2012 7월에 대한 데이터를 가지고 있다. 테이블의 크기는 119GB, 1,858,577,426개의 Row 가지고 있다.

위의 빅쿼리 공개 데이터셋을 GCS에 Export하여 사용하기 위해 먼저, GCS에 Regional 클래스로 US central1리전을 지정하여 버켓을 생성하고

위의 테이블들을 Export할 폴더를 각각 생성한다.

빅쿼리 공개데이터 셋에서 예제에서 사용할 테이블 데이터를 위에서 생성한 GCS 버켓의 폴더에 각각 CSV형식으로 Export한다.

*빅쿼리에서 테이블 데이터 export는 단일파일 기준으로 1GB이하의 파일에 대해서만 export를 할 수 있다. 만약, 1GB이상의 크기의 테이블 데이터에 대해서는 와일드카드 *문자를 이용한 export 사용해야 한다. 와일드 카드 문자를 이용하여 export하면 데이터가 지정한 GCS 위치(버켓 또는 버켓 내부 폴더) 자동으로 분할되어 저장된다.

 Dataproc 클러스터 생성하기

프로젝트 생성, 과금 설정, API활성화 등에 대한 내용은 앞의 다른 글에서 많이 언급하였으니 생략한다. 해당 내용은 https://cloud.google.com/dataproc/docs/quickstarts/quickstart-console

참고하기 바란다.

Google Cloud Console 좌측 상단 서비스탭에서 Dataproc페이지로 접속하고 페이지 중앙의 [Create cluster] 버튼을 클릭한다.

생성할 클러스터의 이름을 임의로 입력하고(여기서는 sparkcluster로 사용했다.)  나머지 부분은 기본값으로 생성한다.

Dataproc Cluster는 기본값으로 생성시 Master Node 1, Worker Node 2대로 생성된다

 

Master Node SSH 접속

앞서, 생성한 클러스터의 디테일 페이지에서 VM Instances탭으로 이동하고 클러스터 Master Node 우측 SSH버튼을 클릭하여 SSH 접속한다.

 

Spark SQL CLI실행

Master Node SSH에서 다음의 명령어를 실행하여 Spark SQL CLI 실행한다.

$ spark-sql

Spark SQL CLI 실행하였다.

 

Playlist테이블 생성하기

Spark SQL CLI에서 다음의 쿼리 명령어를 실행하여 앞에서 GCS로 export한 테이블 데이터 중 playlist 테이블을 생성한다.

CREATE TABLE playlist
USING com.databricks.spark.csv
OPTIONS (path "gs://버켓/playlists/*", header "true", inferSchema "true");

Playlist 테이블이 생성되었다.

 Playlist 테이블에 쿼리 실행하기

앞에서 생성한 playlist테이블을 대상으로 아래와 같은 쿼리를 실행해보자. 

SELECT * FROM playlist limit 10;

생성한 playlist 테이블의 전체 레코드 중에서 10개의 레코드를 가져오는 쿼리이다.

 

쿼리를 실행하였다.

쿼리 실행에 21.918초가 소요되었으며 쿼리 실행의 결과가 프롬프트에 나타나는 것을 확인할 있다.

 

빅쿼리에서 playlist 테이블에 쿼리 실행하기

앞에 생성한 Dataproc 클러스터에서 spark-sql 이용해 실행한 쿼리를 빅쿼리에서 실행해보자.

쿼리 실행에 2초가 소요된 것을 확인할 있다동일한 쿼리지만 별도 Sorting 하지 않았기 때문에 값은 다르게 나오니 참고 바란다.

100개의 Worker Node에서 쿼리 실행하기

이제 앞에서 생성했던 Dataproc 클러스터의 Worker Node 100개로 늘려서 쿼리를 실행하고 결과를 확인해보자. 쿼리는 앞에서 playlist테이블에 실행한 쿼리 외에도 용량이 큰 Wikimedia 테이블 데이터(119GB)를 기준으로 테이블을 생성하고 쿼리를 실행해본다. 또한, 동일한 쿼리를 빅쿼리에서도 실행해서 결과를 확인해보자.

앞에서 생성한 클러스터 디테일 페이지에서 Configuration 탭으로 이동하고 Edit 클릭하여 Worker nodes

부분의 숫자를 2 -> 100으로 변경하고 Save버튼을 클릭하여 변경한 내용을 저장한다. 

 

Node가 전부 생성될 때까지 약간의 시간이 지나면, 다음과 같이 100개의 Worker Node 생성된 것을 확인할 있다.

 Worker Node의 번호가 0에서 99까지 100개의 노드가 생성되었다.

100개의 Worker node 생성이 완료되면 다시 클러스터의 Master Node SSH 접속하여 “spark-sql” 명령으로 Spark SQL CLI 실행한다. Spark SQL CLI 실행하였으면 먼저 앞에서 생성했던 playlist테이블에 실행했던 쿼리를 실행하고 결과를 확인해보자.

100개의 Worker Node에서 Playlist테이블에 쿼리 실행하기

아래는 Dataproc 클러스터의 Worker Node 수를 100개로 늘리고 위에서 playlist 테이블에 실행했던 쿼리를 실행한 결과이다.

쿼리가 실행된 결과를 보면 18.402초가 소요된 것을 확인할 있다. 대상이 되는 playlist 데이터의 크기가 1.11GB 작기 때문에 속도 차이는 나지 않는 것으로 보여진다

 대용량의 데이터에 쿼리 실행하기

이제 데이터의 크기가 큰 Wikimedia 테이블 데이터를 대상으로 쿼리를 실행해서 쿼리 성능을 확인해보자.

테이블 생성

아래의 테이블 생성 쿼리를 실행해서 Wikimedia테이블 데이터를 이용해서 wikitest라는 이름으로 테이블을 생성한다.

CREATE TABLE wikitest (year integer, month integer, day integer, wikimedia_project string, language string, title string, views integer)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION 'gs://버켓명/wikitest/*';

테이블이 생성되었다.

Wikimedia(Wikitest) 테이블에 쿼리 실행하기

아래와 같은 쿼리를 실행하고 결과를 확인해보자. 실행 쿼리의 내용은 앞에서 생성한 wikitest테이블에서 language 값이 “en”이고 titile “sydney”라는 값이 들어있는 모든 레코드를 조회하고 이를 그룹핑하며, 조회된 데이터에 대해서 각 페이지 별 뷰수를 합산하고 합산된 뷰수를 기준으로 정렬하여 10개의 결과를 보여주는 쿼리이다.

SELECT language,title,sum(views) AS total_views
FROM wikitest
WHERE language ='en' AND title like '% sydney %'
GROUP BY language,title
ORDER BY total_views DESC
LIMIT 10;

위의 쿼리를 실행한 결과이다.

쿼리 실행에 30.387초가 소요되었으며 정상적으로 쿼리가 실행된 것을 확인할 있다.

 

추가로 wikimedia 데이터에 대해서 기본 Dataproc 클러스터(Master Node 1, Worker Node 2) 구성으 실행한 결과는 다음과 같으니 참고하기 바란다. 

기본 클러스터 구성으로 쿼리 실행 635.981초가 소요된 확인할 있다. 노드 수가 많아짐에 따라서 쿼리 성능에 큰 차이가 발생하는 것을 확인할 수 있다.

빅쿼리에서 wikimedia 테이블에 동일한 쿼리 실행하기

100개로 worker node를 가진 Dataproc 클러스터에서 실행한 쿼리를 빅쿼리에서 실행해보자.

100개 노드에서 실행에 30초 가량 소요된 쿼리가 빅쿼리에서는 쿼리 실행에 4.2초가 소요된 것을 확인할 있다

 

Dataproc 빅쿼리 쿼리 시간 비교

앞에서 Dataproc클러스터와 빅쿼리에서 실행했던 쿼리의 실행시간을 표의 형태로 정리하였다.
데이터 / 실행모듈
Dataproc(M1, W2)
Dataproc(M1, W100)
BigQuery
playlist(1.11GB)
21.918
18.402
2
Wikimedia(119GB)
635.981
30.387
4.2

*M = Master Node, W = Worker Node

동일한 데이터에 대한 동일한 쿼리 실행 속도에서 빅쿼리가 월등하게 빠른 것을 확인할 수 있다.

 

 

마무리

처음 글을 작성하려고 했던 시점에는 Google Cloud Dataproc 대한 내용만 기재하려고 하였으나, 예제에 적용할 만한 데이터를 찾다가 보니 빅쿼리의 공개 데이터셋을 이용하게 되었고 동일한 쿼리를 Dataproc 빅쿼리에서 실행하면 어떤 결과가 나올지 궁금하여 실행해보았다. 각각 쿼리를 실행하고 결과를 보니 속도에서 매우 차이를 보이는 것을 확인 있었다. 그로 인해 빅쿼리에서 쿼리를 실행한 부분에 대한 내용을 추가하게 되었다. 단순하게 위의 예제에서의 쿼리를 실행한 결과만 놓고 보면 엄청난 차이라고 볼 수 있지만, 각각의 서비스들이 가지는 특징과 기능, 사용목적, 과금 정책, 등이 모두 다르다. 따라서 각자의 사용환경에 따라서 필요한 서비스를 적용하는 것을 고려해야 것이다.

 

추가로 클라우드에서 최적화된 하둡 배포 아키텍쳐에 대한 부분에 대해서 더 알아보고 싶다면 http://bcho.tistory.com/1169 글을 참고하기 바란다.

 

참고링크

https://cloud.google.com/dataproc/docs/

https://cloud.google.com/dataproc/docs/concepts/custom-machine-types

https://cloud.google.com/dataproc/docs/concepts/preemptible-vms

https://cloud.google.com/dataproc/docs/connectors/cloud-storage

 

+ Recent posts