빅쿼리에 Streaming Insert로 데이터를 적재하게되면 Streaming buffer에 먼저 기록되고 실제 테이블에 값이 저장되기까지 최대 90분까지 소요될 수 있다. 따라서 테이블의 preview로는 바로 확인 안 될 수 있으니 간단한 조회 쿼리를 실행해서 확인한다.
좌측 상단의 [COMPOSE QUERY] 버튼으로 쿼리 입력 창을 활성화하고 다음 쿼리를 입력하고 실행한다.
아래그림과같이 VM을생성할때, “Identity and API access” 부분에 “Allow full access to all Cloud APIs”를선택한다. 이를선택해서이 VM이모든구글클라우드 API에대한접근권한 (BigQuery 포함)을가지도록한다.
그리고 “Boot disk”부분에서 Change버튼을클릭하여 OS를 “Ubuntu 14.04 LTS”로변경한다.
또한, 이후 NGINX로웹서버를구성하고웹브라우저를통해 HTTP로접근되는액세스로그를Fluentd를이용해서수집하고해당데이터를시각화할예정이다. 따라서 “Firewall”항목에서 “Allow HTTP traffic” 항목을체크하여HTTP 트래픽에대한방화벽을허용한상태로 VM을생성한다.
Fluentd td-agent 설치하기
앞에서생성한 VM에 Fluentd의로그수집에이전트인 td-agent를설치한다.
td-agent는 OS나, 또는같은 OS라도 OS 버전별로설치방법이다르기때문에,만약다른 OS를설치할것이라면,각각의 OS의버전별설치방법은 http://www.fluentd.org를참고하기바란다.
여기서는 Ubuntu 14.x를기준으로진행한다.
다음명령어를실행하면 td-agent가설치된다.
% curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh
설치가끝난후에이전트를실행해서확인해보자. 다음명령으로 agent를실행한후에,
% sudo /etc/init.d/td-agent restart
실행이끝난후에다음명령으로설치를확인한다.
% sudo /etc/init.d/td-agent status
참고 (td-agent 관련명령어) td-agent 기동 - $sudo /etc/init.d/td-agent start td-agent 정지 - $sudo /etc/init.d/td-agent stop td-agent 재시작 - $sudo /etc/init.d/td-agent restart td-agent 상태확인 - $sudo /etc/init.d/td-agent status
빅쿼리에데이터를저장하기위한구성하기
td-agent설치가끝났으면VM에서 td-agent로빅쿼리에데이터를저장하기위한플러그인을설치하고해당데이터들이저장될빅쿼리의데이터셋과테이블을생성한다. 그리고 nginx설치후발생한브라우저트래픽(HTTP)에대한액세스로그를수집하고빅쿼리에저장하기위한 td-agent설정을해보자.
*로컬컴퓨터에아파치벤치마킹툴이설치되어있지않다면, 다른컴퓨트엔진인스턴스(VM)으로명령을실행해도되며, 테스트용도이기때문에앞에서구성을완료한 VM 즉, 부하를받을 VM에서위의명령어를실행해도무방하다.
이제스프레드시트파일에서 [Dashboard → Run All BQ Queries]로쿼리를실행하면access_log 시트에서좀더많아진쿼리결과를확인할수있을것이다. 또한, BigQuery Results Visualization시트를열면차트의그래프가많아진쿼리결과를확인할수있을것이다.
%%sql SELECT STRFTIME_UTC_USEC(time * 1000000, "%Y-%m-%d %H:%M:%S") as tstamp, count(*) as rps FROM bq_test.access_log GROUP BY tstamp ORDER BY tstamp DESC;
%%sql --module=log_data SELECT STRFTIME_UTC_USEC(time * 1000000, "%Y-%m-%d %H:%M:%S") as tstamp, count(*) as rps FROM bq_test.access_log GROUP BY tstamp ORDER BY tstamp DESC;
일반적으로 대부분의 데이터베이스의 SQL (MS-SQL, MySQL, Oracle, 등)에서 제공하는 Query Explain Plan(쿼리 실행 계획)기능은 사용자가 쿼리를 실행하기전에 쿼리를 분석할 수 있도록 쿼리가 실행되는 각각의 과정에 대해서 예상정보를 제공한다. 이 정보를 통해 해당 되는 쿼리에 대해서 성능에 대한 향상, 실행 오류 방지 등 쿼리를 실행하기 이전에 최적화 할 수 있게 도와준다. 빅쿼리에서도 이와 유사한 기능으로 Query Plan(쿼리 계획)을 제공한다. 이글에서는 빅쿼리의 Query Plan을 통해 쿼리 실행을 분석하는 방법에 대해서 알아보고자 한다.
빅쿼리와다른 SQL엔진의차이점
앞에서 언급한 것처럼 다른 SQL엔진은 쿼리를 실행하기 전에 해당되는 쿼리에 대한 예상 실행정보를 각각의 SQL에서 제공하는 Explain(각각 제공하는 이름은 다르지만 일반적인 의미)문을 적용해서 쿼리를 실행하고 분석해서 쿼리 튜닝을 하게 된다. 하지만 빅쿼리는 다른 SQL엔진과 다른 방식으로 실행이 완료된 쿼리에 대해서 쿼리 실행의 각 단계에 대한 정보를 메타데이터 형태로 제공한다. 또한 일반적인 Explain문이 빅쿼리에 따로 없기 때문에 직접 실행할 수 없으며, 실행이 완료된 쿼리에 대해서 Query Plan정보를 자동적으로 제공해준다.
빅쿼리의 부담 없는 쿼리 실행
빅쿼리는 구글 클라우드 플랫폼의 관리형 서비스로 구글에서 전체적인 시스템에 대한 관리를 하기 때문에 사용자 입장에서는 빅쿼리에 데이터를 저장하고 분석하는 등, 빅쿼리가 제공하는 서비스를 사용하기만 하면 된다. 따라서 일반적인 데이터베이스처럼 잘못된 쿼리로 인한 오류로 인해 데이터베이스에 문제를 일으키는 경우에 대해서 걱정 할 필요가 없다. 단지 쿼리가 실패할 뿐이며, 바로 다시 쿼리를 수정해서 실행할 수 있다. 또한 쿼리를 실행하기전에 쿼리에 대한 Validation (유효성 검사)기능을 제공하기 때문에 잘못된 쿼리의 경우 대부분 여기에서 걸러진다. 추가적으로Validation 을 통해 쿼리오류를 확인하지 못했다고 하더라도 쿼리가 잘못되거나 기타 다른 문제로 쿼리가 실패해서 오류가 발생되는 경우, 실패한 쿼리에 대해서는 비용이 부과되지 않기 때문에 부담이 적다.
빅쿼리에서 쿼리 실행 분석에 대한 접근
빅쿼리의 아키텍처를 보면 일반적인 데이터베이스에서 제공하는 Key, Index가 없으며, 기본적으로 쿼리요청의 대상이 되는 각각의 열에 대해서 Full Scan이다. 따라서 일반적으로 고려하는 Key, Index 기반의 쿼리 튜닝으로 방향이 아닌, 빅쿼리의 Query Plan에서 제공하는 메타데이터를 토대로 쿼리 실행단계에서 소요된 작업대기, 읽기, 쓰기, CPU연산 등에 대한 상대적인 시간 비율을 기준으로 실행 된 쿼리에 대한 분석 및 최적화 방향을 고려해야 한다.
*사실 빅쿼리의 쿼리 최적화는 쿼리실행이 열에 대한 풀스캔이고, 거기에 대한 비용이 발생하는 구조기 때문에 쿼리 비용자체를 감소시키기는 어렵다. 주요한 방향으로 생각해야 할 부분은 쿼리 실행 성능의 향상이다.
빅쿼리의 Query Plan 인터페이스
빅쿼리웹콘솔(Web UI)
빅쿼리 웹 콘솔(Web UI)에서는 쿼리를 실행하고 나서 다음과 같이 Explanation 버튼을 클릭하면 쿼리 실행에 대한 각각의 스테이지로 정보가 간략하게 나타나고 각각의 스테이지에 있는 세모모양의 아이콘을 클릭하면 각각의 스테이지 별로 쿼리 실행의 세부정보가 나타난다.
API
빅쿼리에서 API형태로 제공하는 Query Plan 정보는 실행이 완료된 쿼리 결과에 대해서 자동으로 해당 쿼리 Job의 리소스로 포함되어 7일간 보관된다. 이 기간 안에 Job.get() 메소드를 호출하게 되면 응답 결과에 포함 된 JSON 배열 형식의 queryPlan 속성에서 상세한 쿼리 실행 정보를 얻을 수 있다.
빅쿼리 Query plan의메타데이터
STAGE 메타데이터
API속성
Web UI
설명
Id
Stage x
스테이지의고유한정수 ID (해당 Query Plan 범위한정)
recordsRead
Input
스테이지에서읽은행의수
recordsWritten
Output
스테이지가작성한행의수
상대적인 시간비율에따른메타데이터
API속성
Web UI
설명
waitRatioAvg
작업대기에소요된평균시간
waitRatioMax
작업대기에소요된최대시간
readRatioAvg
입력데이터의읽기에소요된평균시간
readRatioMax
입력데이터의읽기에소요된최대시간
computeRatioAvg
CPU연산에소요된평균시간
computeRatioMax
CPU연산에소요된최대시간
writeRatioAvg
출력데이터의쓰기에소요된평균시간
writeRatioMax
출력데이터의쓰기에소요된최대시간
*빅쿼리웹콘솔에서는 AVG, MAX는따로표시되지않는다. 또한, 정확한수치를확인하기위해서는 API를이용한접근이필요하다.
간단한쿼리를이용한 기본적인메타데이터정보확인하기
먼저 다음의 쿼리를 빅쿼리 웹 콘솔에서 실행하고, Query Plan에 대한 메타데이터 정보를 확인해보자.
SELECT COUNT(*) FROM [publicdata:samples.shakespeare] WHERE corpus = 'hamlet'
쿼리를 실행하고 Explanation 탭을 클릭하면 다음과 같이 Query Plan에 대한 정보를 확인 할 수 있다.
먼저 좌측의 Stage 뒤의 1, 2가 실행 된 해당 쿼리에 대한 고유한 ID이며,우측 Input, Output 아래의 수치가 해당 쿼리에 대한 각각의 스테이지에서 읽거나 기록한 행의 수이다. 중간의 Stage timing 아래에 나타나는 부분이 해당 쿼리가 실행되는 각각의 스테이지에서 [스케줄에 대한 작업대기, 읽기, CPU연산, 쓰기]에 의해서 소요된 시간 비율들을 나타내며, 위의 쿼리의 경우 실행 데이터의 크기가 작기 때문에 각각의 상대적인 시간비율들의 평균값과 최대값이 같게 나타난다.
이번에는 다음과 같이 시간비율의 편차를 보기 위해 쿼리의 대상이 되는 데이터의 크기가 크고 쿼리 연산이 복잡한 경우를 살펴보자.
SELECT language, SUM(views) as views
FROM [bigquery-samples:wikipedia_benchmark.Wiki1B]
WHERE regexp_match(title,"G.*o.*o.*g")
GROUP by language
ORDER by views DESC
위의 결과에서 보듯이 스테이지1에서 데이터의 읽기와 CPU연산에 소요된 시간 비율의 평균과 최대치의 차이가 발생하는 것을 확인할 수 있다.
*사용자가 실행하는 쿼리와 대상이 되는 데이터의 크기, 분포 등에 따라서 위와 같이 쿼리 실행 단계의 각각의 상대적인 시간비율들의 평균, 최대 값의 차이가 발생하게 된다.
STEP(스테이지의세부단계)의메타데이터정보
Query Plan에서 제공하는 각각의 스테이지 내부에서의 확인 가능한 세부적인 메타데이터 정보는 다음과 같다.
STEP(세부 속성)
설명
READ
입력 테이블 또는 중간 결과에서 하나 이상의 열을 읽는다.
WRITE
출력 테이블 또는 중간 결과에 대해 하나 이상의 열을 기록한다.
COMPUTE
대부분의 식에 대한 모든 계산 및 내장된 함수의 호출한다
FILTER
WHERE, OMIT IF, HAVING절을 구현하는 연산자이다.
SORT
정렬 연산을 나타내며 열의 키와 정렬방향을 포함한다.
AGGREGATE
집계연산(예:GROUP BY)을 나타내며, 여러 단계로 분할된다.
LIMIT
LIMIT절을 구현하는 연산자이다.
JOIN
JOIN연산을 나타내며, JOIN의 종류와 JOIN에 사용되는 열의 정보를 포함한다.
ANALYTIC_FUNCTION
분석 기능(CF. window functions)을 호출한다.
USER_DEFINED_FUNCTION
사용자 정의 함수를 호출한다
간단한쿼리를이용한 상세한메타데이터정보확인하기
앞서 메타데이터 정보를 확인하기 위해 처음 실행 했던 쿼리를 이용해서 Query Plan의 각각의 세부적인 단계(STEP)에 대한 정보를 빅쿼리 웹 콘솔(Web UI)과 API를 통해서 각각 확인해보자.
빅쿼리웹콘솔(Web UI) 에서STEP정보 확인하기
앞서 간단하게 메타데이터 정보를 확인하기 위해 처음 실행 했던 쿼리의 Explanation을 통해서 쿼리 실행의 각각의 단계(STEP)에 대한 정보를 확인해보자. 실행 된 쿼리의 Explanation에서 각각의 스테이지 좌측에 나타나는 삼각형모양의 아이콘을 클릭하면 각각의 스테이지에 대한 세부적인 메타데이터 정보를 확인 할 수 있다.
먼저 스테이지 1, 2에서 READ, AGGREGATE, WRITE스탭을 포함하고 있는 것을 볼 수 있다.
스테이지1에서는 READ스탭에서 [publicdata:samples.shakespeare] 테이블의 대상이 되는 "corpus" 열을 읽어서 값이 "hamlet"인 행을 찾는다. 그 다음 AGGREGATE스탭에서 앞서 읽은 행을 카운트하고 WRITE에서 __stage1_output 이라는 식별자에 기록을 한다.
스테이지2에서는 스테이지1에서 __stage1_output에 기록한 정보를 받아서 READ스탭에서 스테이지1의 작업의 카운트한 정보를 읽고 AGGREGATE스탭에서 카운트 된 정보에 대한 합산을 하고 WRITE스탭에서 합산한 결과를 출력한다.
API를이용해서STEP정보확인하기
구글에서 제공하는 APIs Explorer을 사용해서 빅쿼리의 Query Plan정보를 간단하게 API를 호출하고 결과를 확인해보도록 하자.
Job ID 확인
앞서 언급한 것처럼, 빅쿼리 Query Plan정보는 실행된 쿼리에 대한 결과만 확인 할 수 있으며, Query Plan정보를 얻기 위해서는 실행된 쿼리에 대한 정보 중 하나인 Job ID가 필요하다. 빅쿼리 웹 콘솔(Web UI)에서 좌측상단 Query History클릭하고 앞서 실행한 쿼리정보에 대한Job ID의 값을 확인한다.(복사하거나 창을 열어둔다.)
*빅쿼리 웹 콘솔의 Query History에서 Job ID 는 Project ID:Job ID 형식으로 되어있으니, job.get() 메소드를 호출할 때는 각각 나눠서 입력해야 한다.
GoogleAPIs Explorer 페이지 접속
다음으로 APIs Explorer에서 빅쿼리 Job.get()메소드 테스트 페이지인 다음의 주소로 접속한다.
빅쿼리의 Job.get() 메소드를 사용하기 위해서는 빅쿼리에 대한 읽기 또는 전체 관리권한이 필요하다. 따라서 해당되는 권한을 임시로 부여하기 위해 우측의 off로 되어있는 버튼을 클릭하고 해당되는 권한범위를 체크하고(필요한 범위는 기본값으로 체크되어 있다.) Authorize를 클릭해서 실행 권한을 부여한다.
Job.get()메소드를 사용하기 위한 권한 범위URI는 위의 화면에서 체크된 항목과 같다.
*한 가지만 허용해도 Job.get() 메소드를 사용할 수 있다.
Job.get() 메소드실행하기
API 사용에 대한 권한범위를 허용하고 버튼이 ON되었으면 해당 프로젝트 ID와 위에서 확인한 Job ID를 입력하고 Execute버튼을 클릭해서 실행한다.
Job.get() 메소드실행결과확인
아래에 실행된 결과를 보면 Job.get()메소드에 대해서 GET으로 성공적으로 요청이 이루어졌으며, 응답된 결과로 해당 Job에 대한 모든 정보를 나타내며(해당 속성만 필드를 지정해서 실행할 수 있다.), 쿼리 실행 정보를 얻기 위한 queryPlan속성은 statistics속성의 하위 속성인 query안에 포함되어 있다.
*속성 관계: statistics > query > queryPlan
"queryPlan": [
{
"name": "Stage 1",
"id": "1",
"waitRatioAvg": 1,
"waitRatioMax": 1,
"readRatioAvg": 0.8650635652173914,
"readRatioMax": 0.8650635652173914,
"computeRatioAvg": 0.47558847826086953,
"computeRatioMax": 0.47558847826086953,
"writeRatioAvg": 0.19849230434782608,
"writeRatioMax": 0.19849230434782608,
"recordsRead": "164656",
"recordsWritten": "1",
"steps": [
{
"kind": "READ",
"substeps": [
"corpus",
"FROM publicdata:samples.shakespeare",
"WHERE EQUAL(corpus, 'hamlet')"
]
},
{
"kind": "AGGREGATE",
"substeps": [
"COUNT_STAR() AS f0_"
]
},
{
"kind": "WRITE",
"substeps": [
"f0_",
"TO __stage1_output"
]
}
]
},
{
"name": "Stage 2",
"id": "2",
"waitRatioAvg": 1,
"waitRatioMax": 1,
"readRatioAvg": 0,
"readRatioMax": 0,
"computeRatioAvg": 0.05080473913043479,
"computeRatioMax": 0.05080473913043479,
"writeRatioAvg": 0.12621304347826087,
"writeRatioMax": 0.12621304347826087,
"recordsRead": "1",
"recordsWritten": "1",
"steps": [
{
"kind": "READ",
"substeps": [
"f0_",
"FROM __stage1_output AS publicdata:samples.shakespeare"
]
},
{
"kind": "AGGREGATE",
"substeps": [
"SUM_OF_COUNTS(f0_) AS f0_"
]
},
{
"kind": "WRITE",
"substeps": [
"f0_",
"TO __output"
]
}
]
}
],
앞서 언급한대로 쿼리 실행의 상대적인 시간 비율들의 정확한 수치를 포함해서, 빅쿼리 웹 콘솔(Web UI)에서 확인한 정보와 동일한 쿼리 실행 정보를 확인 할 수 있다.
*Job.get()메소드의 Query Plan정보를 확인할 수 있는 queryPlan속성은 기록된 정보의 출력만 가능하다.
Query Plan결과해석및해결방안
Query Plan의 결과에 대해서 앞서 언급한대로 작업대기, 읽기, CPU 연산, 쓰기의 상대적인 시간 비율을 토대로 해석이 이루어져야 한다. 따라서 Query Plan 정보를 통해 쿼리의 문제를 분석하고 해결하는 방법을 알아보도록 하자.
평균및최대시간사이에큰차이가발생하는경우
평균, 최대시간의 차이가 발생하는 이유는 데이터가 고르지 못하게 비대칭 분포된 경우이다. 이러한 비대칭 분포를 가지는 데이터들은 불균형하게 분포 되어 있기 때문에 쿼리 실행 시 오버헤드를 발생시켜 쿼리가 실행되는 속도를 느리게 만들 수 있다.
평균, 최대 시간의 차이가 발생하는 일반적인 원인으로 쿼리에서 JOIN, GROUP BY, PARTITION 절에서 사용할 데이터가 NULL, empty, default 값을 포함하고 있는 경우이다. 따라서 이미 데이터가 고르게 분포되어 있지 않기 때문에 쿼리를 실행 하는 과정에서 편차가 발생한다.
트러블 슈팅방법으로 TOP COUNT문에 중간결과를 넣어서, 해당 쿼리 데이터에서 가장 일반적인 값을 분산된 데이터의 Key로써 확인한다.
* 빅쿼리는 Key가 없다
데이터의 비대칭을 완화하기 위해서는 필터링을 적용해서 NULL, empty, default값과 같은 쿼리에서 불 필요한 데이터를 감소시켜야 한다. 필터링을 하기위한 방법으로는 고르지 못한 값들에 대해서 쿼리를 실행하고, 나머지 값에 대해서 쿼리를 실행한다.
추가적인 방법으로 고르지 못한 데이터를 작은 크기로 세분화하는 방법을 고려한다. 이 경우 세분화 한 데이터 그룹들을 재결합할 때 집계 연산자 사용의 주의가 필요하다.
예) 비대칭분포를 가지는 불균형 데이터세분화
세분화 전
SELECT ... GROUP BY a
세분화 후
SELECT ... FROM (SELECT ... (GROUP by a, b) GROUP BY a)
중간스테이지에서읽기또는쓰기에대부분의시간이소요된경우
이전 스테이지에서 데이터가 예상보다 많이 생성되는 경우로, 이전 스테이지에서 필터링과 JOIN연산을 사용해서 생성되는 데이터의 크기를 줄인다.
입력테이블의읽기에대부분의시간이소요된경우
입력테이블 읽기가 비용이 큰 경우이다. 따라서 명확한 필터를 사용하여 불 필요한 데이터의 크기를 감소시키는 방법과 테이블에 대한 파티셔닝, 즉 작은 테이블들로 분할을 해서 대상이 되는 테이블 크기를 줄이는 방법을 통해 쿼리의 성능 향상을 기대해 볼 수 있다.
스케줄의대기에대부분의시간이소요된경우
쿼리의 스케줄에 많은 작업이 포함되어 있는 경우이다. 만약 작업 시간 자체가 중요한 경우가 아니라면 기다린다. 즉시 작업이 완료 되어야하는 등의 작업 시간이 중요한 경우는 빅쿼리의 slots을 추가해서 사용하는 것을 고려해본다. 여기서 말하는 slots는 CPU연산이 이루어지는 서버계층을 의미한다.
출력결과를쓰는데대부분의시간이소요된경우
처음 입력된 데이터를 읽은 결과 보다 쿼리가 실행되면서 더 많은 데이터를 내보내는 경우로 볼 수 있으며, 출력하기 전에 일부의 데이터의 필터링으로 불 필요한 데이터의 양을 줄여본다.
CPU 연산에대부분의시간이소요된경우
CPU의 연산에 시간이 많이 소요되는 쿼리의 경우, I/O보다 쿼리 실행과정에서 데이터의 변환 및 프로세싱에 더 많은 시간이 소요되는 경우로 볼 수 있다. 보통 사용자 정의 함수(User defined functions) 또는 JSON데이터, 정규표현식, 등을 포함하는 복잡한 쿼리에서 많이 발생하기 때문에, 필터링을 통해 복잡한 쿼리에서 사용되는 데이터의 크기를 줄여야 한다. 또한 자주 사용되는 쿼리라면, 일부 식에 대한 사전 연산등을 고려하는 것도 하나의 방법이다.
결론
앞서 빅쿼리의 Query Plan(쿼리 계획)을 통해 쿼리 실행을 분석하는 방법에 대해서 알아보았다. 빅쿼리가 일반적인 데이터베이스 시스템들과 다른 구조를 가지고 있고 실행된 쿼리에 대해서만 Query Plan정보를 제공하기 때문에 제한적인 부분은 있다. 실행 한 쿼리에 대해서 Query Plan의 상대적인 시간비율을 지표로 세부적인 스탭을 고려해서 데이터의 비대칭 분포를 완화시키거나, 불 필요한 데이터의 크기를 줄여서 성능을 향상 시키는 방법이 주요한 쿼리의 최적화 방향으로 보여진다.
Producer (producer_id) { void SendRow(row, consumer_id) : Called to send a row to a given consumer on behalf of this producer. } Consumer (consumer_id) { string ReceiveRow() : Called to receive one row for this consumer. } Controller { StartShuffle() : Called before any producers or consumers start sending or receiving rows. EndShuffle() : Called after all producers and consumers have successfully sent and received all rows. }
구글 빅쿼리에서는 비동기 처리방식인 Job을 통해서 데이터를 로딩하는 방법 외에도 실시간으로 데이터를 로딩 할 수 있는 방법으로 스트리밍(Streaming) API를 제공한다. 이 글에서는 구글 빅쿼리에서 제공하는 스트리밍 API를 이용해서 실시간으로 데이터를 로딩하는 방법을 알아보도록 하자.
스트리밍(Streaming) API
비동기 연산인 Job을 통한 데이터 로딩방식이 아닌 API형태로 tabledata().insertAll() 메소드를 제공하여 한번에 하나씩레코드를 삽입하는 방법을 제공한다. 하나의 레코드에는 하나 또는 다수의 행을 포함할 수 있다.
*레코드는 빅쿼리가 지원하는 데이터 타입으로 하나 이상, 다수의 데이터 필드에 대한 일종의 집합이다.
*스트리밍 API를 이용한 데이터 로딩은 partition table에 적용되지 않는다.
스트리밍 로딩의 할당량 정책(quota policy)
스트리밍 API를 사용한 데이터로딩은 실시간으로 처리되는 부분으로 Job의 데이터로딩(batch) 방법에 비해서 처리 가능한 데이터의 크기와 행의 수가 제한적이다.
최대 행 사이즈 : 1MB
HTTP 요청 사이즈 제한 : 10MB
초당 최대 입력 가능 행 수 : 테이블 당 100,000 행, 프로젝트 당 1,000,000 행
요청당 최대 입력 가능 행 수 : 제한은 없으나, 최대 500 행을 권장
초당 최대 바이트 크기 : 100MB
*스트리밍 데이터 로딩 시에 위의 정책을 초과하는 경우에 에러가 발생한다. 따라서 그러한 경우는 Job을 통한 데이터 로딩을 권장한다.
데이터 일관성 보장
데이터에 대한 일관성을 보장할 수 있는 옵션으로 각각의 입력 행에 대해서 insertId를 적용할 수 있다. insertId는 행 입력 시 최소1분 정도 빅쿼리가 기억하고 있게 된다.
*이미 입력된 행에 대해서 insertId를 통해 중복된 데이터는 추가로 입력되지 않으며, 만약 변경된 데이터(행)를 입력하면 해당 insertId를 가진 데이터(행)는 변경된 데이터(행)로 입력된다.
템플릿 테이블
templateSuffix 속성을 통해서 지정한 대상 테이블을 기본 템플릿으로 하여 같은 스키마를 가진 새로운 테이블을 자동으로 생성할 수 있다.
templateSuffix의 값은 사용자가 임의로 설정할 수 있다.
<대상 테이블> + <templateSuffix> 형태로 새로운 테이블이 생성된다.
Tabledata: insertAll정의
Tabledata는 API형태로 제공되는 Resource type으로 스트리밍 데이터 로딩을 지원하는 insertAll 메소드를 포함하고 있다.
HTTP 요청(request) 기본형식
POST https://www.googleapis.com/bigquery/v2/projects/projectId/datasets/datasetId/tables/tableId/insertAll
POST요청으로 각각 해당되는 project, dataset, table ID를 설정하면 된다.
*단순스트리밍 로딩을 위해서는 따로 HTTP(S)트래픽을 허용하지 않아도 위의 다른 GCP서비스에 대한 접근 범위 설정을 통해 Streaming API에 사용에 대한 권한을 가지게 된다. 때문에 무리없이 실행은 가능하다. 하지만 실제 어플리케이션 구성에서는 구성환경에 맞춰서 HTTP, HTTP(S)에 대한 트래픽을 허용해기 위해 방화벽 규칙을 설정해야 될 것이다.
VM인스턴스 접근 범위 확인하기
Access scopes에 대한 적합한 설정이 되어있어야 한다는 것만 명심하자.
또한 Access scopes는 VM을 생성 할 때만 적용 가능하다.
VM 신규 생성 시 주의사항
Identity and API access에서 적합한 Access scopes를 설정하고 생성해야 한다.
BigQuery API 활성화
스트리밍 데이터 로딩을 실행하기 위해서는 BigQuery API를 활성화 해야 한다. 다음의 BigQuery API페이지로 이동한다.
구글 빅쿼리는 npm으로 node.js Library를 제공한다. 다음의 명령어를 실행해서 Bigquery API Client Library를 설치한다.
% npm install googleapis --save
스트리밍 데이터 로딩하기
이제 스트리밍 데이터 로딩을 위한 준비는 완료되었다. Node.js와 스트리밍 API를 사용해서 데이터 로딩을 해보도록 한다.
Streaming API 기본 예제 - Node.js
vi등의 편집기를 이용해서 app.js파일을 열고 다음과 같은 내용을 추가한다.
var google = require('googleapis'); var bigquery = google.bigquery('v2');
google.auth.getApplicationDefault(function(err, authClient) { if (err) { console.log('Authentication failed because of ', err); return; } if (authClient.createScopedRequired && authClient.createScopedRequired()) { var scopes = ['https://www.googleapis.com/auth/cloud-platform']; authClient = authClient.createScoped(scopes); }
var request = { // TODO: Change placeholders below to appropriate parameter values for
the 'insertAll' method:
// Project ID of the destination table. projectId: "", // Dataset ID of the destination table. datasetId: "", // Table ID of the destination table. tableId: "", resource: {}, // Auth client auth: authClient };
resource: {} 에서 중괄호 안에 위에서 설명한 기본 Request body의 형식에 맞게 실제 입력할 데이터의 내용을 입력한다.
bigquery.tabledata.insertAll() {}부분에서 데이터 로딩이 실행되는 부분으로, 만약 에러가 있다면 에러에 대한 내용을 반환한다.
스트리밍 로딩 테스트
이제 예제 코드에 대한 대략적인 구조를 파악하였다면 스트리밍 API를 사용해서 실제 테이블에 데이터를 입력해보자.
테이블 정보
테스트에 사용할 테이블은 앞서 구글 스토리지를 활용한 데이터 로딩에서 생성한 csv_test 테이블을 사용하도록 한다. 테이블 csv_test는 다음과 같은 스키마를 가지고 있다.
Node.js 코드 작성하기
projectId: “사용자의 프로젝트ID”
datastId: “데이터를 입력할 테이블을 가지고 있는 데이터셋ID”
tableId: “실제로 데이터를 로딩 할 테이블Id”
resource: { 입력 할 데이터 }
위에서 설명한 request body의 형식으로 대상 테이블의 스키마에 대응되는 적합한 값을 입력해야 한다. 입력을 완료하고 파일을 저장한다.
데이터 로딩 실행
이제 앞서 작성한 내용의 데이터를 로딩하기 위해 다음 명령어를 실행해서 Node.js서버를 구동한다.
%npm start
해당 스트리밍 로딩 요청에 대한 tableDataInsertAllResponse로 응답이 완료됨을 확인할 수 있다. 응답이 완료된 것을 확인하고 ctrl+c를 입력하여 실행을 중지한다.
데이터 로딩 결과 확인하기
이제 빅쿼리 웹 콘솔(Web UI)로 이동해서 간단한 쿼리 실행을 통해 데이터가 정상적으로 입력되었는지 확인해보자.
쿼리 실행하기
좌측 상단의 COMPOSE QUERY 버튼을 클릭한다.
다음과 같이 쿼리를 입력하고 RUN QUERY 버튼을 클릭하여 쿼리를 실행한다.
SELECT word, word_count, corpus, corpus_date
FROM load_test.csv_test
WHERE word_count=1111
쿼리가 실행되었다. 앞서 입력한 데이터를 확인 할 수 있다.
템플릿 테이블을 이용한 테이블 자동생성
이제 앞서 설명한 templateSuffix속성을 활용해 csv_test 테이블을 템플릿으로 하여 같은 스키마를 가진 새로운 테이블을 생성해보자.
Node.js 코드 작성하기
vi편집기등으로 app.js파일을 열고 다음의 내용을 추가하고 파일을 저장한다.
templateSuffix: "String형의 임의의 값"
자동 테이블 생성 및 데이터 로딩 실행
템플릿 테이블을 활용해서 자동으로 새로운 테이블 생성 및 데이터로딩을 위해 node.js 서버를 실행한다.
%npm start
응답이 완료되었다. ctrl+c를 입력하여 실행을 중지한다.
자동 테이블 생성 및 데이터 로딩 결과 확인하기
이제 다시 빅쿼리 웹 콘솔로 이동해서 테이블이 자동으로 생성되고 입력한 데이터가 제대로 로딩되었는지 확인한다. (새로고침이 필요할 수 있다.)
먼저 자동 생성된 테이블 부터 확인한다. 앞서 설명한 것처럼 <대상 테이블> + <templateSuffix>의 형태로 csv_test1234라는 새로운 테이블이 생성되었다. 템플릿의 대상이 되는 테이블인 csv_test와 동일한 스키마를 가진다.
데이터 로딩 결과 확인하기 - 자동생성 테이블
이제 자동으로 생성된 테이블을 확인하였다면 해당 테이블에 스트리밍으로 입력한 데이터를 확인해보도록한다.
쿼리 실행하기
COMPOSE QURERY버튼을 클릭해서 다음과 같은 쿼리를 입력하고 RUN QUERY 버튼을 클릭하여 쿼리를 실행한다.
SELECT word, word_count, corpus, corpus_date
FROM load_test.csv_test1234
WHERE word_count=1111
쿼리가 실행되었다. 앞서 입력한 데이터를 확인 할 수 있다.
*날짜 단위 또는 각각의 사용자 단위 등으로 테이블을 분할하여 생성할 필요가 있을 때,
templateSuffix의 값에 따라서 자동으로 테이블을 생성하여 데이터를 입력할 수 있기 때문에 매우 유용한 기능이다.
자동 테이블 생성 참고사항
templateSuffix속성을 활용해서 자동으로 테이블 생성하고 데이터를 입력한 경우에 빅쿼리
웹콘솔에서 바로 Preview를 클릭하면 스트리밍 버퍼에서 데이터 로딩을 통해 입력한 데이터를
가지고 있어서 입력한 값이 바로 나타나지 않을 수 있다.
*스트리밍 데이터 입력에 수초가 소요될 수 있으며, 입력된 데이터를 복사하고,
내보내기에 사용 될 수 있기까지 90분까지 소요 될 수 있다
데이터 일관성 확인
앞서 insertId를 통해 데이터의 일관성을 보장해주는 부분에 설명하였다. 이제 실제로 insertId를 사용해 데이터가 어떻게 변화하는지 확인해본다.
Node.js 코드 작성하기
vi등의 편집기를 이용해서 app.js파일을 열고 다음의 내용을 추가하고, 입력할 데이터의 내용을 수정한다.
insertId: "String형의 임의의 값"
입력 데이터의 경우 앞서 테스트한 결과와 구분을 위해 다른 값으로 변경한다.
데이터 로딩 실행 - insertId
insertId가 적용된 데이터로딩을 하기 위해서 node.js서버를 실행한다.
%npm start
응답이 완료되었으면 빅쿼리 웹 콘솔로 이동해서 입력된 데이터를 확인한다.
데이터 로딩 결과 확인하기 - insertId
빅쿼리 웹 콘솔에서 COMPOSE QUERY를 클릭하고 다음의 쿼리를 입력한다.
SELECT word, word_count, corpus, corpus_date
FROM load_test.csv_test1234
WHERE word_count=2222
RUN QUERY를 클릭하여 쿼리를 실행하고 입력된 데이터를 확인한다.
쿼리 수행결과를 통해, 정상적으로 입력된 데이터를 확인 할 수 있다.
데이터 중복 입력 확인하기
다시 SSH터미널에서 ctrl+c를 입력하여 서버 실행을 중지하고 app.js의 내용을 변경하지 않고, 다시 한번 node.js서버를 실행한다.
데이터 중복 확인 - 쿼리 테스트
빅쿼리 웹 콘솔로 이동해서 위와 동일한 쿼리를 실행해서 결과를 확인한다.
쿼리를 실행하면 앞서 입력한 데이터(행)만 나타난다. 같은 insertId를 가지고 있기 때문에 중복된 데이터가 추가로 입력되지 않는다.
*insertId가 없는 경우, 스트리밍 로딩을 할 때 데이터의 중복에 관계없이 새로운 데이터(행)가 추가된다. 각각 테스트해보길 권장한다.
데이터 변경하기 - Node.js 코드 수정하기
SSH 터미널에서 node.js서버를 중지하고 vi등의 편집기로 app.js파일을 열고 데이터의 내용을 변경하고 저장한다.
데이터 변경 후 로딩 실행 - insertId
앞서 같은 insertId를 가진 상태에서 변경한 데이터를 로딩하기 위해 다음 명령어를 실행해서 node.js서버를 구동한다.
%npm start
응답을 확인하고 웹 콘솔로 이동해서 입력된 데이터를 확인해보자.
데이터 변경 입력 후, 로딩 결과 확인하기
기존에 데이터가 어떻게 변화하였는 지 확인하기 위해 빅쿼리 웹 콘솔에서 앞서 실행한 쿼리를 입력하고 결과를 확인한다.
SELECT word, word_count, corpus, corpus_date
FROM load_test.csv_test1234
WHERE word_count=2222
앞서 위의 쿼리 실행을 통해 데이터가 로딩된 결과를 확인했었다. 하지만 해당 쿼리가
실행되면 조회되는 데이터가 없음을 확인 할 수 있다
이번에는변경한데이터의 word_count값인 3333으로 WHERE 조건을변경하고쿼리를실행해보자
SELECT word, word_count, corpus, corpus_date
FROM load_test.csv_test1234
WHERE word_count=3333
변경해서 입력한 데이터에 대한 조회결과가 나타난다. 이와 같이 같은 행에 대한 insertId가 적용된 상태에서 데이터가 변경되어 다시 입력되는 경우, 기존의 입력된 데이터는 변경된다.
*insertId의 경우 데이터의 일관성을 확인하고 보장하기 위해 좋은 수단이 될 수 있으나, 앞서 언급한 것처럼 1분정도만 보장되는 시간적인 제약이 있으니 참고하기 바란다.
결론
빅쿼리의 스트리밍 데이터 로딩의 경우, 몇가지 제약적인 부분은 있지만, 실시간으로 대량의 이벤트 로그 분석, 실시간으로 데이터와 연동하는 dashboard등에 활용하기 좋다. 또한 빅쿼리의 대용량의 데이터 분석 뿐만 아니라 실시간의 데이터 로딩, 분석을 융합적으로 활용하면, 급변하는 시장에서 빠르게 대응 할 수 있는 또 하나의 대안이나 수단으로 이용할 수 있을 것이다.
구글의 대용량 데이터 분석 서비스인 빅쿼리에 구글 클라우드 스토리지를 활용하여 CSV, JSON형식의 데이터를 로드하고 테이블을 생성하는 방법에 대해서 알아보도록 한다.
또한,빅쿼리에서는 데이터 파일의 단일 업로드뿐만 아니라 병렬 업로드(다중파일 동시 업로드)도 제공한다.따라서, 이번 글의 뒤쪽에서는 단일CSV파일로 데이터로딩을 해보고, 그 CSV파일을 여러 개의 파일로 분할하여 병렬로 데이터를 로드해서 각각의 처리 속도를 확인해본다.
그리고 CSV 병렬 업로드를 위해 분할한 데이터를 이용하여 JSON, Avro포맷으로 변환하여 각각의 형식에 따라 동일데이터라도 데이터 로딩시간이 어떻게 달라지는지 확인해보도록 하자
구글 클라우드 스토리지(Google Cloud Storage : GCS)
구글 클라우드 스토리지는 구글 클라우드 플랫폼에서 지원하는 BLOB(Binary large object) Store로 구글 클라우드 플랫폼의 모든 서비스들과 연계하여 사용이 가능하다.구글 클라우드 플랫폼에서 제공하는 파일,미디어 등의 저장에 특화된 서비스이다.
GCS 스토리지 클래스
Standard:
높은 data availability(99.9%)와low latency을 가지고 있어서 데이터에 대한 빠른 응답속도(ms)를 보인다.따라서 데이터에 대한 빠른 접근,빈번한 접근이 필요한 경우에 사용하기 적합하다.활용하기 적합한 예로는 웹사이트 컨텐츠,모바일 또는 게임 어플리케이션, 등이 있다.
Durable Reduced Availability (DRA) :
Standard에 비해 약간 낮은 가격과 data availability(99%)를 가지고 있다.
데이터 백업,배치(batch) 작업에 사용하기 적합한 클래스이다.
Nearline : data archiving 이나 online backup, 재해복구(disaster recovery)용도로 사용하기 적합한 클래스로 가장 저렴한 비용으로 이용할 수 있다. 데이터 접근에 시간단위가 소요되는AWS의 glacier서비스등에 비해서 매우 빠른 속도로 초 단위(대략3초)로 다른 클라우드 업체들의 유사서비스들에 비해서 매우 높은 성능을 가진다.
*구글 클라우드 스토리지 클래스 모두 동일한 API를 사용하고 AWS의 S3와 API호환되서 API사용에 대한 부담이 적다.
GCS 기본 구성요소
프로젝트(Project) :
최상위 컨테이너로 구글 클라우드 스토리지의 모든 자원(resources)은 프로젝트 위에서 생성되고 관리된다. 또한 스토리지 자원들에 대하여 권한제어가 가능하다.
버켓(Bucket) :
데이터 또는 파일을 저장하고 관리하기 위한 컨테이너이다.구글 클라우드 스토리지에서의 최소단위인 Object(각각의 데이터 또는 파일단위)가 저장되는 공간이며, 버켓 생성 시에 앞서 설명한 클래스와 데이터가 저장될 위치를 지정하여야 하고 생성 후에는 변경이 불가능하다.새로 생성하거나 다른 버켓으로 이동시키는 등의 방법을 사용하여야 한다.
오브젝트(Object) :
실제로 구글 클라우드 스토리지에 저장되는 각각의 데이터 또는 파일을 의미한다. 하나의 오브젝트는 최대 5TB까지 저장할 수 있다.
빅쿼리 기본구조
*빅쿼리에 대한 개념, 아키텍쳐 등의 기본적인 이해가 필요하다면 아래 주소의 정보를 참고하기 바란다.
JSON포맷의 데이터를 로드하고 테이블을 생성하기 위해 내용을 입력하고 필요한 옵션을 설정하여 테이블을 생성한다. .
Source Data
Location : gs://load-bigquery/shakespeare.json
JSON 파일도 마찬가지로 구글 클라우드 스토리지의 JSON파일 URI를 입력한다
File format : JSON(Newline Delimited)
Destination Table
Table name : json_test(임의의 값)
Schema
Edit as Text에 다음과 같이 텍스트로 JSON형태의 스키마를 입력한다.
[
{
"name": "word",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "word_count",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "corpus",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "corpus_date",
"type": "INTEGER",
"mode": "REQUIRED"
}
]
*Add Field로 각각 생성해도 상관없다.각각의 환경에 따라서 사용하기 편한 방식으로 이용하도록 하자.
다음과 같이 스키마 정보가 정상적으로 입력된 것을 확인 할 수 있다.
이제 나머지 값은 기본값으로 하여 Create Table 버튼을 클릭하여 테이블을 생성해보자.
JSON포맷의 데이터로 생성한 테이블이 생성되었다.
테이블 테스트 – JSON
COMPOSE QUERY를 클릭하여 CSV로 입력한 데이터와 같은 데이터를 가지고 있기 때문에, 앞에서 CSV를 테스트한 쿼리를 이용하여 테스트해보자.
SELECT word, word_count, corpus, corpus_date
FROM load_test.json_test
ORDER BY word_count DESC LIMIT 10
RUN QUERY버튼을 클릭하여 쿼리를 실행해보자
테스트로 입력한 쿼리가 정상적으로 실행되었다.
*구글 클라우드 스토리지를 이용하여 데이터 교환의 표준 포맷이라고 할 수 있는 CSV, JSON형식의 데이터를 이용해 테이블을 생성하고 데이터를 로드하였다. 위의 예제에서는 데이터를 로딩하는 방법에 대한 설명하기 위해 작은 용량의 데이터를 가지고 진행하였다.하지만 빅쿼리는 기본적으로 대용량의 데이터를 실시간에 가까운 속도로 처리하고 분석하기 위해 설계되었고,GB,TB급의 데이터라도 매우 빠른 속도로 로딩이 가능하다. 따라서 위의 예제의 데이터보다는 비교적 큰 데이터를 이용해 앞서 언급한 단일, 병렬, 포맷에 따른 데이터 로딩시간을 확인해보도록 한다
업로드 속도 비교하기
시작하기전에 먼저 각각의 방식에서 속도(성능)를 확인하는 것이 주요 목적으로
각각의 업로드 방법에 대한상세한 설명은 생략한다.
기본 준비사항
원본 데이터
아래는 원본 테이블의 정보이다. 2014년의 항공편에 대한 데이터를 가지고 있으며 용량은 1GB가 약간 넘으며, 6,303,310개의 행(Rows)으로 이루어져 있다.
bq 명령어 도구
앞서 언급한 Command-line interface로 제공되는 도구이다. 파이썬 기반의 도구로 Google Cloud SDK에 포함되어 있으며, 기타 SSH를 사용하는 경우 따로 설치도 가능하다. 이후 진행 할 단일/병렬 업로드에 bq도구를 활용하도록 하겠다.
VM인스턴스 준비
테스트를 위해 Google Compute Engine의 VM인스턴스를 하나 생성하거나 기존의 VM인스턴스를 사용한다.
테스트에 사용한 VM은
machine type : n1-standard-1 (1 vCPU, 3.75 GB memory)
마지막으로 JSON과 동일한 방법으로 CSV병렬 업로드에 사용한 6개의 파일을 Avro포맷으로 변환하여 병렬 업로드를 하여 속도를 확인해보자. 변환과정은 생략한다.
데이터 준비
CSV -> Avro으로 변환한 파일을 구글 클라우드 스토리지에 저장한다.
같은 데이터를 가지고 있지만 CSV, JSON포맷과 비교했을 때데이터 파일의 크기가 작다.
다음 명령어를 통해 Avro포맷의 병렬 업로드를 실행한다.
bq load --source_format=AVRO \
bigquery-1369:load_test.flight_avro_Multi \
gs://load-bigquery/avro/flight*.avro
--source_format=AVRO : Avro포맷으로 지정한다
Avro의 경우 데이터파일에서 스키마의 정보를 포함하고 있기에 스키마를 따로 지정하지 않아도 된다.
테이블이 생성 되었다
웹 콘솔의 job History를 통해 데이터 로딩에 소요된 시간을 확인해보자.
Avro포맷의병렬 데이터 로딩에 40초가 소요된 걸 확인할 수 있다.
(여러 번 테스트 해본 결과 35 ~ 45초 정도 소요됨)
*여러 번 테스트 한 결과 CSV 병렬 업로드 보다 빠르고 JSON보다는 느리다.
데이터 로딩 결과(시간)
CSV(단일)
CSV(다중)
JSON(다중)
Avro(다중)
업로드 시간
40초 ~ 60초
36초 ~ 50초
20초 ~ 30초
35 ~ 45초
결론
빅쿼리가 지원하는 데이터 포맷(CSV,JSON, Avro)을 이용해동일한 데이터에 대해서단일 파일 데이터 로딩을 포함하여, 단일 데이터파일을 분할하여 여러 개로 나눠서 병렬(다중파일 동시 업로드)데이터 로딩을 테스트하였다.다음과 같은 결과를 확인할 수 있었다.
로딩 속도는 CSV > Avro > JSON
데이터의 크기(비용적인 측면)는 Avro < CSV < JSON
물론 데이터의 크기가 더 커진다면 결과가 달라 질 수도 있겠지만, 위에서 테스트한 정보를 토대로 이야기하면 1GB의 용량에 6백만 행을 가진 데이터가 작지 않은 데이터라고 볼 수 있으나, 현재와 같이 빅데이터가 일반화 되어가는 시기에 수십,수백GB에서 TB까지(또는 그 이상)의데이터의 로딩의 여러가지 측면(비용,속도)을 생각하면, Avro포맷을 고려해 보는 것도 좋은 선택이 될 수 있을 것이다.