はじめに
こんばんは。
前回の記事から embulkで遊んでるのですが、 bigQueryにデータを同期させる際に、 更新されたデータのみ
同期させたい場合はどうすればいいのだろう?と色々探していると、参考サイトをみつけました。
さっそくやってみます。
参考サイト
- https://engineering.mercari.com/blog/entry/2018-06-28-100000/
- https://blog.adachin.me/archives/20577#google_vignette
本題
前提
データは前回のデータを使います。
> select * from users; +----+-------+---------------------+---------------------+ | id | name | created | modified | +----+-------+---------------------+---------------------+ | 1 | test | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 | | 2 | test2 | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 | | 3 | test3 | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 | | 4 | test4 | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 | | 5 | test5 | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 | +----+-------+---------------------+---------------------+ 5 rows in set (0.00 sec)
bigQuery側もこんな感じで同期しています。日付だけUTCでずれちゃってますが...
1. digdagのインストール
インストール
$ brew install digdag $ digdag --version 0.9.42
プロジェクト作成
$ digdag init example_bigquery $ digdag init example_bigquery 2020-12-11 01:09:00 +0900: Digdag v0.9.42 Creating example_bigquery/example_bigquery.dig Creating example_bigquery/.gitignore Done. Type `cd example_bigquery` and then `digdag run example_bigquery.dig` to run the workflow. Enjoy! # ディレクトリが掘られちゃうので、一旦上に持っていきます。 $ mv example_bigquery/example_bigquery.dig ./ $ mv example_bigquery/.gitignore ./ $ rm -rf example_bigquery
digファイル作成
+uploads: sh>: embulk run users.yml.liquid +merge_task: sh>: bq query --batch --replace=true --use_legacy_sql=false --project_id=XXXXXXX --location=asia-northeast1 --destination_table='dataset_example1.users' "$(cat ./merge.sql)"
2. Google Cloud SDK のインストール
こちらからDownload後、インストールする
3. embulkファイルの更新
{% include 'env' %} in: type: mysql host: {{ in_host }} user: {{ in_user }} password: {{ in_password }} database: {{ in_database }} table: users where: "modified > DATE_SUB(CURDATE(), INTERVAL 1 DAY)" ← ここを追加 out: type: bigquery mode: replace table: tmp_users ← ここをusersからtmp_usersに変更 auth_method: json_key json_keyfile: {{ out_json }} project: {{ out_project }} dataset: {{ out_dataset }} location: asia-northeast1 compression: GZIP auto_create_dataset: true auto_create_table: true
4. merge用SQLファイル作成
参考サイトをコピーさせてもらいました。
SELECT * EXCEPT(rn) FROM ( SELECT *, row_number() over (PARTITION BY id ORDER BY modified DESC) AS rn FROM (SELECT * FROM dataset_example1.tmp_users UNION ALL SELECT * FROM dataset_example1.users) ) WHERE rn = 1
5. 差分データを作成する
embulkの in
の where
に引っかかるように、 modifiedが昨日よりあとの日にち
になるようにデータを更新します。
mysql> UPDATE users SET name = 'AAAAAAAAAAAA', modified = '2020-12-12 00:00:00' WHERE id = 3; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> select * from users; +----+--------------+---------------------+---------------------+ | id | name | created | modified | +----+--------------+---------------------+---------------------+ | 1 | test | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 | | 2 | test2 | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 | | 3 | AAAAAAAAAAAA | 2020-12-01 00:00:00 | 2020-12-12 00:00:00 | | 4 | test4 | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 | | 5 | test5 | 2020-12-01 00:00:00 | 2020-12-01 00:00:00 | +----+--------------+---------------------+---------------------+ 5 rows in set (0.00 sec)
id: 3の nameを変更しました。
6. 実行する
$ digdag run example_bigquery.dig ... +----+--------------+---------------------+---------------------+ | id | name | created | modified | +----+--------------+---------------------+---------------------+ | 1 | test | 2020-11-30 15:00:00 | 2020-11-30 15:00:00 | | 5 | test5 | 2020-11-30 15:00:00 | 2020-11-30 15:00:00 | | 4 | test4 | 2020-11-30 15:00:00 | 2020-11-30 15:00:00 | | 2 | test2 | 2020-11-30 15:00:00 | 2020-11-30 15:00:00 | | 3 | AAAAAAAAAAAA | 2020-11-30 15:00:00 | 2020-12-11 15:00:00 | +----+--------------+---------------------+---------------------+ ... Success. Task state.
いったみたいです。
bigQueryを確認します。
id: 3の内容が更新されていました!
全体の流れは、参考にさせていただいたメルカリさんのブログで詳しく解説されています。
(1) BigQuery上に対象テーブルのitems と 一時テーブルの tmp_items があります
(2) UNION ALL で全行の和集合を構成します。
(3) row_number() over (PARTITION BY id ORDER BY updated DESC) によって itemsのid毎に更新日の降順でグループ分けします。(右下が拡大図です)
(4) row_number = rd が 1 のものだけをSELECTします。その際に作業用カラムrnをEXCEPT(rn)によって除去します
終わりに
参考サイトのおかげで、結構簡単に差分更新を実現することが出来ました。
一点悩んだこととしては、 今回 bigQueryにクエリを発行する際 shellscriptで実行したのですが、 digdagの bq>
でも本来は実行できるようでした。
ただ、僕の場合、 bq>
だと何回やっても not foundエラーが出てしまっていたので、仕方なく shellscriptで実行しました。
たぶん、locationの指定が出来てないので not foundとかになってるのかな?とか思ってるのですが、 bq>
を使う場合の location設定がわからず断念しました...
どなたか知っていたら教えて下さい。
現場からは以上です。