Azure Stream Analytics GZIP 圧縮された入力のデシリアライズサンプル
GZIPで圧縮されたバイナリーデータを入力として受け取り、解凍して、JSONとして出力するサンプル。
IoT シナリオで、例えば usec オーダーで振動加速度等をセンサーで収集した生データをクラウドに送信し蓄積する場合、デバイスからJSONで送信しようとすると膨大なバイト数のデータを送信することになり、通信回線への負担、IoT Hub での受信可能メッセージ数の浪費などの問題が発生する。そんな問題を回避したい場合に参考となるサンプルを紹介する。
デバイスで、3軸の加速度センサーで振動を一定間隔で計測する。
- timestamp - 計測した時間
- accelx - X軸加速度
- accely - Y軸加速度
- accelz - Z軸加速度
これらのデータ項目を、
データ項目 | バイナリ化 |
---|---|
timestamp | .NET Framework の DateTime の Ticks 形式(8byte) |
accelx | float(4byte) |
accely | float(4byte) |
accelz | float(4byte) |
で 1 計測データあたり、20 byte で時系列で並べていき、一定数、または、一定サイズを超えたら、そのバイナリデータを GZIP で圧縮し、IoT Hub に圧縮したデータを D2C メッセージとして送信する。
Stream Analytics で、GZIP解凍、カスタムデシリアライザーを使ったバイナリデータ⇒JSON形式への変換、クエリーによるデータ処理(本サンプルではそのままスルー)、後段への出力を行う。
カスタムデシリアライザーは、「.NET カスタム逆シリアライザーを使用して任意の形式の入力を読み取る」を参照の事。
事前準備
入力用の IoT Hub と出力用の Event Hub はあらかじめ作成しておくこと。
Stream Analytics は作成しなくてよい。
Custom Deserializer の作成
※ 本記述は、2021/3/15時点の状況を元に記載している。今読まれている時点が非常に時が経っていたら、もしかすると色々と更新されているかもしれないのでDocs等自分で確認していただきたい。
現状、Custom Desierializer は、Azure Portal では設定できず、配置も含め、VS Code + Extension、もしくは、Visual Studio 2019+拡張機能でしか作成できない。ここでは、Visual Studio 2019 での開発手順を紹介する。
Visual Studio 2019 で Azure Stream Analytics Application を開発するには、 Azure Data Lake and Stream Analytics Tools を拡張機能でインストールする。これをインストールすると、Azure Stream Analytics Custom Deserializer Project(.NET) というプロジェクトテンプレートが追加されるので、それを使いたくなるのだが、どうもうまく配置できなかったので、EAzure Stream Analytics Applicationプロジェクトテンプレートを使う方法を紹介する。
プロジェクトを作成すると以下の様な状態で作成される。
Custom Deserializer の実装
Script.asaql のコードビハインドを開いて、ここに Custom Desierializer のコードを記述する。
先頭の using のパートに以下を追加する。
using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;
JSON で出力するためのデータモデルを追加する。
public class TelemetryDataStructure
{
public DateTime timestamp { get; set; }
public double accelx { get; set; }
public double accely { get; set; }
public double accelz { get; set; }
}
Stream Analytics がデータを受信したときにそのデータを受け取って変換するためのクラスを追加する。
public class CustomTelemetryDeserializer : StreamDeserializer<TelemetryDataStructure>
{
// streamingDiagnostics is used to write error to diagnostic logs
private StreamingDiagnostics streamingDiagnostics;
// Initializes the operator and provides context that is required for publishing diagnostics
public override void Initialize(StreamingContext streamingContext)
{
this.streamingDiagnostics = streamingContext.Diagnostics;
}
// Deserializes a stream into objects of your type
public override IEnumerable<TelemetryDataStructure> Deserialize(Stream stream)
{
var buf = new byte[stream.Length];
var readBytes = stream.Read(buf, 0, (int)stream.Length);
var processedBytes = 0;
var unitSize = sizeof(float);
var timestampSize = sizeof(long);
while (processedBytes + unitSize * 3 + timestampSize < readBytes)
{
var value = new TelemetryDataStructure();
long ticks = BitConverter.ToInt64(buf, processedBytes);
processedBytes += timestampSize;
var timestamp = new DateTime(ticks);
var accelx = BitConverter.ToSingle(buf, processedBytes);
processedBytes += unitSize;
var accely = BitConverter.ToSingle(buf, processedBytes);
processedBytes += unitSize;
var accelz = BitConverter.ToSingle(buf, processedBytes);
processedBytes += unitSize;
yield return new TelemetryDataStructure()
{
timestamp = timestamp,
accelx = accelx,
accely = accely,
accelz = accelz
};
}
if (processedBytes == 0 || processedBytes != stream.Length)
{
streamingDiagnostics.WriteError("data format error", processedBytes == 0 ? $"processed data size is 0 for {stream.Length}" : $"data size mismatched {processedBytes}<->{stream.Length}");
}
}
}
Stream Analytics が受信したデータを、Stream Analytics が標準で持っているGZIP解凍機能を使って解凍されたバイナリーデータは受信の度に、Deserializeメソッドの引数の stream から取得できる。
受信するデータは、前述の通り、最初の8バイトが timestamp の Tick、次に、4バイトずつ、accelx、accely、accelz と続く、20バイトを順に読み込んで、JSON 化、それを受け取ったデータ分繰り返す。
ちなみに、上に挙げたコードの中の、
yield return new TelemetryDataStructure()
{
timestamp = timestamp,
accelx = accelx,
accely = accely,
accelz = accelz
};
がそれにあたり、yield をつけることによって、20バイトの塊ごとに Deserialize メソッドのコール側に逐次返しつつ(返されたデータに、Stream Analytics のクエリーで定義されたデータ処理が適用される)、このメソッドの実行が継続される。
次に、Script.asaql を開き、
SELECT * INTO unziped FROM iothub
と入力する。"ビルド"→"ソリューションのビルド"で、プロジェクトをビルドする。
※ このファイルを空にしておくとビルドでエラーが出てしまうので入力定義の前にやっておく。
入力の定義
Inputs/Input.json をクリックしてダイアログを表示し、設定を行う。
項目 | 設定方法 |
---|---|
入力のエイリアス | クエリーに合わせ、iothubと入力 |
ソース | IoT Hubを選択 |
リソース | データソース設定を手動で提供しますを選択 |
サブスクリプション | 各自のサブスクリプションを選択 |
IoT Hub | 入力としてバインドする IoT Hub を選択 |
共有アクセスポリシー | service を選択 |
イベントのシリアル化の形式 | Other (Protobuf, XML, proprietary...) を選択 |
リソース | ASA プロジェクト参照または分離コードから読み込みますを選択 |
CSharp アセンブリ名 | ASAApplication_CodeBehindを選択 |
クラス名 | Script.asaql.cs で追加した CustomTelemetryDeserializerクラス |
イベントの圧縮タイプ | GZip |
出力の定義
Outputs/Output.json をクリックしてダイアログを表示し、unziped というエイリアスで出力用の Event Hubをバインドする。
Azure へのデプロイと実行
ソリューションエクスプローラ―で、Stream Analytics Application プロジェクトを右クリックし、Azureに発行するを選択し、デプロイする。
最初の一回目は、新規作成、二回目以降は、更新でよい。
デプロイが完了したら、Stream Analytics を実行する。
※ 実行中は課金が発生するので、お試しが終わったら停止しておくことを推奨する。
デバイスシミュレータ
[テスト用のシミュレータ](simulator/WpfAppDeviceSimulator]を用意しているので、お試しはそちらで。
Visual Studio 2019 で WpfAppDeviceSimulator.csprojを開き、42行目付近の
private string iothubconnectionstring = "<- connection string for device of IoT Hub ->";
の文字列を、IoT Hub に一つ適当に IoT Device を追加して、その接続文字列で置き換え、実行する。
Connect & Start をクリックすると、送信を開始する。
留意点
現状、カスタムデシリアライザーが利用可能な Stream Analytics は、データセンターが限られている。
利用可能なデータセンターは、リージョンのサポートを確認してください。