Apache Airflow 이용한 데이터 워크플로우 자동화

 최유석

글에서는 데이터 워크플로우 관리도구인 Apache Airflow 병렬 배치 데이터 전송 도구인 TreasureData Embulk 이용해서 Cloud SQL에서 빅쿼리까지의 데이터 워크플로우를 구성하고 자동화하는 방법에 대해서 알아보겠다. Embulk 대한 자세한 내용은 생략한다.  조대협님의 http://bcho.tistory.com/1126 Embulk 대한 내용이 자세히 소개되어 있으니 참고하기 바란다..

 

글에서 구현할 아키텍처의 구성은 다음과 같으며, Data Pipeline 구축하고 자동화 처리하는 주요목적이어서 Reporting Services부분에 대한 별도 설명은 생략한다.

 

일반적인 빅데이터 분석 과정

빅데이터를 분석하기 위해서는 먼저 어플리케이션에서 로그 사용자 정보, 등의 데이터를 수집하는 시스템이 있고 이렇게 수집된 원천데이터에서 데이터를 추출하고 처리되어야하는 데이터에 따라 BATCH/Streaming 형태로 변환 가공 처리해서 하둡과 같은 오픈소스 기반의 시스템 또는 클라우드 기반의 데이터 웨어하우스 서비스(CF. 구글 빅쿼리) 적재해서 이렇게 적재된 데이터를 가지고 쿼리를 실행해서 원하는 형태로 결과 데이터를 만들고 시각화 하기까지 일련의 과정을 거쳐서 빅데이터 분석을 하게 된다.

 

데이터 워크플로우 관리도구의 등장

앞에서 보았던 것처럼, 빅데이터 분석을 하기까지의 일련의 과정에서 데이터를 수집하고, ETL처리하고 쿼리하는 등의 여러 개의 구간으로 나뉘어서 각각의 태스크를 연결해서 실행되게 되는데 이렇게 실행되는 여러 개의 태스크들을 하나의 파이프라인으로 실행할 있게 해주고 스케줄링 기능을 통해서 자동화까지 가능하게 해주는 데이터 워크플로우 관리 도구가 등장하게 되었다. 사실 이러한 기능은 CRON 스크립트를 이용하면 순차적인 태스크의 실행, 스케줄링, 등에 대한 구현은 가능했지만, 에러에 대한 디버깅, 재처리 작업등이 어려웠고 앞의 태스크의 실행 결과에 대한 분기, 분산 환경에 대한 지원, 등이 필요하게 되면서 구조화 기능을 충족하는 데이터 워크 플로우 관리 도구의 필요성이 높아지게 되었다.

데이터 워크플로우 관리도구

앞에서 설명한 것처럼, 데이터를 분석 과정에서의 늘어나는 요구 사항을 충족시키기 위해서 오픈소스로 여러가지 데이터 워크플로우를 관리하기 위한 도구가 개발되었는데, 대표적인 도구로는 하둡 에코시스템(HADOOP ECOSYSTEM) 우지(oozie) 등이 있다. 하둡의 여러가지 에코 시스템의 도구들을 유기적으로 조합하기 위해 개발된 도구로 다양한 하둡 에코시스템의 도구들을 연동하기 위한 아답터를 가지고 있다. 글에서 소개하고자 하는 데이터 워크플로우 관리 도구인 아파치 에어플로우(Apache Airflow) 처음 에어비앤비(Airbnb) 엔지니어링팀에서 개발된 도구로 현재는 아파치 재단에서 인큐베이션 단계에 있는 소프트웨어이다. 이글에서 Airflow 소개하려는 이유는 첫번째로 개발자, 데이터 분석 전문가(데이터 과학자, 엔지니어)에게 익숙한 파이썬을 기반으로 태스크들에 대한 코드를 작성할 있어 접근이 쉽다. 두번째로 HDFS 같이 여러대의 노드가 동작해야 하는 환경에 대해서도 지원한다. 세번째로 UI기반의 강력한 모니터링 기능을 제공하여 여러 개의 태스크가 합쳐져서 동작하는 데이터 파이프라인에서도 강력한 디버깅 기능을 사용할 있다. 마지막으로 GCP Operator 제공해서 구글 클라우드의 여러가지 데이터 분석 서비스와 연계해서 사용이 쉽다 라는 장점을 가지고 있다.

 

Airflow 구조

DAG (Directed Acyclic Graph)

Airflow에서는 DAG 앞에서 설명한 것처럼, 여러 개의 테스트가 합쳐져서 동작하는 하나의 데이터 워크 플로우를 말한다. DAG 파이썬 코드를 기반으로 사용자가 정의하고 실행된다. 예를 들면 위의 그림에서처럼 CloudSQL에서부터 데이터를 가져와서 빅쿼리까지 적재하는 모든 개별 구간(태스크) 합쳐 하나로 만든 데이터 파이프라인이 DAG 된다.

Operator and Task

Operator DAG안에서 정의되는 작업 함수(함수임을 주의하자) 이며, 데이터가 처리되고 이동하는 각각의 개별 태스크를 정의하는 함수이다.

Operator DAG 실제로 데이터 워크플로우에 정의되고 실행되면서 호출 것이  Task 이다. 조금 이해를 돕기 위해 부연설명을 덧붙이자면, 객체지향 언어에서의 Operator class 라면, Task object 라고 보면 된다.

 

Airflow 데이터 워크플로우 만들기

처음에 소개한 것처럼. 에어플로우(Airflow) 엠벌크(Embulk) 이용해서 GCE VM인스턴스를 이용해서 ETL서버를 구축하고 구글 클라우드의 RDBMS 서비스인 CloudSQL 있는 데이터를 수집해서 구글 클라우드의 데이터 웨어하우스 서비스인 빅쿼리까지 데이터를 적재하고 간단한 쿼리를 실행하기까지의 하나의 데이터 파이프라인(워크플로우) 구성하는 과정에 대해 설명하겠다.

 

샘플 데이터 준비하기

먼저, 예제에서 사용할 데이터는 빅쿼리에서 공개 데이터셋으로 제공하는 이용하겠다.

빅쿼리에서 제공하는 테이블 데이터 내보내기 기능을 통해 구글 클라우드 스토리지(GCS) 테이블 데이터를 내보내고 CSV데이터 파일을 Cloud SQL My SQL 인스턴스에 Import 하여 활용하였다. 또한, 예제를 통해 데이터 워크 플로우를 구성하는 방법에 대한 소개가 주요 목적이기 때문에 비교적 작은 크기의 테이블 데이터를 사용했다.

샘플 테이블 정보

 샘플로 사용한 빅쿼리 공개 데이터셋의 테이블(311_service_requests) 2008 7 월부터 현재까지의 모든 샌프란시스코 311전화번호 서비스에 대한 요청 정보가 포함되어 있다. 테이블의 크기는 767MB, 2,659,261개의 Row 가지고 있다.

 

빅쿼리 테이블 데이터 export

아래와 같이, 빅쿼리에 공개데이셋 테이블(311_service_requests) export해서 저장시킬 GCS 버켓을 먼저 생성한다.

  • Default storage class : Regional
  • Regional location : us-central1

 

GCS버켓을 생성하였다면, 빅쿼리 콘솔로 접속해서 311_service_requests 테이블을 csv형식으로 export한다.

  • Google cloud Storage URI : gs://bigquery-public-dataset/311_service_requests.csv
  • 나머지 : Default

Cloud SQL인스턴스 생성 데이터 Import

앞에서 설명한 것처럼, 예제에서 사용할 빅쿼리 공개 데이터셋의 테이블(311_service_requests) GCS export하였다. 이제 GCS 저장된 csv형식의 데이터를 CloudSQL MySQL(2세대)인스턴스를 생성하여 Import하여 예제를 진행하기 위한 데이터 준비를 마무리 하겠다.

 

먼저, 클라우드 콘솔에 접속해서 CloudSQL메뉴로 진입하고 MySQL(2세대) 인스턴스 생성을 진행한다.

 

임의로 인스턴스의 이름과 패스워드를 지정하고 하단의 생성 메뉴를 확장하여, 4 Authorize networks 에서 DB접속을 허용할 네트워크 IP범위를 0.0.0.0/0으로 입력하고 MySQL 인스턴스를 생성한다.

 

MySQL 테이블 생성

앞에서 생성한 MySQL인스턴스에 테이블을 생성하기 위해서 CloudShell 접속한다. CloudShell 접속하면 다음의 명령어를 실행하고 앞에서 MySQL인스턴스 생성 시에 지정했던 패스워드를 입력하여 MySQL Shell 접속한다..

$ gcloud sql connect airflow-test-db --user=root

(CF. $ gcloud sql connect "인스턴스명" --user=root)

MySQL Shell 접속하였다면, 아래 쿼리를 실행해서 데이터 베이스와 테이블을 생성한다

CREATE DATABASE sanfrancisco;
 
USE sanfrancisco;
 
CREATE TABLE `311_service_requests`
(`unique_key` INT(11) NULL DEFAULT NULL,
`created_date` TIMESTAMP NULL DEFAULT NULL,
`closed_date` TIMESTAMP  NULL DEFAULT NULL,
`resolution_action_updated_date` TIMESTAMP  NULL DEFAULT NULL,
`status` VARCHAR(255) NULL DEFAULT NULL,
`status_notes` VARCHAR(255) NULL DEFAULT NULL,
`agency_name` VARCHAR(255) NULL DEFAULT NULL,
`category` VARCHAR(255) NULL DEFAULT NULL,
`complaint_type` VARCHAR(255) NULL DEFAULT NULL,
`descriptor` VARCHAR(255) NULL DEFAULT NULL,
`incident_address` VARCHAR(255) NULL DEFAULT NULL,
`supervisor_district` INT(11) NULL DEFAULT NULL,
`neighborhood` VARCHAR(255) NULL DEFAULT NULL,
`location` VARCHAR(255) NULL DEFAULT NULL,
`source` VARCHAR(255) NULL DEFAULT NULL,
`media_url` VARCHAR(255) NULL DEFAULT NULL,
`latitude` FLOAT NULL DEFAULT NULL,
`longitude` FLOAT NULL DEFAULT NULL,
`police_district` VARCHAR(255) NULL DEFAULT NULL
)
COLLATE='utf8_general_ci'
ENGINE=InnoDB
;

 

CSV데이터 Import

이제 MySQL인스턴스에 테이블 생성까지 끝났다면, 빅쿼리에서 Export하여 GCS 저장한 CSV형식의 테이블 데이터 파일을 CloudSQL인스턴스(airflow-test-db) Import하기위해 Instance details페이지로 접속하여 상단의 IMPORT버튼을 클릭하고 아래와 같이 설정하여 CSV데이터 Import 진행한다.

 

  • Cloud Storage file : bigquery-public-dataset/311_service_requests.csv
  • Format of import : CSV
  • Database : sanfrancisco
  • Table : 311_service_requests 

데이터의 크기가 크지 않기 때문에 약간의 시간이 지나면 CloudSQL 생성한 테이블에 CSV파일 Import 완료된다.

ETL 처리 서버 구성하기

지금까지 예제에서 사용 데이터의 준비가 완료되었다. 이제 Compute Engine(GCE)에서 인스턴스(VM) 하나 생성하고 VM Embulk 설치하고 CloudSQL에서 빅쿼리까지의 데이터 전송과정에서의 ETL처리를 위한 서버 환경을 구성한다.

VM 생성하기

준비된 데이터의 ETL처리를 담당할 Compute Engine VM 아래와 같이 생성한다.

  • Name: etl-server
  • Zone: us-central1-a
  • Boot Disk: Ubuntu 14.04 LTS / 10 GB(Standard persistent disk)
  • Identity and API access : Allow full access to all Cloud APIs

* 생성한 VM에서 CloudSQL, GCS, BigQuery 대한 액세스 권한을 가지고 있어야한다. 편의를 위해 모든 Cloud API 대한 전체 액세스 허용하는 옵션으로 선택한다.

기타 : Embulk 배치 데이터 전송도구 이다. JVM위에서 동작하고 배치 처리가 주요동작이기 때문에 실행 많은 리소스가 필요할 있다. 글에서는 예제를 위해서 서버의 (CPU, Memory, Disk)크기를 작게 설정하였기 때문에 실제 동작이 느릴 있다.

 

Embulk 설치하기

VM생성이 완료 되었다면 SSH터미널로 접속하여 데이터 전송 ETL 작업을 수행할 Embulk 사용할 패키지들을 설치한다.

 

자바 설치

Embulk 설치 실행을 , JVM(Java Virtual Machine) 대한 의존성을 가지고 있다. 먼저, 다음의 명령어를 실행해서 우분투 apt패키지 설치 정보를 업데이트하고 JVM 설치한다.

$ sudo apt-get update

$ sudo apt-get install default-jre

Embulk 설치

JVM설치가 완료되었다면, 아래 명령어를 실행해서 Embulk 설치를 진행한다.

$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"

$ chmod +x ~/.embulk/bin/embulk

$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc

$ source ~/.bashrc

 

Embulk MySQL Input, BigQuery Output, parser-none 플러그인 설치

Embulk 설치가 완료되었다면, 예제에서 사용하게 MySQL Input 플러그인과 BigQuery Output 플러그인, 그리고 Embulk 실행 속도를 빠르게 하기 위해서 parser-none플러그인을

아래 명령어를 실행해서 설치한다.

$ embulk gem install embulk-input-mysql 

$ embulk gem install embulk-output-bigquery 

$ embulk gem install embulk-parser-none

 

Embulk MySQL 데이터 가져오기

Embulk 이용해서 앞에서 준비한 CloudSQL(MySQL) 테이블 데이터를 가져오기 위해 311_service_requests_input.yml이라는 이름으로 Embulk 설정파일을 아래와 같이 생성한다.

in:
  type: mysql
  auth_method: compute_engine
  project: “프로젝트 ID”
  host: “CloudSQL 인스턴스 IP”
  port: 3306
  user: root
  password: “CloudSQL 인스턴스의 root 계정 패스워드
  database: sanfrancisco
  column_options:
    created_date: {type: string, timestamp_format: '%Y-%m-%d %H:%M:%S'}
  query: |
    SELECT created_date, status,source
    FROM 311_service_requests
    WHERE status="Closed"
      AND MONTH(created_date) = MONTH(now())
      AND DAYOFMONTH(created_date) = DAYOFMONTH(now())
    GROUP BY created_date, status, source
    ORDER BY created_date DESC
out:
  type: file
  path_prefix: /데이터를 가져올 디렉토리/311_service_requests_
  file_ext: CSV
  formatter:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    header_line: false
    columns:
    - {name: unique_key, type: long}
    - {name: created_date, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: closed_date, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: resolution_action_updated_date, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: status, type: string}
    - {name: status_notes, type: string}
    - {name: agency_name, type: string}
    - {name: category, type: string}
    - {name: complaint_type, type: string}
    - {name: descriptor, type: string}
    - {name: incident_address, type: string}
    - {name: supervisor_district, type: long}
    - {name: neighborhood, type: string}
    - {name: location, type: string}
    - {name: source, type: string}
    - {name: media_url, type: string}
    - {name: latitude, type: double}
    - {name: longitude, type: double}
    - {name: police_district, type: string}
 
  •      .yml형식의 설정파일을 보면 크게 in(input) out(output) 두가지 부분으로 나뉜다.
  •       in부분을 보면 mysql플러그인을 실행하기위해 type mysql 인증 처리를 위한 옵션으로 auth_method compute_engine이라는 인증 방식을 사용하였다.( 방식으로 인증하게 되면 VM생성 시에 다른 구글 클라우드 API 대한 액세스 권한을 가지게 하였기 때문에 별도의 인증 절차를 생략할 있다.) 다음으로 MySQL 테이블에 접근하기 위한 정보를 해당 옵션들(host, user, port, ) 사용해서 설정하고, column_options부분에 테이블에서 날짜 형식의 컬럼 데이터를 가져올 소수점 처리에서 에러가 있어 이를 방지하기 위해 string형식으로 가져오게 한다. 마지막으로 query부분에 MySQL에서 테이블에 쿼리를 실행하고 원하는 특정 결과 데이터만 가져올 있게 있도록 쿼리를 입력하였다. 이를 이용하면 사용자가 원하는 데이터만 가져올 있게 되어 별도의 프로그래밍 적인 ETL처리작업이 필요 없어져서 매우 유용하게 사용할 있는 옵션이다.
  •       MySQL 저장된 테이블 데이터를 가져올 실행되는 쿼리는 311콜센터에 접수되어 있는 데이터 중에서 현재 상태가 “Closed” 데이터들에 대해 언제 어떠한 매체(Web, Phone, ETC) 통해 접수 되었는 지에 대한 정보를 현재 날짜와 같은 날짜의 데이터(예제를 위한 데이터로 이전의 데이터이기 때문에 년도는 무시한다.) 가져오게 하였다.
  •       out부분을 보면 앞의 in부분 설정을 통해서 가져온 MySQL데이터를type file 해서 path_prefix 지정한 위치에 file_ext옵션으로 CSV파일로 생성하게 지정하고 formatter옵션을 이용해서 생성될 CSV 파일에 대한 상세 설정(컬럼, 캐릭터셋, ) 하였다.

 

MySQL 가져오기 위한 설정파일을 생성하였다면, 아래의 Embulk명령어를 실행해서 실제로 MySQL인스턴스에서 데이터를 정상적으로 가져오는지 확인한다.

$ embulk run 311_service_requests_input.yml

 

Embulk 빅쿼리에 데이터 적재하기

VM 구성한 Embulk 통해서 CloudSQL(MySQL) 데이터를 정상적으로 가져오는 것을 확인하였다면, 가져온 CSV데이터 파일을 빅쿼리에 적재하는 과정을Embulk 이용해서 진행한다.

Embulk 이용해서 가져온 테이블 데이터 파일(CSV) 빅쿼리에 적재하기 위해 311_service_requests_output.yml이라는 이름으로 Embulk설정파일을 아래와 같이 생성한다.

in:
  type: file
  path_prefix: /가져온 데이터 파일을 저장한 디렉토리/311_service_requests_
  parser:
    type: none
out:
  type: bigquery
  mode: append
  auth_method: compute_engine
  project: lab-test-01
  dataset: embulk
  table: 311_service_requests_%Y%m%d_%H
schema_file: /스키마파일을 저장한 디렉토리/311_service_requests.json
auto_create_table: true
  •       in부분을 보면 빅쿼리에 적재할 데이터에 대해 설정하고 none-parser 플러그인을 사용해서 파싱을 하지 않게 해서 적재 성능을 높였다.
  •      out부분을 보면 type bigquery 설정하고 mode append 해서 기존 테이블이 있다면 추가데이터는 덧붙이게 된다. auth_method 인증방식은 MySQL에서 데이터를 가져올 생성한 방식과 동일하게 compute_engine으로 적용하고 빅쿼리에 저장할 데이터셋, 테이블 형식(Wild카드 형식으로 쿼리 사용이 가능하게 테이블을 생성) 파싱작업 생략을 위해 schema_file json형식의 스키마 파일을 생성해둔다. 테이블의 경우 없으면 자동으로 생성되도록 auto_create_table옵션을 true 주었다.
  •      스키마 파일의 내용은 아래와 같이 구성되어 있다.
[
   {
    "name": "created_date",
    "type": "TIMESTAMP",
    "mode": "NULLABLE"
   },  
   {
    "name": "status",
    "type": "STRING",
    "mode": "NULLABLE"
   },  
   {
    "name": "source",
    "type": "STRING",
    "mode": "NULLABLE"
   }
]

빅쿼리로 데이터를 적재하기위한 설정파일을 생성하였다면, 아래의 Embulk명령어를 실행해서 실제로 데이터를 정상적으로 빅쿼리에 적재되는지 확인한다.

$ embulk run 311_service_requests_output.yml

Airflow 서버 구성하기

지금까지 CloudSQL에서 빅쿼리까지 데이터를 적재하는 과정을 각각의 구간으로 나누어 실행해보았다. 이제 VM 추가로 생성하여 앞에서 각각 구간 별로 나누어 실행하였던 데이터 파이프라인 구간을 하나의 구간의 데이터 워크플로우로 통합하고 자동화하여 실행할 있게 해주는 Airflow 설치하고 환경을 구성해보도록 한다.

VM 생성하기

앞에서 ETL처리를 위한 VM생성할 때와 적용되는 옵션은 다르지 않다. 별도 설명은 생략한다.

방화벽 포트 오픈

Airflow 웹서버를 실행하여 액세스 하기 위해서 다음과 같이 현재 사용중인 프로젝트의 네트워크에 대한 방화벽 8080포트를 허용하는 새로운 규칙을 추가한다.

  • Targets : All instances in the network
  • Source filter : IP ranges
  • Source IP ranges : 0.0.0.0/0
  • Protocols and ports : Specified protocols and ports -> tcp:8080
  • 나머지 : Default

 

Airflow 설치하기

Compute Engine VM생성 방화벽 8080포트에 대한 허용 규칙을 추가하였다면, 새롭게 생성한 VM SSH 터미널에 접속하여 다음의 명령어를 실행해서 Airflow 설치한다.

환경 변수 설정
$ export AIRFLOW_HOME=~/airflow
 
Airflow 설치
$ pip install airflow
 
DB 초기화
$ airflow initdb
 
서버 실행
$ airflow webserver -p 8080

위의 명령어 순서대로 Airflow 실행하고 서버를 실행한 상태에서 생성한 VM External IP 8080(http://VM External IP:8080) 접근하여 정상적으로 동작되는 확인한다.

Airflow 설정 파일 수정하기

Airflow 설치가 완료되었다면, Airflow 정상적인 동작을 위해 설정파일($AIRFLOW_HOME/airflow.cfg) 수정하여야 한다. VI등의 편집기를 이용해서 설정 파일의 내용을 아래와 같이 수정한다.

...
# Are DAGs paused by default at creation // True -> False 변경
dags_are_paused_at_creation = False
...
# Whether to load the examples that ship with Airflow. It's good to
# get started, but you probably want to set this to False in a production
# environment  // True -> False 변경
load_examples = False
...

 

ETL 서버 SSH연결 설정

데이터 워크플로우의 동작을 위해서는 Airflow 구성된 서버에서 ETL서버로의 SSH 원격 명령 실행이 필요하다. ETL서버에 대한 원격SSH연결 설정을 위해 아래와 같이 Airflow 서버를 실행시킨 상태에서 콘솔의 상단의 Admin -> Connections메뉴로 이동한다.

*Airflow에서는 콘솔의 Connetions 메뉴를 통해 DB, file system, HDFS, 등의 데이터의 소스에 대한 커넥션 설정 아니라, SSH연결 설정 또한 쉽게 있는 기능을 제공하여 편리하다.

 

ETL서버에 대한 원격SSH연결 설정을 위해, [Create] 버튼을 클릭하고 다음과 같이 내용을 입력하여 새로운 커넥션 설정을 구성한다.

  • Conn Id : etl-server-ssh(임의로 설정)
  • Conn Type : SSH
  • Host : etl-server (앞에서 생성한 ETL서버 VM 이름)
  • Port: 22
  • 나머지 : 공란

Airflow DAG파일 작성

지금까지 Airflow 실행하기 위한 환경 설정을 하였다. 이제 DAG 구성(파이썬 코드로 작성)하여 앞에서 구성한 전체 구간에 대한 데이터 파이프라인을 각각의 태스크 단위로 작성하고 매시간 30분마다 자동으로 실행되도록 스케줄링을 적용하여 자동화 처리를 하도록 하겠다.

기본적으로 DAG(파이썬 실행 파일) $AIRFLOW_HOME/dags/ 저장하고 Airlfow 명령을 실행하여 동작하게 한다. 만약, 해당 폴더가 없으면 생성하고 DAG 파일을 작성하면 된다.

 

$AIRFLOW_HOME/dags/ 폴더에 data_workflow_test.py라는 이름으로 아래와 같은 내용으로 작성하여 저장한다.

from airflow import DAG
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.contrib.hooks import SSHHook
from datetime import datetime,timedelta
 
sshHook = SSHHook(conn_id='etl-server-ssh')
 
dag = DAG('airflow-test-sample',description='Data Load to GCP With airflow and Embulk',
                        schedule_interval = '30 * * * *',
                        start_date=datetime(2017, 7, 1),catchup=False)
 
embulk_run = SSHExecuteOperator(
        task_id='mysql_down',
        bash_command='/home/yooseok_choi/.embulk/bin/embulk run /home/yooseok_choi/311_service_requests_input.yml',
        ssh_hook=sshHook,
        dag=dag)
 
bq_run = SSHExecuteOperator(
        task_id='bq_load',
        bash_command='/home/yooseok_choi/.embulk/bin/embulk run /ETL서버에서 데이터를 가져올 디렉토리 /311_service_requests_output.yml',
        ssh_hook=sshHook,
        dag=dag)
 
bq_run.set_upstream(embulk_run)
 
rm_run = SSHExecuteOperator(
        task_id='local_file_delete',
        bash_command='/bin/rm / 가져온 데이터 파일을 저장한 디렉토리/311_service_requests_*',
        ssh_hook=sshHook,
        dag=dag)
 
rm_run.set_upstream(bq_run)
  •       DAG 정의 부분을 보면. 먼저, DAG 객체는 DAG 대한 전체 컨택스트를 저장 유지 관리한다.
  •       DAG('airflow-test-sample',description='Data Load to GCP With airflow and Embulk', 에서 DAG 이름을 ‘'airflow-test-sample’ 정의하고 description 설명을 작성한다. 다음으로 DAG 실행되는 주기를 정해야 하는데, schedule_interval = '30 * * * *', 부분에 cron 명령과 같은 노테이션으로 정의한다. 설정은 시간 30분마다 등록한 DAG 실행되도록 한다. 마지막으로, start_date=datetime(2017,07,01), ,DAG 언제부터 시작 것인지 지정한다. DAG 반드시 전역 변수로 지정한다. DAG안에서 다른 DAG 부르는 sub DAG 경우에는 지역 변수로 지정이 가능하다.
  •       다음 task 사용할 operator 정의하게 되는데, 콘솔에서 정의한 ETL서버에 대한 SSH Connection설정 정보를 사용하기 위해 SSHHook SSHHook으로 정의한 원격 서버에  SSH 명령( 커맨드) 실행할 SSHExecuteOperator 대한 내용을 정의한다. 먼저, 'mysql_down'이라는 이름의 task MySQL에서 데이터를 가져오는 부분을 다음으로 'bq_load'라는 이름의 task 가져온 데이터를 빅쿼리에 적재하는 부분을 정의하고 마지막으로 'local_file_delete'이라는 이름으로 빅쿼리에 데이터 적재가 끝나고 ETL서버에 저장된 데이터를 삭제하는 task 정의하였다.

  

DAG파일 작성 다음의 명령어로 작성한 DAG파일이 정상적으로 등록이 되었는지 확인한다. 정상적으로 등록되었다면, “airflow-test-sample” 등록되어 있는 DAG 리스트로 확인할 있다.

$ airflow list_dags

Airflow 재실행

이제 DAG 파일 구성 까지 완료되었다면, Airflow 웹서버를 중지하고 DB 초기화하고 재시작을 해야 정상적으로 구성한 DAG파일이 동작하게 된다. 따라서 웹서버가 동작하고 있는 터미널에서는 Ctrl+C 실행을 중지한 상태에서 터미널에서 다음의 명령을 실행한다.

DB초기화

$ airflow initdb

 

Airflow 서버 재실행

$ airflow webserver -p 8080

 

*Airflow 서버가 중지되지 않는다면 아래의 명령을 이용하면 해당 프로세스를 kill 있다.

$ cat $AIRFLOW_HOME/airflow-webserver.pid | xargs kill -9

 

Airflow DAG실행

이제 모든 구성이 완료하였다. 다른 SSH 터미널을 열고 아래의 명령어를 실행해서 등록한 DAG 정상적으로 동작되는 확인해보자.

$ airflow scheduler

 

* 위의 명령어는 Airflow DAG 등록된 스케줄에 따라서 태스크들을 지속적으로 실행하게 한다.

 

Airflow 웹서버를 이용한 모니터링

스케줄러에 등록한 DAG 정상적으로 동작되는지 Airflow 서버 콘솔로 접속해서 확인해보자.

먼저, 서버 콘솔로 접속하면 아래와 같이 앞에서 작성해서 등록한 DAG 실행되고 있는 것을 확인할 있다. DAG 실행되면서 해당 DAG에서 작성된 task 순서대로 실행되는 것을 확인할 있을 것이다.

Dag 실행이 완료되고 빅쿼리 콘솔에 CloudSQL에서 Embulk 구성된 VM에서 가져온 데이터가 앞에서 구성한대로 아래와 같이 테이블로 저장되어있는 것을 확인할 있다.

이제 Airflow 스케줄러를 정지 시키지 않는 다면 매시간 30분에 한번씩 앞에서 구성한 데이터 워크플로우에 따라 빅쿼리에 테이블이 자동으로 생성될 것이다.

 

Airflow 서버 콘솔의 주요기능

지금까지 Airflow Embulk 이용해서 CloudSQL에서부터 빅쿼리까지 데이터를 적재하기 까지의 데이터 파이프라인을 구성하고 Airflow 이용해서 하나의 자동화된 데이터워크플로우를 구성해보았다. 마지막으로 Airflow 서버에서 제공하는 주요한 기능들에 대해 알아보고 마무리하겠다.

 

Graph View

Graph View DAG 구조를 그래프 형태로 보여주는 뷰이다

 

복잡한 워크플로우의 경우 구조를 파악하는데 유용하다. 위의 그림을 보면 'airflow-test-sample'이라는 이름으로 정의한 DAG 내부 task들이 mysql_down -> bq_load -> local_file_delete 순서대로 호출되어 실행되는 것을 있다.

 

Tree View

트리뷰를 보면, DAG 구조를 트리 형태로 보여주고, DAG 태스크가 각각 성공했는지 실패 했는지를 우측 그래프처럼 표현해준다. 태스크 실행 로그를 보려면 태스크 실행 결과 그래프를 클리하면 아래와 같이 세부 메뉴가 나온다.

 

여기서 View Log 누르면 Task 별로 실행 당시의 로그를 있다. 아래는 mysql_down태스크를 실행한 로그이다.

위의 그림처럼 task 에러가 발생하여도 출력된 실행로그를 확인하여 디버깅이 가능하다..

 

Task Duration

Task duration DAG에서 수행된 태스크의 수행 시간을 아래와 같이 그래프 형태로 나타내준다.

 

Task Tries

Task Tries 에서는 수행 별로 각각의 태스크를 수행한 횟수를 그래프로 보여준다. 재시도 (RETRY)횟수를 모니터링할 있다.

 

Gantt

Gantt 차트는 수행 태스크들 대해서 수행 순서에 따라서 소모된 시간과 함께 간트 차트로 표시해준다. 태스크의 수행 순서와 태스크당 시간을 한꺼번에 보여주기 때문에 병목 구간 파악이 쉬워진다.

 

마무리

지금까지 Apache Airflow를 이용하여 데이터 워크플로우를 구성 하는 방법에 대해서 알아보았다. 이미 링크드인의 Azkaban이나, 스포티파이의 Luigi, 하둡의 Oozie 여러가지 워크 플로우 관리 시스템이 있지만, 아직 인큐베이터 단계인 airflow 주목하는 이유는 분산 환경 지원이 가능하고, 태스크에 대한 스크립트를 파이썬을 사용할 있기 때문에, 각종 빅데이타 분석 시스템이나 라이브러리 그리고 머신러닝 시스템과 연동이 쉽고, 파이썬 언어만 알면 정교한 데이터 워크 플로우 개발이 가능하기 때문에, ( XML등의 설정을 하지 않고도) 활용 가능성이 높다.

 

참고링크

https://airflow.apache.org/

https://cloud.google.com/blog/big-data/2017/07/how-to-aggregate-data-for-bigquery-using-apache-airflow

http://airbnb.io/projects/airflow/

http://www.embulk.org/docs/

https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-mysql

https://github.com/embulk/embulk-output-bigquery

https://github.com/sonots/embulk-parser-none

 

 

구글 클라우드 NAT Gateway 구성하기

 
최유석

이 글에서는 구글 클라우드의 구글 컴퓨트 엔진의 VPC 네트워크 서비스와 기능 및 인스턴스, 인스턴스 템플릿과 그룹을 이용하여 NAT Gateway를 구성하는 방법에 대해서 알아보겠다. 작년에 작성했던 글(http://whitechoi.tistory.com/24)의 내용에서 주의해야 할 부분이 추가되어 새로 글을 작성하게 되었다. 이미 많은 글들을 통해 무료평가판 등록, 프로젝트 생성, 과금설정하는 부분에 대해서 언급하였다. 따라서 이러한 부분은 생략하고 NAT Gateway를 구성하는 방법에 대해서만 간략하게 설명하겠다.

이 글을 통해 구성될 NAT Gateway의 구성도이다.

 


 

VPC 네트워크 생성

다음과 같이 VPC네트워크를 맞춤설정 네트워크로 서브넷을 추가하여 새로 생성한다.

  • VPC 네트워크 이름: nat-nw
  • 서브넷(맞춤설정)          

            - 이름 : sub-asia

- 지역: asia-east1

- IP 주소 범위: 192.168.0.0/24

- 비공개 Google 액세스: 사용 설정(이 구성과 연관성은 없지만 사용하는게 보안적으로 좋음)

  • 나머지 기본값

 

방화벽 규칙 생성

다음과 같이 SSH접속을 위한 tcp 22포트, 네트워크 내부 통신을 위한 서브넷 범위의 tcp, udp, icmp 허용 규칙을 생성한다.

SSH접속을 위한 TCP 22포트 허용

  • 방화벽 규칙 이름: asia-allow-ssh
  • 네트워크: nat-nw
  • 대상: 네트워크의 모든 인스턴스
  • 소스 필터: IP 범위
  • 소스 IP 범위: 0.0.0.0/0
  • 프로토콜 및 포트(지정된 프로토콜 및 포트): tcp:22
  • 나머지 기본값

 

서브넷(sub-asia)범위의 내부통신을 위한 TCP, UDP, ICMP 허용

  • 방화벽 규칙 이름: asia-allow-internal
  • 네트워크: nat-nw
  • 대상: 네트워크의 모든 인스턴스
  • 소스 필터: 하위 네트워크
  • 하위 네트워크: sub-asia 192.168.0.0/24
  • 프로토콜 및 포트(지정된 프로토콜 및 포트): tcp;udp;icmp
  • 나머지 기본값

 

인스턴스 생성

 

다음과 같이 NAT Gateway로 사용될 인스턴스를 생성한다.

  • 인스턴스 이름: nat
  • 영역: asia-east1-a
  • 부팅 디스크: Ubuntu 14.04 LTS
  • ID 및 API 액세스: 모든 Cloud API에 대한 전체 액세스 허용
  • 네트워크 태그: nat
  • 네트워크 인터페이스

- 네트워크: nat-nw

- 하위 네트워크: sub-asia(192.168.0.0/24)

- 외부IP: nat-ip(고정 IP로 생성, 임시로 해도 무방) 

- IP 전달: 사용

  • 나머지 기본값

인스턴스 템플릿 생성

다음과 같이 인스턴스 그룹에 사용될 인스턴스 템플릿을 생성한다. 여기에서는 NAT Gateway구성만 할 예정이지만, 인스턴스 템플릿과 그룹을 이용하여 로드밸런서, 오토스케일링에 사용할 수 있는 백엔드로 사용할 수 있다.

  • 인스턴스 템플릿 이름: instance-template

 

  • 머신 유형(맞춤설정): vCPU 1개, 1GB 메모리

 

  • 부팅 디스크: Ubuntu 14.04 LTS
  • ID 및 API 액세스: 모든 Cloud API에 대한 전체 액세스 허용
  • 네트워크 태그: no-ip
  • 네트워크 인터페이스

- 네트워크: nat-nw

- 하위 네트워크: sub-asia(192.168.0.0/24)

- 외부IP: 없음

  • 나머지 기본값

인스턴스 그룹 생성

다음과 같이 인스턴스 템플릿을 사용할 인스턴스 그룹을 생성한다.

  • 인스턴스 그룹 이름: instance-group
  • 영역: asia-east1-a
  • 인스턴스 템플릿: instance-template
  • 나머지 기본값

네트워크 경로 생성

다음과 같이 NAT Gateway 구성에 사용될 네트워크 경로를 생성한다.

  • 경로 이름: nat-route
  • 네트워크: nat-nw
  • 대상 IP 범위: 0.0.0.0/0
  • 우선순위: 800
  • 인스턴스 태그: no-ip (인스턴스 템플릿에 설정한 네트워크 태그와 동일해야하며, NAT Gateway로 사용될 인스턴스와 다른 네트워크 태그이어야 한다.) 
  • 다음 홉: 인스턴스 지정
  • 다음 홉 인스턴스: nat

iptables을 이용한 NAT 설정

iptables를 이용해서 NAT 설정을 위해 VM 인스턴스 페이지의 nat(NAT Gateway 인스턴스)의 우측에 있는 SSH버튼을 클릭하여 SSH 터미널로 접속한다.

 

nat 인스턴스의 SSH에 접속하였다면, NAT 설정을 위해 다음의 명령어를 실행 한다.

$ sudo sysctl -w net.ipv4.ip_forward=1

$ sudo iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE

 

NAT Gateway 구성 확인하기

NAT Gateway 구성이 정상적으로 되었는지 확인하기 위해 다음의 명령어를 이용해서 인스턴스 그룹을 통해 생성된 인스턴스에 SSH로 접속한다.

$ ssh instance-group-nwb2

참고

$ ssh "인스턴스 그룹으로 자동 생성된 인스턴스 명"

NAT Gateway 구성 확인을 위한 traceroute 설치

인스턴스 그룹으로 자동 생성된 인스턴스의 SSH 터미널에서 NAT Gateway가 정상적으로 구성되고 동작되는지 확인을 위해 다음의 명령어로 traceroute를 설치한다.

$ sudo apt-get install -y traceroute

traceroute명령을 이용해서 NAT Gateway 동작 확인하기

traceroute로 외부로 나가는 네트워크 이동경로 확인을 위해 다음의 명령어를 실행한다.

$ traceroute www.google.com

* traceroute 명령을 통해 www.google.com까지 위에서 구성한 nat(NAT Gateway)인스턴스의 내부 IP를 통해서 접근하는 것을 확인할 수 있다.

* 네트워크 이동 경로를 보면 내부 IP를 통해서 이동한 것으로 확인되는데 실제로는 인스턴스의 내부IP -> 외부IP로 외부 네트워크(CF. 인터넷)와 통신하게 된다. 내부IP는 외부 네트워크와의 통신이 되지 않는다.

* NAT 구성에서 주의할 점은 백엔드로 사용 될 인스턴스(또는 인스턴스 템플릿)의 네트워크 태그와 경로에 적용할 태그가 같아야하며, NAT Gateway로 사용될 인스턴스의 태그와는 달라야 한다. 예전에는 같은 태그를 적용해도 문제 없었으나, 현재 직접 테스트를 통해 확인해보니 정상적으로 구성되지 않는다. 

참고링크

https://cloud.google.com/compute/docs/vpc/special-configurations#natgateway

 

 

 

Google Cloud Dataproc 사용하기

 최유석

글에서는 Google Cloud Dataproc 대해서 알아보겠다.

 

Google Cloud DataProc?

구글 클라우드 플랫폼에서 제공하는 매니지드 HADOOP, Spark(+ Hive, Pig) 클러스터 서비스로 사용자가 손쉽게 HADOOP또는 Spark 클러스터를 구성할 있다.

빠른 클러스터 구성

규모에 상관없이 HADOOP, Spark 클러스터 구성에 걸리는 시간이 90초이내로 빠른 속도로 클러스터를 구성할 있고 이를 활용해서 사용자는 데이터 처리 또는 분석을 위한 작업에서 빠른 연산이 가능하다.

비용 절감

매우 빠른 속도로 클러스터를 구성할 있고, 분당과금 정책을 적용하여 사용자가 원하는 시간만큼 클러스터를 구성해서 연산을 실행하고 클러스터를 내려도 추가적인 가격 부담이 없다.

클러스터에 다양한 VM 타입 지원

클러스터 구성에 Predefined 형태의 VM 머신 뿐만 아니라, 사용자가 CPU, Memory크기를 자유롭게 설정하여 사용할 있는 Custom Machine Type 지원하여 각각의 Job 최적화한 크기의 클러스터 구성을 있고, 사용에 약간의 제약은 있지만, 80%낮은 가격으로 인스턴스를 사용할 있는 Preemptible VM타입을 지원하여 각각의 Job 더욱 비용 효율적인 클러스터의 구성을 있게 지원한다. 추가로 클러스터의 노드에 GPU Attach 있기 때문에 GPU 연산이 필요한 경우에도 쉽게 사용 있다.

*GPU Preemtible VM에는 지원되지 않는다.

구글 클라우드 서비스들과의 통합

단순하게 Dataproc 클러스터만 구성해서 사용할 있는게 아니라 BigQuery, Cloud Storage, Cloud Bigtable, Stackdriver Logging, Monitoring, 등과의 함께 통합된 서비스로 제공되기 때문에 더욱 유연한 사용을 가능하게 한다. 

Dataproc 클러스터 구성 테스트하기

앞서 Google Cloud Dataproc 대한 개념적인 부분에 대해서 간단하게 알아보았다. 이제 Dataproc 클러스터를 구성하고 구성한 클러스터에서 sparkSQL 이용해서 쿼리를 실행해보자. 쿼리를 실행할 처음에는 Worker Node 2 구성해서 쿼리를 실행해보고 이후 100개로 노드 수를 늘려서 쿼리를 실행하고 용량의 데이터에도 쿼리를 실행하여 Node 따라서 성능이 어떻게 달라지는지 확인해본다. 또한, Dataproc에서 사용한 동일 데이터와 쿼리를 이용해서 빅쿼리에서도 실행해보고 결과를 비교해보려고 한다. 

데이터 준비하기

예제에서 사용할 데이터는 빅쿼리에서 공개 데이터셋으로 제공하는 데이터를 이용한다.

Dataproc 클러스터에서 Cloud Storage Connector를 Built-in 형태로 제공하기 때문에 Cloud Storage 있는 데이터에 다이렉트로 액세스가 가능하다. 이를 이용해서 빅쿼리 공개 데이터셋에 있는 특정 테이블을 Cloud Storage Export하고 이렇게 Export한 데이터에 바로 접근하여 사용할 예정이다.

Node 수에 따라서 성능이 어떻게 변화하는 지 확인하기 위해 서로 크기가 다른 개의 테이블을 사용할 예정이다.

첫번째 테이블은 음악 앨범에 대한 플레이리스트 정보들이 기록되어 있으, 테이블의 크기는 1.11GB, 12,304,975개의 Row 가지고 있다

두번째로 사용할 테이블은 위키미디아에서 관리하는 각종 위키관련사이트의 페이지 뷰수에 대한 정보가 기록된 테이블로 2012 7월에 대한 데이터를 가지고 있다. 테이블의 크기는 119GB, 1,858,577,426개의 Row 가지고 있다.

위의 빅쿼리 공개 데이터셋을 GCS에 Export하여 사용하기 위해 먼저, GCS에 Regional 클래스로 US central1리전을 지정하여 버켓을 생성하고

위의 테이블들을 Export할 폴더를 각각 생성한다.

빅쿼리 공개데이터 셋에서 예제에서 사용할 테이블 데이터를 위에서 생성한 GCS 버켓의 폴더에 각각 CSV형식으로 Export한다.

*빅쿼리에서 테이블 데이터 export는 단일파일 기준으로 1GB이하의 파일에 대해서만 export를 할 수 있다. 만약, 1GB이상의 크기의 테이블 데이터에 대해서는 와일드카드 *문자를 이용한 export 사용해야 한다. 와일드 카드 문자를 이용하여 export하면 데이터가 지정한 GCS 위치(버켓 또는 버켓 내부 폴더) 자동으로 분할되어 저장된다.

 Dataproc 클러스터 생성하기

프로젝트 생성, 과금 설정, API활성화 등에 대한 내용은 앞의 다른 글에서 많이 언급하였으니 생략한다. 해당 내용은 https://cloud.google.com/dataproc/docs/quickstarts/quickstart-console

참고하기 바란다.

Google Cloud Console 좌측 상단 서비스탭에서 Dataproc페이지로 접속하고 페이지 중앙의 [Create cluster] 버튼을 클릭한다.

생성할 클러스터의 이름을 임의로 입력하고(여기서는 sparkcluster로 사용했다.)  나머지 부분은 기본값으로 생성한다.

Dataproc Cluster는 기본값으로 생성시 Master Node 1, Worker Node 2대로 생성된다

 

Master Node SSH 접속

앞서, 생성한 클러스터의 디테일 페이지에서 VM Instances탭으로 이동하고 클러스터 Master Node 우측 SSH버튼을 클릭하여 SSH 접속한다.

 

Spark SQL CLI실행

Master Node SSH에서 다음의 명령어를 실행하여 Spark SQL CLI 실행한다.

$ spark-sql

Spark SQL CLI 실행하였다.

 

Playlist테이블 생성하기

Spark SQL CLI에서 다음의 쿼리 명령어를 실행하여 앞에서 GCS로 export한 테이블 데이터 중 playlist 테이블을 생성한다.

CREATE TABLE playlist
USING com.databricks.spark.csv
OPTIONS (path "gs://버켓/playlists/*", header "true", inferSchema "true");

Playlist 테이블이 생성되었다.

 Playlist 테이블에 쿼리 실행하기

앞에서 생성한 playlist테이블을 대상으로 아래와 같은 쿼리를 실행해보자. 

SELECT * FROM playlist limit 10;

생성한 playlist 테이블의 전체 레코드 중에서 10개의 레코드를 가져오는 쿼리이다.

 

쿼리를 실행하였다.

쿼리 실행에 21.918초가 소요되었으며 쿼리 실행의 결과가 프롬프트에 나타나는 것을 확인할 있다.

 

빅쿼리에서 playlist 테이블에 쿼리 실행하기

앞에 생성한 Dataproc 클러스터에서 spark-sql 이용해 실행한 쿼리를 빅쿼리에서 실행해보자.

쿼리 실행에 2초가 소요된 것을 확인할 있다동일한 쿼리지만 별도 Sorting 하지 않았기 때문에 값은 다르게 나오니 참고 바란다.

100개의 Worker Node에서 쿼리 실행하기

이제 앞에서 생성했던 Dataproc 클러스터의 Worker Node 100개로 늘려서 쿼리를 실행하고 결과를 확인해보자. 쿼리는 앞에서 playlist테이블에 실행한 쿼리 외에도 용량이 큰 Wikimedia 테이블 데이터(119GB)를 기준으로 테이블을 생성하고 쿼리를 실행해본다. 또한, 동일한 쿼리를 빅쿼리에서도 실행해서 결과를 확인해보자.

앞에서 생성한 클러스터 디테일 페이지에서 Configuration 탭으로 이동하고 Edit 클릭하여 Worker nodes

부분의 숫자를 2 -> 100으로 변경하고 Save버튼을 클릭하여 변경한 내용을 저장한다. 

 

Node가 전부 생성될 때까지 약간의 시간이 지나면, 다음과 같이 100개의 Worker Node 생성된 것을 확인할 있다.

 Worker Node의 번호가 0에서 99까지 100개의 노드가 생성되었다.

100개의 Worker node 생성이 완료되면 다시 클러스터의 Master Node SSH 접속하여 “spark-sql” 명령으로 Spark SQL CLI 실행한다. Spark SQL CLI 실행하였으면 먼저 앞에서 생성했던 playlist테이블에 실행했던 쿼리를 실행하고 결과를 확인해보자.

100개의 Worker Node에서 Playlist테이블에 쿼리 실행하기

아래는 Dataproc 클러스터의 Worker Node 수를 100개로 늘리고 위에서 playlist 테이블에 실행했던 쿼리를 실행한 결과이다.

쿼리가 실행된 결과를 보면 18.402초가 소요된 것을 확인할 있다. 대상이 되는 playlist 데이터의 크기가 1.11GB 작기 때문에 속도 차이는 나지 않는 것으로 보여진다

 대용량의 데이터에 쿼리 실행하기

이제 데이터의 크기가 큰 Wikimedia 테이블 데이터를 대상으로 쿼리를 실행해서 쿼리 성능을 확인해보자.

테이블 생성

아래의 테이블 생성 쿼리를 실행해서 Wikimedia테이블 데이터를 이용해서 wikitest라는 이름으로 테이블을 생성한다.

CREATE TABLE wikitest (year integer, month integer, day integer, wikimedia_project string, language string, title string, views integer)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION 'gs://버켓명/wikitest/*';

테이블이 생성되었다.

Wikimedia(Wikitest) 테이블에 쿼리 실행하기

아래와 같은 쿼리를 실행하고 결과를 확인해보자. 실행 쿼리의 내용은 앞에서 생성한 wikitest테이블에서 language 값이 “en”이고 titile “sydney”라는 값이 들어있는 모든 레코드를 조회하고 이를 그룹핑하며, 조회된 데이터에 대해서 각 페이지 별 뷰수를 합산하고 합산된 뷰수를 기준으로 정렬하여 10개의 결과를 보여주는 쿼리이다.

SELECT language,title,sum(views) AS total_views
FROM wikitest
WHERE language ='en' AND title like '% sydney %'
GROUP BY language,title
ORDER BY total_views DESC
LIMIT 10;

위의 쿼리를 실행한 결과이다.

쿼리 실행에 30.387초가 소요되었으며 정상적으로 쿼리가 실행된 것을 확인할 있다.

 

추가로 wikimedia 데이터에 대해서 기본 Dataproc 클러스터(Master Node 1, Worker Node 2) 구성으 실행한 결과는 다음과 같으니 참고하기 바란다. 

기본 클러스터 구성으로 쿼리 실행 635.981초가 소요된 확인할 있다. 노드 수가 많아짐에 따라서 쿼리 성능에 큰 차이가 발생하는 것을 확인할 수 있다.

빅쿼리에서 wikimedia 테이블에 동일한 쿼리 실행하기

100개로 worker node를 가진 Dataproc 클러스터에서 실행한 쿼리를 빅쿼리에서 실행해보자.

100개 노드에서 실행에 30초 가량 소요된 쿼리가 빅쿼리에서는 쿼리 실행에 4.2초가 소요된 것을 확인할 있다

 

Dataproc 빅쿼리 쿼리 시간 비교

앞에서 Dataproc클러스터와 빅쿼리에서 실행했던 쿼리의 실행시간을 표의 형태로 정리하였다.
데이터 / 실행모듈
Dataproc(M1, W2)
Dataproc(M1, W100)
BigQuery
playlist(1.11GB)
21.918
18.402
2
Wikimedia(119GB)
635.981
30.387
4.2

*M = Master Node, W = Worker Node

동일한 데이터에 대한 동일한 쿼리 실행 속도에서 빅쿼리가 월등하게 빠른 것을 확인할 수 있다.

 

 

마무리

처음 글을 작성하려고 했던 시점에는 Google Cloud Dataproc 대한 내용만 기재하려고 하였으나, 예제에 적용할 만한 데이터를 찾다가 보니 빅쿼리의 공개 데이터셋을 이용하게 되었고 동일한 쿼리를 Dataproc 빅쿼리에서 실행하면 어떤 결과가 나올지 궁금하여 실행해보았다. 각각 쿼리를 실행하고 결과를 보니 속도에서 매우 차이를 보이는 것을 확인 있었다. 그로 인해 빅쿼리에서 쿼리를 실행한 부분에 대한 내용을 추가하게 되었다. 단순하게 위의 예제에서의 쿼리를 실행한 결과만 놓고 보면 엄청난 차이라고 볼 수 있지만, 각각의 서비스들이 가지는 특징과 기능, 사용목적, 과금 정책, 등이 모두 다르다. 따라서 각자의 사용환경에 따라서 필요한 서비스를 적용하는 것을 고려해야 것이다.

 

추가로 클라우드에서 최적화된 하둡 배포 아키텍쳐에 대한 부분에 대해서 더 알아보고 싶다면 http://bcho.tistory.com/1169 글을 참고하기 바란다.

 

참고링크

https://cloud.google.com/dataproc/docs/

https://cloud.google.com/dataproc/docs/concepts/custom-machine-types

https://cloud.google.com/dataproc/docs/concepts/preemptible-vms

https://cloud.google.com/dataproc/docs/connectors/cloud-storage

 

빅쿼리 Streaming Insert - go lang 샘플

최유석

기본준비사항

프로젝트 과금 설정(https://support.google.com/cloud/answer/6293499#enable-billing), 

빅쿼리 API활성화(https://console.cloud.google.com/flows/enableapi?apiid=bigquery&_ga=1.216323729.601314171.1492654432),

Cloud SDK 설치(https://cloud.google.com/sdk/downloads),

Client Libraries 설치 및 OAuth2.0 인증(로컬환경 실행 시) (https://cloud.google.com/bigquery/docs/reference/libraries#client-libraries-install-go),

Service account(또는 default service account사용)Key 생성이 되어있으며 Key 파일이 OS환경변수로 GOOGLE_APPLICATION_CREDENTIALS 지정 되어있다고 가정한다.( https://cloud.google.com/docs/authentication)

 

또한 go lang(https://golang.org/)설치 및 기본 설정은 되어있다고 가정한다.

여기서는 go 1.8버전을 기준으로 테스트하였다.

 

테이블 생성

빅쿼리에 "gotest"라는 데이터 셋을 생성하고 해당 데이터셋에 "gotable"이라는 이름으로 테이블을 생성한다. 생성할 테이블의 스키마정보는 다음과 같다.

Name:STRING,Count:INTEGER

생성된 테이블 정보

 

빅쿼리 Streaming Insert 샘플 코드 - Go lang

package main
import (
           "fmt"
           "log"
 
           "cloud.google.com/go/bigquery"
           "golang.org/x/net/context"
)
 
type Item struct {
           Name  string
           Count int
}
 
// Save implements the ValueSaver interface.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
           return map[string]bigquery.Value{
                      "Name":  i.Name,
                      "Count": i.Count,
           }, "", nil
}
 
func main() {
           ctx := context.Background()
 
           // Sets your Google Cloud Platform project ID.
           projectID := "cystest-1" //프로젝트ID
           datasetID := "gotest" //데이터셋ID
           tableID := "gotable" //테이블ID
 
           // Creates a client.
           client, err := bigquery.NewClient(ctx, projectID)
           if err != nil {
                      log.Fatalf("Failed to create client: %v", err)
           }
           insertRows(client, datasetID, tableID)    
}
 
func insertRows(client *bigquery.Client, datasetID, tableID string) error {
           ctx := context.Background()
                     
           // [START bigquery_insert_stream]
           u := client.Dataset(datasetID).Table(tableID).Uploader()
           items := []*Item{
                      // Item implements the ValueSaver interface.
                      // 아래에 테이블에 삽입할 레코드를 입력한다.
                      {Name: "n1", Count: 7},
                      {Name: "n2", Count: 2},
                      {Name: "n3", Count: 1},
           }
           if err := u.Put(ctx, items); err != nil {
                      return err
           }
           // [END bigquery_insert_stream]
           fmt.Printf("insert complete\n")
           return nil
}

 

코드 실행하기( .go파일이 위치한 폴더에서)

go run gotest.go

*빌드는 생략한다.

 

코드실행 결과 확인하기

빅쿼리에 Streaming Insert로 데이터를 적재하게되면 Streaming buffer에 먼저 기록되고 실제 테이블에 값이 저장되기까지 최대 90분까지 소요될 수 있다. 따라서 테이블의 preview로는 바로 확인 안 될 수 있으니 간단한 조회 쿼리를 실행해서 확인한다.

좌측 상단의 [COMPOSE QUERY] 버튼으로 쿼리 입력 창을 활성화하고 다음 쿼리를 입력하고 실행한다.

SELECT * FROM gotest.gotable

 

쿼리 실행결과

쿼리를 실행하면 정상적으로 테이블에 데이터가 추가된 것을 확인할 수 있다.

 

참고자료

https://cloud.google.com/bigquery/streaming-data-into-bigquery

https://godoc.org/cloud.google.com/go/bigquery

https://github.com/GoogleCloudPlatform/google-cloud-go/blob/master/bigquery/uploader.go

https://github.com/GoogleCloudPlatform/golang-samples/blob/master/bigquery/snippets/snippet.go

 

 

 

 

 

Data Loss Prevention API 사용하기

최유석

DLP

Data Loss Prevention 또는 Data Leakage Prevention 약자로 데이터(또는 정보) 검사 또는 검열하여 지정된 보안 정책에 의해 데이터가 유출되는 것을 방지하는 기술이다이는 기업이나 공공기관등에서 솔루션의 형태로 효과적인 내부 정보 또는 개인 정보 유출에 대한 통제 기술로 활용되어 왔다.

구글 클라우드에서도 DLP기술(또는 기능)  “Data Loss Prevention API”(이하 DLP API)라는 이름으로 제공하고 있으며 텍스트 또는 이미지 에서 Sensitive(민감) 정보(전화번호, 주민등록번호, 여권, 신용카드, 이메일, ) 있는 부분을 자동으로 검사하고 수정하는 기능을 API형태로 제공한다. 기존의 DLP솔루션에 비해서 기능적인 부분은 제한될 있지만, Pay As You Go모델로 사용한 만큼에 대해서만 지불하는 가격정책으로 합리적인 비용으로 사용할 있고 REST API 제공되어 GCP 뿐만 아니라 , 모바일, IoT기기에서도 쉽게 적용할 있다.

 

 

*한국 기준으로 지원되는 데이터는 여권과 주민등록번호 2가지이다. 그러나 신용카드 정보, IP주소, MAC 주소, 전화번호 등은 국제적인 형식으로 글로벌 형태로 지원되기 때문에 적용 가능한 데이터는 더욱 많다고 있을 것이다.

 

Sensitive Data 분류 수정

DLP API는 40 이상의 사전 정의된 감지기(Detectors) 사용하여 패턴, 형식, 체크섬을 식별하여 자동으로 개인정보가 포함된 Sensitive(민감한) 데이터를 텍스트와 이미지에서 자동으로 분류하고 수정하는 기능을 제공한다. 수정된 텍스트 또는 이미지를 생성하는 기능을 제공하고 사용자가 원하는 수치의 가능도(Likelihood Scores) 임계값을 지정하여 분류된 개인정보 데이터로 발생할 있는 문제를 감소할 있다.

 

데이터 이동 최소화

데이터가 저장된 위치를 지정하여 DLP API 실행할 있기 때문에데이터에 대한 수집, 노출, 복사의 작업이 별도로 필요하지 않기 때문에 데이터의 이동을 최소화할 있고 데이터 기록, 로그 생성, 데이터 분석 작업, 등을 하기 전에 텍스트의 스트림에서 자동으로 민감한 개인정보 데이터를 분류하고 수정할 있다.

또한, DLP API 이용하면 데이터의 이동을 최소화 있기 때문에, 기업에서도 추가적인 위험없이 클라우드 데이터에 대한 관리, 분석을 있다.

 

GCPIntegration

Google Cloud Storage 텍스트/이미지,  Datastore 데이터 셋에 대해서 스캔을 지원하여 민감한 개인정보 데이터의 구성을 손쉽게 파악하여 관리를 용이하게 하고 접근 정책을 설정하는데 도움을 있다. 또한 GCP 내부에서 외부로 데이터를 전송하지 않아도 되기 때문에 속도, 보안, 비용 적인 측면에서 이점을 얻을 있다. 

 

DLP API 사용하기

지금까지 DLP 대한 대략적인 개념과 구글 클라우드에서 제공하는 DLP API 사용하면 얻을 있는 이점에 대해서 알아보았다. 이제 텍스트 (스트림 또는 파일) 이미지 파일에 DLP API 사용하는 방법에 대해서 간단한 테스트를 통해서 알아보도록 하자.

 

사전준비

프로젝트가 준비되고 해당 프로젝트의 과금 설정이 되어 있다고 가정한다. 또한, DLP API Node.js 샘플코드를 사용할 예정이기 때문에 Node.js NPM 설치되어 있는 상태라고 가정한다. 글에서는 구글 클라우드의 리소스 관리 도구 Node.js NPM 포함해서 각종 런타임과 툴이 사전 제공되는 브라우저 CLI도구인 Google Cloud Shell(https://cloud.google.com/shell/) 사용하여 테스트를 진행하겠다.

 

 

DLP API활성화

다음의 링크로 이동해서 DLP API 사용할 프로젝트를 선택하고 활성화를 진행한다.

https://console.cloud.google.com/flows/enableapi?apiid=dlp.googleapis.com 

 

Service account Key 생성 – Cloud Shell 제외한 다른 환경에서 진행

DLP API 사용하기 위해서 Service account 해당 Service account 대한JSON형식의 암호화 Key 파일을 생성하기 위해 다음 링크로 이동하여 DLP API 활성화한 프로젝트를 선택한다.

https://console.cloud.google.com/iam-admin/serviceaccounts/serviceaccounts-zero 

 

상단의 [CREATE SERVICE ACCOUNT]버튼을 클릭하여 생성할 Service account name 입력하고 JSON 형식의 Key파일생성을 위해서 Furnish a new private key] 부분에 체크하고 확장된 메뉴에서 Key type JSON으로 체크한다. 또한, 테스트 진행의 편의를 위해서 생성 Service account Role(권한) Project -> Owner(프로젝트 최상위 권한) 지정하여 Service account 생성한다. 생성한 key 파일을 GOOGLE_APPLICATION_CREDENTIALS라는 OS 환경 변수로 지정한다.

 

Node.js 샘플 다운로드 및 NPM설치

Cloud Shell 에서 다음의 명령어를 실행하여 GCP node.js샘플을 다운로드 한다.

git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

 

다음의 명령어를 실행해서 node.js DLP API 샘플이 위치한 폴더로 이동한다.

cd nodejs-docs-samples/dlp/

 

NPM 모듈 설치

다음의 명령어를 실행해서 node.js DLP API 샘플을 실행하기 위한 패키지를 설치한다.

npm install

 

텍스트(스트림 또는 파일)에서 개인정보 검사하기

DLP API 테스트하기 위한 준비가 완료되었다. 이제 DLP API 샘플이 구성된 node.js 모듈을 실행해서 텍스트와 텍스트 파일에서 개인정보를 검사해보자.

 

텍스트 스트링에서 검사하기

다음의 명령어를 실행해서 스트링으로 이루어진 텍스트 데이터에서 개인정보를 검사해보자

node inspect string "검사할 텍스트 스트링"

 

이글의 작성자의 전화번호와 주민등록번호를 텍스트에 입력하고 실행한 결과 

DLP API 통해서 실제 검출된 개인정보 데이터는 정확하게 나온다. 실제 정보를 입력하였기에 자체 모자이크 처리하였다.

 

텍스트 파일에서 검사하기

간단하게 테스트를 하기 위해서 .txt 형식으로 개인정보를 입력하여 텍스트파일을 생성한다.

DLP API 실행할 샘플 파일 내용

 

다음의 명령어를 실행해서 텍스트 파일에서 개인정보 데이터를 검사해보자.

node inspect file “텍스트 파일

이글의 작성자의 개인정보(전화번호, 이메일, 주민등록번호, 신용카드번호) 텍스트에 입력하고 실행한 결과 

 

마무리

보통 솔루션의 형태로 제공되는 DLP 기능을 간단하게 API를 이용해서 사용할 수 있다는 부분만으로도 정보에 대한 관심과 중요도가 높아지고 있는 요즘,  솔루션의 형태로 제한된 범위로만 사용할 수 있는게 아니라 민감한 개인정보를 다루는 모든 경우에서 광범위하게 활용성을 높여줄 것으로 예상된다.

*

테스트 자체나 샘플이 한정되고 샘플이 문제일 수 도 있지만, 

아직은 베타단계라서 그런지 

나머지 부분에 대해서도 테스트를 진행했지만만족할만한 결과가 나오지 않아서 따로 기재하지는 않는다. 

 

참고자료

https://cloud.google.com/dlp/

https://cloud.google.com/sensitive-data-classification/demo/#/

https://en.wikipedia.org/wiki/Personally_identifiable_information

https://en.wikipedia.org/wiki/Protected_health_information

 

 


Private Google Access 적용하기

 최유석

Private Google Access?

Priavte Google Access가 적용된 서브 네트워크의 Compute Engine Instances(VM)에서 Google APIs Services에 외부(External) IP가 아닌 내부(Internal) IP로 통신해서 접근하게 된다. 이 경우 적용하지 않았을 때와 전송속도에서는 차이가 없지만, Private Google access가 지원되는 (BigQuery, GCS, Pub/Sub, )에 내부 IP를 통해 접근하게 되니, 보안상의 이점을 얻을 수 있을 것으로 예상된다. 또한, App Engine Flexible Environment  Instances(VM)에도 적용된다.

 

Private Google Access 적용 방법

Google Cloud Console에서의 Private Google Access적용 방법 

1. Networks 상세 페이지 접속


2. Subnetworks상세페이지 접속

*각각의 Subnetworks단위로 적용할 수 있다.

 

3. EDIT 

4. Private Google access (Disabled -> Enabled) 변경 후 Save

 

5. Private Google access 적용여부 확인


Google Cloud SDK에서 Private Google Access 적용 방법

gcloud beta component의 설치가 필요하다. 만약 설치되어 있지 않다면 아래 명령어로 설치한다.


gcloud components install beta

 

Private Google access 적용 명령어


gcloud beta compute networks subnets update [
서브넷명] --enable-private-ip-google-access --region [리전명]

ex) gcloud beta compute networks subnets update default --enable-private-ip-google-access --region us-central1

*해제 시에는 --enable-private-ip-google-access 옵션을 --no-enable-private-ip-google-access로 변경해서 실행하면 된다.


참고) GCP 리전 리스트

- us-central1

- europe-west1

- us-west1

- asia-east1

- us-east1

- asia-northeast1

 

Google Cloud SDK에서 Private Google Access 적용여부 확인

gcloud beta compute networks subnets describe [서브넷명] --region [리전명]

ex) gcloud beta compute networks subnets describe default --region us-central1

*privateIpGoogleAccess항목에 true가 적용, false가 해제 상태 이다. 

 

 

자세한 내용은 아래 주소를 참고

Private Google Access Overview

https://cloud.google.com/compute/docs/private-google-access

Private Google Access 구성방법

https://cloud.google.com/compute/docs/configure-private-google-access

 

웹로그 데이터를 빅쿼리를 이용하여 리포팅 하기

 

최유석

Fluentd 이용해서 스트림 데이터를 BigQuery 로딩해보자.

Fluentd 대한 자세한 내용은 http://bcho.tistory.com/1115 참고하기 바란다.

 에서는 구글 컴퓨트 엔진의 VM NGINX  서버를 구성하고, HTTP NGINX 접근(access)하는 로그를 Fluentd 수집하고 BigQuery 저장한다그리고 BigQuery 저장한 데이터를 구글 스프레드 시트의 차트와 구글 클라우드 데이터랩을 이용해서 시각화하는 방법에 대해서 설명하겠다.

 

Fluentd 설치

예제는 Google Cloud에서 Ubuntu Linux 14.x VM에서 Fluentd 이용하여 NGINX  서버로 구성된 VM에서 발생하는 트래픽 데이터(access log) 수집하고 BigQuery 데이터를 로딩한다.

 

VM 생성하기

먼저Fluentd 설치할 VM 생성해보자.

VM 생성하기 위해 컴퓨트 엔진 인스턴스 리스트(VM instances페이지에서 “+CREATE INSTANCE” 버튼을 클릭한다.

아래 그림과 같이 VM 생성할 , “Identity and API access” 부분에  “Allow full access to all Cloud APIs” 선택한다이를 선택해서  VM 모든 구글 클라우드 API 대한 접근 권한 (BigQuery 포함) 가지도록 한다.

그리고 “Boot disk”부분에서 Change버튼을 클릭하여 OS “Ubuntu 14.04 LTS” 변경한다.

또한이후 NGINX  서버를 구성하고  브라우저를 통해 HTTP 접근되는 액세스 로그를 Fluentd 이용해서 수집하고 해당 데이터를 시각화할 예정이다따라서 “Firewall”항목에서 “Allow HTTP traffic” 항목을 체크하여 HTTP 트래픽에 대한 방화벽을 허용한 상태로 VM 생성한다.

 

Fluentd td-agent 설치하기

앞에서 생성한 VM Fluentd 로그 수집 에이전트인 td-agent 설치한다.

td-agent OS또는 같은 OS라도 OS 버전 별로 설치 방법이 다르기 때문에,만약 다른 OS 설치할 것이라면, 각각의 OS 버전  설치 방법은 http://www.fluentd.org 참고하기 바란다.

여기서는 Ubuntu 14.x 기준으로 진행 한다.

다음 명령어를 실행하면 td-agent 설치된다.

% curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh

설치가 끝난  에이전트를 실행해서 확인해보자다음 명령으로 agent 실행한 후에,

% sudo /etc/init.d/td-agent restart

실행이 끝난 후에 다음 명령으로 설치를 확인한다.

% sudo /etc/init.d/td-agent status

참고 (td-agent 관련 명령어)
td-agent 기동 - $sudo /etc/init.d/td-agent start
td-agent 
정지 - $sudo /etc/init.d/td-agent stop
td-agent 
재시작 - $sudo /etc/init.d/td-agent restart
td-agent 
상태확인 - $sudo /etc/init.d/td-agent status

 

빅쿼리에 데이터를 저장하기 위한 구성하기

td-agent설치가 끝났으면 VM에서 td-agent 빅쿼리에 데이터를 저장하기 위한 플러그인을 설치하고 해당 데이터들이 저장  빅쿼리의 데이터셋과 테이블을 생성한다그리고 nginx설치  발생한 브라우저 트래픽(HTTP) 대한 액세스 로그를 수집하고 빅쿼리에 저장하기 위한 td-agent설정을 해보자.

빅쿼리 플러그인 설치

Fluentd에서 빅쿼리로 데이터를 저장하기 위한 빅쿼리 플러그인을 설치한다.

% sudo td-agent-gem install fluent-plugin-bigquery

빅쿼리 테이블 생성하기

td-agent 통해서 수집된 데이터를 저장할 빅쿼리의 데이터셋과 테이블을 생성해보자.

*앞서 생성한 컴퓨트 엔진 VM 동일한 프로젝트에서 진행한다.

먼저빅쿼리  콘솔(https://bigquery.cloud.google.com/) 접속한다.

데이셋은 아래 그림과 같이 빅쿼리  콘솔 화면에서 해당 프로젝트 우측의 드롭 다운메뉴에서 Create new dataset메뉴를 이용하여, “bq_test라는 이름으로 생성한다.

 

테이블을 생성하기 위해 앞에서 생성한 데이터셋(“bq_test”) 우측의 +모양의 아이콘을 클릭한다먼저 테이블 명은 “access_log” 으로 생성한다.

그리고데이터가 없이 스키마만 정의된  테이블로 생성하기 위해 Source Data부분에서 “Create empty table” 체크하고

생성할 테이블의 스키마는 https://raw.githubusercontent.com/GoogleCloudPlatform/bigquery-fluentd-docker-sample/master/schema.json 있다.  스키마 파일의 내용을 복사해서 빅쿼리  콘솔에서 아래 그림과 같이 Schema 부분에서 Edit as Text 클릭하여 입력 형식을 변경한 상태에서 붙여넣고 테이블 생성한다.

 

td-agent설정하기

td-agent에서 설정파일을 수정하여 nginx 수집될 로그 데이터가 빅쿼리에 저장될  있도록 설정해보자설정 파일은 /etc/td-agent/td-agent.conf  있으며 파일에 vi등의 편집기를 이용해서 기존의 내용을 삭제하고 다음의 내용으로 변경한다.

# collecting nginx access log
<source>
  type tail
  format apache
  path /var/log/nginx/access.log
  tag nginx.access
  pos_file /var/log/td-agent/nginx.pos
</source> 
# forwarding to bigquery plugin
<match nginx.access>
  type bigquery
  auth_method compute_engine 
  project "<Project ID>"
  dataset bq_test
  table access_log 
  time_format %s
  time_field time
  fetch_schema true
#deprecated  field_integer time 
</match>

 설정 파일(td-agent.conf ) 주요 부분을 살펴보면

    크게 <source>…</source>부분은 앞으로 구성할 nginx  서버로 발생하는 액세스 로그를 수집하는 부분에 대한 설정이고, <match nginx.access>…</match>부분은 앞에서 수집된 데이터를 빅쿼리에 저장하는 부분에 대한 설정이다.

 

    auth_method, project, dataset, table : 데이터를 저장될 bigquery project, dataset, table 명을 정의한다그리고 auth_method 통해서 인증 방법을 설정하는데일반적으로는 service account 대한 json 파일을 사용하는데여기서는 구글 클라우드 내에 VM 생성하였고앞에서 VM 생성시에 Bigquery 대한 접근 권한을 이미 주었기 때문에인증 방식을 compute_engine으로 설정하면 된다.

 

    fetch_schema : true 설정하면 저장하고자 하는 bigquery 테이블의 스키마 정보를 자동으 가져온다fetch_schema대신에 schema_path라는 옵션을 사용해서 JSON으로 정의된 스키마 파일 경로를 직접 지정할   있다.

 

**td-agent 설정에서 field_*(Integer, string, float, 등) 속성이 fluentd 버전이 올라가면서 deprecated되어 작동되지 않는다. td-agent 설정 마지막 줄 옵션  field_integer time을 삭제해야 동작된다. 

 

 

 

 

NGINX  서버 구성하기

지금까지 데이터를 수집하고 저장하기 위한 Fluentd 빅쿼리의 구성을 하였다이제 NGINX  서버를 구성하고 트래픽을 발생시켜서 액세스 로그를 생성하고 빅쿼리에서 저장되는 데이터를 확인해보자.

 

NGINX설치하기

앞에서 Fluentd 설치한 VM에서 우분투(데비안 계열) 패키지 관리 도구인 apt 다음 명령어를 이용해서 패키지 인덱스 정보를 업데이트하고 NGINX 설치한다.

%sudo apt-get update && sudo apt-get install -y nginx

nginx 설치가 끝났다면해당VM external IP 클릭하고,

 브라우저에서 다음과 같이 nginx 정상적으로 구동 되는지 확인한다.

 

NGINX 설정하기

Nginx 정상적으로 동작하는 것을 확인하였다면 앞서 td-agent설정에서 수집될 데이터의 포맷을 “apache” 설정하였기 때문에 nginx에서 수집될 로그의 포맷을 combined형식으로 변경해야 한다. nginx 설정 파일은 /etc/nginx/nginx.conf 있으며, vi등의 편집기를 이용해서 해당 파일 중간의 Logging Setting부분에서 다음과 같이 변경한다.

… //  부분 생략
##
# Logging Settings
##
 
access_log /var/log/nginx/access.log combined;

Nginx 로그 파일 권한 변경

nginx 설치  기본적으로 /var/log/nginx/access.log 파일에 대한 읽기 권한이 root사용자에게만 있다이후 td-agent에서 nginx로그 파일을 읽어서 빅쿼리에 저장할  있게  파일(/var/log/nginx/access.log) 대한 권한 변경이 필요하다.

root 사용자를 다음의 명령어로 변경하고

%sudo su

다음의 명령어를 실행해서 nginx 액세스 로그 파일의 권한을 변경한다.

%chmod 644 /var/log/nginx/access.log

권한 변경이 끝나면 다음의 명령어로 다시 기존 사용자로 돌아온다.

%exit

 

빅쿼리 데이터 저장 테스트

지금까지 Fluentd  서버(nginx) 로그데이터를 수집하고 빅쿼리로 저장하기 위한 모든 설정이 끝났다. 이제  브라우저로 접근해서 트래픽을 발생시키고  데이터가 빅쿼리에 실제로 저장되는  확인해보자.

먼저앞에서 설정한 내용들을 적용하기위해다음 명령어를 이용해서 td-agent nginx  시작한다.

%sudo /etc/init.d/td-agent restart && sudo /etc/init.d/nginx restart

다음으로 브라우저에서 해당 VM external IP 접속해서 nginx 액세스 로그를 생성되게 한다컴퓨트 엔진 인스턴스 리스트(VM instances페이지에서 앞에서 구성한 VM 우측의 External IP 클릭하여 nginx  서버로 접근한다.

* HTTP 대한 데이터만 수집되게 설정하였기 때문에 브라우저에서 http://External IP/ 형태로 접근한 데이터에 대해서만 수집된다.

td-agent 통해서 nginx에서 수집된 로그데이터가 빅쿼리에 저장된다이제 빅쿼리  콘솔에서 로그 데이터가 빅쿼리에 정상적으로 저장되었는지 확인하기 위해 다음의 쿼리를 실행한다.

SELECT * FROM [bq_test.access_log] LIMIT 1000

앞에서  브라우저로 접근하여 생성된 nginx액세스 로그 데이터를 확인할  있어야한다.

*참고 : Fluentd 통해 초기 로그값을 저장하는 데까지   분이 소요될  있다.

 

구글 스프레드 시트로 빅쿼리 쿼리 실행하기

지금까지 구글 클라우드에서 컴퓨트 엔진의 VM 빅쿼리  오픈소스로 제공되는Fluentd td-agent, nginx( 서버) 구성해서  서버에서 발생하는 로그 데이터를 수집하고 빅쿼리에 저장하고 쿼리를 실행해서 저장된 데이터를 확인하기 까지 테스트를 하였다.

지금까지 구성한 것을 토대로 빅쿼리와 구글 스프레드 시트를 연동해서 데이터를 시각화(Visualization) 하는 방법에 대해서 설명하겠다.

 

빅쿼리와 구글 스프레드 시트

빅쿼리는 구글 스프레드 시트와 쉬운 연동이 가능하며구글 스프레드 시트는 빅쿼리의 쿼리 실행을 위한 앱스 스크립트(Apps Script) 가지고 있다또한 실행한 빅쿼리의 쿼리 결과를 시트에 저장하고 구글 스프레드 시트에서 제공하는 차트를 이용해서 데이터에 대한 시각화를 손쉽게   있으며사용자가 지정한 시간에 주기적으로 쿼리를 실행해서 실행된 쿼리 정보와 차트를 자동으로 업데이트   있는 기능을 제공한다이를 이용하면전문적인 데이터 분석가 또는 과학자가 아니더라도 데이터 분석을 쉽게 가능하게 한다. 

구글 스프레드 시트와 빅쿼리를 연동해서 사용자가 사용하기 쉽도록 빅쿼리와 구글 스프레드 시트 통합 예제(Example of BigQuery and Google Spreadsheet Integration) 제공하며 글에서는  예제를 활용한다.

 

구글 스프레드 시트에서 빅쿼리 API 활성화하기

앞에서 설명한 것처럼구글 스프레드 시트에서 빅쿼리로 쿼리를 실행하기 위해서 빅쿼리 API 활성화해야 한다빅쿼리 API 활성화하기 위해 다음과 같이 진행한다.

1. Example of BigQuery and Google Spreadsheet Integration 열고

상단의 메뉴에서 [파일 → 사본 만들기 실행해서 해당 파일을 자기 계정의 구글 드라이브로 복사한다.

*이후 진행되는 모든 부분은 구글 드라이브에 복사한 스프레드 시트 파일을 사용한다.

2. 스프레드 시트의 상단 메뉴에서 [도구 → 스크립트 편집기] 실행한다.

*스크립트 편집기를 실행하면 bq_query.gs라는 파일이 열려 있다 파일에 작성된 내용은 앱스 스크립트로 작성되어 있으며 스크립트가 동작해서 구글 스프레드 시트에서 빅쿼리로 쿼리를 실행하고쿼리 결과를 시트에 저장하고쿼리 결과에 대한 차트를 만들어 준다.

3. bq_query.gs 중간의 <<PUT YOUR SPREADSHEET URL HERE>>부분을 복사한 스프레드 시트의 URL(스크립트 편집기 URL 아님) 변경하고,

<<PUT YOUR PROJECT ID HERE>>부분을 앞에서 구성한 컴퓨트 엔진의VM  빅쿼리를 포함하고 있는 프로젝트 ID 변경하고 저장한다.

↓↓↓↓↓ (각각의 사용자의 환경에 맞게 변경한다.)

4. 스크립트 편집기 상단의 메뉴에서 [리소스  고급 Google 서비스] 실행하고 빅쿼리 API 활성화(녹색되어있는지 확인하고만약 활성화 되어 있지 않다면 활성화 한다.

그리고 [고급 Google 서비스] 설정 메뉴 하단의 “Google 개발자 콘솔” 링크를 클릭하여 구글 개발자 콘솔(Google Developer Console) API Manager페이지로 이동해서 Google Cloud APIs부분의 BigQuery API 클릭하고

빅쿼리 API 페이지로 이동해서 “ENABLE” 버튼을 클릭하여 API 활성화 한다. “DISABLE” 바뀌면 활성화  상태이다.

이제 빅쿼리 API 활성화 하였으면창을 닫고 다시 스크립트 편집기로 이동해서 [고급 Google 서비스설정메뉴에서 확인을 클릭한다.

*만약, “Google 개발자 콘솔” 링크로 이동할 해당 스프레드 시트와 연결된 새로운 프로젝트 ID 생성할 지를 묻는 창이 나오면 확인을 선택한다.

 

빅쿼리 공개 데이터 셋에 쿼리 실행하기

이제 구글 스프레드 시트에서 빅쿼리의 쿼리 실행쿼리 결과 저장쿼리 결과에 대한 차트 생성에 대한 구성을 마쳤다복사한 스프레드 시트 파일로 이동해서 구글 빅쿼리에서 제공하는 공개 데이터셋에 쿼리를 실행해보자.

 

쿼리 실행 정보

스프레드 시트에서 BQ Queries 시트로 이동하면 이미 작성되어 있는 “gsod_temperature_LINE” 라는 이름의 쿼리를 확인   있다 쿼리는 구글 빅쿼리에서 제공하는 공개 데이터 셋인 GSOD climate database에서 년도  온도 데이터를 평균최저최대 값으로 집계한다.

 

쿼리 실행하기

스프레드 시트 상단의 메뉴[Dashboard  Run All BQ Queries] 실행하면쿼리가 실행된다.

*최초 쿼리 실행  권한 부여에 대한 부분이 나오면 각각 [계속] , [허용버튼을 클릭하여 진행한다. 

쿼리 실행이 완료되면, gsod_temperature라는 이름의 새로운 시트가 생성되며다음과 같이 쿼리 실행의 결과가 저장되어 있는 것을 확인할  있다..

그리고 BigQuery Results Visualization시트로 이동하면다음과 같이 gsod_temperature라는 이름으로 앞에서 실행한 쿼리 결과에 대해 차트가 새로 생성되어 있는 것을 확인할  있다.

 

Fluentd 수집한 데이터에 쿼리하기

앞서 공개 데이터 셋에 대한 쿼리 테스트를 하였다이제 앞에서 구성한 컴퓨트 엔진의 VMFluentd  NGINX 수집하여 빅쿼리에 저장한 로그데이터에 대해서 쿼리를 실행해보도록 하자. 

다시 BQ Queries 시트로 이동해서 앞에서 실행한 공개 데이터 셋의 쿼리 다음 줄에 아래의 내용을 입력한다.

query name “access_log_LINE” interval (min) 1 입력하고 아래의 쿼리를 query항목에 입력한다.

SELECT
  STRFTIME_UTC_USEC(time * 1000000, "%Y-%m-%d %H:%M:%S") as tstamp,
  count(*) as rps
FROM bq_test.access_log
GROUP BY tstamp ORDER BY tstamp DESC;

*쿼리는 반드시 시트의  하나에 포함되도록 넣어야 한다다음과 같이 해당 셀을 더블 클릭해서 복사하거나 해당 셀을 지정한 상태에서 위의 fx부분에 쿼리를 복사한다.

 

앞에서와 같이 [Dashboard  Run All BQ Queries]으로 쿼리를 실행한다실행이 완료되면, access_log라는 시트가 새로 생성되며 해당 쿼리의 결과가 저장되어 있음을 확인할  있으며,

다시 BigQuery Results Visualization시트를 열면 access_log라는 이름으로 실행한 쿼리 결과에 대한 차트가 새롭게 생성되어 있는 것을 확인할  있다.

*만약 차트가 생성되지 않는다면, BQ Queries시트에서 추가한 query name 끝에 “_LINE” 포함하고 있는  확인하길 바란다.

 

로드 시뮬레이션  자동화 구성하기

앞에서의 쿼리 실행 결과를 보면  서버로 구성한 VM(Compute Engine Instance) 아직 많은 접근이 이루어 지지 않았기 때문에 차트에 나타나는 데이터가 작다이번에는 아파치 벤치마킹 (Apach  Bench) 이용해서 부하를 줘서 브라우저 트래픽을 증가시키고 쿼리 실행을  결과를 시뮬레이션 해보고 구글 스프레드 시트의 스크립터 편집기의 트리거라는 기능을 이용해서 자동화 쿼리를 구성해보도록 한다.

 

로드 시뮬레이션

아파치 벤치마킹 툴을 이용한 다음의 명령어로 100개의 클라이언트로 1,000,000개의 부하를 주자. <YOUR_EXTERNAL_IP> 해당 VM external IP 입력한다.

%ab -c 100 -n 1000000 http://<YOUR_EXTERNAL_IP>/

*로컬 컴퓨터에 아파치 벤치마킹 툴이 설치되어 있지 않다면다른 컴퓨트 엔진 인스턴스(VM)으로 명령을 실행해도 되며테스트 용도이기 때문에 앞에서 구성을 완료한 VM 부하를 받을 VM에서 위의 명령어를 실행해도 무방하다.

이제 스프레드 시트파일에서 [Dashboard  Run All BQ Queries] 쿼리를 실행하면 access_log 시트에서   많아진 쿼리 결과를 확인할  있을 것이다또한, BigQuery Results Visualization시트를 열면 차트의 그래프가 많아진 쿼리 결과를 확인할  있을 것이다.

 

자동화 쿼리 구성하기

앞에서는 사용자가 직접 쿼리를 수행해야만 해당 쿼리 결과  차트의 변화를 확인할  있었다.

이제 구글 스프레드 시트의 스크립트 편집기에서 제공하는 트리거라는 기능을 이용해서 사용자가 지정한 특정 시간 간격으로 자동으로 쿼리를 실행하고 지속적으로 쿼리 결과  차트를 업데이트하는 방법에 대해서 알아보고자 한다.

1. 스프레드 시트에서 다시 상단의 메뉴 [도구 → 스크립트 편집기] 실행한다스크립트 편집기에서 상단의 메뉴 [리소스 → 현재 프로젝트의 트리거] 실행한다아래와 같이 설정된 트리거가 없을 것이다. [트리거가 설정되어 있지 않습니다여기를 클릭하여 트리거를 추가하세요.] 부분을 클릭하여 트리거를 구성해보자

2. 트리거 설정 메뉴에서 실행 부분에는 “runQueries” 설정하고 이벤트 부분에서는 “시간 기반”, “ 타이머”, “매분” 으로 설정한다이렇게 하면 bq_query.gs 1 단위로 실행되고 자동으로 쿼리 실행의 결과  차트가 자동으로 업데이트 된다설정이 끝났으면해당 트리거의 내용을 저장한다.

3. 다시 스프레드 시트의 BigQuery Results Visualization 시트로 이동하면, “access_log” 그래프가  분마다 새로고침 되는 것을 확인할  있다또한 서버로 구성한 VM 트래픽이 거의 없기 때문에 변화를 확인하기는 어렵다따라서 다시 아파치 벤치마킹 툴을 이용해서 부하를 주면 access_log 대한 차트가 다음과 같이 거의 실시간(1분단위)으로 변화하는 모습을 확인할  있을 것이다.

* 예제에서는 스크립트 트리거(script trigger) 쿼리 간격(query interval) 모두 1분으로 설정하였다이것은 스크립트가  1분마다 동작하고 쿼리들 역시  1분마다 불러오게 됨을 의미한다만약 스크립트에서 시간당 한번씩 트리거하도록 설정하고 쿼리 간격을 1분으로 유지한다면쿼리는 한시간에 한번만 불러오게 된다일반적으로 쿼리는 가져오길 원하는 주기만큼 트리거되도록 스크립트를 설정해야 한다쿼리 호출에 대한 간격을 없애려고 한다면 쿼리 간격을 0(min)으로 설정하면 된다또한모든 쿼리 실행을 비활성화하려면스크립트 트리거를 삭제하면 된다.

*트리거를 설정해둔 경우 계속 스크립트가 실행되며 쿼리 실행이 이루어지기 때문에 아래의 순서로 트리거를 삭제하길 권장한다.

bq_query.gs 스크립트의 자동 실행을 중지하기 위해서는 스크립트 편집기의 상단 메뉴 [리소스 → 현재 프로젝트의 트리거]에서 앞에서 설정한 "runQueries" 트리거를 삭제하고 저장한다.

 

데이터랩(Datalab)으로 데이터 시각화하기

구글 클라우드 데이터랩(Google Cloud Datalab) 오픈소스로 공개된 주피터 노트북(Jupyter) 구글 클라우드 플랫폼에 맞게 기능을 추가하여 제공되는 서비스이다기본이 되는 주피터가 오픈소스이기에 데이터랩 역시 오픈소스로 공개되어있다.

데이터랩은 파이썬(python) 기본언어로 지원하며구글 클라우드의 빅쿼리와 연동등에 사용할  있게 SQL 자바스크립트를 지원한다또한 머신러닝의 딥러닝 프레임웍인 텐서플로우도 지원하고 있다. 

데이터랩에서 연동할  있는 데이터는 구글 클라우드에서 제공하는 컴퓨트엔진의 VM, 빅쿼리, Cloud Storage등이 있다. 

데이터랩은 오픈소스로 공개되어 있으며데이터랩 사용에 따른 별도의 추가요금이 발생하지 않는다따라서, VM 설치해서 실행하거나로컬 환경에 구성해서 사용할  있다또한도커로 패키징되어 있어서 도커 환경만 있다면 손쉽게 설치  실행을   있다.

 

Google Cloud SDK 설치하기

데이터랩은 사용자 인증  설치에 Google Cloud SDK(이하 gcloud SDK) 사용하고 이후 진행과정에서 gcloud SDK에서 제공하는 주요한 명령어 도구의 사용이 필요하기 때문에 먼저, gcloud SDK 각각의 OS환경에 맞게

https://cloud.google.com/sdk/downloads 에서 다운로드 받아서 설치한다.

설치가 끝났으면 다음의 명령어로 초기화를 실행한다.

%gcloud init

먼저, “Create a new configuration” 선택하여 새롭게 구성한다.

다음으로해당 configuration 저장할 이름을 입력한다임의로 입력해도 되며여기서는 편의상 “fluentd-test” 지정하였다.

 

다음으로 로그인 계정을 선택하는 부분으로 사용자가 앞에서 구글 클라우드 콘솔에 로그인한 계정을 선택한다.

앞에서 로그인 계정을 선택하면구글 클라우드에서 사용 중인 프로젝트의 리스트가 나온다 중에서 앞에서 컴퓨트엔진과 빅쿼리를 사용중인 프로젝트로 선택한다.

뒤의 컴퓨트 엔진 구성부분은 여기에서는 필요하지 않으니 n 입력하고 넘어간다.

gcloud SDK설정 작업이 마무리 되었다. 

*지금까지 gcloud SDK 설정한 정보는 다음의 명령어로 확인 가능하다만약 프로젝트ID 또는 로그인 계정이 잘못 설정된 경우 다시 초기화를 진행하기 바란다.

 

데이터랩 설치

이글에서는 구글 클라우드 컴퓨트 엔진의 VM 데이터랩을 설치하고 데이터랩을 사용해서 데이터 분석하는 방법에 대해 알아보겠다.

앞에서 설치한 gcloud SDK 실행하고 다음의 명령어로 데이터랩 구성 정보가 도커로 사전 정의되어 있는 YAML파일을 다운로드한다.

%gsutil cp gs://cloud-datalab/server.yaml ./datalab-server.yaml 

YAML파일을 다운로드 하였다면데이터랩을 구성하고 실행할 VM 생성하기 위해 gcloud SDK에서 다음의 명령어를 실행한다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

%gcloud compute instances create "instance-name" \
   --project "project-id" \
   --zone "zone" \
   --image-family "container-vm" \
   --image-project "google-containers" \
   --metadata-from-file "google-container-manifest=datalab-server.yaml" \
   --machine-type "n1-highmem-2" \
   --scopes "cloud-platform"

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Copy & Paste용

%gcloud compute instances create "instance-name" --project "project-id" --zone "zone" --image-family container-vm" --image-project "google-containers" --metadata-from-file "google-container-manifest=datalab-server.yaml" --machine-type "n1-highmem-2" --scopes "cloud-platform"

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

gcloud SDK compute 명령어로 VM(instances) 생성하기 위한 명령어이며사용자에 맞게 변경이 필요한 부분은 “instance-name”  생성하기를 원하는 VM이름으로 설정하고, "project-id" 현재 사용하고 있는 프로젝트의 ID 입력한다. “zone” 생성할 VM 위치할 zone 지정한다여기서는 앞에서 이용한 VM 같은 “us-central1-a”으로 진행한다.

 

SSH 터널링

데이터랩을 구성하는 VM 앞에서 생성하였다면데이터랩은 해당 VM에서 자동으로 실행되고 있다이제 SSH 터널링으로 로컬환경에서 데이터랩VM 접속해서 사용하기 위해 데이터랩이 설치된 VM에서 SSH설정 변경이 필요하다.

따라서컴퓨트 엔진의 VM리스트 페이지(VM instances) 이동해서 해당 VM SSH 접속한다.

데이터랩VM SSH 접속하였다면, vi등의 편집기를 이용해서 ssh 설정파일(/etc/ssh/sshd_config) 내용을 수정하고 저장한다. (root권한 필요)


PasswordAuthentication no → PasswordAuthentication yes

SSH 설정 파일의 내용을 수정하고 해당내용을 적용하기 위해서 SSH 서비스를 다음의 명령어로 다시 시작한다.

%sudo service ssh restart

설정  SSH서비스 재시작이 끝났다면 해당 SSH연결은 닫는다. 

 

다시 gcloud SDK명령줄 도구로 이동해서 다음의 명령어를 실행해서 SSH 터널링을 구성한다.

%gcloud compute ssh --quiet \
  --project "project-id" \
  --zone "zone" \
  --ssh-flag="-N" \
  --ssh-flag="-L" \
  --ssh-flag="localhost:8081:localhost:8080" \
  "${USER}@instance-name"

Copy & Paste용

%gcloud compute ssh --quiet --project "project-id" --zone "zone" --ssh-flag="-N" --ssh-flag="-L" --ssh-flag="localhost:8081:localhost:8080" "${USER}@instance-name"
 

SSH터널링으로 VM에서 8080포트에 실행되고 있는 데이터랩을 로컬에서 8081포트로 접속할  있게 정의한다이외에 변경이 필요한 내용으로 "project-id" 현재 사용중인 프로젝트ID 입력하고, "zone"부분은 데이터랩VM 생성한 zone위치를 입력한다그리고 "${USER}@instance-name" 부분에서 “${USER}” 해당 VM 접속할 사용자명(gcloud SDK 초기화 설정에서 선택한 로그인 계정)으로 변경하고 @뒤의 “instance-name” 앞에서 생성한 데이터랩VM 이름으로 변경한다.

*주의 위의 명령어를 실행한 상태가 유지되어야 데이터랩에 로컬에서 접속할  있으며, gcloud SDK창을 닫거나 해당 명령어 실행을 취소하면 데이터랩에 접속할  없다.

 

데이터랩에서 빅쿼리 쿼리 실행하기

데이터랩 접속하기

데이터랩VM 생성하고 SSH 터널링을 실행한 상태라면 브라우저를 실행하고 http://localhost:8081  접속한다접속을 하면 앞에서 생성한 VM에서 실행중인 Google Cloud Datalab화면을   있으며이제 데이터랩의 노트북을 이용해서 분석을 진행할  있다.

 

 

빅쿼리 쿼리 실행하기

데이터랩에 접속하였으면앞에서 구글 스프레드 시트를 이용해서 실행했던 빅쿼리의 쿼리를 실행해보자. 

데이터랩은 노트북이라는 단위로 사용자가 실행한 코드  입력한 텍스트 등의 정보를 관리한다. 

먼저 데이터랩에서 “+Notebook”버튼을 클릭하고 새로운 노트북을 생성하고생성한 노트북에서 “+Add Code”버튼을 이용해서 생성한 코드블럭에 SQL, 파이썬스크립트자바스크립트등의 코드를 입력하여 실행할  있고, “+Add Markdown” 버튼으로 생성한 텍스트 편집 상자에는 간단한 텍스트 또는 이미지를 입력할  있다. 

먼저, “+Add Markdown” 버튼을 클릭해서 텍스트 상자를 만들고 다음의 내용을 입력한다.

빅쿼리 쿼리 실행하기

다음으로 “+Add Code”버튼을 클릭하여 코드블럭을 생성하고 아래의 내용을 입력한다.

%%sql
SELECT
  STRFTIME_UTC_USEC(time * 1000000, "%Y-%m-%d %H:%M:%S") as tstamp,
  count(*) as rps
FROM bq_test.access_log
GROUP BY tstamp ORDER BY tstamp DESC;

%%sql 빅쿼리의 sql 실행하겠다고 명시하는 부분이며 아래의 쿼리 내용은 구글 스프레드 시트로 실행한 쿼리와 같다.

*Google Cloud SDK 프로젝트  사용자에 대한 인증이 이루어지기 때문에 따로 프로젝트ID 명시하지 않아도 된다.

 

상단의 메뉴에서 “RUN” 버튼을 클릭하면 위의 쿼리가 실행되고 결과를 확인할  있다.

 

Google Cloud Datalab 이용한 데이터 시각화

데이터랩의 노트북에서는 빅쿼리에 저장된 데이터를 구글 차트API 사용하여 SQL 조회한 데이터를 간단한 명령어로 차트를 생성하고 시각화   있으며다양한 유형의 차트를 지원한다또한파이썬 코드를 이용해서 데이터를 자유롭게 가공하고 파이썬의 matplotlib 이용해서 데이터를 그래프로 나타낼 수도 있다이글에서는 구글 차트API 이용한 명령으로 차트를 생성해서 시각화하는 방법에 대해서 알아본다.

 

모듈에 데이터 저장하기

데이터를 구글 차트 또는 파이썬 matplotlib등으로 시각화하기 위해서는 SQL 실행한 결과를 모듈로 저장하고데이터가 저장된 해당 모듈을 이용해야한다. SQL 실행한 결과를 모듈로 저장하기위해 다음과 같이 진행한다. 

먼저 “+Add Markdown”버튼을 클릭하여 텍스트 상자를 만들고 아래와 같이 입력한다.

빅쿼리 쿼리 실행 결과 모듈에 저장하기

다음으로 “+Add Code” 버튼을 클릭하여 생성한 코드블럭에 다음의 코드를 입력한다.

%%sql --module=log_data
SELECT
  STRFTIME_UTC_USEC(time * 1000000, "%Y-%m-%d %H:%M:%S") as tstamp,
  count(*) as rps
FROM bq_test.access_log
GROUP BY tstamp ORDER BY tstamp DESC;

%%sql뒤의 –module=log_data 부분은 아래의 쿼리실행의 결과를 지정한 log_data라는 모듈에 저장하겠다는 의미이다..

*데이터를 모듈에 저장하는 부분으로 “RUN”버튼을 클릭하여 실행하면  다른 에러가 없는 경우변화가 없으며 지정한 모듈에 저장이  것이다.

 

구글 차트로 시각화하기

이제 앞에서 저장한 모듈의 데이터를 이용해서 구글 차트를 생성하는 명령어를 이용해서 차트를 생성하여 시각화를 해보자구글 차트를 생성하기 위해 다음과 같이 진행한다. 

빅쿼리에 SQL 쿼리를 실행한 결과를 모듈에 저장하였다면, “+Add Markdown”으로 다음의 내용을 입력한다.

area 차트로 시각화하기

다음으로 “+Add Code” 코드블럭을 생성하고 다음의 내용을 입력하고 “RUN”버튼으로 코드를 실행한다.

%%chart area --fields tstamp,rps --data=log_data

다음과 같이 차트가 생성되는 것을 확인할  있다.

 

구글 차트로 area 차트를 그리는 명령어로, --fields 부분은 앞서 모듈에 저장된 데이터 중에서 tstamp, rps 필드의 데이터를 읽는 부분으로,

여기에 정의한 필드를 읽어서 차트로 그려준다.

 

다음은 구글 차트로 제공되는 여러가지 유형의 차트로 시각화한 결과이니 참고하기 바란다.

1. pie 차트

2. columns 차트

3. line 차트

4. scatter 차트

 

마무리

지금까지 구글 컴퓨트 엔진의 VM nginx 서버를 구성하고 Fluentd 실시간으로  서버에서 발생한 액세스로그를 수집하고  데이터를 빅쿼리의 테이블에 저장하고 구글 스프레드 시트  구글 클라우드 데이터랩을 이용해서 쿼리를 수행하고 시각화하는 방법에 대해서 알아보았다지금까지 설명한 방법 이외에도 구글 클라우드 에서는 다양한 형태로 데이터를 수집하고 처리하고 시각화하는 데이터 분석을 위한 여러가지 서비스들을 제공하니

https://cloud.google.com/products/

https://cloud.google.com/solutions/

 참고하길 바란다.

 

실습자원 정리하기

지금까지 실습에 사용한 VM, 빅쿼리 테이블 데이터 등의 리소스를 그대로  경우 지속적으로 비용이 과금되고 Free Trial기간이 종료되면 유료 과금   있다따라서 실습에 사용한 자원  데이터를 정리하자.

1. 구글 클라우드 플랫폼의 자원(VM, 빅쿼리 테이블) 정리하기 위해서 구글 클라우드 콘솔(Google Cloud Console)페이지로 접속해서 앞에서 사용한 프로젝트가 선택된 상태에서 아래와 같이 “Manage project setting” 버튼을 클릭한다.

2. 상단 메뉴에서 “DELETE PROJECT” 버튼을 클릭하고 지금까지 사용한 프로젝트를 삭제한다.

3. 구글 스프레드 시트 파일의 삭제는 위해 본인 계정의 구글 드라이브 접속하여 해당 파일을 선택하고 마우스 오른쪽 버튼을 클릭하여 하단의 삭제 버튼을 클릭하여 삭제한다.

 

참고 자료

https://cloud.google.com/solutions/real-time/fluentd-bigquery

 

https://cloud.google.com/datalab/docs/quickstarts/quickstart-gcp-frontend

 

 

구글 클라우드 빅데이터 분석 서비스를 활용한

 

 

아이폰7 SNS 메시지 반응 분

최유석

이 글은 구글 클라우드 플랫폼의 빅데이터 분석 플랫폼의 학습 및 정보 공유 목적으로 작성하였으며, 조대협님의 구글 빅쿼리와 데이타 플로우를 이용한 노트7 소셜 반응 분석 (http://bcho.tistory.com/1136, 7, 8) 관련 글 들을 참고하여 원본 글의 작성자(조대협님)의 허가를 받고 작성하였으며, 이 글의 내용에 대한 악의적인 해석이나 활용되지 않기를 바랍니다. 이 글에 대한 라이센스는 원본 글 작성자인 조대협님과 이글의 작성자인 본인이 소유하고 있으며 원본 글과 동일하게 출처를 밝히는 인용을 포함하여 모든 인용 및 내용에 대한 활용을 금합니다.

 

시작

구글 클라우드 플랫폼의 빅데이터 분석 서비스들인 빅쿼리, 데이터플로우에 대한 학습 및 실습을 하면서 익혀가고 있던 도중, 우연한 기회에 조대협님에게 아이폰7에 대한 분석에 대한 제의를 받고 원본 글을 읽고 개인적인 호기심과 흥미가 있었던 상황이라서 시작하게 되었다. 이 글은 설명하기 위한 목적보다는 원본 글에서 분석의 대상이 되었던 노트7과 비슷한 시기에 출시된 아이폰7에 대한 분석 결과를 공유하고자 작성하게 되었다.

 

*데이터의 수집부터 분석하기까지 전체과정에 대한 전반적인 설명은 http://bcho.tistory.com/1136 , http://bcho.tistory.com/1137 , http://bcho.tistory.com/1138 의 글에 자세히 되어있기 때문에 생략한다. 만약 자세한 내용이 궁금하다면 위의 글들을 참고하기 바란다. 또한 원본 글들의 과정을 토대로 거의 동일하게 진행하였으며, 단지 분석의 대상만 변경하였다.

 

데이터 수집 기간

트위터와 같은SNS의 특성상 특정한 이슈가 있을 때 발생하는 메시지가 증가하고 통상적인 메시지와 내용이나 양이 달라진다. 따라서 이후 분석할 데이터의 기준이 되는 데이터를 수집한 기간은 9월26일(월) 오전부터 9월30일(금) 오전까지 이다.

 

데이터 스튜디오를 이용한 데이터의 시각화 분석

원래는 데이터에 대한 수집 및 저장은 원본 글의 구성을 따르고 시각화 및 분석은 제플린(zeppelin.apache.org)을 이용한 데이터의 시각화 및 분석을 하려고 하였으나, 최근 원본 글에서도 소개된 리포트 도구인 구글의 데이터 스튜디오(https://datastudio.google.com/)가 한국에도 베타로 릴리즈되고 사용할 수 있게 되어서 원본 글과 비슷하게 데이터 스튜디오, 제플린 두 가지를 같이 사용하여 데이터의 시각화 및 분석을 한 내용을 공유하려고 한다.

 

명사 전체 통계 분석 (9/26 ~ 9/30)

먼저 해당기간 동안에 수집된 명사에 대한 통계 이다.

  *전체 수집된 데이터에서 iphone7이 가장 많았으나, 원본처럼 iphone7을 포함해서 불 필요한 명사들은 필터링하였다.

 

명사 데이터 분석하기

명사에 대한 표와 그래프를 이용해서 수집된 데이터의 통계정보를 살펴보자.

첫번째로 공짜, 증정품 등을 의미하는giveaway가 처음부터 끝까지 전체 데이터에서 1위를 꾸준히 유지하고 있으며, 28일에 피크 상태까지 올랐다가 이후 하강세를 보인다. 이는 미국 이동통신회사인 버라이즌, AT&T, T모바일에서 각각 조금씩 다르지만 이전모델인 아이폰5, 6을 반납하고 2년약정을 하면 아이폰7을 일종의 "공짜폰" 마케팅을 진행했기 때문으로 보여진다. 

http://www.edaily.co.kr/news/NewsRead.edy?SCD=JE31&newsid=01954886612784712&DCD=A00503&OutLnkChk=Y

http://fortune.com/2016/09/30/verizon-ending-free-iphone-7-trade-in/ 

그리고 giveaway가 28일에 피크를 치고 이후 하강세로 나타나고30일에는 매우 큰 폭으로 떨어지는 이유는 위의 무료행사가 종료되기 때문인 것으로 보인다.

 

두번째는 아이폰이 전작과 마찬가지로 기본모델(4.7인치)과 플러스모델(5.5인치)로 출시하고 전작과 다르게 크기만 다른 것이 아니라 가장 큰 부분은 카메라가 아이폰7은 싱글 카메라이고 플러스는 듀얼카메라이고 그 외 다른 스펙에서도 약간 높다. 이러한 스펙의 차이로 사람들이 많은 관심을 보이는 듯하다. 물론 가격은 더 비싸다. 

http://www.kookje.co.kr/news2011/asp/newsbody.asp?code=0200&key=20160908.99002073718

 

세번째는 &amp로 나타나지만 이는 특수문자인 &을 HTML에서 변환하기 위한 코드이기 때문에 생략하고, 결국 세번째로 많이 나타난 단어는 case로 생각 할 수 있는데 아무래도 새로운 제품으로 출시되었기 때문에 많은 해당모델에 대한 케이스에 대한 관심을 많이 가지는 것으로도 볼 수 있으나, 이번 아이폰7은 많이 이슈가 된 것처럼, 헤드폰 단자가 없기 때문에 부가적인 악세서리에 대한 관심이 높아 진 것으로 보여진다. 관련 된 내용을 찾다 보니 여러가지 형태로 이어폰 잭을 사용할 수 있는 부가 악세서리 정보가 많았다. 

*HTML에서 특수문자를 표현하기 위한 코드인 &amp(3위)와 &gt(7위)가 구글 자연어 분석API(CLOUD NATURAL LANGUAGE API )에서 왜 명사로 구분되었을 까? 이 글에서는 크게 중요한 부분은 아니기 때문에 넘어가도록 한다.

 

http://www.itworld.co.kr/news/101479

 

네번째(eztech231)와 다섯 번째(nbt88yt)는 각종 IT제품들의 리뷰를 진행하고 SNS, 유튜브 등을 통해서 홍보하고 공개하는 사람들의 트위터 ID로 확인된다.

 

데이터 스튜디오를 이용한 형용사 전체 통계 분석 (9/26 ~ 9/30)

다음은 해당기간 동안에 수집된 형용사에 대한 통계이다.

 

형용사 데이터 분석하기

형용사에 대한 표와 그래프를 이용해서 수집된 데이터의 통계정보를 살펴보자

첫번쨰는 새로운 출시된 제품이기 때문인지 new가 가장 많았다.,

두번째는 free로 명사처럼 무료 마케팅 때문인 것으로 보여진다.

세번째는 이번에 아이폰7이 블랙계열 색상으로 나온 모델이 블랙(무광)과, 제트블랙(유광)이 두가지가 출시되어서 black이라는 단어가 세번째로 많은 것으로 보여진다.

네번째(good)와 다섯 번째(best)는 아이폰7에 대한 긍정적인 반응을 의미하는 것으로 보여진다.

 

제플린을 이용한 데이터의 시각화 상세분석

지금까지 수집한 전체에 데이터를 기준으로 구글의 데이터 스튜디오를 이용해서 분석을 해보았다. 이제 제플린을 이용해서 날짜 및 시간단위로 수집된 데이터에 대해서 상세한 분석을 해보도록 한다.

 

시간대 명사의 전체 카운트 그래프 (상위 5개)

먼저 시간대 별로 명사의 카운트 그래프를 보면 명사의 카운트 수가 최대 4,833, 최저 149이고 대략적으로 통상500정도이다. 해당 결과는 카운트의 수가 상위5개의 명사의 수량만 카운트한 결과의 그래프이며 분석 대상이 되는 iphone7등 불 필요한 단어는 제외한 상태이다.

위의 그래프를 통해서 확인해보니 완만한 곡선을 그리는 부분을 제외하고, 사이사이에 트윗이 높게 발생하는 시점이 대략 1000 정도이다. 따라서 명사 카운트가 1000을 넘는 시점(9/26 11:00, 9/27 4:00, 9/27 5:00, 9/27 23:00, 등) 에 특정한 이슈가 있었던 것으로 간주하고 명사 카운트 1000이상이 되는 시간을 기준으로 분석 해보도록 한다.

 

시간대 별 카운트 된 각각의 명사에 대한 그래프

다음과 같이 각각의 명사의 카운트 수를 그래프로 확인하고 분석해보려고 하였으나, 그래프에 표현 될 각각의 명사의 개수가 늘어나니 그래프로 확인하기는 어렵다.

전체 기간 동안의 시간대 별 단어 그래프(상위5건)

 

1000건 이상 카운트 된 시간대의 단어들의 그래프(상위 5건)

명사 카운트가 1000건 이상 발생한 시간대 별 상위 5건의 단어에 대해서 그래프로 표현해보았으나, 전체에 대한 그래프보다는 보기가 편하지만, 각각의 시간대 별 단어들에 대해서 직관적으로 파악하고 분석하기 어렵다. 따라서 표를 이용해서 분석을 진행하기로 한다. 

 

표를 이용한 데이터 분석

앞에서 언급한 것처럼 명사의 카운트가 1000건 이상 발생한 주요한 시점을 기준으로 상위 5개씩 검출해서 명사 단어들을 표를 이용해서 확인해보도록 하자.

 

926 11 (1497)

9월26일 11시에 features, water, camara, phone, wireless라는 단어가 많이 검출되었다.

특정한 이슈가 발생 한 것 보다는 아이폰7의 기능, 특징, 방수 등에 대한 내용과 위에서 언급했던 이어폰 잭 제거로 인한 무선 이어폰, 헤드폰 관련 내용이 트윗 메시지로 발생한 것으로 보여진다.

 

927 4(1282), 5(1264)

트위터 같은 SNS서비스의 특성 상 특정한 이슈나 메시지가 발생하면 지속적으로 비슷한 성향의 메시지가 나타나는 경향이 있으니 1000건이상 카운트 된 시간이 연속되어 있는 경우 해당 시간대들을 연결해서 보도록 하자.

 

9월 27일 4시와 5시에는 앞에서 전체 통계 분석에서 확인했던 것처럼 아이폰7의 케이스에 대해서 트윗에서 많이 언급 된 듯하다. 기case를 포함해서 neroccocases, etsy, learhercase라는 단어가 많이 보이는 데 찾아보니, 트위터에서 etsy라는 핸드메이드 쇼핑몰에서 판매하는 nerocco라는 일종의 브랜드에서 판매하는 가죽 케이스(learher-case) 많이 언급된 것으로 보여지고 가죽케이스에서 지갑(wallet) 역할도 하기 때문에 wallet이라는 단어도 언급된 것으로 보인다.

 

트위터에서 해당 검색어를 넣고 트윗된 메시지를 검색해서 나온 페이지를 들어가보니 다음과 같은 결과를 얻을 수 있었다. 이 페이지를 보니 왜 위와 같은 단어들이 검색되었는 지 파악할 수 있었다. 물론 해당 메시지에도 위에서 본 단어들이 메시지 자체의 내용이나 태그에 있어서 검출된 것으로 보인다.

https://www.etsy.com/listing/464824974/iphone-7-leather-case-iphone-7-wallet

 

또한, 앞서 명사에 대한 전체 통계분석에서 전체적으로 가장 많이 나온 단어인 giveaway도 보인다.

 

927 23 (4565), 928 0 (4833) 증가 및 정점

928 1(2423),2(1459),3 (1041) 하락

데이터를 수집한 총 기간 중에 9월 27일 23시에 4천건이상으로 급증하기 시작해서 9월28일 0시에 4833건으로 정점을 찍고 카운트는 하락하지만 9월28일3시까지 1000건이상 발생한 시점이다. 해당 시간대의 전체적인 단어들을 살펴보면 처음 트윗이 증가하기 시작해서 줄어들기까지 지속적으로 나온 단어들이 여러가지 나오고 카운트수가 시간대 별 동일한 것으로 봐서는 같은 메시지에 위에서 나온 여러 단어들이 조합되어 있었을 가능성을 고려해본다. 그래서 단어 하나 하나 살펴보던 중 sqeezereport라는 단어가 이렇게 트윗이 증가하게한 중요 키워드인 것을 발견했다. 그래서 sqeezereport라는 단어를 토대로 검색을 해보았다. 

위와 같은 결과를 얻게 된 것은 이 트윗 메시지가 원인인듯 하다. http://www.squeezereport.com/ 라는 곳에서 주식관련 리포트를 내놓은 듯한데 관련 내용에 대해서 문외한이니 자세히 알 수는 없었다. 그러나 메시지와 태그에 있는 단어들이 위의 표에서 확인할 수 있는 단어들과 같다. 단어들의 카운트가 동일한것으로 봐서는 해당 시간대에 최초의 메시지가 지속적으로 리트윗된 것으로 보인다.

 

9월28일 2시와 3시에는 지속적으로 나오는 단어인 giveaway 가 상위권에 있는 것도 보인다.

 

928 15 (1131)

9월28일 15시에는 iphone7plus, sleeve, iphone7plussleeve, giveaway, iphone7pluspouch

라는 단어가 많이 발생했다. 지속적으로 나온 단어인 giveaway 제외하고 나머지 4단어인 iphone7plus, sleeve, iphone7plussleeve, iphone7pluspouch 무언가 단어들끼리 연관성이 있어 보이고 케이스와 관련된 같다. 그래서 해당 단어들을 토대로 확인해보니 다음과 비슷한 트윗이 많이 나온 것을 발견했다.

앞서 sqeezereport에서 처럼 하나의 트윗이 지속적으로 리트윗 되었다기보다는 아이폰7 플러스의 케이스(sleeve, pouch 포함하는) 대한 소개 또는 홍보 트윗이 보인다.

 

9월30일 3 (1172)

1000건이상 단어가 카운트 된 마지막 시간대인 9월30일 3시에는 flipkart라는 인도의 최대 온라인 쇼핑몰에서 아이폰7 preorderyour7( 주문) 관련 메시지가 많이 발생한 것으로 보인다.

해당시기에 위와 비슷한 트윗을 많이 발견할 있었다.

 

http://economictimes.indiatimes.com/magazines/panache/wait-over-indian-apple-fans-finally-grab-iphone-7/articleshow/54741023.cms

또한, 인도에서 9 30 아직 아이폰7 출시되지 않아서 107 출시되었기 때문에 wait라는 단어가 많이 나온 것으로 보인다.

 

http://www.business-standard.com/article/companies/flipkart-cashes-in-on-strong-iphone-7-launch-binny-delivers-first-device-116100701137_1.html

위에서 언급 한 것처럼 10월 7일 Flipkart에서 아이폰7 런칭 했다는 기사를 확인 있었다.

 

지속적으로 나온 단어인 giveaway wireless(무선) 관련 카운트도 높은 것으로 보이나 위에서 언급하였으니 생략한다.

 

마무

아이폰7은 데이터를 수집한 시기에 노트7과 같은 특수한 이슈는 없었지만 데이터를 모으고 분석하기 위해 확인하면서 개인적으로는 많이 흥미로운 결과를 볼 수 있었다. 사실 뒤에 V20출시나 아이폰도 유머인지 정확히 확인할 수는 없지만 폭발이라는 이슈가 있었기 때문에 조금 더 데이터를 수집했다면, 더욱 더 흥미로운 결과를 볼 수 있었을 것 같아서 아쉬운 마음도 남는다. 짧은 기간이기는 하지만 비교적 큰 비용을 들이지 않고도 SNS서비스를 통해 데이터를 수집하고 구글 클라우드 플랫폼의 빅데이터 분석서비스들을 이용해서 특정한 제품에 대한 트렌드를 손쉽게 파악할 수 있었고, 이러한 특정한 사례만이 아니라 데이터를 분석하는 거의 모든 경우에서 활용할 수 있을 듯 하다. 

처음 노트7분석글을 보았을 때만 해도 과연 내가 이러한 분석을 따라서 라도 할 수 있을 까? 라는 의문이 많이 들었다. 하지만 원본 글을 읽고 따라서 진행하다 보니 처음 구성을 마치기 까지 대략3, 4시간정도 밖에 걸리지 않았다. 물론 약간의 우여곡절이 있어서 바로 데이터를 수집할 수 는 없었지만 말이다. 아직 이러한 시스템을 구축해보고 운영해본 경험이 거의 없는 것과 데이터 플로우는 개념적인 부분만 알고 있었고 실제 사용하는 부분은 살펴보기 시작한 즈음이었는데도 따라서 하기는 했지만, 이러한 시스템을 만들고, 데이터를 수집해서 분석하고 하는게 혼자서 몇시간이면 가능하다는 것이 놀라울 따름이다.

 

 

 

빅쿼리 Query Plan을 이용한 쿼리 실행 분석

최유석

개요

일반적으로 대부분의 데이터베이스의 SQL (MS-SQL, MySQL, Oracle, 등)에서 제공하는 Query Explain Plan(쿼리 실행 계획)기능은 사용자가 쿼리를 실행하기전에 쿼리를 분석할 수 있도록 쿼리가 실행되는 각각의 과정에 대해서 예상정보를 제공한다. 이 정보를 통해 해당 되는 쿼리에 대해서 성능에 대한 향상, 실행 오류 방지 등 쿼리를 실행하기 이전에 최적화 할 수 있게 도와준다. 빅쿼리에서도 이와 유사한 기능으로 Query Plan(쿼리 계획)을 제공한다. 이글에서는 빅쿼리의 Query Plan을 통해 쿼리 실행을 분석하는 방법에 대해서 알아보고자 한다.

 

빅쿼리와 다른 SQL엔진의 차이점

앞에서 언급한 것처럼 다른 SQL엔진은 쿼리를 실행하기 전에 해당되는 쿼리에 대한 예상 실행정보를 각각의 SQL에서 제공하는 Explain(각각 제공하는 이름은 다르지만 일반적인 의미)문을 적용해서 쿼리를 실행하고 분석해서 쿼리 튜닝을 하게 된다. 하지만 빅쿼리는 다른 SQL엔진과 다른 방식으로 실행이 완료된 쿼리에 대해서 쿼리 실행의 각 단계에 대한 정보를 메타데이터 형태로 제공한다. 또한 일반적인 Explain문이 빅쿼리에 따로 없기 때문에 직접 실행할 수 없으며, 실행이 완료된 쿼리에 대해서 Query Plan정보를 자동적으로 제공해준다.

 

빅쿼리의 부담 없는 쿼리 실행

빅쿼리는 구글 클라우드 플랫폼의 관리형 서비스로 구글에서 전체적인 시스템에 대한 관리를 하기 때문에 사용자 입장에서는 빅쿼리에 데이터를 저장하고 분석하는 등, 빅쿼리가 제공하는 서비스를 사용하기만 하면 된다. 따라서 일반적인 데이터베이스처럼 잘못된 쿼리로 인한 오류로 인해 데이터베이스에 문제를 일으키는 경우에 대해서 걱정 할 필요가 없다. 단지 쿼리가 실패할 뿐이며, 바로 다시 쿼리를 수정해서 실행할 수 있다. 또한 쿼리를 실행하기전에 쿼리에 대한 Validation (유효성 검사)기능을 제공하기 때문에 잘못된 쿼리의 경우 대부분 여기에서 걸러진다. 추가적으로Validation 을 통해 쿼리오류를 확인하지 못했다고 하더라도 쿼리가 잘못되거나 기타 다른 문제로 쿼리가 실패해서 오류가 발생되는 경우, 실패한 쿼리에 대해서는 비용이 부과되지 않기 때문에 부담이 적다.

 

빅쿼리에서 쿼리 실행 분석에 대한 접근

빅쿼리의 아키텍처를 보면 일반적인 데이터베이스에서 제공하는 Key, Index가 없으며, 기본적으로 쿼리요청의 대상이 되는 각각의 열에 대해서 Full Scan이다. 따라서 일반적으로 고려하는 Key, Index 기반의 쿼리 튜닝으로 방향이 아닌, 빅쿼리의 Query Plan에서 제공하는 메타데이터를 토대로 쿼리 실행단계에서 소요된 작업대기, 읽기, 쓰기, CPU연산 등에 대한 상대적인 시간 비율을 기준으로 실행 된 쿼리에 대한 분석 및 최적화 방향을 고려해야 한다.

 

*사실 빅쿼리의 쿼리 최적화는 쿼리실행이 열에 대한 풀스캔이고, 거기에 대한 비용이 발생하는 구조기 때문에 쿼리 비용자체를 감소시키기는 어렵다. 주요한 방향으로 생각해야 할 부분은 쿼리 실행 성능의 향상이다.

 

빅쿼리의 Query Plan 인터페이스

빅쿼리 콘솔(Web UI)

빅쿼리 웹 콘솔(Web UI)에서는 쿼리를 실행하고 나서 다음과 같이 Explanation 버튼을 클릭하면 쿼리 실행에 대한 각각의 스테이지로 정보가 간략하게 나타나고 각각의 스테이지에 있는 세모모양의 아이콘을 클릭하면 각각의 스테이지 별로 쿼리 실행의 세부정보가 나타난다.

 

 

API

빅쿼리에서 API형태로 제공하는 Query Plan 정보는 실행이 완료된 쿼리 결과에 대해서 자동으로 해당 쿼리 Job의 리소스로 포함되어 7일간 보관된다. 이 기간 안에 Job.get() 메소드를 호출하게 되면 응답 결과에 포함 된 JSON 배열 형식의 queryPlan 속성에서 상세한 쿼리 실행 정보를 얻을 수 있다.

 

빅쿼리 Query plan 메타데이터

STAGE 메타데이터

API속성 Web UI 설명
Id Stage x 스테이지의 고유한 정수 ID
(해당 Query Plan 범위 한정)
recordsRead Input 스테이지에서 읽은 행의
recordsWritten Output 스테이지가 작성한 행의

 

상대적인 시간비율에 따른 메타데이터

API속성 Web UI 설명
waitRatioAvg 작업 대기에 소요된 평균시간
waitRatioMax 작업 대기에 소요된 최대시간
readRatioAvg 입력데이터의 읽기에 소요된 평균시간
readRatioMax 입력데이터의 읽기에 소요된 최대시간
computeRatioAvg CPU연산에 소요된 평균시간
computeRatioMax CPU연산에 소요된 최대시간
writeRatioAvg 출력데이터의 쓰기에 소요된 평균시간
writeRatioMax 출력데이터의 쓰기에 소요된최대시

*빅쿼리 콘솔에서는 AVG, MAX 따로 표시되지 않는다. 또한, 정확한 수치를 확인하기 위해서는 API 이용한 접근이 필요하다.

 

간단한 쿼리를 이용한 기본적인 메타데이터 정보 확인하기

먼저 다음의 쿼리를 빅쿼리 웹 콘솔에서 실행하고, Query Plan에 대한 메타데이터 정보를 확인해보자.

SELECT COUNT(*)
FROM [publicdata:samples.shakespeare]
WHERE corpus = 'hamlet'

쿼리를 실행하고 Explanation 탭을 클릭하면 다음과 같이 Query Plan에 대한 정보를 확인 할 수 있다.

먼저 좌측의 Stage 뒤의 1, 2가 실행 된 해당 쿼리에 대한 고유한 ID이며,우측 Input, Output 아래의 수치가 해당 쿼리에 대한 각각의 스테이지에서 읽거나 기록한 행의 수이다. 중간의 Stage timing 아래에 나타나는 부분이 해당 쿼리가 실행되는 각각의 스테이지에서 [스케줄에 대한 작업대기, 읽기, CPU연산, 쓰기]에 의해서 소요된 시간 비율들을 나타내며, 위의 쿼리의 경우 실행 데이터의 크기가 작기 때문에 각각의 상대적인 시간비율들의 평균값과 최대값이 같게 나타난다.

 

이번에는 다음과 같이 시간비율의 편차를 보기 위해 쿼리의 대상이 되는 데이터의 크기가 크고 쿼리 연산이 복잡한 경우를 살펴보자.

SELECT language, SUM(views) as views

FROM [bigquery-samples:wikipedia_benchmark.Wiki1B]

WHERE regexp_match(title,"G.*o.*o.*g")

GROUP by language

ORDER by views DESC

위의 결과에서 보듯이 스테이지1에서 데이터의 읽기와 CPU연산에 소요된 시간 비율의 평균과 최대치의 차이가 발생하는 것을 확인할 수 있다.

 

*사용자가 실행하는 쿼리와 대상이 되는 데이터의 크기, 분포 등에 따라서 위와 같이 쿼리 실행 단계의 각각의 상대적인 시간비율들의 평균, 최대 값의 차이가 발생하게 된다.

 

STEP(스테이지의 세부 단계) 메타데이터 정보

Query Plan에서 제공하는 각각의 스테이지 내부에서의 확인 가능한 세부적인 메타데이터 정보는 다음과 같다.

STEP(세부 속성) 설명
READ 입력 테이블 또는 중간 결과에서 하나 이상의 열을 읽는다.
WRITE 출력 테이블 또는 중간 결과에 대해 하나 이상의 열을 기록한다.
COMPUTE 대부분의 식에 대한 모든 계산 및 내장된 함수의 호출한다
FILTER WHERE, OMIT IF, HAVING절을 구현하는 연산자이다.
SORT 정렬 연산을 나타내며 열의 키와 정렬방향을 포함한다.
AGGREGATE 집계연산(예:GROUP BY)을 나타내며, 여러 단계로 분할된다.
LIMIT LIMIT절을 구현하는 연산자이다.
JOIN JOIN연산을 나타내며, JOIN의 종류와 JOIN에 사용되는 열의 정보를 포함한다.
ANALYTIC_FUNCTION 분석 기능(CF. window functions)을 호출한다.
USER_DEFINED_FUNCTION 사용자 정의 함수를 호출한다

 

간단한 쿼리를 이용한 상세한 메타데이터 정보 확인하기

앞서 메타데이터 정보를 확인하기 위해 처음 실행 했던 쿼리를 이용해서 Query Plan의 각각의 세부적인 단계(STEP)에 대한 정보를 빅쿼리 웹 콘솔(Web UI)과 API를 통해서 각각 확인해보자.

 

빅쿼리 콘솔(Web UI) 에서 STEP정보 확인하기

앞서 간단하게 메타데이터 정보를 확인하기 위해 처음 실행 했던 쿼리의 Explanation을 통해서 쿼리 실행의 각각의 단계(STEP)에 대한 정보를 확인해보자. 실행 된 쿼리의 Explanation에서 각각의 스테이지 좌측에 나타나는 삼각형모양의 아이콘을 클릭하면 각각의 스테이지에 대한 세부적인 메타데이터 정보를 확인 할 수 있다.

먼저 스테이지 1, 2에서 READ, AGGREGATE, WRITE스탭을 포함하고 있는 것을 볼 수 있다.

 

스테이지1에서는 READ스탭에서 [publicdata:samples.shakespeare] 테이블의 대상이 되는 "corpus" 열을 읽어서 값이 "hamlet"인 행을 찾는다. 그 다음 AGGREGATE스탭에서 앞서 읽은 행을 카운트하고 WRITE에서 __stage1_output 이라는 식별자에 기록을 한다.

 

스테이지2에서는 스테이지1에서 __stage1_output에 기록한 정보를 받아서 READ스탭에서 스테이지1의 작업의 카운트한 정보를 읽고 AGGREGATE스탭에서 카운트 된 정보에 대한 합산을 하고 WRITE스탭에서 합산한 결과를 출력한다.

 

API 이용해서 STEP정보 확인하기

구글에서 제공하는 APIs Explorer을 사용해서 빅쿼리의 Query Plan정보를 간단하게 API를 호출하고 결과를 확인해보도록 하자.

 

Job ID 확인

앞서 언급한 것처럼, 빅쿼리 Query Plan정보는 실행된 쿼리에 대한 결과만 확인 할 수 있으며, Query Plan정보를 얻기 위해서는 실행된 쿼리에 대한 정보 중 하나인 Job ID가 필요하다. 빅쿼리 웹 콘솔(Web UI)에서 좌측상단 Query History클릭하고 앞서 실행한 쿼리정보에 대한Job ID의 값을 확인한다.(복사하거나 창을 열어둔다.)

*빅쿼리 웹 콘솔의 Query History에서 Job ID 는 Project ID:Job ID 형식으로 되어있으니, job.get() 메소드를 호출할 때는 각각 나눠서 입력해야 한다. 

Google APIs Explorer 페이지 접속

다음으로 APIs Explorer에서 빅쿼리 Job.get()메소드 테스트 페이지인 다음의 주소로 접속한다.

https://developers.google.com/apis-explorer/#p/bigquery/v2/bigquery.jobs.get

 

권한 부여

빅쿼리의 Job.get() 메소드를 사용하기 위해서는 빅쿼리에 대한 읽기 또는 전체 관리권한이 필요하다. 따라서 해당되는 권한을 임시로 부여하기 위해 우측의 off로 되어있는 버튼을 클릭하고 해당되는 권한범위를 체크하고(필요한 범위는 기본값으로 체크되어 있다.) Authorize를 클릭해서 실행 권한을 부여한다.

Job.get()메소드를 사용하기 위한 권한 범위URI는 위의 화면에서 체크된 항목과 같다.

*한 가지만 허용해도 Job.get() 메소드를 사용할 수 있다.

 

Job.get() 메소드 실행하기

API 사용에 대한 권한범위를 허용하고 버튼이 ON되었으면 해당 프로젝트 ID와 위에서 확인한 Job ID를 입력하고 Execute버튼을 클릭해서 실행한다.

 

Job.get() 메소드 실행결과 확인

아래에 실행된 결과를 보면 Job.get()메소드에 대해서 GET으로 성공적으로 요청이 이루어졌으며, 응답된 결과로 해당 Job에 대한 모든 정보를 나타내며(해당 속성만 필드를 지정해서 실행할 수 있다.), 쿼리 실행 정보를 얻기 위한 queryPlan속성은 statistics속성의 하위 속성인 query안에 포함되어 있다.

*속성 관계: statistics > query > queryPlan 

"queryPlan": [

  {

    "name": "Stage 1",

    "id": "1",

    "waitRatioAvg": 1,

    "waitRatioMax": 1,

    "readRatioAvg": 0.8650635652173914,

    "readRatioMax": 0.8650635652173914,

    "computeRatioAvg": 0.47558847826086953,

    "computeRatioMax": 0.47558847826086953,

    "writeRatioAvg": 0.19849230434782608,

    "writeRatioMax": 0.19849230434782608,

    "recordsRead": "164656",

    "recordsWritten": "1",

    "steps": [

      {

        "kind": "READ",

        "substeps": [

          "corpus",

          "FROM publicdata:samples.shakespeare",

          "WHERE EQUAL(corpus, 'hamlet')"

            ]

        },

        {

        "kind": "AGGREGATE",

        "substeps": [

          "COUNT_STAR() AS f0_"

            ]

        },

        {

        "kind": "WRITE",

        "substeps": [

          "f0_",

          "TO __stage1_output"

                 ]

            }

        ]

    },

    {

    "name": "Stage 2",

    "id": "2",

    "waitRatioAvg": 1,

    "waitRatioMax": 1,

    "readRatioAvg": 0,

    "readRatioMax": 0,

    "computeRatioAvg": 0.05080473913043479,

    "computeRatioMax": 0.05080473913043479,

    "writeRatioAvg": 0.12621304347826087,

    "writeRatioMax": 0.12621304347826087,

    "recordsRead": "1",

    "recordsWritten": "1",

    "steps": [

      {

        "kind": "READ",

        "substeps": [

          "f0_",

          "FROM __stage1_output AS publicdata:samples.shakespeare"

            ]

        },

        {

        "kind": "AGGREGATE",

        "substeps": [

          "SUM_OF_COUNTS(f0_) AS f0_"

            ]

        },

        {

        "kind": "WRITE",

        "substeps": [

          "f0_",

          "TO __output"

                ]

            }

        ]

    }

],

앞서 언급한대로 쿼리 실행의 상대적인 시간 비율들의 정확한 수치를 포함해서, 빅쿼리 웹 콘솔(Web UI)에서 확인한 정보와 동일한 쿼리 실행 정보를 확인 할 수 있다.

 

*Job.get()메소드의 Query Plan정보를 확인할 수 있는 queryPlan속성은 기록된 정보의 출력만 가능하다.

 

Query Plan결과 해석 해결 방안

Query Plan의 결과에 대해서 앞서 언급한대로 작업대기, 읽기, CPU 연산, 쓰기의 상대적인 시간 비율을 토대로 해석이 이루어져야 한다. 따라서 Query Plan 정보를 통해 쿼리의 문제를 분석하고 해결하는 방법을 알아보도록 하자.

 

평균 최대 시간 사이에 차이가 발생하는 경우

평균, 최대시간의 차이가 발생하는 이유는 데이터가 고르지 못하게 비대칭 분포된 경우이다. 이러한 비대칭 분포를 가지는 데이터들은 불균형하게 분포 되어 있기 때문에 쿼리 실행 시 오버헤드를 발생시켜 쿼리가 실행되는 속도를 느리게 만들 수 있다.

 

평균, 최대 시간의 차이가 발생하는 일반적인 원인으로 쿼리에서 JOIN, GROUP BY, PARTITION 절에서 사용할 데이터가 NULL, empty, default 값을 포함하고 있는 경우이다. 따라서 이미 데이터가 고르게 분포되어 있지 않기 때문에 쿼리를 실행 하는 과정에서 편차가 발생한다.

 

트러블 슈팅방법으로 TOP COUNT문에 중간결과를 넣어서, 해당 쿼리 데이터에서 가장 일반적인 값을 분산된 데이터의 Key로써 확인한다.

* 빅쿼리는 Key가 없다

 

데이터의 비대칭을 완화하기 위해서는 필터링을 적용해서 NULL, empty, default값과 같은 쿼리에서 불 필요한 데이터를 감소시켜야 한다. 필터링을 하기위한 방법으로는 고르지 못한 값들에 대해서 쿼리를 실행하고, 나머지 값에 대해서 쿼리를 실행한다.

 

추가적인 방법으로 고르지 못한 데이터를 작은 크기로 세분화하는 방법을 고려한다. 이 경우 세분화 한 데이터 그룹들을 재결합할 때 집계 연산자 사용의 주의가 필요하다.

) 비대칭분포를 가지는 불균형 데이터 세분화

세분화 전

SELECT ... GROUP BY a

세분화 후

SELECT ... FROM (SELECT ... (GROUP by a, b) GROUP BY a)

 

중간 스테이지에서 읽기 또는 쓰기에 대부분의 시간이 소요된 경우

이전 스테이지에서 데이터가 예상보다 많이 생성되는 경우로, 이전 스테이지에서 필터링과 JOIN연산을 사용해서 생성되는 데이터의 크기를 줄인다.

 

입력테이블의 읽기에 대부분의 시간이 소요된 경우

입력테이블 읽기가 비용이 큰 경우이다. 따라서 명확한 필터를 사용하여 불 필요한 데이터의 크기를 감소시키는 방법과 테이블에 대한 파티셔닝, 즉 작은 테이블들로 분할을 해서 대상이 되는 테이블 크기를 줄이는 방법을 통해 쿼리의 성능 향상을 기대해 볼 수 있다.

 

스케줄의 대기에 대부분의 시간이 소요된 경우

쿼리의 스케줄에 많은 작업이 포함되어 있는 경우이다. 만약 작업 시간 자체가 중요한 경우가 아니라면 기다린다. 즉시 작업이 완료 되어야하는 등의 작업 시간이 중요한 경우는 빅쿼리의 slots을 추가해서 사용하는 것을 고려해본다. 여기서 말하는 slots는 CPU연산이 이루어지는 서버계층을 의미한다.

 

출력결과를 쓰는데 대부분의 시간이 소요된 경우

처음 입력된 데이터를 읽은 결과 보다 쿼리가 실행되면서 더 많은 데이터를 내보내는 경우로 볼 수 있으며, 출력하기 전에 일부의 데이터의 필터링으로 불 필요한 데이터의 양을 줄여본다.

 

CPU 연산에 대부분의 시간이 소요된 경우

CPU의 연산에 시간이 많이 소요되는 쿼리의 경우, I/O보다 쿼리 실행과정에서 데이터의 변환 및 프로세싱에 더 많은 시간이 소요되는 경우로 볼 수 있다. 보통 사용자 정의 함수(User defined functions) 또는 JSON데이터, 정규표현식, 등을 포함하는 복잡한 쿼리에서 많이 발생하기 때문에, 필터링을 통해 복잡한 쿼리에서 사용되는 데이터의 크기를 줄여야 한다. 또한 자주 사용되는 쿼리라면, 일부 식에 대한 사전 연산등을 고려하는 것도 하나의 방법이다.

 

결론

앞서 빅쿼리의 Query Plan(쿼리 계획)을 통해 쿼리 실행을 분석하는 방법에 대해서 알아보았다. 빅쿼리가 일반적인 데이터베이스 시스템들과 다른 구조를 가지고 있고 실행된 쿼리에 대해서만 Query Plan정보를 제공하기 때문에 제한적인 부분은 있다. 실행 한 쿼리에 대해서 Query Plan의 상대적인 시간비율을 지표로 세부적인 스탭을 고려해서 데이터의 비대칭 분포를 완화시키거나, 불 필요한 데이터의 크기를 줄여서 성능을 향상 시키는 방법이 주요한 쿼리의 최적화 방향으로 보여진다.

 

참고자료

https://cloud.google.com/bigquery/query-plan-explanation

 

https://cloud.google.com/bigquery/docs/reference/v2/jobs/get

 

https://cloud.google.com/bigquery/docs/reference/v2/jobs

 

In-memory query execution in Google BigQuery

 

빅쿼리의 In-memory query 실행

 

최유석

원글 주소 : https://cloud.google.com/blog/big-data/2016/08/in-memory-query-execution-in-google-bigquery

원글 작성자 : Hossein Ahmadi, BigQuery Technical Lead

 

개요

빅쿼리는 대규모의 데이터에 대해서 실시간에 가까운 쿼리실행 속도를 제공한다. 빅쿼리는 높은 성능을 제공하기 위해서 모든 연산이 메모리에서 이루어진다. 이러한 쿼리 실행 속도의 배경에는 빅쿼리의 심플한 아키텍쳐 구조와, 빠른 쿼리 실행이 가능하게 하는 메모리 기반의 연산, 그리고 페타바이트 급의 분석이 가능하게 하는 확장성 있는 데이터의 재분할(repartitioning), 셔플(shuffle) 기능이 있다. 이 글에서는 빅쿼리의 셔플(shuffle)에 대해서 자세히 알아보고, 구글의 페타비트급(petabit-scale) 네트워킹 기술(Jupiter)을 활용해 어떻게 높은 성능으로 메모리상에서 쿼리 실행을 가능하게 하는지 알아보고자 한다.

 

구글의 빅데이터 분석

시작하기에 앞서 구글의 경우, 구글의 수많은 서비스들(YouTube, Gmail. Search,등) 각각의 이용자들만 해도 수억 명에 이른다. 따라서 자연스럽게 구글 내부적으로 빅데이터를 분석하기 위한 기술들이 개발되고 사용되어 왔다. 구글은 오픈소스 플랫폼을 지향하는 모습으로 빅데이터를 포함한 여러 기술들의 논문을 공개해왔다. 몇가지 사례를 보면, Apache Hadoop의 근간이 되는 GFS와 MapReduce, Apache HBase의 근간이 되는 Big Table등이 있으며, BigQuery는 2010년 문서로 공개된 Dremel이라는 기술을 근간으로 한다.

빅쿼리의 성능과 인프라

빅쿼리의 쿼리 실행의 대한 내용을 알아보기에 앞서, 먼저 예제를 통해 빅쿼리의 성능과 인프라에 관련한 부분을 알아보도록 하자.

예제로 보는 빅쿼리의 성능

빅쿼리의 쿼리 실행속도, 즉 성능을 보기 위한 사례로 다음의 예제가 많이 사용된다.

(출처: https://cloud.google.com/blog/big-data/2016/01/anatomy-of-a-bigquery-query )

 

빅쿼리에서 제공하는 공개 데이터 셋에 저장 되어있는 위키피디아 페이지의 제목, 뷰수, 등의 정보가 저장된 테이블에서 1000억(100 billion record)개의 레코드를 스캔하고 제목(title)컬럼에서 정규표현식(regular expression)과 3개의 wildcard를 사용해서 해당하는 문자열("G.*o.*o.*g")을 검색한 결과를 가지고 해당 제목(title)을 가진 페이지의 뷰(views) 수를 카운트하고 각각의 언어별로 그룹으로 묶고 내림차순으로 정렬해서 결과를 보여주는 예제이다.

 

위의 예제에서 쿼리가 실행된 시간은 24.7초가 소요되었으나, 항상 동일한 속도로 실행되지는 않는다. 하지만 직접 실행해서 확인하더라도 동일한 쿼리에 대해서 약 30초 이내에 실행이 되는 것을 확인 할 수 있다.

 

예제로 보는 빅쿼리의 인프라

이제 앞의 예제가 실행되는 동안, 빅쿼리 내부적으로는 사용된 인프라에 대해서 알아보도록 하자. 먼저 위의 쿼리가 실행되는 약 30초 동안 약 4TB의 데이터를 처리하게 된다. 이때 사용되는 빅쿼리 인프라에 대한 대략적인 수치는

이 사용되고 위의 예제의 쿼리를 실행하는데 약 $20 정도의 비용이 발생한다. 위와 같은 쿼리실행 및 인프라 사용을 가능하게 하는 것은 빅쿼리가 방대한 규모의 인프라를 공유하는 멀티 테넌트(multitenancy)서비스이기 때문이다.

빅쿼리의 심플한 사용

일반적인 관계형 데이터베이스(RDBMS) 또는 여러 빅데이터 솔루션을 사용하더라도 위의 예제 분량의 쿼리실행을 하기 위해서는 해당 솔루션에 대한 이해와 설치, 분석에 필요한 코드실행이 필요하고, 또한 제대로 실행하기 위해서 끊임없는 튜닝과 모니터링을 포함한 유지보수가 필요하다. 하지만 빅쿼리의 경우 표준 ANSI SQL과 유사한 SQL(표준 ANSI SQL은 현재 베타로 지원)을 사용하여 쿼리를 입력하고 쿼리 실행버튼인 RUN QUERY만 클릭하면 자동적으로 빅쿼리 내부적으로 위와 비슷하거나, 더 많은 규모의 인프라를 사용하여 실행된 쿼리의 결과를 볼 수 있다.

 

빅쿼리 실행엔진(기본 아키텍쳐)

구글 클라우드 플랫폼(Google Cloud Platform)에서 서비스되고 있는 빅쿼리는 앞서 언급한 Dramel뿐만아니라 구글의 자체적인 기술들인 Borg, Colossus, Jupiter가 융합되어 놀라운 성능을 만들어낸다. 그렇다면 이와 같은 기술들이 어떤 구조로 동작하는지 알아보도록 하자.

 

빅쿼리의 기본 구조

(출처 : https://cloud.google.com/blog/big-data/2016/01/bigquery-under-the-hood )

Dremel

빅쿼리에서 요청이 들어오면 분산 스토리지인 Colossus에서 데이터를 읽고

그 해당 데이터를 페타비트(Petabit)급 네트워크인 Jupiter를 통해 컴퓨팅 처리 계층인 Dremel로 전달한다. 이 때, 데이터를 전달 받은 Dremel에서는 디스크 없이 컴퓨팅 처리가 이루어 진다. Dremel에서는 Leaf Nodes(slots), Mixers, Root server계층으로 나눠지고, Leaf Nodes (slots)에서 필요한 모든 계산을 수행하고, 앞의 예제 기준으로는 Colossus에서 읽은 1000억개의 각각의 레코드에 대한 정규표현식 체크를 하는 것이 이 계층(slots)이다. 그 다음 Leaf Nodes (slots)에서 처리된 데이터에 대해서 Mixer에서 aggregation(집계) 작업을 수행한다. 마지막으로 Mixer에서 집계된 데이터에 대한 정렬 등의 작업을 처리 하고 결과를 리턴 한다.

이렇게 데이터를 처리하는 단계에서 셔플(shuffle)작업이 이루어지며, Jupiter를 통해 빠른 속도로 데이터의 이동이 가능하게 한다. 또한 Dremel, Colossus, Jupiter의 리소스는 Borg를 통해 클러스터로 관리되고 동작한다.

 

 

Colossus

GFS(Google File System)의 후속 버전인 분산 파일 시스템으로 구글 데이터 센터에서 Colossus 클러스터를 가지고 있으며, 한번에 모든 빅쿼리 사용자의 요청에 대해 쿼리 실행 시 수천개의 디스크를 제공할 수 있도록 되어있다. 또한 콜로서스에 저장되는 모든 빅쿼리의 데이터는 자동으로 여러 데이터 센터에 분산 및 복제(replication)되어 저장되기 때문에 높은 안정성을 제공한다. Colossus를 통해 빅쿼리는 In-memory 데이터베이스와 유사한 성능을 보이지만, 저렴한 가격, 높은 병렬성, 확장성, 내구성을 가진 인프라이기 때문에 활용가치가 높다.

빅쿼리는 콜로서스에 구조화된 데이터에 대한 컬럼(Column) 기반 저장형식과 압축 알고리즘을 활용해서 데이터를 저장한다. 이를 통해 빠른 쿼리 실행을 가능하게 하며, 높은 비용을 들이지 않고도 데이터에 대한 저장을 할 수 있다.

Borg

구글의 대규모의 클러스터 관리시스템으로 구글의 서비스들에 사용되는 자원(CPU, RAM, disk, network)을 관리하고 할당하는 역할을 하며, 빅쿼리에서도 쿼리실행,등의 작업요청이 들어오면 미리 예약한 자원을 기반으로 필요한 자원을 할당해서 해당 작업의 실행을 가능하게 한다. 위의 예제처럼 쿼리가 실행 될 때, Dremel클러스터에 수천개의 CPU를 제공하는 것도 Borg의 역할이다. 또한 서버를 포함해서 전원, 네트워크에 이르기까지 수많은 오류에 대한 보호를 하고 관리를 하기 때문에 사용자 입장에서는 발생하는 문제를 개의치 않고 이용 할 수 있다.

 

Jupiter

대규모의 데이터를 원활하게 처리하려면 그에 걸맞는 네트워크도 필수로 필요하다. 구글이 클라우드 플랫폼을 통해 구축하여 제공하는 네트워크(Jupiter)는 양방향 1 Petabit/sec의 대역폭을 제공한다. 10만VM이 각각 10Gb로 통신할 수 있는 속도이다. 이렇듯 상상이상의 대역폭을 자랑하는 Jupiter를 통해 모든 쿼리에 대해서 데이터가 저장된 스토리지(Colossus)에 직접 접근해서 수초만에 테라바이트(terabytes)의 데이터를 읽을 수 있다.

빅쿼리 셔플(Shuffle)의 변화

하둡(Hadoop), 스파크(Spark), 구글 데이터 플로우(Google Cloud Dataflow)에 이르기까지 모든 분산 데이터 처리 시스템에서 셔플은 핵심요소로 작용한다. 데이터 처리 중간의 셔플단계는 크고 복잡한 조인, 집계 및 분석 작업의 실행을 위해 필요하다. 빅쿼리의 셔플은 쿼리 특성과 요구사항의 증가로 인한 셔플 처리 단계의 개선 요구로 인해, 2014년 메모리 기반(디스크 스풀링 지원) 및 구글의 데이터 센터에서 네트워크 기술(Jupiter)로 특별하게 설계되었고 새롭게 개발된 인프라로 이전되었다. 게다가, 특정한 분산 작업을 넘어선 활용사례(예를 들면, 해시조인)와 유연한 데이터 전송 시스템으로 설계되었다. 이 프로젝트는 데이터 전송 기술에 다년간의 연구 및 개발 노력의 산물이다.

빅쿼리 셔플의 차이점

빅쿼리의 셔플은 전용된 호스팅 원격 메모리의 노드 집합에 쿼리 처리의 다양한 단계에서 생산되는 중간 데이터를 셔플링해서 저장한다. 스파크(Spark), 피콜로(Piccolo) 등의 많은 시스템에서 일반적으로 데이터를 처리 할 때 중간 데이터 결과를 지속하여 저장한다. 그러나 빅쿼리는 셔플 작업에서 긴밀한 통합으로 메모리에서 처리된 중간 결과에 대해 다른 방향을 보인다.

 

빅쿼리 셔플의 구성요소

빅쿼리의 셔플 구성은 3가지의 컴포넌트로 구성된다.

빅쿼리 셔플의 구성요소

 

셔플에서 Producer, Consumer, Controller는 다음과 같이 구현되어 있다.

Producer (producer_id) {
void SendRow(row, consumer_id) : Called to send a row to a given consumer
on behalf of this producer.
}
Consumer (consumer_id) {
string ReceiveRow() : Called to receive one row for this consumer.
}
Controller {
StartShuffle() : Called before any producers or consumers start sending or
receiving rows.
EndShuffle() : Called after all producers and consumers have successfully
sent and received all rows.
}

위의 API들은 공유 메모리의 개념을 제공하도록 설계되었기 때문에, 데이타 프로세싱 파이프라인상에서, 데이타를 파티셔닝하는데 범용적으로 사용될 수 있다.

 

빅쿼리 셔플(shuffle)의 개념

빅쿼리 셔플의 기본 동작은 다음의 그림과 같이 설명될 수 있다.

(출처 : https://cloud.google.com/blog/big-data/2016/08/in-memory-query-execution-in-google-bigquery )

 

빅쿼리의 셔플은 많은 Producer들이 효율적으로 원격지의 머신의 메모리에 데이타를 저장할 수 있도록 하며, Consumer역시 높은 처리량으로 동시 읽기가 가능하다. 특히 Producer는 인접한 메모리 블록에 생성된 행(rows)에 로그(Log)하고 색인(index)을 남긴다.

이 색인(index)은 Consumer가 해당 행을 효율적으로 검색하고 읽을 수 있도록 해준다

빅쿼리 셔플의 복합적인 동작

(출처 : https://cloud.google.com/blog/big-data/2016/08/in-memory-query-execution-in-google-bigquery )

 

재분할한 데이타를 메모리에 저장하는 특징이외에도, 빅쿼리 셔플은 또 다른 측면에서 MapReduce스타일의 셔플과 다르다. MapReduce 스타일의 셔플은 모든 행이 재정렬 된 다음에 데이타를 접근할 수 있는데 반하여 빅쿼리에서는 producers에 의해 셔플된 각각 행(rows)은 바로 workers에 의해 접근이 가능하다. 그래서 파이프 라인에서 분산된 작업을 수행 하게 할 수 있게 한다.

빅쿼리의 데이터 분할(partitioning)

데이터의 파티셔닝(partitioning)은 BigQuery의 쿼리의 성능에 상당한 영향을 미친다.

제대로 된 결과를 얻기 위해서는 Consumer와, Provider를 적절한 숫자로 맞추는 것이 중요하다. (빅쿼리가 자동으로 수행)

최적화 되지 않은 데이터의 분할로 쿼리가 매우 느리게 실행되거나, 심지어는 자원의 제약으로 실패할 수 도 있다.

빅쿼리의 파티셔닝은 데이터 크기, 백그라운드의 부하, 기타 요인에 기초하여 쿼리에 사용 된 연산자의 종류에 따라 파티셔닝(분할)을 지능적으로 선택하는 동적 분할 메카니즘을 사용한다. 이로인하여 빅쿼리는 데이타의 특정 분포나, 키에 따른 정렬에 따른 오버헤드 없이 임의의 데이터 셋에 대한 효율적인 쿼리 실행을 할 수 있게 한다.

 

빅쿼리에서 콜로서스를 활용한 이점

빅쿼리는 페타바이트의 데이터에 쿼리를 사용할 수 있다. 대부분의 케이스에서 빅쿼리는 인메모리 셔플링을 통하여 데이타를 재분할 하는데, 메모리만 사용할 경우, 연산 비용이 매우 크기 때문에, 이를 해결하기 위해서 콜로서스 파일 시스템에 경우에 따라 데이타를 메모리에서 부터 이동하여 저장한다.

디스크의 경우 메모리에 비해서 많이 느리기 때문에, 빅쿼리에서는 디스크 억세스를 최소화하는 방법으로 성능 문제를 최소화한다.

 

빅쿼리는 multi-tenant service 이다.

빅쿼리는 사용자가 인프라를 공유해서 사용하는 멀티 테넌트 (multi-tenant) 서비스로 모든 고객이 쿼리를 실행하기 위해 VM의 클러스터의 크기를 조정하고 배포하거나, 리소스(자원)에 대한 프로비저닝이 필요 없다. 그렇게 하기 위해 빅쿼리의 셔플은 메모리에서 대부분의 쿼리를 실행 할 수 있는 지능적인 메모리 자원 관리 시스템을 사용한다. 빅쿼리는 고객으로부터 발생하는 부하의 변동에 따라 즉각적으로 적응한다.

 

결론

모든 빅쿼리의 쿼리는 하나 또는 다수의 셔플 동작을 포함하고, 단일 행 데이터 또는 페타바이트의 쿼리 데이터를 전송하는데 동일한 인프라를 사용한다. 구글 네트워크 기술과 함께 긴밀한 통합(tight intergration)으로 만들어내는 빅쿼리 셔플의 극한의 유연성은 빅쿼리의 사용자가 모든 규모에서 빠른 데이터 분석을 할 수 있게 한다.

 

 

참고자료

https://cloud.google.com/blog/big-data/2016/08/in-memory-query-execution-in-google-bigquery

 

https://cloud.google.com/blog/big-data/2016/01/anatomy-of-a-bigquery-query

 

https://cloud.google.com/blog/big-data/2016/01/bigquery-under-the-hood

 

+ Recent posts