Cloud Dataflowを使ってGrovePi+のセンサデータをウィンドウ集計(Java編)
↓の記事でRaspberryPi3からCloud IoTへ送信したGrovePi+のセンサデータを、Cloud Dataflowでウィンドウ集計してみた。
kmth23.hatenablog.com
Cloud Dataflow のJavaのクイックスタートを参考に進める。
Java と Apache Maven を使用したクイックスタート | Cloud Dataflow のドキュメント | Google Cloud
まず、mavenでプロジェクトを作成。
mvn archetype:generate \ -DgroupId=com.example \ -DartifactId=dev-dataflow \ -Dversion="0.1" \ -DinteractiveMode=false \ -Dpackage=com.example
サンプルを参考にしつつ、pom.xmlは下記のようにした。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>dev-dataflow</artifactId> <packaging>jar</packaging> <version>0.1</version> <name>dev-dataflow</name> <url>http://maven.apache.org</url> <properties> <gson.version>2.8.2</gson.version> <dataflow.version>2.2.0</dataflow.version> <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version> <guava.version>20.0</guava.version> <pubsub.version>v1-rev10-1.22.0</pubsub.version> </properties> <build> <pluginManagement> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>${maven-exec-plugin.version}</version> <configuration> <cleanupDaemonThreads>false</cleanupDaemonThreads> </configuration> </plugin> </plugins> </pluginManagement> </build> <dependencies> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>${gson.version}</version> </dependency> <dependency> <groupId>com.google.cloud.dataflow</groupId> <artifactId>google-cloud-dataflow-java-sdk-all</artifactId> <version>${dataflow.version}</version> </dependency> <dependency> <groupId>com.google.apis</groupId> <artifactId>google-api-services-pubsub</artifactId> <version>${pubsub.version}</version> <exclusions> <!-- Exclude an old version of guava that is being pulled in by a transitive dependency of google-api-client --> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava-jdk5</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> </project>
Javaのコードは下記。
Cloud IoTへ送信し、Cloud Pub/Subへ入ったセンサデータをDataflowから取得し、そこから気温データを抜き出す。
この気温データについて、2分毎の合計値を、2分毎にログ出力させた。(特に目的はないが。。お試しで)
package com.example; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import com.google.gson.Gson; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class App { public static void main( String[] args ) { DataflowPipelineOptions options = PipelineOptionsFactory .fromArgs(args) .withValidation() .create() .as(DataflowPipelineOptions.class); options.setStreaming(true); options.setJobName("test"); Pipeline p = Pipeline.create(options); PCollection<KV<String, Double>> scores = p.apply(PubsubIO.readStrings().fromTopic("projects/project-id/topics/raspi3")) .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(2)))) .apply(ParDo.of(new PubSubToNumFn())) .apply(Sum.<String>doublesPerKey()) .apply(ParDo.of(new LoggingFn())); p.run(); } public static class LoggingFn extends DoFn<KV<String, Double>, KV<String, Double>> { private static final Logger LOG = LoggerFactory.getLogger(LoggingFn.class); @ProcessElement public void processElement(ProcessContext c) { KV<String, Double> kv = c.element(); double sum = kv.getValue(); LOG.info(String.valueOf(sum)); c.output(kv); } } }
package com.example; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.ProcessContext; import org.apache.beam.sdk.values.KV; import com.google.gson.Gson; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PubSubToNumFn extends DoFn<String, KV<String, Double>> { private final Logger LOG = LoggerFactory.getLogger(PubSubToNumFn.class); @ProcessElement public void processElement(ProcessContext c) { String json = c.element(); Gson gson = new Gson(); LOG.info(json); SensorData data = gson.fromJson(json, SensorData.class); if (data == null) { LOG.error("data is null"); } else { LOG.info(data.getTemperature()); } c.output(KV.of("temperature", Double.valueOf(data.getTemperature()))); } }
package com.example; public class SensorData { public String temperature; public SensorData(String temperature) { this.temperature = temperature; } public String getTemperature() { return this.temperature; } }
実行は↓
mvn compile exec:java -Dexec.mainClass=com.example.App -Dexec.args="--project=<<project-id>> --tempLocation=gs://path/to/temp/ --stagingLocation=gs://path/to/staging/ --runner=DataflowRunner"
maven exec:javaのコマンドライン引数で、「--project=」のような形でクラウドの情報を入れて実行すると、コードの中でPipelineOptionsFactoryに引き渡され、Pipelineをrun()するとCloud Dataflowに自動でデプロイしてくれる。
Cloud Dataflowへデプロイし、コンソールで実行が確認出来たら、センサーデータを送信する。
センサデータの送信は↓でやった通りの手順。30秒単位で送信するため、setIntervalの第二引数は30000として実行した。
kmth23.hatenablog.com
Cloud Dataflowのコンソール上で、気温の合計値が、2分毎にログ出力されれば成功。
過去のQiitaのBeamの記事などを見ながらだと、SDKのバージョンが違うため仕様が変わっており、コンパイルエラーになることが多かった。 公式のドキュメントを見ながら進めるのが、結局最短で学習できるかもしれない。
RaspberryPi3からGoogle Cloud IoTへGrovePi+のセンサデータを送信
RaspberryPi3につないだGrovePi+のセンサデータをGCP Cloud IoTへ送信した。
以下、Cloud IoTのQuickStart(https://cloud.google.com/iot/docs/quickstart?hl=ja)に従って進める。
※Rasberry Pi3はModel B(+ではない)。OSはraspbian。
cat /etc/debian_version =>8.0
事前準備
プロジェクト作成(既存でもOK)、課金を有効、Cloud IoT Core and Cloud Pub/Sub API(複数)を有効にする。
デバイス側の設定
Google Cloud SDKインストール
手順:https://cloud.google.com/sdk/docs/?hl=ja#deb
「Debian、Ubuntu」タブの内容に従いインストールする。
※毎回exportするのは面倒なので、今回は、~/.profileに追記した。また、追加コンポーネントのインストールはなし。
echo "export CLOUD_SDK_REPO=\"cloud-sdk-$(lsb_release -c -s)\"" >> ~/.profile exec bash -l echo "deb http://packages.cloud.google.com/apt $CLOUD_SDK_REPO main" | sudo tee -a /etc/apt/sources.list.d/google-cloud-sdk.list curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add - sudo apt-get update && sudo apt-get install google-cloud-sdk gcloud init
gcloud initを実行すると、ブラウザが開いてGoogleへのログインを求められる。
事前準備で作成したプロジェクトのアカウントでログインすると、Google Cloud SDKが許可を求める画面に遷移するので、「許可」ボタンを押すと認証完了。
コンソールに戻ると、対話形式で、使用するプロジェクト、リージョン/ゾーンを聞かれる。
今回は、事前準備で作成したプロジェクト、asia-northeast1-b(東京リージョンのbゾーン)、を選択した。
※リージョン/ゾーンによって、マシンのスペックも変わるので注意(選択可能なリージョン/ゾーン一覧:https://cloud.google.com/compute/docs/regions-zones/regions-zones#available)。
Node.jsインストール
インストールは下記記事を参照。2018/03/22現在、最新のLTSはv8.10.0 kmth23.hatenablog.com
node -v =>v8.10.0 npm -v =>5.6.0
デバイスの登録
Cloud IoTのコンソール(https://console.cloud.google.com/iot?hl=ja)で作業。
まず「レジストリ」を作成し、「レジストリ」にデバイスを追加する。
「Create device registry.」ボタンを押して下記のように設定。
- レジストリ ID: 任意の名前
- Region: asia-east1
- プロトコル: MQTTとHTTP
- デフォルトのテレメトリーのトピック: トピックを作成⇒任意の名前
- デバイス状態のトピック: なし
- CA 証明書: 設定しない
「作成」ボタンを押す。(これでレジストリの作成が完了)
遷移したページで「端末を追加」ボタンを押して下記のように設定。
「追加」ボタンを押す。(これでデバイスの登録が完了) コンソールはこのまま開いておく。
デバイスに公開鍵を追加
opensslがない場合は、事前にインストールする。 下記のコマンドで、rsa_cert.pem(公開鍵)、rsa_private.pem(秘密鍵)を作成する。
cd /tmp openssl req -x509 -newkey rsa:2048 -keyout rsa_private.pem -nodes -out rsa_cert.pem -subj "/CN=unused"
開いたままにしておいたコンソールで「公開鍵を追加」ボタンを押し、rsa_cert.pemの内容をコピーして張り付ける。
- 入力方法: 手動で入力
- 公開鍵の形式: RS256_X509
- 公開鍵の有効期限: 設定しない
「追加」ボタンを押す。
サンプルで動作確認
サンプルをgit cloneで取得する。gitがない場合は、事前にインストールする。
下記コマンドについては、これまでに作成した情報に従い、<<PROJECT_ID>>をプロジェクトID、<<TOPIC_NAME>>をトピック名、<<REGISTRY_ID>>をレジストリID、<<DEVICE_ID>>をデバイスIDに置き換えること。
<<任意のサブスクリプション名>>は、任意のサブスクリプション名に置き換えること。
また、jsファイルの実行時には、--cloudRegionの指定を忘れないこと(デフォルトはus-central1。今回はasia-east1にしたので指定が必要)。
--numMessagesで送信するデータ数を指定できる。今回はテストのため1メッセージだけ送信する。
cd 任意の作業dir git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples cd nodejs-docs-samples/iot/mqtt_example cp /tmp/rsa_private.pem . npm install gcloud pubsub subscriptions create \ projects/<<PROJECT_ID>>/subscriptions/<<任意のサブスクリプション名>> \ --topic=projects/<<PROJECT_ID>>/topics/<<TOPIC_NAME>> node cloudiot_mqtt_example_nodejs.js \ --projectId=<<PROJECT_ID>> \ --registryId=<<REGISTRY_ID>> \ --deviceId=<<DEVICE_ID>> \ --privateKeyFile=rsa_private.pem \ --numMessages=1 \ --algorithm=RS256 \ --cloudRegion=asia-east1 gcloud pubsub subscriptions pull --auto-ack \ projects/<<PROJECT_ID>>/subscriptions/<<任意のサブスクリプション名>>
jsファイルの実行で、1つのデータがpublishされる。gcloud pubsub subscriptions pullコマンドでsubscribeできれば成功。
センサー情報の取得
AWS IoTにデータ送信したときと同じ仕組みを使う。 kmth23.hatenablog.com
まず、pythonでGrovePi+につないだセンサー情報を取得する。
次に、動作確認で使ったJavaScriptコードを参考にして、mqttでCloud IoTへデータを送信するコードを書く。
pythonコードの実行は、AWS IoTの時と同様、child_processのexecファンクションを使用する。
まずは、package.jsonを作成し、必要なライブラリをインストール。
{ "name": "grovepi-test", "version": "0.0.1", "description": "", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "author": "", "license": "ISC", "dependencies": { "jsonwebtoken": "7.4.1", "mqtt": "2.15.0", "yargs": "8.0.2" }, "devDependencies": {} }
npm install
次に、実行ファイルを、index.jsとして作成。
5分間隔でセンサーデータを取得し、Cloud IoTへ送信する。
※pythonコードは、/path/to/python/script.py。
// This software includes the work that is distributed in the Apache License 2.0 'use strict'; const fs = require('fs'); const jwt = require('jsonwebtoken'); const mqtt = require('mqtt'); const exec = require('child_process').exec; var argv = require(`yargs`) .options({ projectId: { default: process.env.GCLOUD_PROJECT || process.env.GOOGLE_CLOUD_PROJECT, description: 'The Project ID to use. Defaults to the value of the GCLOUD_PROJECT or GOOGLE_CLOUD_PROJECT environment variables.', requiresArg: true, type: 'string' }, cloudRegion: { default: 'us-central1', description: 'GCP cloud region.', requiresArg: true, type: 'string' }, registryId: { description: 'Cloud IoT registry ID.', requiresArg: true, demandOption: true, type: 'string' }, deviceId: { description: 'Cloud IoT device ID.', requiresArg: true, demandOption: true, type: 'string' }, privateKeyFile: { description: 'Path to private key file.', requiresArg: true, demandOption: true, type: 'string' }, algorithm: { description: 'Encryption algorithm to generate the JWT.', requiresArg: true, demandOption: true, choices: ['RS256', 'ES256'], type: 'string' }, mqttBridgeHostname: { default: 'mqtt.googleapis.com', description: 'MQTT bridge hostname.', requiresArg: true, type: 'string' }, mqttBridgePort: { default: 8883, description: 'MQTT bridge port.', requiresArg: true, type: 'number' }, messageType: { default: 'events', description: 'Message type to publish.', requiresArg: true, choices: ['events', 'state'], type: 'string' } }) .example(`node $0 cloudiot_mqtt_example_nodejs.js --projectId=blue-jet-123 \\\n\t--registryId=my-registry --deviceId=my-node-device \\\n\t--privateKeyFile=../rsa_private.pem --algorithm=RS256 \\\n\t --cloudRegion=us-central1`) .wrap(120) .recommendCommands() .epilogue(`For more information, see https://cloud.google.com/iot-core/docs`) .help() .strict() .argv; function createJwt (projectId, privateKeyFile, algorithm) { const token = { 'iat': parseInt(Date.now() / 1000), 'exp': parseInt(Date.now() / 1000) + 20 * 60, // 20 minutes 'aud': projectId }; const privateKey = fs.readFileSync(privateKeyFile); return jwt.sign(token, privateKey, { algorithm: algorithm }); } const mqttClientId = `projects/${argv.projectId}/locations/${argv.cloudRegion}/registries/${argv.registryId}/devices/${argv.deviceId}`; const mqttTopic = `/devices/${argv.deviceId}/${argv.messageType}`; let connectionArgs = { host: argv.mqttBridgeHostname, port: argv.mqttBridgePort, clientId: mqttClientId, username: 'unused', password: createJwt(argv.projectId, argv.privateKeyFile, argv.algorithm), protocol: 'mqtts', secureProtocol: 'TLSv1_2_method' }; let client = mqtt.connect(connectionArgs); client.subscribe(`/devices/${argv.deviceId}/config`); client.on('connect', (success) => { console.log('connect'); if (!success) { console.log('Client not connected...'); } else { setInterval(() => { exec('python /path/to/python/script.py', (error, stdout, stderr) => { if (error !== null) { console.log('exec error: ' + error); return } var data = stdout.replace(/\r?\n/g,""); var datas = data.split(",") var record = { registryid: argv.registryId, deviceid: argv.deviceId, timestamp: datas[0], temperature: Number(datas[1]), humidity: Number(datas[2]), moisture: Number(datas[3]), light: Number(datas[4]), location: datas[5] + "," + datas[6] }; const payload = JSON.stringify(record); console.log("Publish: " + payload); client.publish(mqttTopic, payload, { qos: 1 }); }); return; }, 300000); } }); client.on('close', () => { console.log('close'); }); client.on('error', (err) => { console.log('error', err); }); client.on('message', (topic, message, packet) => { console.log('message received: ', Buffer.from(message, 'base64').toString('ascii')); }); client.on('packetsend', () => { // Note: logging packet send is very verbose });
実行コマンドは下記。rsa_private.pemはあらかじめコピーしておくこと。
cp /tmp/rsa_private.pem . npm start \ --projectId=<<PROJECT_ID>> \ --registryId=<<REGISTRY_ID>> \ --deviceId=<<DEVICE_ID>> \ --privateKeyFile=rsa_private.pem \ --algorithm=RS256 \ --cloudRegion=asia-east1
subscribeすると、下記のようなデータが取得できるはず。
gcloud pubsub subscriptions pull --auto-ack \ projects/<<PROJECT_ID>>/subscriptions/<<任意のサブスクリプション名>> =>{ "registryid":"<<REGISTRY_ID>>", "deviceid":"<<DEVICE_ID>>", "timestamp":"2018-03-23T16:19:48+09:00", "temperature":25.2, "humidity":30.4, "moisture":0, "light":206 }
ストリーム処理を勉強してみた
素人ながら興味があったので少し勉強した。
以下はその際のメモ。
基本的には、↓のqiitaとslideshareを最初のインプットにさせていただいて、自分なりに調べていった。
https://qiita.com/kimutansk/items/60e48ec15e954fa95e1c
https://www.slideshare.net/SotaroKimura/ss-72769963
ストリーム処理の歴史
slideshareより抜粋。
2011年~ Lambdaアーキテクチャ
2013年~ Kappaアーキテクチャ
2015年~ Dataflowモデル
Lambdaアーキテクチャについては、オライリー本が日本語で出ている。
スケーラブルリアルタイムデータ分析入門 ―ラムダアーキテクチャによるビッグデータ処理
- 作者: Nathan Marz,James Warren,伊藤真浩,木下哲也
- 出版社/メーカー: オライリージャパン
- 発売日: 2016/08/24
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (2件) を見る
これは近年ではあまり使われていないのかもしれないが、素人が勉強するには、知っておいて損はない情報が詰まっていると思う。
ツールとしてはApache Stormが使われており、古い感じもするが、Spark streamingやWindow集計についても、(コラム程度だが)既に触れられていた。
日本語情報で体系的に学べる数少ない資料だと思う。
Kappaアーキテクチャについては、正直よく分からない。(日本語の情報が少ない)
公式サイトが存在しており、URLは↓。
http://milinda.pathirage.org/kappa-architecture.com/
「KappaアーキテクチャはLambdaアーキテクチャからバッチ処理を除去したようなもの」と説明されている。
基本的には、Apache Kafkaのような複数サブスクライブができるキューにデータを集約し、ストリーミング処理はキューからデータを取得して実施するらしい。
ストリーム処理システムとしては、Apache Storm、Apache Spark、Kafka Stream、Apache Flinkなどが挙げられている。
Kafkaに集約して、(分散バッチ処理は行わず)ストリーム処理を行う形は、よく見かける気がするけど、もともとはKappaアーキテクチャからきているのだろうか?
また、Flinkがツールとして挙げられているので、後述のDataflowモデルのOSSであるApache Beamと連携させても良いのかもしれない。
(slideshareでも、DataflowモデルはLambdaアーキテクチャやKappaアーキテクチャを置き換えるものではないと説明されている)
Dataflowモデルについても、同じ方がqiitaにまとめてくれている。
https://qiita.com/kimutansk/items/d6daca473440462634a0
また、(情報が古いが)Cloud DataflowとApache Sparkとの比較記事も分かりやすいと思う。
https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison?hl=ja
どちらの記事にもリンクがある、
The World Beyond Batch: Streaming 101
The World Beyond Batch: Streaming 102
や、
VLDB 2015 の Dataflow モデル の論文
の内容が基本的な考え方になっているよう。
Dataflowモデルを使うには、GCPのCloud Dataflowを使うのが一番早いと思う。
コードの実装には、Apache BeamがOSSとして提供されている。
Beamはエンジンとして、別のOSSのApache FlinkやApache Apexなどが利用でき、GCPのCloud Dataflowもエンジンとして使える。
そのため、Apache Beamでコードを書けば、GCPでも、オンプレミスでもストリーム処理を行うことができる。
※GCPで使うなら、Cloud Dataflow SDKを使う方が、機能が豊富になる。
https://cloud.google.com/dataflow/docs/installing-dataflow-sdk?hl=ja
Apache Beamを使った実装などの例は、情報がある程度多くあり、Window集計についての説明も色々な方がまとめてくれている印象。
実装はバージョンごとに差異もありそうなので、注意が必要そうだが、Raspberry Piのセンサデータを、Apache Beamでストリーム処理するようなことをやってみたいと思う。
RaspberryPi3にmavenをインストール
環境変数JAVA_HOMEを設定する必要がある。
Javaはインストール済みのため、JAVA_HOMEの場所を調べる。
which java =>/usr/bin/java ls -l /usr/bin/java =>/usr/bin/java -> /etc/alternatives/java ls -l /etc/alternatives/java =>/etc/alternatives/java -> /usr/lib/jvm/jdk-8-oracle-arm32-vfp-hflt/jre/bin/java
jdkの場所(/usr/lib/jvm/jdk-8-oracle-arm32-vfp-hflt)をJAVA_HOMEとして環境変数に設定する。
echo "export JAVA_HOME=/usr/lib/jvm/jdk-8-oracle-arm32-vfp-hflt" >> ~/.profile source ~/.profile
mavenをインストールする。2018/3/25現在の最新は、v3.5.3。
cd /tmp wget http://ftp.kddilabs.jp/infosystems/apache/maven/maven-3/3.5.3/binaries/apache-maven-3.5.3-bin.tar.gz tar xzvf apache-maven-3.5.3-bin.tar.gz sudo mv apache-maven-3.5.3 /opt echo "export PATH=/opt/apache-maven-3.5.3/bin:$PATH" >> ~/.profile source ~/.profile
確認
mvn -v =>Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00) Maven home: /opt/apache-maven-3.5.3 Java version: 1.8.0_65, vendor: Oracle Corporation Java home: /usr/lib/jvm/jdk-8-oracle-arm32-vfp-hflt/jre Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "4.4.8-v7+", arch: "arm", family: "unix"
Sigfox Shield for Arduino (UnaShield) を使ってセンサー情報をSORACOMへ送信
京セラコミュニケーションシステム株式会社(KCCS)がSigfoxをArduinoで使用できるシールドを発売した。
加速度センサ(MMA8451Q)、温湿度・気圧センサ(BME280)を搭載している。
ソラコムと、スイッチサイエンスで販売しているが、今回はソラコムから購入した。(ソラコムの方が400円ほど高いので注意)
いずれから購入した場合も、1年間のSigfox回線利用料が含まれている。
シールドのみの販売で、Arduino本体は別売りなので注意。また2年目以降は、別途通信料が発生する。
スイッチサイエンスから購入すると、センサー情報をSigfox社のクラウドに送信・蓄積ができる。クラウドWebは日本語対応してない?が、データは1年保存してくれる(Sigfoxクラウドについて参照)。2年目以降は別途Sigfox契約が必要らしいが、どのように契約したらいいのかはよくわからない。。(KCCSの取扱説明書と、スイッチサイエンスの商品紹介を参照)
一方、ソラコムから購入すると、ソラコムAirなどと同じように、ユーザーコンソールで利用ができ、2年目以降も継続利用可能(料金は発生する)。ただしデータ蓄積はSORACOM Harvestの利用が必要で、データ保存期間は40日のみ。SORACOM Beamなどを使えば、AWSなどにデータ転送することは可能。
安く済むのはスイッチサイエンス版かもしれないが、ソラコムのアカウントは既に持っており、コンソールも使いやすかったので、今回はソラコムにした。
品薄だったのか、発注から手元に届くまで9日かかった。ただ4日目に、発送予定日と「遅れて申し訳ない」との旨をメールで連絡はいただけた。
ソラコムから購入するには、ソラコムアカウントの作成が必要。詳しくは、SORACOM Air for Sigfox の利用方法を参照。
Sigfoxとは
フランスで設立された通信事業者のSIGFOX社が提供する、低価格・低消費電力・長距離伝送を特長とした、グローバルIoTネットワーク。日本ではKCCSが展開している。
LPWA(Low Power=省電力、Wide Area=広域エリア)ネットワークの一つ。
アンライセンス系 LPWAと言われ、類似の通信規格にはLoRaWANがある。
ライセンス系 LPWAには、LTE Cat0, M1, NB-IoTがあるそう。
このあたりはソラコムの解説(LPWA とは? Low Power Wide Area)が詳しい。
Sigfoxについてはこちらも参照。
設定方法
※シールドを取り付けるArduino本体は別途必要。今回は、Arduino Uno R3を別途購入した。
ソラコムのSigfox Shield for Arduino をセットアップするに従えば設定が完了する。
ただ、↑のページで「Sigfox デバイスの受取確認をする」とあるが、「受け取り確認」ボタンを押すとエラーが発生して出来なかった※。
そのため、「Sigfoxデバイス管理」メニューから、「SigfoxデバイスID」と「PAC(Porting Authorization Code)」を自分で登録した。
デバイスIDと、PACは、Arduinoシールド本体に貼ってあるQRコードを読み込むと分かる(KCCSの取扱説明書参照)。
ただQRコードが小さく、読み取りに苦戦。。自分のスマホでは読み取れず、手持ちの別スマホで無事読み取れた。スマホが古いのが悪かったのか読み取りアプリが悪かったのか。。
あと↑のページを見る限り、「受け取り確認」を押すと、「Sigfox-Harvest」という「Sigfoxグループ」が作られ、デバイスに自動で紐づけしてくれている?ようなのだが、これも自分で設定した。(コンソールから勘でやった)
※エラーメッセージを残すの忘れてた。。手動登録後は「サーバ内部でエラーが発生しました。メッセージ:400 Bad Request」と出る。初めは、「nullがどうたらこうたら。。」みたいなメッセージだった気がする)
シールドのセットアップは簡単。アンテナを付けて、Arduinoに差せば終わり。
Arduino 開発環境は、Arduino Desktop IDEの、Windows版インストーラーをダウンロードし、全てデフォルト設定のまま、Windows10にインストールした。
バージョンは、ARDUINO 1.8.5。
あとは、Sigfox Shield for Arduino をセットアップするのページ通りサンプルスケッチを実行すれば、気温データの送信に成功し、SORACOM Harvestで確認ができた。
データ収集・蓄積が不要であれば、SORACOM HarvestはOFFのままでよいと思う。
データの転送をしたいならば、SORACOM Beam などを使う必要がある。
RaspberryPi3にCloudPiをインストールしてNAT越え
スイッチサイエンスここを参考にした。
CloudPiの購入
CloudPiはここに購入リンクがあり、スイッチサイエンス経由かAmazon経由で購入できる。
購入すると、UIDが割り振られたカードが送付される。
このUIDを、RaspberryPi3にインストールするCloudPiモジュールの設定ファイルに記入することで、CloudPiの機能を使うことが出来る。
RaspberryPi3にインストール
$ mkdir /home/pi/cloudpi $ cd /home/pi/cloudpi
参考サイトでは、CloudPi用のサーバモジュールをwgetで取得しているが、リンク切れしているため、ここから、Raspberry Pi版のモジュール(p2ptunnel_v100.tar.gz)をダウンロードする。
ダウンロードしたモジュールは、WinSCPなどを使ってRaspberryPiに配置し、解凍する。
$ tar zxvf p2ptunnel_v100.tar.gz
解凍してできたディレクトリに移動し、次のコマンドを実行
$ cd p2ptunnel $ sudo cp cloudpi /etc/init.d/cloudpi $ sudo chmod 755 /etc/init.d/cloudpi
UIDを確認し、cloudpi.confファイルを編集してUIDとパスワードを記述します。
$ vi cloudpi.conf
以下の記述をcloudpi.confに記入する。
uid=割り振られたUID password=任意のパスワード
記入後、以下のコマンドを実行し、RaspberryPi3起動時に自動でCloudPiが起動するようにする。
$ sudo insserv cloudpi
以下のコマンドで、CloudPiを起動する。
$ service cloudpi start
クライアント側の設定
Windows PCからRaspberryPi3へ、NAT越えでssh接続する。
ここから、Windows版クライアントモジュールをダウンロードし、解凍する。
解凍したフォルダ内の「P2PTunnel.exe」を実行する。
「追加」ボタンを押すと、 任意の名前、割り振られたUID、サーバモジュールで決めたパスワード、を記述し、「ポート設定」を押す。
今回は、sshで接続を行うため、ローカルの2222番ポートに、RaspberryPi3の22番ポートを割り当てる。
設定後、「接続」ボタンを押すと、localhostの2222番が、RaspberryPi3の22番ポートとして使えるようになっている。(NAT越えも可能)
teratermなどで、localhost:2222でログインできれば成功。
RaspberryPi3上でWEBサーバなどを起動している場合も、ポートを適切に設定しておけば、localhostでWindowsPCからアクセスできるようになる。
RaspberryPi3からAWS IoTへセンサデータの送信
AWS IoT SDK for JavaScriptのインストール
githubのREADMEを参考に、AWS IoT SDKをインストール
nodejsのバージョンは
node -v v4.4.4
npm install aws-iot-device-sdk cd /home/pi (任意のディレクトリでOK) git clone https://github.com/aws/aws-iot-device-sdk-js.git cd aws-iot-device-sdk-js npm install
センサー情報の取得
センサー情報はGrove+を使って取得する
AWS IoT SDKはnode.jsを使っているけれど、センサーデータはpythonで取得する
まずはpythonのコードを作成する
#!/usr/bin/env python import sys sys.path.append('/home/pi/Desktop/GrovePi/Software/Python/') import time import datetime from grovepi import * import serial # for GPS ser = serial.Serial('/dev/ttyAMA0', 9600, timeout = 0) #Open the serial port at 9600 baud ser.flush() class GPS: inp=[] GGA=[] def read(self): while True: GPS.inp=ser.readline() if GPS.inp[:6] =='$GPGGA': # GGA data , packet 1, has all the data we need break time.sleep(0.1) try: ind=GPS.inp.index('$GPGGA',5,len(GPS.inp)) GPS.inp=GPS.inp[ind:] except ValueError: hoge="fuga" GPS.GGA=GPS.inp.split(",") return [GPS.GGA] def vals(self): time=GPS.GGA[1] lat=GPS.GGA[2] lat_ns=GPS.GGA[3] long=GPS.GGA[4] long_ew=GPS.GGA[5] fix=GPS.GGA[6] sats=GPS.GGA[7] alt=GPS.GGA[9] return [time,fix,sats,alt,lat,lat_ns,long,long_ew] # for Japan Timestamp class JapanTZ(datetime.tzinfo): def tzname(self, dt): return "JST" def utcoffset(self, dt): return datetime.timedelta(hours=9) def dst(self, dt): return datetime.timedelta(0) ptemperature = 3 pmoisture = 0 plight = 1 gps = GPS() pinMode(ptemperature,"INPUT") pinMode(pmoisture,"INPUT") pinMode(plight,"INPUT") time.sleep(1) [temperature, humidity] = dht(ptemperature, 1) moisture = analogRead(pmoisture) light = analogRead(plight) gps.read() [t,fix,sats,alt,lat,lat_ns,long,long_ew] = gps.vals() try: latitude = str(float(lat)/100) longitude = str(float(long)/100) except ValueError: latitude = 0 longitude = 0 time.sleep(.5) print("{0},{1},{2},{3},{4},{5},{6}".format(datetime.datetime.now(JapanTZ()).strftime('%Y-%m-%dT%H:%M:%S+09:00'), temperature, humidity, moisture, light, latitude, longitude)) ser.close()
次にnode.jsのコード
5分間隔でセンサーデータを取得し、AWS IoTクラウドへ送信する
private.pem.key、certificate.pem.crt、root-CA.crtは、AWS IoTのクラウドコンソールからダウンロードしておき、任意のディレクトリに配置しておく
pythonコードの実行は、child_processのexecファンクションを使用する(pythonの標準出力を、execファンクションに設定したコールバック関数の引数として受け取れる)
var awsIot = require('aws-iot-device-sdk'); var sys = require('sys') var exec = require('child_process').exec; var device = awsIot.device({ privateKey: './certs/private.pem.key', clientCert: './certs/certificate.pem.crt', caCert: './certs/root-CA.crt', clientId: 'test_client', region: 'ap-northeast-1' }); device.on('connect', function() { console.log('Connect!!'); setInterval(function() { exec("python /path to python code", function(error, stdout, stderr) { if (error !== null) { console.log('exec error: ' + error); return } var data = stdout.replace(/\r?\n/g,""); var datas = data.split(",") var record = { deviceid: "pi_01", timestamp: datas[0], temperature: Number(datas[1]), humidity: Number(datas[2]), moisture: Number(datas[3]), light: Number(datas[4]), location: datas[5] + "," + datas[6] }; var message = JSON.stringify(record); console.log("Publish: " + message); device.publish('pi_test', message); }); return; }, 300000); });