Cloud Functionsを使った簡単な解析(時間軸線型補間処理)

1.概要

GCP(Google Cloud Platform)上の機能の一つである‘Cloud Functions’を使って、R-MSMで取得したデータの線型補間を行います。
Cloud Functionsは、簡単に言うとクラウド上(サーバレス)でプログラムを実行してくれる環境のことです。クラウド上で作成したプログラムを実行してくれます。
ローカルで作ったプログラムをクラウドに置いて、実行させるだけということも可能です。あとは、プログラムを実行するタイミングを指示する必要があります。
タイミングの指示の方法は2通りあります。HTTPリクエストで知らせる方法とCloud Schedulerなどのイベントドリブンで知らせる方法です。
また、サポートされているプログラムは、Node.js、Python、Go、Java、.NET、Ruby、PHPなど多様です。
今回処理をさせるR-MSMで取得したデータの線型補間ですが、データ処理の前処理として、正確に12時00分、12時01分、12時02分というような時間でのデータ値に直線補間で変換する処理です。

ここでは、PDH(Physical Data Hub)のNode-REDで生成したHTTPリクエストでファイルに保管したR-MSMのデータを送信し、Cloud Functions上のPythonプログラムで線型補間処理を行い、結果を受け取ります。

以下、線型補間前後のデータの説明を行った後、Cloud Functionsを使った処理の説明を行いますが、まず、Cloud FunctionsのサンプルプログラムをNode-REDからのHTTPリクエストで動作させる実験で肩慣らしをした後に、線型補間処理の実験に進むというステップで説明します。すでに、Cloud Functionsで関数を作成された経験のある方は、次の「2.線型補間前後のデータ」の次に、「4.Cloud Functionsでの線型補間プログラムの作成」まで飛んでもらって構いません。

2.線型補間前後のデータ

Cloud Functionsで線型補完する前のデータと線型補完したデータを次に示します。線型補間処理がどのようなものかを簡単に掴んでいただけたらと思います。

2-1.線型補間前のデータ

PDH側で、二酸化炭素ガスの流量と圧力のデータをR-MSMで取得したデータをcsvに変換して保存したデータを準備しました。R-MSMで1分毎にサンプリングしていますが、PDHとの時計の精度により微妙にずれていきます。18:54の表示データがありません。そこで、補完処理で1分毎のデータに変換します。

/home/pi/source/python_pro/interpolate/sample.csv

datetime,flow,press
2022/10/10 18:50:54,2.592,3.16
2022/10/10 18:51:54,2.458,2.71
2022/10/10 18:52:57,3.206,2.94
2022/10/10 18:53:57,3.015,2.71
2022/10/10 18:55:00,2.957,2.94
2022/10/10 18:56:00,3.149,2.48
2022/10/10 18:57:00,2.09,2.94
2022/10/10 18:58:03,1.876,2.48
2022/10/10 18:59:06,2.881,2.48
2022/10/10 19:00:06,3.015,2.94
2022/10/10 19:01:09,1.993,2.71
2022/10/10 19:02:09,4.405,3.16
2022/10/10 19:03:12,3.778,2.94
2022/10/10 19:04:12,4.234,3.16
2022/10/10 19:05:15,4.044,2.48
2022/10/10 19:06:18,4.044,2.71
2022/10/10 19:07:21,2.129,2.48
2022/10/10 19:08:24,4.196,2.71
2022/10/10 19:09:24,3.854,3.16
2022/10/10 19:10:24,4.576,3.39

2-2.線型補間後のデータ

線型補間後のデータを示します。きれいに00秒のデータに保管されています。補間前にはなかった18:54台のデータも存在します。
補間前後のデータをグラフにしてい見ると、変化の激しいところで、前後のデータで変化の大きい(鈍ってしまっている)ところがあります。

3.Cloud Functionsでのプログラミング(サンプルプログラムの実行)

ここからCloud Functionsでのプログラムの作成方法に関して説明します。Cloud Functionsのサンプルプログラムを動作させます。Node-REDからHTTP リクエストでJSONデータを送って、そのJSONが送り返されて、Node-REDのデバッグ画面に表示されるまでを説明します。まず、Google Cloud Platformのダッシュボードの画面に入ってください。アカウントが無い方は、まずこちら(Googleのアカウントを作る)を参照してアカウントの作成をお願いします。

3-1. Cloud Functionsの初期設定

初めに、プロジェクトを選択する必要があります。ここでは“My First Project”を使用します。新たに作成したプロジェクトを使用することも可能です。
1) ダッシュボードの画面の左下のCloud Functionsをクリックして下さい。

2) 初めてCloud Functionsで関数を作りますので、下記のような初期画面が出てきます。「関数を作成」をクリックして下さい。

3) 必要なAPIの有効化のポップアップが出てきます。ここでは、「Cloud Functionsを使用するためには、これら4つのAPIが必要ですが、今は1つしか有効になっていません。有効にしますか?」というメッセージです。Cloud Functionsを使用しますので、「有効にする」をクリックしてください。

3-2. 関数の初期設定

ここから関数作成の初期設定の画面です。
1) 「環境」は第1世代のままでOKです。第2世代は“Cloud Run”を使用した高性能なCloud Functionsのようです。
2) 関数名を入れます。ここでは「test-function」とします。
3) リージョンは、asia-northeast1(東京) or asia-northeast2(大阪)を選択します。
4) トリガーは今回はHTTP リクエストを使用しますので、「HTTP」です。その下のURLは、HTTPリクエストを送付する先になります。
   HTTP以外にもプルダウンメニューで選べます。イベントドリブンの場合は、Cloud Pub/Subなどを選びます。
5) 「未認証の呼び出しを許可」を選んで誰でもアクセスできるように設定します。HTTPSが必須を選びます。
6) ここまで記入したら、「保存」を押します。続けて「次へ」を押します。

7) エディタの画面が出てきます。“Hello World!”を表示する初期プログラムも表示されています。
8) まず、プログラミング言語(ランタイム)を選びます。初期は、Node.jsになっています。ここでは、ドロップダウンメニューからPython3.10を選びます。するとサンプルプログラムもPython3に変わります
9) ソースコードは、ここではこの画面で編集しますので、インラインエディタのままでOKです。他にもZIPファイルのアップデートやCloud Storageに置いたZIPファイルや、ここで設定することも可能です。

3-3. 関数のデプロイ

ここでは、サンプルプログラムをそのまま、Cloud Functionsにデプロイします。
1) 「デプロイ」ボタンを押します。

2) 画面が変わってデプロイが始まります。しばらく時間がかかります。関数名の左横のくるくる回る動画が緑のチェックマイクに変わったらデプロイ完了です。
  エラーがある場合は、黄色いビックリマークになります。
3) ソースプログラムを見る場合は、「ソース」のボタンを押します。「トリガー」は、HTTPリクエストでアクセスするアドレスが記載されています。「ログ」はログ表示の画面です。
  HTTPリクエストでNode-REDでアクセスするために、「ソース」を押してソースプログラムを表示し、簡単に理解します。

 

4)エントリポイントは初めに実行される関数名を示します。編集画面で変更できます。Cloud Functionsは、PythonのWebフレームワークとして、Flaskを採用しています。
 ソース10行目のrequest.get_json()は、リクエストデータのJSONフォーマットを参照しています。
 13行目で、request_jsonが値を持ち、その中に’message’が含まれるなら、’messge’の中身を送り返すとなっています。これを使って、送信したJSONを送り返させます。
 16行目は、それ以外なら“Hello World!”を表示せよとなっています。
5) ここでは、{“message”:{“data”:“my first cloud functions”}}というJSONを送信します。
  {“data”:“my first cloud functions”}が返信されてくる予定です。

3-4. Node-REDのフロー作成

① injectノードとhttp requestノードとdebugノードをシリアルに配置し結線します。
② injectノードをダブルクリックし、プロパティ画面を開きます。 msg.payloadにJSON形式で{“message”:{“data”:“my first cloud functions”}}を入力します。
③ http requestノードをダブルクリックし、メソッドを“POST”にします。URLは、cloud functionsの“「トリガー」ボタンを押したURLをコピーして入力します。出力形式をJSONオブジェクトにします。
それぞれ完了ボタンを押し、最後にデプロイします。

3-5. 動作実験

① injectノードのボタンを押します。
② debug画面に {“data”:“my first cloud functions”}が表示されます。

4.Cloud Functionsでの線型補間プログラムの作成

ここからCloud Functionsでの線型補間プログラムの作成方法に関して説明します。線型補間プログラムの具体的な中身に関しては、ソースコードを参照ください。
まず、Google Cloud Platformのダッシュボードの画面に入ってください。アカウントが無い方は、まずこちら(Googleのアカウントを作る)を参照してアカウントの作成をお願いします。

4-1.関数の作成

1つでも関数を作成するとCloud Functionsの初期画面は、関数のリストになります。
1) 上段の「関数の作成」をクリックして新しい関数を作成します。

2) first-functionという名前の関数をHTTPトリガーで未認証呼び出しを許可して作成します。

3) ランタイムを“Python 3.10”に設定します。
4) プログラムを入力します。
5) エントリポイントを”main“に変更します。

・main.py

import pandas as pd
from pandas import json_normalize

def interpolate_index(df):
    start = df["datetime"].min().ceil("min")
    end = df["datetime"].max().floor("min")
    num_point = int((end - start).total_seconds() / 60 + 1)
    index = pd.date_range(start, end, freq="min")
    return index

def add_interpolate_index(df, index):
    df_i = pd.DataFrame({"datetime": index})
    df_i["0_second"] = True
    df["0_second"] = df["datetime"].apply(lambda x: True if x.second==0 else False)
    dfm = pd.merge(df_i, df, on="datetime", how="outer")
    dfm = dfm.sort_values("datetime").reset_index(drop=True)
    dfm = dfm.set_index("datetime")
    return dfm

def calc_interpolate(dfm, index, cols=["temperature", "humidity", "gas", "CO2"]):
    for col in cols:
        dfm[col] = dfm[col].interpolate(method="time")
    dfi = dfm.loc[index, cols]
    return dfi

def main(request):
    """Responds to any HTTP request.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text or any set of values that can be turned into a
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    num = 0
    sum = 0.0
    request_json = request.get_json()

    # convert json data to pandas
    df = json_normalize(request_json['message'])
    df['datetime'] = pd.to_datetime(df['datetime'])

    # generate index data 
    index = interpolate_index(df)

    # interpolate operation
    dfm = add_interpolate_index(df, index)
    cols = list(['flow','press'])
    dfi = calc_interpolate(dfm, index, cols=cols)
    
    # convert pandas data to json
    output_json = dfi.to_json(date_format='iso')

    # calculate average
    for key in request_json['message']:
        sum = sum + key['flow']
        num = num + 1
    ave = str(sum / num)
    data = '{"average": "' + ave + '"}'
#    return data
    return output_json

・requierement.txt

# Function dependencies, for example:
# package>=version
pandas

6) ソースファイル一覧から”requirement.txt”を選んで、”main.py”でimportしている”pandas“ライブラリ名を記載します。requirement.txt に記載することでライブラリが必要であることを示します。
7) 以上が終わったら、「デプロイ」します。

5.線型補間プログラム動作確認用Node-REDの作成

Node-REDで、Cloud FunctionsにHTTP リクエストでアクセスして線型補間用のセンサデータを届けるフローを作成します。
PDH上のファイル“/home/pi/source/python_pro/interpolate/sample.csv”を関数を作成する際にトリガーをHTTPに指定した画面に出ていたURLにPOST処理します。
URLは、関数画面のトリガータブでも確認できます。
(0) injectノード、read fileノード、csvノード、functionノード、http requestノード、デバッグノードの計6つのノードをシリアルに並べて結線します。以下、順に各ノードの設定を見ていきます。

(1) injectノードは、送信のトリガーボタンとして使用するだけです。そのため、変更箇所は名前だけです。他は変更しません。
(2) read fileノードは、ファイル名を設定したファイルを読み込み、msg.payloadとして次段のノードに送ります。
ファイル名に、ここでは以下を設定しました。
“/home/pi/source/python_pro/interpolate/sample.csv”
出力形式と、文字コードはそのままです。好みで名前を変更してください。

(3) csvノードで、csvファイルをNode-REDのObjectに変換します。csvファイルの1列名に列名が記載されていますので、「1行目に列名を含む」にチェックを入れます。1列目にない場合には,列名欄に、列名を記載します。
(4) functionノードでは、jsonをネストさせます。msg.payloadを{“message”:msg.payload}に変換します。
名前も好みで変更してください。

(5) http requestノードは、URL先にHTTP送信を行います。送信メソッドは、“POST”にします。URLには、Cloud Functionsの「トリガー」タブを押して出てくるトリガーURLを入力します。出力形式は、JSONオブジェクトにします。
以上でNode-REDの設定は終了です。

6.線型補間処理の実行

Cloud Functionsで関数“first-function”のデプロイを完了し、ローカルのマシン(PDH or PC)のNode-REDでHTTPリクエストを実装しデプロイを完了させれば、準備完了です。
Node-REDのinjectノードのボタンを押してください。しばらくする(http requestノードの下の「要求中」の表示が消えると)と、線型補間処理がされた結果が返ってきます。

debugノードのプロパティで、対象を「msg.payload」からドロップメニューで「msgオブジェクト全体」に変更して、再実行してみてください。

実行結果の“payload”以外にも、statuscode=200(処理OKのコードです)や、サーバー名などの情報を閲覧できます。

著作権と条件について/ Copyright, Terms and Conditions

本ページに記載のプログラムは、下記の条件で使用ください。

English follows

本研究は、内閣府(CAO)、省庁横断的戦略的イノベーション創造推進事業(SIP)の「フィジカル空間デジタルデータ処理基盤研究開発計画」(資金提供:NEDO)の支援を受けています。

Copyright (c) 2022年 , 立命館大学 全著作権を所有
BSD-3-Clause

ソースコード形式かバイナリ形式か、変更するかしないかを問わず、以下の条件を満たす場合に限り、再頒布および使用が許可されます。
1. ソースコードを再頒布する場合、上記の著作権表示、本条件一覧、および下記免責条項を含めること。
2. バイナリ形式で再頒布する場合、頒布物に付属のドキュメント等の資料に、上記の著作権表示、本条件一覧、および下記免責条項を含めること。
3. 書面による特別の許可なしに、本ソフトウェアから派生した製品の宣伝または販売促進に、学校法人立命館の名前またはコントリビューターの名前を使用してはならない。

本ソフトウェアは、著作権者およびコントリビューターによって「現状のまま」提供されており、明示黙示を問わず、商業的な使用可能性、および特定の目的に対する適合性に関する暗黙の保証も含め、またそれに限定されない、いかなる保証もありません。著作権者もコントリビューターも、事由のいかんを問わず、 損害発生の原因いかんを問わず、かつ責任の根拠が契約であるか厳格責任であるか(過失その他の)不法行為であるかを問わず、仮にそのような損害が発生する可能性を知らされていたとしても、本ソフトウェアの使用によって発生した(代替品または代用サービスの調達、使用の喪失、データの喪失、利益の喪失、業務の中断も含め、またそれに限定されない)直接損害、間接損害、偶発的な損害、特別損害、懲罰的損害、または結果損害について、一切責任を負わないものとします。

This work was supported by the Cabinet Office (CAO), Cross-ministerial Strategic Innovation Promotion Program (SIP), “Physical space digital data processing infrastructure research and development plan” (funding agency: NEDO).

Copyright © 2022 The Ritsumeikan Trust All rights reserved.
BSD-3-Clause

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
3. Neither the name of the Ritsumeikan Trust nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.