Cloud Dataflowを使ってGrovePi+のセンサデータをウィンドウ集計(Java編)
↓の記事でRaspberryPi3からCloud IoTへ送信したGrovePi+のセンサデータを、Cloud Dataflowでウィンドウ集計してみた。
kmth23.hatenablog.com
Cloud Dataflow のJavaのクイックスタートを参考に進める。
Java と Apache Maven を使用したクイックスタート | Cloud Dataflow のドキュメント | Google Cloud
まず、mavenでプロジェクトを作成。
mvn archetype:generate \
-DgroupId=com.example \
-DartifactId=dev-dataflow \
-Dversion="0.1" \
-DinteractiveMode=false \
-Dpackage=com.example
サンプルを参考にしつつ、pom.xmlは下記のようにした。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>dev-dataflow</artifactId>
<packaging>jar</packaging>
<version>0.1</version>
<name>dev-dataflow</name>
<url>http://maven.apache.org</url>
<properties>
<gson.version>2.8.2</gson.version>
<dataflow.version>2.2.0</dataflow.version>
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<guava.version>20.0</guava.version>
<pubsub.version>v1-rev10-1.22.0</pubsub.version>
</properties>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>${dataflow.version}</version>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
<version>${pubsub.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Javaのコードは下記。
Cloud IoTへ送信し、Cloud Pub/Subへ入ったセンサデータをDataflowから取得し、そこから気温データを抜き出す。
この気温データについて、2分毎の合計値を、2分毎にログ出力させた。(特に目的はないが。。お試しで)
package com.example;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import com.google.gson.Gson;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class App
{
public static void main( String[] args )
{
DataflowPipelineOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.create()
.as(DataflowPipelineOptions.class);
options.setStreaming(true);
options.setJobName("test");
Pipeline p = Pipeline.create(options);
PCollection<KV<String, Double>> scores = p.apply(PubsubIO.readStrings().fromTopic("projects/project-id/topics/raspi3"))
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(2))))
.apply(ParDo.of(new PubSubToNumFn()))
.apply(Sum.<String>doublesPerKey())
.apply(ParDo.of(new LoggingFn()));
p.run();
}
public static class LoggingFn extends DoFn<KV<String, Double>, KV<String, Double>> {
private static final Logger LOG = LoggerFactory.getLogger(LoggingFn.class);
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, Double> kv = c.element();
double sum = kv.getValue();
LOG.info(String.valueOf(sum));
c.output(kv);
}
}
}
package com.example;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.values.KV;
import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PubSubToNumFn extends DoFn<String, KV<String, Double>> {
private final Logger LOG = LoggerFactory.getLogger(PubSubToNumFn.class);
@ProcessElement
public void processElement(ProcessContext c) {
String json = c.element();
Gson gson = new Gson();
LOG.info(json);
SensorData data = gson.fromJson(json, SensorData.class);
if (data == null) {
LOG.error("data is null");
} else {
LOG.info(data.getTemperature());
}
c.output(KV.of("temperature", Double.valueOf(data.getTemperature())));
}
}
package com.example;
public class SensorData {
public String temperature;
public SensorData(String temperature) {
this.temperature = temperature;
}
public String getTemperature() {
return this.temperature;
}
}
実行は↓
mvn compile exec:java -Dexec.mainClass=com.example.App -Dexec.args="--project=<<project-id>> --tempLocation=gs://path/to/temp/ --stagingLocation=gs://path/to/staging/ --runner=DataflowRunner"
maven exec:javaのコマンドライン引数で、「--project=」のような形でクラウドの情報を入れて実行すると、コードの中でPipelineOptionsFactoryに引き渡され、Pipelineをrun()するとCloud Dataflowに自動でデプロイしてくれる。
Cloud Dataflowへデプロイし、コンソールで実行が確認出来たら、センサーデータを送信する。
センサデータの送信は↓でやった通りの手順。30秒単位で送信するため、setIntervalの第二引数は30000として実行した。
kmth23.hatenablog.com
Cloud Dataflowのコンソール上で、気温の合計値が、2分毎にログ出力されれば成功。
過去のQiitaのBeamの記事などを見ながらだと、SDKのバージョンが違うため仕様が変わっており、コンパイルエラーになることが多かった。 公式のドキュメントを見ながら進めるのが、結局最短で学習できるかもしれない。
RaspberryPi3からGoogle Cloud IoTへGrovePi+のセンサデータを送信
RaspberryPi3につないだGrovePi+のセンサデータをGCP Cloud IoTへ送信した。
以下、Cloud IoTのQuickStart(https://cloud.google.com/iot/docs/quickstart?hl=ja)に従って進める。
※Rasberry Pi3はModel B(+ではない)。OSはraspbian。
cat /etc/debian_version =>8.0
事前準備
プロジェクト作成(既存でもOK)、課金を有効、Cloud IoT Core and Cloud Pub/Sub API(複数)を有効にする。
デバイス側の設定
Google Cloud SDKインストール
手順:https://cloud.google.com/sdk/docs/?hl=ja#deb
「Debian、Ubuntu」タブの内容に従いインストールする。
※毎回exportするのは面倒なので、今回は、~/.profileに追記した。また、追加コンポーネントのインストールはなし。
echo "export CLOUD_SDK_REPO=\"cloud-sdk-$(lsb_release -c -s)\"" >> ~/.profile exec bash -l echo "deb http://packages.cloud.google.com/apt $CLOUD_SDK_REPO main" | sudo tee -a /etc/apt/sources.list.d/google-cloud-sdk.list curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add - sudo apt-get update && sudo apt-get install google-cloud-sdk gcloud init
gcloud initを実行すると、ブラウザが開いてGoogleへのログインを求められる。
事前準備で作成したプロジェクトのアカウントでログインすると、Google Cloud SDKが許可を求める画面に遷移するので、「許可」ボタンを押すと認証完了。
コンソールに戻ると、対話形式で、使用するプロジェクト、リージョン/ゾーンを聞かれる。
今回は、事前準備で作成したプロジェクト、asia-northeast1-b(東京リージョンのbゾーン)、を選択した。
※リージョン/ゾーンによって、マシンのスペックも変わるので注意(選択可能なリージョン/ゾーン一覧:https://cloud.google.com/compute/docs/regions-zones/regions-zones#available)。
Node.jsインストール
インストールは下記記事を参照。2018/03/22現在、最新のLTSはv8.10.0 kmth23.hatenablog.com
node -v =>v8.10.0 npm -v =>5.6.0
デバイスの登録
Cloud IoTのコンソール(https://console.cloud.google.com/iot?hl=ja)で作業。
まず「レジストリ」を作成し、「レジストリ」にデバイスを追加する。
「Create device registry.」ボタンを押して下記のように設定。
- レジストリ ID: 任意の名前
- Region: asia-east1
- プロトコル: MQTTとHTTP
- デフォルトのテレメトリーのトピック: トピックを作成⇒任意の名前
- デバイス状態のトピック: なし
- CA 証明書: 設定しない
「作成」ボタンを押す。(これでレジストリの作成が完了)
遷移したページで「端末を追加」ボタンを押して下記のように設定。
「追加」ボタンを押す。(これでデバイスの登録が完了) コンソールはこのまま開いておく。
デバイスに公開鍵を追加
opensslがない場合は、事前にインストールする。 下記のコマンドで、rsa_cert.pem(公開鍵)、rsa_private.pem(秘密鍵)を作成する。
cd /tmp openssl req -x509 -newkey rsa:2048 -keyout rsa_private.pem -nodes -out rsa_cert.pem -subj "/CN=unused"
開いたままにしておいたコンソールで「公開鍵を追加」ボタンを押し、rsa_cert.pemの内容をコピーして張り付ける。
- 入力方法: 手動で入力
- 公開鍵の形式: RS256_X509
- 公開鍵の有効期限: 設定しない
「追加」ボタンを押す。
サンプルで動作確認
サンプルをgit cloneで取得する。gitがない場合は、事前にインストールする。
下記コマンドについては、これまでに作成した情報に従い、<<PROJECT_ID>>をプロジェクトID、<<TOPIC_NAME>>をトピック名、<<REGISTRY_ID>>をレジストリID、<<DEVICE_ID>>をデバイスIDに置き換えること。
<<任意のサブスクリプション名>>は、任意のサブスクリプション名に置き換えること。
また、jsファイルの実行時には、--cloudRegionの指定を忘れないこと(デフォルトはus-central1。今回はasia-east1にしたので指定が必要)。
--numMessagesで送信するデータ数を指定できる。今回はテストのため1メッセージだけ送信する。
cd 任意の作業dir
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples
cd nodejs-docs-samples/iot/mqtt_example
cp /tmp/rsa_private.pem .
npm install
gcloud pubsub subscriptions create \
projects/<<PROJECT_ID>>/subscriptions/<<任意のサブスクリプション名>> \
--topic=projects/<<PROJECT_ID>>/topics/<<TOPIC_NAME>>
node cloudiot_mqtt_example_nodejs.js \
--projectId=<<PROJECT_ID>> \
--registryId=<<REGISTRY_ID>> \
--deviceId=<<DEVICE_ID>> \
--privateKeyFile=rsa_private.pem \
--numMessages=1 \
--algorithm=RS256 \
--cloudRegion=asia-east1
gcloud pubsub subscriptions pull --auto-ack \
projects/<<PROJECT_ID>>/subscriptions/<<任意のサブスクリプション名>>
jsファイルの実行で、1つのデータがpublishされる。gcloud pubsub subscriptions pullコマンドでsubscribeできれば成功。
センサー情報の取得
AWS IoTにデータ送信したときと同じ仕組みを使う。 kmth23.hatenablog.com
まず、pythonでGrovePi+につないだセンサー情報を取得する。
次に、動作確認で使ったJavaScriptコードを参考にして、mqttでCloud IoTへデータを送信するコードを書く。
pythonコードの実行は、AWS IoTの時と同様、child_processのexecファンクションを使用する。
まずは、package.jsonを作成し、必要なライブラリをインストール。
{
"name": "grovepi-test",
"version": "0.0.1",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"jsonwebtoken": "7.4.1",
"mqtt": "2.15.0",
"yargs": "8.0.2"
},
"devDependencies": {}
}
npm install
次に、実行ファイルを、index.jsとして作成。
5分間隔でセンサーデータを取得し、Cloud IoTへ送信する。
※pythonコードは、/path/to/python/script.py。
// This software includes the work that is distributed in the Apache License 2.0
'use strict';
const fs = require('fs');
const jwt = require('jsonwebtoken');
const mqtt = require('mqtt');
const exec = require('child_process').exec;
var argv = require(`yargs`)
.options({
projectId: {
default: process.env.GCLOUD_PROJECT || process.env.GOOGLE_CLOUD_PROJECT,
description: 'The Project ID to use. Defaults to the value of the GCLOUD_PROJECT or GOOGLE_CLOUD_PROJECT environment variables.',
requiresArg: true,
type: 'string'
},
cloudRegion: {
default: 'us-central1',
description: 'GCP cloud region.',
requiresArg: true,
type: 'string'
},
registryId: {
description: 'Cloud IoT registry ID.',
requiresArg: true,
demandOption: true,
type: 'string'
},
deviceId: {
description: 'Cloud IoT device ID.',
requiresArg: true,
demandOption: true,
type: 'string'
},
privateKeyFile: {
description: 'Path to private key file.',
requiresArg: true,
demandOption: true,
type: 'string'
},
algorithm: {
description: 'Encryption algorithm to generate the JWT.',
requiresArg: true,
demandOption: true,
choices: ['RS256', 'ES256'],
type: 'string'
},
mqttBridgeHostname: {
default: 'mqtt.googleapis.com',
description: 'MQTT bridge hostname.',
requiresArg: true,
type: 'string'
},
mqttBridgePort: {
default: 8883,
description: 'MQTT bridge port.',
requiresArg: true,
type: 'number'
},
messageType: {
default: 'events',
description: 'Message type to publish.',
requiresArg: true,
choices: ['events', 'state'],
type: 'string'
}
})
.example(`node $0 cloudiot_mqtt_example_nodejs.js --projectId=blue-jet-123 \\\n\t--registryId=my-registry --deviceId=my-node-device \\\n\t--privateKeyFile=../rsa_private.pem --algorithm=RS256 \\\n\t --cloudRegion=us-central1`)
.wrap(120)
.recommendCommands()
.epilogue(`For more information, see https://cloud.google.com/iot-core/docs`)
.help()
.strict()
.argv;
function createJwt (projectId, privateKeyFile, algorithm) {
const token = {
'iat': parseInt(Date.now() / 1000),
'exp': parseInt(Date.now() / 1000) + 20 * 60, // 20 minutes
'aud': projectId
};
const privateKey = fs.readFileSync(privateKeyFile);
return jwt.sign(token, privateKey, { algorithm: algorithm });
}
const mqttClientId = `projects/${argv.projectId}/locations/${argv.cloudRegion}/registries/${argv.registryId}/devices/${argv.deviceId}`;
const mqttTopic = `/devices/${argv.deviceId}/${argv.messageType}`;
let connectionArgs = {
host: argv.mqttBridgeHostname,
port: argv.mqttBridgePort,
clientId: mqttClientId,
username: 'unused',
password: createJwt(argv.projectId, argv.privateKeyFile, argv.algorithm),
protocol: 'mqtts',
secureProtocol: 'TLSv1_2_method'
};
let client = mqtt.connect(connectionArgs);
client.subscribe(`/devices/${argv.deviceId}/config`);
client.on('connect', (success) => {
console.log('connect');
if (!success) {
console.log('Client not connected...');
} else {
setInterval(() => {
exec('python /path/to/python/script.py', (error, stdout, stderr) => {
if (error !== null) {
console.log('exec error: ' + error);
return
}
var data = stdout.replace(/\r?\n/g,"");
var datas = data.split(",")
var record = {
registryid: argv.registryId,
deviceid: argv.deviceId,
timestamp: datas[0],
temperature: Number(datas[1]),
humidity: Number(datas[2]),
moisture: Number(datas[3]),
light: Number(datas[4]),
location: datas[5] + "," + datas[6]
};
const payload = JSON.stringify(record);
console.log("Publish: " + payload);
client.publish(mqttTopic, payload, { qos: 1 });
});
return;
}, 300000);
}
});
client.on('close', () => {
console.log('close');
});
client.on('error', (err) => {
console.log('error', err);
});
client.on('message', (topic, message, packet) => {
console.log('message received: ', Buffer.from(message, 'base64').toString('ascii'));
});
client.on('packetsend', () => {
// Note: logging packet send is very verbose
});
実行コマンドは下記。rsa_private.pemはあらかじめコピーしておくこと。
cp /tmp/rsa_private.pem .
npm start \
--projectId=<<PROJECT_ID>> \
--registryId=<<REGISTRY_ID>> \
--deviceId=<<DEVICE_ID>> \
--privateKeyFile=rsa_private.pem \
--algorithm=RS256 \
--cloudRegion=asia-east1
subscribeすると、下記のようなデータが取得できるはず。
gcloud pubsub subscriptions pull --auto-ack \
projects/<<PROJECT_ID>>/subscriptions/<<任意のサブスクリプション名>>
=>{
"registryid":"<<REGISTRY_ID>>",
"deviceid":"<<DEVICE_ID>>",
"timestamp":"2018-03-23T16:19:48+09:00",
"temperature":25.2,
"humidity":30.4,
"moisture":0,
"light":206
}
ストリーム処理を勉強してみた
素人ながら興味があったので少し勉強した。
以下はその際のメモ。
基本的には、↓のqiitaとslideshareを最初のインプットにさせていただいて、自分なりに調べていった。
https://qiita.com/kimutansk/items/60e48ec15e954fa95e1c
https://www.slideshare.net/SotaroKimura/ss-72769963
ストリーム処理の歴史
slideshareより抜粋。
2011年~ Lambdaアーキテクチャ
2013年~ Kappaアーキテクチャ
2015年~ Dataflowモデル
Lambdaアーキテクチャについては、オライリー本が日本語で出ている。

スケーラブルリアルタイムデータ分析入門 ―ラムダアーキテクチャによるビッグデータ処理
- 作者: Nathan Marz,James Warren,伊藤真浩,木下哲也
- 出版社/メーカー: オライリージャパン
- 発売日: 2016/08/24
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (2件) を見る
これは近年ではあまり使われていないのかもしれないが、素人が勉強するには、知っておいて損はない情報が詰まっていると思う。
ツールとしてはApache Stormが使われており、古い感じもするが、Spark streamingやWindow集計についても、(コラム程度だが)既に触れられていた。
日本語情報で体系的に学べる数少ない資料だと思う。
Kappaアーキテクチャについては、正直よく分からない。(日本語の情報が少ない)
公式サイトが存在しており、URLは↓。
http://milinda.pathirage.org/kappa-architecture.com/
「KappaアーキテクチャはLambdaアーキテクチャからバッチ処理を除去したようなもの」と説明されている。
基本的には、Apache Kafkaのような複数サブスクライブができるキューにデータを集約し、ストリーミング処理はキューからデータを取得して実施するらしい。
ストリーム処理システムとしては、Apache Storm、Apache Spark、Kafka Stream、Apache Flinkなどが挙げられている。
Kafkaに集約して、(分散バッチ処理は行わず)ストリーム処理を行う形は、よく見かける気がするけど、もともとはKappaアーキテクチャからきているのだろうか?
また、Flinkがツールとして挙げられているので、後述のDataflowモデルのOSSであるApache Beamと連携させても良いのかもしれない。
(slideshareでも、DataflowモデルはLambdaアーキテクチャやKappaアーキテクチャを置き換えるものではないと説明されている)
Dataflowモデルについても、同じ方がqiitaにまとめてくれている。
https://qiita.com/kimutansk/items/d6daca473440462634a0
また、(情報が古いが)Cloud DataflowとApache Sparkとの比較記事も分かりやすいと思う。
https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison?hl=ja
どちらの記事にもリンクがある、
The World Beyond Batch: Streaming 101
The World Beyond Batch: Streaming 102
や、
VLDB 2015 の Dataflow モデル の論文
の内容が基本的な考え方になっているよう。
Dataflowモデルを使うには、GCPのCloud Dataflowを使うのが一番早いと思う。
コードの実装には、Apache BeamがOSSとして提供されている。
Beamはエンジンとして、別のOSSのApache FlinkやApache Apexなどが利用でき、GCPのCloud Dataflowもエンジンとして使える。
そのため、Apache Beamでコードを書けば、GCPでも、オンプレミスでもストリーム処理を行うことができる。
※GCPで使うなら、Cloud Dataflow SDKを使う方が、機能が豊富になる。
https://cloud.google.com/dataflow/docs/installing-dataflow-sdk?hl=ja
Apache Beamを使った実装などの例は、情報がある程度多くあり、Window集計についての説明も色々な方がまとめてくれている印象。
実装はバージョンごとに差異もありそうなので、注意が必要そうだが、Raspberry Piのセンサデータを、Apache Beamでストリーム処理するようなことをやってみたいと思う。
RaspberryPi3にmavenをインストール
環境変数JAVA_HOMEを設定する必要がある。
Javaはインストール済みのため、JAVA_HOMEの場所を調べる。
which java =>/usr/bin/java ls -l /usr/bin/java =>/usr/bin/java -> /etc/alternatives/java ls -l /etc/alternatives/java =>/etc/alternatives/java -> /usr/lib/jvm/jdk-8-oracle-arm32-vfp-hflt/jre/bin/java
jdkの場所(/usr/lib/jvm/jdk-8-oracle-arm32-vfp-hflt)をJAVA_HOMEとして環境変数に設定する。
echo "export JAVA_HOME=/usr/lib/jvm/jdk-8-oracle-arm32-vfp-hflt" >> ~/.profile source ~/.profile
mavenをインストールする。2018/3/25現在の最新は、v3.5.3。
cd /tmp wget http://ftp.kddilabs.jp/infosystems/apache/maven/maven-3/3.5.3/binaries/apache-maven-3.5.3-bin.tar.gz tar xzvf apache-maven-3.5.3-bin.tar.gz sudo mv apache-maven-3.5.3 /opt echo "export PATH=/opt/apache-maven-3.5.3/bin:$PATH" >> ~/.profile source ~/.profile
確認
mvn -v =>Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00) Maven home: /opt/apache-maven-3.5.3 Java version: 1.8.0_65, vendor: Oracle Corporation Java home: /usr/lib/jvm/jdk-8-oracle-arm32-vfp-hflt/jre Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "4.4.8-v7+", arch: "arm", family: "unix"
Sigfox Shield for Arduino (UnaShield) を使ってセンサー情報をSORACOMへ送信
京セラコミュニケーションシステム株式会社(KCCS)がSigfoxをArduinoで使用できるシールドを発売した。
加速度センサ(MMA8451Q)、温湿度・気圧センサ(BME280)を搭載している。
ソラコムと、スイッチサイエンスで販売しているが、今回はソラコムから購入した。(ソラコムの方が400円ほど高いので注意)
いずれから購入した場合も、1年間のSigfox回線利用料が含まれている。
シールドのみの販売で、Arduino本体は別売りなので注意。また2年目以降は、別途通信料が発生する。
スイッチサイエンスから購入すると、センサー情報をSigfox社のクラウドに送信・蓄積ができる。クラウドWebは日本語対応してない?が、データは1年保存してくれる(Sigfoxクラウドについて参照)。2年目以降は別途Sigfox契約が必要らしいが、どのように契約したらいいのかはよくわからない。。(KCCSの取扱説明書と、スイッチサイエンスの商品紹介を参照)
一方、ソラコムから購入すると、ソラコムAirなどと同じように、ユーザーコンソールで利用ができ、2年目以降も継続利用可能(料金は発生する)。ただしデータ蓄積はSORACOM Harvestの利用が必要で、データ保存期間は40日のみ。SORACOM Beamなどを使えば、AWSなどにデータ転送することは可能。
安く済むのはスイッチサイエンス版かもしれないが、ソラコムのアカウントは既に持っており、コンソールも使いやすかったので、今回はソラコムにした。
品薄だったのか、発注から手元に届くまで9日かかった。ただ4日目に、発送予定日と「遅れて申し訳ない」との旨をメールで連絡はいただけた。
ソラコムから購入するには、ソラコムアカウントの作成が必要。詳しくは、SORACOM Air for Sigfox の利用方法を参照。
Sigfoxとは
フランスで設立された通信事業者のSIGFOX社が提供する、低価格・低消費電力・長距離伝送を特長とした、グローバルIoTネットワーク。日本ではKCCSが展開している。
LPWA(Low Power=省電力、Wide Area=広域エリア)ネットワークの一つ。
アンライセンス系 LPWAと言われ、類似の通信規格にはLoRaWANがある。
ライセンス系 LPWAには、LTE Cat0, M1, NB-IoTがあるそう。
このあたりはソラコムの解説(LPWA とは? Low Power Wide Area)が詳しい。
Sigfoxについてはこちらも参照。
設定方法
※シールドを取り付けるArduino本体は別途必要。今回は、Arduino Uno R3を別途購入した。
ソラコムのSigfox Shield for Arduino をセットアップするに従えば設定が完了する。
ただ、↑のページで「Sigfox デバイスの受取確認をする」とあるが、「受け取り確認」ボタンを押すとエラーが発生して出来なかった※。
そのため、「Sigfoxデバイス管理」メニューから、「SigfoxデバイスID」と「PAC(Porting Authorization Code)」を自分で登録した。
デバイスIDと、PACは、Arduinoシールド本体に貼ってあるQRコードを読み込むと分かる(KCCSの取扱説明書参照)。
ただQRコードが小さく、読み取りに苦戦。。自分のスマホでは読み取れず、手持ちの別スマホで無事読み取れた。スマホが古いのが悪かったのか読み取りアプリが悪かったのか。。
あと↑のページを見る限り、「受け取り確認」を押すと、「Sigfox-Harvest」という「Sigfoxグループ」が作られ、デバイスに自動で紐づけしてくれている?ようなのだが、これも自分で設定した。(コンソールから勘でやった)
※エラーメッセージを残すの忘れてた。。手動登録後は「サーバ内部でエラーが発生しました。メッセージ:400 Bad Request」と出る。初めは、「nullがどうたらこうたら。。」みたいなメッセージだった気がする)
シールドのセットアップは簡単。アンテナを付けて、Arduinoに差せば終わり。
Arduino 開発環境は、Arduino Desktop IDEの、Windows版インストーラーをダウンロードし、全てデフォルト設定のまま、Windows10にインストールした。
バージョンは、ARDUINO 1.8.5。
あとは、Sigfox Shield for Arduino をセットアップするのページ通りサンプルスケッチを実行すれば、気温データの送信に成功し、SORACOM Harvestで確認ができた。
データ収集・蓄積が不要であれば、SORACOM HarvestはOFFのままでよいと思う。
データの転送をしたいならば、SORACOM Beam などを使う必要がある。
RaspberryPi3にCloudPiをインストールしてNAT越え
スイッチサイエンスここを参考にした。
CloudPiの購入
CloudPiはここに購入リンクがあり、スイッチサイエンス経由かAmazon経由で購入できる。
購入すると、UIDが割り振られたカードが送付される。
このUIDを、RaspberryPi3にインストールするCloudPiモジュールの設定ファイルに記入することで、CloudPiの機能を使うことが出来る。
RaspberryPi3にインストール
$ mkdir /home/pi/cloudpi $ cd /home/pi/cloudpi
参考サイトでは、CloudPi用のサーバモジュールをwgetで取得しているが、リンク切れしているため、ここから、Raspberry Pi版のモジュール(p2ptunnel_v100.tar.gz)をダウンロードする。
ダウンロードしたモジュールは、WinSCPなどを使ってRaspberryPiに配置し、解凍する。
$ tar zxvf p2ptunnel_v100.tar.gz
解凍してできたディレクトリに移動し、次のコマンドを実行
$ cd p2ptunnel $ sudo cp cloudpi /etc/init.d/cloudpi $ sudo chmod 755 /etc/init.d/cloudpi
UIDを確認し、cloudpi.confファイルを編集してUIDとパスワードを記述します。
$ vi cloudpi.conf
以下の記述をcloudpi.confに記入する。
uid=割り振られたUID password=任意のパスワード
記入後、以下のコマンドを実行し、RaspberryPi3起動時に自動でCloudPiが起動するようにする。
$ sudo insserv cloudpi
以下のコマンドで、CloudPiを起動する。
$ service cloudpi start
クライアント側の設定
Windows PCからRaspberryPi3へ、NAT越えでssh接続する。
ここから、Windows版クライアントモジュールをダウンロードし、解凍する。
解凍したフォルダ内の「P2PTunnel.exe」を実行する。
「追加」ボタンを押すと、 任意の名前、割り振られたUID、サーバモジュールで決めたパスワード、を記述し、「ポート設定」を押す。
今回は、sshで接続を行うため、ローカルの2222番ポートに、RaspberryPi3の22番ポートを割り当てる。
設定後、「接続」ボタンを押すと、localhostの2222番が、RaspberryPi3の22番ポートとして使えるようになっている。(NAT越えも可能)
teratermなどで、localhost:2222でログインできれば成功。
RaspberryPi3上でWEBサーバなどを起動している場合も、ポートを適切に設定しておけば、localhostでWindowsPCからアクセスできるようになる。
RaspberryPi3からAWS IoTへセンサデータの送信
AWS IoT SDK for JavaScriptのインストール
githubのREADMEを参考に、AWS IoT SDKをインストール
nodejsのバージョンは
node -v v4.4.4
npm install aws-iot-device-sdk cd /home/pi (任意のディレクトリでOK) git clone https://github.com/aws/aws-iot-device-sdk-js.git cd aws-iot-device-sdk-js npm install
センサー情報の取得
センサー情報はGrove+を使って取得する
AWS IoT SDKはnode.jsを使っているけれど、センサーデータはpythonで取得する
まずはpythonのコードを作成する
#!/usr/bin/env python import sys sys.path.append('/home/pi/Desktop/GrovePi/Software/Python/') import time import datetime from grovepi import * import serial # for GPS ser = serial.Serial('/dev/ttyAMA0', 9600, timeout = 0) #Open the serial port at 9600 baud ser.flush() class GPS: inp=[] GGA=[] def read(self): while True: GPS.inp=ser.readline() if GPS.inp[:6] =='$GPGGA': # GGA data , packet 1, has all the data we need break time.sleep(0.1) try: ind=GPS.inp.index('$GPGGA',5,len(GPS.inp)) GPS.inp=GPS.inp[ind:] except ValueError: hoge="fuga" GPS.GGA=GPS.inp.split(",") return [GPS.GGA] def vals(self): time=GPS.GGA[1] lat=GPS.GGA[2] lat_ns=GPS.GGA[3] long=GPS.GGA[4] long_ew=GPS.GGA[5] fix=GPS.GGA[6] sats=GPS.GGA[7] alt=GPS.GGA[9] return [time,fix,sats,alt,lat,lat_ns,long,long_ew] # for Japan Timestamp class JapanTZ(datetime.tzinfo): def tzname(self, dt): return "JST" def utcoffset(self, dt): return datetime.timedelta(hours=9) def dst(self, dt): return datetime.timedelta(0) ptemperature = 3 pmoisture = 0 plight = 1 gps = GPS() pinMode(ptemperature,"INPUT") pinMode(pmoisture,"INPUT") pinMode(plight,"INPUT") time.sleep(1) [temperature, humidity] = dht(ptemperature, 1) moisture = analogRead(pmoisture) light = analogRead(plight) gps.read() [t,fix,sats,alt,lat,lat_ns,long,long_ew] = gps.vals() try: latitude = str(float(lat)/100) longitude = str(float(long)/100) except ValueError: latitude = 0 longitude = 0 time.sleep(.5) print("{0},{1},{2},{3},{4},{5},{6}".format(datetime.datetime.now(JapanTZ()).strftime('%Y-%m-%dT%H:%M:%S+09:00'), temperature, humidity, moisture, light, latitude, longitude)) ser.close()
次にnode.jsのコード
5分間隔でセンサーデータを取得し、AWS IoTクラウドへ送信する
private.pem.key、certificate.pem.crt、root-CA.crtは、AWS IoTのクラウドコンソールからダウンロードしておき、任意のディレクトリに配置しておく
pythonコードの実行は、child_processのexecファンクションを使用する(pythonの標準出力を、execファンクションに設定したコールバック関数の引数として受け取れる)
var awsIot = require('aws-iot-device-sdk');
var sys = require('sys')
var exec = require('child_process').exec;
var device = awsIot.device({
privateKey: './certs/private.pem.key',
clientCert: './certs/certificate.pem.crt',
caCert: './certs/root-CA.crt',
clientId: 'test_client',
region: 'ap-northeast-1'
});
device.on('connect', function() {
console.log('Connect!!');
setInterval(function() {
exec("python /path to python code", function(error, stdout, stderr) {
if (error !== null) {
console.log('exec error: ' + error);
return
}
var data = stdout.replace(/\r?\n/g,"");
var datas = data.split(",")
var record = {
deviceid: "pi_01",
timestamp: datas[0],
temperature: Number(datas[1]),
humidity: Number(datas[2]),
moisture: Number(datas[3]),
light: Number(datas[4]),
location: datas[5] + "," + datas[6]
};
var message = JSON.stringify(record);
console.log("Publish: " + message);
device.publish('pi_test', message);
});
return;
}, 300000);
});