mongoのchange stream apiを使って差分をbigqueryに取り込むスクリプトです。 常に起動せずに、時間ごとに起動して差分を取り込むためのものです。
- bigqueryでリアルタイムに差分を更新するような方法は高くつく
- エラーが出た時の処理が面倒
などが理由です。
動作の流れは下記です。
- bigqueryにある最新のtimestamp(time,increment)を取得する
- mongoのchange streamをtimeとincrementの地点から取得する
- 内容はfulldocumentに記載されているもの
- timestampとしてbsonに入っているtimeとincrementも含める
- _idは$oidを抜く
- insertはinsert用の一時ファイルに
- update or upsert
- input modeがmergeならupdate用の一時ファイル
- input modeがappendならinsert用の一時ファイル
- deleteならdelete用のidのみ一時ファイルに
- もしDELETEだけが実行された上に、最新のtime,incrementに対しての削除行われたなら、timeとincrementだけ最新のものを入れる。latest viewにはこれは表示されない。
- もし、DELETE後に同じ_idがINSERTされていたら、削除対象から外してinput modeに応じて insert一時ファイル or update一時ファイルに書き込む
- 内容はfulldocumentに記載されているもの
- timeとincrementの最新断面が見れるlatest viewがなければ作成する
- bigqueryに対してINSERT or MERGE or DELETEする
pip3 install git+ssh://git@github.com/wacul/inhouse-data/mongo-changestream-to-bigquery/
# or
pip3 install git+https://github.com/wacul/inhouse-data/mongo-changestream-to-bigquery/
下記を実行するだけです。
mongo-changestream-to-bigquery --config config.yml
認証はgcpのSDKに任せるので、キーを使う場合は環境変数 GOOGLE_APPLICATION_CREDENTIALS=
などを使ってください。
replicaを使ってsnapshot時点のtimestampを使って初期の時間合わせをします。
- replicationを止めたりsnapshotなどからmongoを起動する
- oplogの最新のtimestampを下記コマンドで取得しておく
use local
db.oplog.rs.find().sort({$natural: -1}).limit(1)
- mongoのデータをexport
mongoexport -d database -c collection -o collection.json
- bigqueryにアップロード
2で取得したtimeとincrementを入れてexportしたデータをinsertします。 テーブルはここで管理することを想定してないので、作成されていることを前提にしてます。
mongo-changestream-to-bigquery --config config.yml mongoexport-insert -e collection.json -t timestamp -i increment
liquidが使えるので {{ env.MONGO_URI }}
のような指定ができます。パスワードなどsecretな情報はこれで隠してください。
項目のアップデートであっても必ずinsertをします。 最新の断面はtimeとincrementが最新であるviewを作って参照することになります。 過去の変更履歴を見ることができます。
項目のアップデートがあった場合にはbigqueryのMERGEクエリを使って項目をアップデートします。 過去の履歴は消えます
最新断面を得るためと、どこまでmongoからデータを取得したかを保持するためのフィールドです。 mongoのフィールドに存在しないフィールド名を指定してください。 既存のフィールド名を入れてしまったら、そのフィールドはtime_fieldまたはincrement_fieldとして扱われてデータは上書きされます。
mongodbへの接続のuriです
change streamを取得するmongodbのdb, collection 名を指定してください。
bigqueryのテーブルの場所を指定してください。
bigqueryのschemaのjsonファイルをそのまま指定してください。 指定されているフィールドのみアップロード対象にします。