빅쿼리 스트리밍 데이터 로딩하기

 

최유석

 

개요

구글 빅쿼리에서는 비동기 처리방식인 Job을 통해서 데이터를 로딩하는 방법 외에도 실시간으로 데이터를 로딩 할 수 있는 방법으로 스트리밍(Streaming) API를 제공한다. 이 글에서는 구글 빅쿼리에서 제공하는 스트리밍 API를 이용해서 실시간으로 데이터를 로딩하는 방법을 알아보도록 하자.

 

스트리밍(Streaming) API

비동기 연산인 Job을 통한 데이터 로딩방식이 아닌 API형태로 tabledata().insertAll() 메소드를 제공하여 한번에 하나씩 레코드를 삽입하는 방법을 제공한다. 하나의 레코드에는 하나 또는 다수의 행을 포함할 수 있다. 

*레코드는 빅쿼리가 지원하는 데이터 타입으로 하나 이상, 다수의 데이터 필드에 대한 일종의 집합이다.

*스트리밍 API를 이용한 데이터 로딩은 partition table에 적용되지 않는다.

 

스트리밍 로딩의 할당량 정책(quota policy)

스트리밍 API를 사용한 데이터로딩은 실시간으로 처리되는 부분으로 Job의 데이터로딩(batch) 방법에 비해서 처리 가능한 데이터의 크기와 행의 수가 제한적이다.

  • 최대 행 사이즈 : 1MB
  • HTTP 요청 사이즈 제한 : 10MB
  • 초당 최대 입력 가능 행 수 : 테이블 당 100,000 행, 프로젝트 당 1,000,000 행
  • 요청당 최대 입력 가능 행 수 : 제한은 없으나, 최대 500 행을 권장
  • 초당 최대 바이트 크기 : 100MB

 

*스트리밍 데이터 로딩 시에 위의 정책을 초과하는 경우에 에러가 발생한다. 따라서 그러한 경우는 Job을 통한 데이터 로딩을 권장한다.


데이터 일관성 보장

데이터에 대한 일관성을 보장할 수 있는 옵션으로 각각의 입력 행에 대해서 insertId를 적용할 수 있다. insertId는 행 입력 시 최소1분 정도 빅쿼리가 기억하고 있게 된다.

*이미 입력된 행에 대해서 insertId를 통해 중복된 데이터는 추가로 입력되지 않으며, 만약 변경된 데이터(행)를 입력하면 해당 insertId를 가진 데이터(행)는 변경된 데이터(행)로 입력된다.

 

템플릿 테이블

templateSuffix 속성을 통해서 지정한 대상 테이블을 기본 템플릿으로 하여 같은 스키마를 가진 새로운 테이블을 자동으로 생성할 수 있다. 

templateSuffix의 값은 사용자가 임의로 설정할 수 있다.

<대상 테이블> + <templateSuffix> 형태로 새로운 테이블이 생성된다.

 

Tabledata: insertAll정의

Tabledata는 API형태로 제공되는 Resource type으로 스트리밍 데이터 로딩을 지원하는 insertAll 메소드를 포함하고 있다.

 

HTTP 요청(request) 기본형식

POST https://www.googleapis.com/bigquery/v2/projects/projectId/datasets/datasetId/tables/tableId/insertAll

POST요청으로 각각 해당되는 project, dataset, table ID를 설정하면 된다.

 

insertAll 권한 범위

스트리밍 요청을 할 때 VM인스턴스가 위의 각각의 권한 범위를 나타내는 URI중에서 

적어도 하나의 범위의 권한을 가지고 있어야 한다.

Google API 접근 범위에 대한 참고 https://developers.google.com/identity/protocols/googlescopes

 


Request body 구조

{
  "kind": "bigquery#tableDataInsertAllRequest",
  "skipInvalidRows": boolean,
  "ignoreUnknownValues": boolean,
  "templateSuffix": string,
  "rows": [
    {
      "insertId": string,
      "json": {
        (key): (value)
      }
    }
  ]
}

필수(Required) 속성

  • kind : 응답의 자원 유형 - 사용하는 프로그래밍 언어에 따라서 직접적으로 사용이 필요한 경우가 있다.
  • rows[] : 삽입할 데이터(행)들을 포함한다.
  • rows[].json :
  • json형태로 데이터를 입력 가능하다. key(스키마): value(입력 값)의 구조를 가지며, 대상 테이블의 스키마 구조에 대응되는 형태로 스키마와 값을 입력하여야 한다. 하나이상의 행에 대한 입력이 가능하다.

 

선택적(Optional) 속성
  • rows[].insertId : 앞서 설명한 데이터의 일관성을 보장하기 위한 옵션이다.
  • templateSuffix : 앞서 설명한 것처럼, String 타입의 값으로 사용자가 임의의 값을 설정하여 데이터 로딩을 실행하면 <대상 테이블> + <templateSuffix>형태로 새로운 테이블이 자동생성되고 해당 요청에 대한 데이터가 로딩된다.
  • skipInvalidRows : 적합하지 않은 행이 존재하더라도 모든 유효한 행의 데이터를 삽입한다. boolean형으로 기본값은 false이다.
  • ignoreUnknownValues: 테이블의 스키마와 일치하지 않는 유형의 값은 무시된다. boolean형으로 기본값은 false이다.

 

Response body구조

  • kind : 응답 자원 유형을 나타낸다. - 사용하는 프로그래밍 언어에 따라서 직접적으로 사용이 필요한 경우가 있다.
  • insertErrors[] : 실패한 요청의 경우 해당 오류에 대한 내용을 json배열형태로 반환한다.


※앞서 스트리밍 데이터 로딩을 위한 기초적인 내용을 설명하였으며, Node.js를 활용하여 실제로 스트리밍으로 데이터를 로딩하여 보도록 하자.


VM환경 준비하기

프로젝트 선택

기존의 프로젝트를 선택하거나,

 

또는 새로 생성하고 진행한다.

  

VM준비(VM 생성 or 기존의 VM사용)

스트리밍 API를 사용해서 데이터로딩을 하기위해서 리눅스 기반의 VM인스턴스를 준비한다. 

주의할 점으로 해당 VM은 앞서 설명한, tabledata().insertAll() 메소드를 접근해서 사용 할 수 있는 범위의 권한을 가지고 있어야 한다.

 

이글에서는 기존의 VM을 사용했으며, 테스트에 사용한 VM의 정보는 다음과 같다. 

  • Name : test
  • Boot disk : Ubuntu 14.04 LTS / SSD 10GB

*리눅스 기반의 각자가 사용하기 편리한 다른 OS이미지를 포함하여, disk의 용량이나 타입도 각자의 환경에 맞춰서 적용해도 상관없다.

  • Access scopes : Allow full access to all Cloud APIs

편의를 위해 구글 클라우드의 모든 서비스에 Access 가능한 범위를 지정하였다.

*앞서 언급한 tabledata().insertAll() 메소드를 사용할 수 있는 권한 범위를 적용하여야 한다.

  • 나머지는 기본값으로 설정


선택사항

  • Firewalls : Allow HTTP traffic, Allow HTTPS traffic 체크

이 글에서는 HTTP, HTTP(S)의 방화벽 규칙을 설정하고 진행한다.


*단순 스트리밍 로딩을 위해서는 따로 HTTP(S)트래픽을 허용하지 않아도 위의 다른 GCP서비스에 대한 접근 범위 설정을 통해 Streaming API에 사용에 대한 권한을 가지게 된다. 때문에 무리없이 실행은 가능하다. 하지만 실제 어플리케이션 구성에서는 구성환경에 맞춰서 HTTP, HTTP(S)에 대한 트래픽을 허용해기 위해 방화벽 규칙을 설정해야 될 것이다.

   

VM인스턴스 접근 범위 확인하기

Access scopes에 대한 적합한 설정이 되어있어야 한다는 것만 명심하자.

또한 Access scopes는 VM을 생성 할 때만 적용 가능하다.

  

VM 신규 생성 시 주의사항

Identity and API access에서 적합한 Access scopes를 설정하고 생성해야 한다.


BigQuery API 활성화

스트리밍 데이터 로딩을 실행하기 위해서는 BigQuery API를 활성화 해야 한다. 다음의 BigQuery API페이지로 이동한다.

https://console.developers.google.com/apis/api/bigquery

 

상단의 ENABLE를 클릭하여 BigQuery API를 활성화 한다.

만약, 이미 BigQuery API가 활성화 되어 있다면 다음 단계로 바로 이동한다.

 

Google Cloud SDK설치

기본적으로 구글 클라우드 SDK(command line interface) 가 설치되어 있다는 가정하에 진행한다. 

만약 설치가 안되어 있다면 다음의 주소를 참고하여 설치하도록 한다.

https://cloud.google.com/sdk

 

Auth 인증

다음으로 Default Credentials정보를 인증하기 위해 SSH터미널에 접속한다.

auth인증을 위해 다음 명령어를 실행한다.

%gcloud auth login

만약 위의 명령어로 진행이 되지 않는다면 아래 명령어를 입력하여 진행한다.

% gcloud auth application-default login 

위의 명령어를 입력하고 y를 입력하고 아래에 나타난 링크를 복사하여 브라우저에서 해당 링크로 접속한다.

해당 계정을 선택하면 다음과 같은 화면이 나타나고 허용을 클릭한다.

 

다음과 같이 인증코드가 생성된다.

 

 

인증코드를 복사하여 앞서 코드를 검증하는 부분에 붙여 넣고 엔터를 입력하면

Default Credentials 인증이 성공한 것을 확인할 수 있다.

 

Node.js 환경 준비하기

이번 예제에서는 기본적으로 node.js와 npm이 설치되어 있다고 가정하고 진행한다. 

이 글은 node.js 4.4.7버전과 npm 2.15.8버전을 기준으로 작성하였다.

또한 root계정으로 진행한다. 편의를 위해 express프로젝트로 진행한다.

 

Express generator 설치

Express generator설치를 통해서 기본적인 Express 프로젝트의 구조로 생성하기 위해 다음의 명령어를 실행한다.

% npm install express-generator -g

 

Express 프로젝트 생성

Express 프로젝트를 생성하기 위해 다음의 명령어를 실행한다.

% express --session --ejs --css stylus test

다음 명령어를 실행하여 프로젝트 생성을 마무리한다.

% cd test && npm install

 

Express 프로젝트 및 프레임워크에 대해서는 http://bcho.tistory.com/887

http://bcho.tistory.com/888 글을 참고하기 바란다.

 

Bigquery API Client Library 설치

구글 빅쿼리는 npm으로 node.js Library를 제공한다. 다음의 명령어를 실행해서 Bigquery API Client Library를 설치한다.

% npm install googleapis  --save

 

 

스트리밍 데이터 로딩하기

이제 스트리밍 데이터 로딩을 위한 준비는 완료되었다. Node.js와 스트리밍 API를 사용해서 데이터 로딩을 해보도록 한다.

 

Streaming API 기본 예제 - Node.js

vi등의 편집기를 이용해서 app.js파일을 열고 다음과 같은 내용을 추가한다.

var google = require('googleapis');
var bigquery = google.bigquery('v2');

google.auth.getApplicationDefault(function(err, authClient) {
  if (err) {
    console.log('Authentication failed because of ', err);
    return;
  }
  if (authClient.createScopedRequired && authClient.createScopedRequired()) {
    var scopes = ['https://www.googleapis.com/auth/cloud-platform'];
    authClient = authClient.createScoped(scopes);
  }

  var request = {
    // TODO: Change placeholders below to appropriate parameter values for

the 'insertAll' method:

    // Project ID of the destination table.
    projectId: "",
    // Dataset ID of the destination table.
    datasetId: "",
    // Table ID of the destination table.
    tableId: "",
    resource: {},
    // Auth client
    auth: authClient
  };

  bigquery.tabledata.insertAll(request, function(err, result) {
    if (err) {
      console.log(err);
    } else {
      console.log(result);
    }
  });
});


Node 예제 구성요소 안내

google.auth.getApplicationDefault() {}

앞서 Auth 인증한 Default Credentials 정보를 확인한다. 

var scopes = ['https://www.googleapis.com/auth/cloud-platform'];
위에
설명한 것처럼, 스트리밍 로딩을 위한 VM생성 시에 가지고 있는 접근 범위 에 따라서 적합한 범위의 URI가 입력되어야 한다.

https://www.googleapis.com/auth/cloud-platform 은 구글 클라우드 플랫폼의 모든 서비스에 대한 Full Access 범위를 나타난다.

var request 내부 내용을 살펴보면 기본적으로 확인 할 내용은 다음과 같다.

projectId: "",

""안에 해당하는 프로젝트 ID,

datasetId: "",

""안에 해당하는 데이터셋의 ID,

tableId: "",

""안에 해당하는 대상 테이블의 ID 

resource: {} 에서 중괄호 안에 위에서 설명한 기본 Request body의 형식에 맞게 실제 입력할 데이터의 내용을 입력한다.

bigquery.tabledata.insertAll() {}부분에서 데이터 로딩이 실행되는 부분으로, 만약 에러가 있다면 에러에 대한 내용을 반환한다.

 

스트리밍 로딩 테스트

이제 예제 코드에 대한 대략적인 구조를 파악하였다면 스트리밍 API를 사용해서 실제 테이블에 데이터를 입력해보자.

 

테이블 정보

테스트에 사용할 테이블은 앞서 구글 스토리지를 활용한 데이터 로딩에서 생성한 csv_test 테이블을 사용하도록 한다. 테이블 csv_test는 다음과 같은 스키마를 가지고 있다.

 

Node.js 코드 작성하기

  • projectId: “사용자의 프로젝트ID”
  • datastId: “데이터를 입력할 테이블을 가지고 있는 데이터셋ID”
  • tableId: “실제로 데이터를 로딩 할 테이블Id”
  • resource: { 입력 할 데이터 }

위에서 설명한 request body의 형식으로 대상 테이블의 스키마에 대응되는 적합한 값을 입력해야 한다. 입력을 완료하고 파일을 저장한다.

 

데이터 로딩 실행 

이제 앞서 작성한 내용의 데이터를 로딩하기 위해 다음 명령어를 실행해서 Node.js서버를 구동한다.

%npm start

해당 스트리밍 로딩 요청에 대한 tableDataInsertAllResponse로 응답이 완료됨을 확인할 수 있다. 응답이 완료된 것을 확인하고 ctrl+c를 입력하여 실행을 중지한다.

 

데이터 로딩 결과 확인하기

이제 빅쿼리 웹 콘솔(Web UI)로 이동해서 간단한 쿼리 실행을 통해 데이터가 정상적으로 입력되었는지 확인해보자.

  

쿼리 실행하기

좌측 상단의 COMPOSE QUERY 버튼을 클릭한다.

 

다음과 같이 쿼리를 입력하고 RUN QUERY 버튼을 클릭하여 쿼리를 실행한다.

SELECT word, word_count, corpus, corpus_date

FROM load_test.csv_test

WHERE word_count=1111

 

쿼리가 실행되었다. 앞서 입력한 데이터를 확인 할 수 있다.

 

템플릿 테이블을 이용한 테이블 자동생성

이제 앞서 설명한 templateSuffix속성을 활용해 csv_test 테이블을 템플릿으로 하여 같은 스키마를 가진 새로운 테이블을 생성해보자.

 

Node.js 코드 작성하기

vi편집기등으로 app.js파일을 열고 다음의 내용을 추가하고 파일을 저장한다.

  • templateSuffix: "String형의 임의의 값"

 

자동 테이블 생성 및 데이터 로딩 실행

템플릿 테이블을 활용해서 자동으로 새로운 테이블 생성 및 데이터로딩을 위해 node.js 서버를 실행한다.

%npm start

응답이 완료되었다. ctrl+c를 입력하여 실행을 중지한다.

 

자동 테이블 생성 및 데이터 로딩 결과 확인하기

이제 다시 빅쿼리 웹 콘솔로 이동해서 테이블이 자동으로 생성되고 입력한 데이터가 제대로 로딩되었는지 확인한다. (새로고침이 필요할 수 있다.)

 

먼저 자동 생성된 테이블 부터 확인한다. 앞서 설명한 것처럼 <대상 테이블> + <templateSuffix>의 형태로 csv_test1234라는 새로운 테이블이 생성되었다. 템플릿의 대상이 되는 테이블인 csv_test와 동일한 스키마를 가진다.

 

데이터 로딩 결과 확인하기 - 자동생성 테이블

이제 자동으로 생성된 테이블을 확인하였다면 해당 테이블에 스트리밍으로 입력한 데이터를 확인해보도록한다.

 

쿼리 실행하기

COMPOSE QURERY버튼을 클릭해서 다음과 같은 쿼리를 입력하고 RUN QUERY 버튼을 클릭하여 쿼리를 실행한다.

SELECT word, word_count, corpus, corpus_date

FROM load_test.csv_test1234

WHERE word_count=1111


쿼리가 실행되었다. 앞서 입력한 데이터를 확인 할 수 있다.

*날짜 단위 또는 각각의 사용자 단위 등으로 테이블을 분할하여 생성할 필요가 있을 때, 

templateSuffix의 값에 따라서 자동으로 테이블을 생성하여 데이터를 입력할 수 있기 때문에 매우 유용한 기능이다.

 

자동 테이블 생성 참고사항

templateSuffix속성을 활용해서 자동으로 테이블 생성하고 데이터를 입력한 경우에 빅쿼리 

웹콘솔에서 바로 Preview를 클릭하면 스트리밍 버퍼에서 데이터 로딩을 통해 입력한 데이터를 

가지고 있어서 입력한 값이 바로 나타나지 않을 수 있다.

*스트리밍 데이터 입력에 수초가 소요될 수 있으며, 입력된 데이터를 복사하고, 

내보내기에 사용 될 수 있기까지 90분까지 소요 될 수 있다

 

 

데이터 일관성 확인

앞서 insertId를 통해 데이터의 일관성을 보장해주는 부분에 설명하였다. 이제 실제로 insertId를 사용해 데이터가 어떻게 변화하는지 확인해본다.

 

Node.js 코드 작성하기

vi등의 편집기를 이용해서 app.js파일을 열고 다음의 내용을 추가하고, 입력할 데이터의 내용을 수정한다.

  • insertId: "String형의 임의의 값"

입력 데이터의 경우 앞서 테스트한 결과와 구분을 위해 다른 값으로 변경한다.

   

데이터 로딩 실행 - insertId

insertId가 적용된 데이터로딩을 하기 위해서 node.js서버를 실행한다.

%npm start

응답이 완료되었으면 빅쿼리 웹 콘솔로 이동해서 입력된 데이터를 확인한다.

 

데이터 로딩 결과 확인하기 - insertId

빅쿼리 웹 콘솔에서 COMPOSE QUERY를 클릭하고 다음의 쿼리를 입력한다.

SELECT word, word_count, corpus, corpus_date

FROM load_test.csv_test1234

WHERE word_count=2222

RUN QUERY를 클릭하여 쿼리를 실행하고 입력된 데이터를 확인한다.

쿼리 수행결과를 통해, 정상적으로 입력된 데이터를 확인 할 수 있다.

 

데이터 중복 입력 확인하기

다시 SSH터미널에서 ctrl+c를 입력하여 서버 실행을 중지하고 app.js의 내용을 변경하지 않고, 다시 한번 node.js서버를 실행한다.

 

데이터 중복 확인 - 쿼리 테스트

빅쿼리 웹 콘솔로 이동해서 위와 동일한 쿼리를 실행해서 결과를 확인한다.

 

쿼리를 실행하면 앞서 입력한 데이터(행)만 나타난다. 같은 insertId를 가지고 있기 때문에 중복된 데이터가 추가로 입력되지 않는다.

 

*insertId가 없는 경우, 스트리밍 로딩을 할 때 데이터의 중복에 관계없이 새로운 데이터(행)가 추가된다. 각각 테스트해보길 권장한다.

   

데이터 변경하기 - Node.js 코드 수정하기

SSH 터미널에서 node.js서버를 중지하고 vi등의 편집기로 app.js파일을 열고 데이터의 내용을 변경하고 저장한다.

 

데이터 변경 후 로딩 실행 - insertId

앞서 같은 insertId를 가진 상태에서 변경한 데이터를 로딩하기 위해 다음 명령어를 실행해서 node.js서버를 구동한다.

%npm start

응답을 확인하고 웹 콘솔로 이동해서 입력된 데이터를 확인해보자.

 

데이터 변경 입력 후, 로딩 결과 확인하기

기존에 데이터가 어떻게 변화하였는 지 확인하기 위해 빅쿼리 웹 콘솔에서 앞서 실행한 쿼리를 입력하고 결과를 확인한다.

SELECT word, word_count, corpus, corpus_date

FROM load_test.csv_test1234

WHERE word_count=2222 

앞서 위의 쿼리 실행을 통해 데이터가 로딩된 결과를 확인했었다. 하지만 해당 쿼리가 

실행되면 조회되는 데이터가 없음을 확인 할 수 있다

 

이번에는 변경한 데이터의 word_count값인 3333으로 WHERE 조건을 변경하고 쿼리를 실행해보자

SELECT word, word_count, corpus, corpus_date

FROM load_test.csv_test1234

WHERE word_count=3333

변경해서 입력한 데이터에 대한 조회결과가 나타난다. 이와 같이 같은 행에 대한 insertId가 적용된 상태에서 데이터가 변경되어 다시 입력되는 경우, 기존의 입력된 데이터는 변경된다.

 

*insertId의 경우 데이터의 일관성을 확인하고 보장하기 위해 좋은 수단이 될 수 있으나, 앞서 언급한 것처럼 1분정도만 보장되는 시간적인 제약이 있으니 참고하기 바란다.

 

결론

빅쿼리의 스트리밍 데이터 로딩의 경우, 몇가지 제약적인 부분은 있지만, 실시간으로 대량의 이벤트 로그 분석, 실시간으로 데이터와 연동하는 dashboard등에 활용하기 좋다. 또한 빅쿼리의 대용량의 데이터 분석 뿐만 아니라 실시간의 데이터 로딩, 분석을 융합적으로 활용하면, 급변하는 시장에서 빠르게 대응 할 수 있는 또 하나의 대안이나 수단으로 이용할 수 있을 것이다.

 

참고자료

https://cloud.google.com/bigquery/streaming-data-into-bigquery

https://cloud.google.com/bigquery/docs/reference/v2/tabledata/insertAll

https://cloud.google.com/bigquery/quota-policy#streaminginserts

+ Recent posts