GCP からPDHへのデータ返信

1.概要

R-MSMからPDH(Physical Data Hub)を経由して、GCP(Google Cloud Platform)上のVM(Virtual Machine)へセンサーデータを送る流れをPDHからGCPへのデータ送信というとします。GCP 上のVMで何らかのセンサデータの処理を行い、その結果をGCPのVMからPDHへ送る流れを返信というとします。
データ送信と同じようにデータ返信もNode-RED間で行います。MQTTを使ったデータの送信と返信を説明します。具体的には、次の図に示すように、GCP のVM のNode-REDで移動平均を取った結果を逐次、PDH へ返す処理を実験として行います。

なお、ここではPDHとGCPとして記載を進めますが、PDH が複数台あるシステムなどの場合は、GCPとPDH の間にエッジサーバが入る場合があります。その場合には、ここでのPDHをエッジサーバと読み替えていただければ大丈夫です。

2. MQTT を使ったデータ送信/返信実験

2-1. MQTTを使うための準備

1) MQTTブローカーの配置

GCPのVM上のNode-REDに、MQTTのブローカーノードを配置する必要があります。
MQTTによるデータのパブリッシング/サブスクライブおよび、ブローカーの配置に関しては、こちら(Node-REDへのAedes Brokerの導入)をご覧ください。

2)ファイアウォールの設定

下り(GCPからPDHへ)のファイアウォールの設定が必要です。こちら(ファイアウォールルールの新規作成と既存修正)を参考にMQTTの下りのファイアウォールを設定してください。ソースフィルタ、ポート番号は、実施される環境に合わせてください。

2-2. GCP ⇔ PDHの送信/返信実験結果

R-MSMからPDH経由でGCPのVMに送られた環境センサBME688の気温情報の10個の移動平均を取った結果をGCPからPDHに返す実験を行いました。次に、図1にPDH上のNode-REDのエディタ画面を,図2にGCPのVM上のNおでーREDのエディタ画面を示します。

1) PDHからMQTT outノードでトピック名”TestSensorData”でパブリッシングされたデータが、GCP上のVMのMQTT inノードで同じくトピック名”TestSensorData”としてサブスクライブされます。
2) 受信されたデータがGCPのVM上で10個の移動平均処理を施され、MQTT outノードからトピック名”movingAverage”としてパブリッシングされます。
3) PDHのMQTT inノードで”movingAverage”のトピック名でサブスクライブされ、右側のでバッグ画面に表示されています。

このようにMQTTを使うことで、容易に送信、返信を実現することができます。

 

図1. PDH上のNode-RED画面

図2. GCPのVM上のNode-RED画面

3. PDH上のNode-REDの編集内容(1):センサデータのパブリッシング

ここから、上記送信/返信実験に用いたNode-REDのエディタ画面の実現方法に関して詳しく説明します。初めに、PDH上のNode-REDの編集内容のうち、センサデータのパブリッシングを説明します。次の第4項でGCP上のNode-REDの編集内容を説明します。最後に第5項で、再びPDH上のNode-REDの編集内容のうち、移動平均のサブスクライブに関して説明します。

注意)R-MSMは、PDHとBluetoothで接続し、環境センサBME688のデータが出てくることを確認しておいてください。

3-1.  Node-REDエディタのオープン

① PDHのNode-REDの編集画面を開きます。
② 右上の[+]を押し、新しいタブを開きます。

3-2. 送信データの取出し(Link inノード)

① link inノードを配置して、ダブルクリックして、プロパティ画面を開きます。
② link outの一覧の中から”sensor-module_data”の中の”Sensor Data OUTPUT”を選びます。
③ 名前を入力します。
④ 最後に、「完了」を押します。

3-3. センサデータの送信(MQTT outノード)

① jsonノードとMQTT outノードを配置して、配線で接続します。MQTT outノードをダブルクリックして、プロパティ画面を開きます。
② トピック欄にトピック名を入力します。ここでは、“TestSensorData”とします。
③ サーバを設定しるために、サーバ欄の横の鉛筆マークをクリックします。

④ サーバ欄に、GPCのVMの外部IPアドレスを入力します。
⑤ ポート番号を変更する場合は、変更します。ここでは“1883”のままとします。
⑥ その後「追加」を押します。

⑦ 最後に「完了」を押します。

3-4. デプロイ

① デプロイします。
② デプロイ後しばらくして、MQTT out ノードに下に緑の■とともに接続済とでればGPCのブローカーへの接続はOKです。

4. GCPのVM上のNode-REDの編集内容

ここでは、GCP のVM上のNode-REDの編集内容を詳細に説明します。

4-1. PDHからのデータ受信(MQTT-in ノード)

① Aedes MQTT ブローカーのノードを配置したNode-REDを開きます。Aedes  MQTTブローカーは、エディタ画面のいずれかのタブに一つあればOKです。

② MQTT in ノードを配置します。ノードをダブルクリックして、プロパティ画面を開きます。
③ トピック名を入力します。ここでは、”TestSensorData”とします。
④ サーバを設定するために、鉛筆マークを押します。

⑤ サーバに,”localhost“と入れます。ポート番号は、必要に応じて変えてください。
⑥ 名前を入力します。
⑦ 入力が終わったら、追加を押します。
⑧ 完了を押します。

4-2. 環境センサBME688の選択(Switchノード)

① switchノードを配置して、配線で接続します。
② プロパティに“sensor”を追加して、”msg.payload.sensor”とします。
③ 値ルール”==“をドロップダウンメニューから選びます。
④ ドロップダウンメニューから文字列を選びます。”BME688”と入力します。
  msg.payload.sensor == BME688 となったら、1番の端子から、msg.payloadが次段に送られます。
⑤ 名前を入力します。
⑥ 最後に、「完了」を押します。

4-3. 温度データ抽出(Changeノード)

① changeノードを配置して、配線で接続します。
② ドロップダウンメニューから”msg”を選びます。
③ 残りの部分に”payload.temp.val”と入力します。結果として”msg.payload.temp.val”を”msg.payload”に代入します。
④ 名前を入力します。
⑤ 最後に、「完了」を押します。

4-4. 動作の確認(Debug ノード)

① debugノードを配置して、配線を接続します。
② デプロイをします。
③ msg.payloadとして温度データが表示されたらOKです。

4-5. 配列にデータを格納(Function ノード)

① functionノードを配置して、配線を接続します。その後、ダブルクリックをして、プロパティを開きます。
② コードの欄に、10個の配列の作成とデータシフト(shift())とデータの追加(push(msg.payload))の記述を記載します。
③ 名前を記載します。
④ 最後に「完了」を押します。

// global.arrayが存在しなければ10個の配列として新たに作成
if ( !context.global.array ) {
    context.global.array = new Array (10);
}
// arrayデータを上位に一つシフトする
var shifted = context.global.array.shift();

// array[0]に値を代入する
context.global.array.push(msg.payload);

msg.array = context.global.array;
msg.length = context.global.array.length;
return msg;

参考HP:Watson IoT Platform – Node-RED – 直近10件の平均や標準偏差を求める

4-6. 移動平均算出(Function ノード)

① functionノードを配置して、配線を接続します。その後、ダブルクリックをして、プロパティを開きます。
② コードの欄に、合計値の計算と平均値の記述を記載します。
③ 名前を記載します。
④ 最後に「完了」を押します。

// 変数宣言
var i, sum = 0, average = 0;

// 合計値の計算
for (i=0; i<msg.length; i++) {
    sum = sum + msg.array[i];
}
msg.sum = sum;

// 平均値の計算
average = sum / msg.length;
msg.average = average;
return msg;

4-7. 出力フォーマットの整形(Formatノード)

① functionノードを配置して、配線を接続します。その後、ダブルクリックをして、プロパティを開きます。② コードの欄に、出力の小数点以下を3桁に固定する処理とをObject形式への変換を記載します。
③ 名前を記載します。
④ 最後に「完了」を押します。

msg.payload = { "movingAverage": parseFloat(msg.average.toFixed(3))}
return msg;

4-8. 移動平均結果のパブリッシング(MQTT outノード)

① jsonノードとMQTT outノードを配置して、配線を接続します。その後、MQTT outノードをダブルクリックをして、プロパティを開きます。
② トピックの欄にトピックの名前を記載します。ここでは、“movingAverage”とします。
③ サーバ名が、localhostと指定している“Aedes MQTT broker”であることを確認します。
④ 最後に「完了」を押します。

4-9. デプロイ

① デプロイします。
② デプロイ後しばらくして、MQTT out ノードに下に緑の■とともに接続済とでればGPCのブローカーへの接続はOKです。

5. PDH上のNode-REDの編集内容(2):移動平均結果のサブスクライブ

5-1. 移動平均結果のサブスクライブ(MQTT inノード)

① MQTT inノードを配置します。その後、MQTT inノードをダブルクリックをして、プロパティを開きます。
② トピックの欄にトピックの名前を記載します。ここでは、“movingAverage”とします。
③ サーバ名が、GCPを指定している“GPC MQTT”であることを確認します。
④ 最後に「完了」を押します。

5-2.データの返信結果の確認

① jsonノードとdebugノードを配置します。その後、デプロイします。
② 移動平均結果が出力されればOKです。