もがき系プログラマの日常

もがき系エンジニアの勉強したこと、日常のこと、気になっている技術、備忘録などを紹介するブログです。

digdag + embulk で bigQueryを差分更新

はじめに

こんばんは。

前回の記事から embulkで遊んでるのですが、 bigQueryにデータを同期させる際に、 更新されたデータのみ 同期させたい場合はどうすればいいのだろう?と色々探していると、参考サイトをみつけました。

さっそくやってみます。

参考サイト

本題

前提

データは前回のデータを使います。

> select * from users;
+----+-------+---------------------+---------------------+
| id | name  | created             | modified            |
+----+-------+---------------------+---------------------+
|  1 | test  | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 |
|  2 | test2 | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 |
|  3 | test3 | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 |
|  4 | test4 | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 |
|  5 | test5 | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 |
+----+-------+---------------------+---------------------+
5 rows in set (0.00 sec)

bigQuery側もこんな感じで同期しています。日付だけUTCでずれちゃってますが...

f:id:kojirooooocks:20201212040649p:plain

1. digdagのインストール

インストール

$ brew install digdag
$ digdag --version
0.9.42

プロジェクト作成

$ digdag init example_bigquery
$ digdag init example_bigquery
2020-12-11 01:09:00 +0900: Digdag v0.9.42
  Creating example_bigquery/example_bigquery.dig
  Creating example_bigquery/.gitignore
Done. Type `cd example_bigquery` and then `digdag run example_bigquery.dig` to run the workflow. Enjoy!


# ディレクトリが掘られちゃうので、一旦上に持っていきます。
$ mv example_bigquery/example_bigquery.dig ./
$ mv example_bigquery/.gitignore ./
$ rm -rf example_bigquery

digファイル作成

+uploads:
  sh>: embulk run users.yml.liquid
 
+merge_task:
  sh>: bq query --batch --replace=true --use_legacy_sql=false --project_id=XXXXXXX --location=asia-northeast1 --destination_table='dataset_example1.users' "$(cat ./merge.sql)"

2. Google Cloud SDK のインストール

こちらからDownload後、インストールする

3. embulkファイルの更新

{% include 'env' %}
in:
  type: mysql
  host: {{ in_host }}
  user: {{ in_user }}
  password: {{ in_password }}
  database: {{ in_database }}
  table: users
  where: "modified > DATE_SUB(CURDATE(), INTERVAL 1 DAY)"  ← ここを追加
out:
  type: bigquery
  mode: replace
  table: tmp_users  ← ここをusersからtmp_usersに変更
  auth_method: json_key
  json_keyfile:  {{ out_json }}
  project: {{ out_project }}
  dataset: {{ out_dataset }}
  location: asia-northeast1
  compression: GZIP
  auto_create_dataset: true
  auto_create_table: true

4. merge用SQLファイル作成

参考サイトをコピーさせてもらいました。

SELECT * EXCEPT(rn) FROM (
    SELECT *, row_number() over (PARTITION BY id ORDER BY modified DESC) AS rn
    FROM (SELECT * FROM dataset_example1.tmp_users UNION ALL SELECT * FROM dataset_example1.users)
) WHERE rn = 1

5. 差分データを作成する

embulkの inwhere に引っかかるように、 modifiedが昨日よりあとの日にち になるようにデータを更新します。

mysql> UPDATE users SET name = 'AAAAAAAAAAAA', modified = '2020-12-12 00:00:00' WHERE id = 3;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> select * from users;
+----+--------------+---------------------+---------------------+
| id | name         | created             | modified            |
+----+--------------+---------------------+---------------------+
|  1 | test         | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 |
|  2 | test2        | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 |
|  3 | AAAAAAAAAAAA | 2020-12-01 00:00:00 | 2020-12-12 00:00:00 |
|  4 | test4        | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 |
|  5 | test5        | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 |
+----+--------------+---------------------+---------------------+
5 rows in set (0.00 sec)

id: 3の nameを変更しました。

6. 実行する

$ digdag run example_bigquery.dig
...
+----+--------------+---------------------+---------------------+
| id |     name     |       created       |      modified       |
+----+--------------+---------------------+---------------------+
|  1 | test         | 2020-11-30 15:00:00 | 2020-11-30 15:00:00 |
|  5 | test5        | 2020-11-30 15:00:00 | 2020-11-30 15:00:00 |
|  4 | test4        | 2020-11-30 15:00:00 | 2020-11-30 15:00:00 |
|  2 | test2        | 2020-11-30 15:00:00 | 2020-11-30 15:00:00 |
|  3 | AAAAAAAAAAAA | 2020-11-30 15:00:00 | 2020-12-11 15:00:00 |
+----+--------------+---------------------+---------------------+
...
Success. Task state.

いったみたいです。

bigQueryを確認します。

f:id:kojirooooocks:20201212040724p:plain

id: 3の内容が更新されていました!

全体の流れは、参考にさせていただいたメルカリさんのブログで詳しく解説されています。

(1) BigQuery上に対象テーブルのitems と 一時テーブルの tmp_items があります

(2) UNION ALL で全行の和集合を構成します。

(3) row_number() over (PARTITION BY id ORDER BY updated DESC) によって itemsのid毎に更新日の降順でグループ分けします。(右下が拡大図です)

(4) row_number = rd が 1 のものだけをSELECTします。その際に作業用カラムrnをEXCEPT(rn)によって除去します

終わりに

参考サイトのおかげで、結構簡単に差分更新を実現することが出来ました。

一点悩んだこととしては、 今回 bigQueryにクエリを発行する際 shellscriptで実行したのですが、 digdagの bq> でも本来は実行できるようでした。

ただ、僕の場合、 bq>だと何回やっても not foundエラーが出てしまっていたので、仕方なく shellscriptで実行しました。

たぶん、locationの指定が出来てないので not foundとかになってるのかな?とか思ってるのですが、 bq> を使う場合の location設定がわからず断念しました...

どなたか知っていたら教えて下さい。

現場からは以上です。