Cloud Dataflowを使ってGrovePi+のセンサデータをウィンドウ集計(Java編)
↓の記事でRaspberryPi3からCloud IoTへ送信したGrovePi+のセンサデータを、Cloud Dataflowでウィンドウ集計してみた。
kmth23.hatenablog.com
Cloud Dataflow のJavaのクイックスタートを参考に進める。
Java と Apache Maven を使用したクイックスタート | Cloud Dataflow のドキュメント | Google Cloud
まず、mavenでプロジェクトを作成。
mvn archetype:generate \ -DgroupId=com.example \ -DartifactId=dev-dataflow \ -Dversion="0.1" \ -DinteractiveMode=false \ -Dpackage=com.example
サンプルを参考にしつつ、pom.xmlは下記のようにした。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>dev-dataflow</artifactId> <packaging>jar</packaging> <version>0.1</version> <name>dev-dataflow</name> <url>http://maven.apache.org</url> <properties> <gson.version>2.8.2</gson.version> <dataflow.version>2.2.0</dataflow.version> <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version> <guava.version>20.0</guava.version> <pubsub.version>v1-rev10-1.22.0</pubsub.version> </properties> <build> <pluginManagement> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>${maven-exec-plugin.version}</version> <configuration> <cleanupDaemonThreads>false</cleanupDaemonThreads> </configuration> </plugin> </plugins> </pluginManagement> </build> <dependencies> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>${gson.version}</version> </dependency> <dependency> <groupId>com.google.cloud.dataflow</groupId> <artifactId>google-cloud-dataflow-java-sdk-all</artifactId> <version>${dataflow.version}</version> </dependency> <dependency> <groupId>com.google.apis</groupId> <artifactId>google-api-services-pubsub</artifactId> <version>${pubsub.version}</version> <exclusions> <!-- Exclude an old version of guava that is being pulled in by a transitive dependency of google-api-client --> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava-jdk5</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> </project>
Javaのコードは下記。
Cloud IoTへ送信し、Cloud Pub/Subへ入ったセンサデータをDataflowから取得し、そこから気温データを抜き出す。
この気温データについて、2分毎の合計値を、2分毎にログ出力させた。(特に目的はないが。。お試しで)
package com.example; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import com.google.gson.Gson; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class App { public static void main( String[] args ) { DataflowPipelineOptions options = PipelineOptionsFactory .fromArgs(args) .withValidation() .create() .as(DataflowPipelineOptions.class); options.setStreaming(true); options.setJobName("test"); Pipeline p = Pipeline.create(options); PCollection<KV<String, Double>> scores = p.apply(PubsubIO.readStrings().fromTopic("projects/project-id/topics/raspi3")) .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(2)))) .apply(ParDo.of(new PubSubToNumFn())) .apply(Sum.<String>doublesPerKey()) .apply(ParDo.of(new LoggingFn())); p.run(); } public static class LoggingFn extends DoFn<KV<String, Double>, KV<String, Double>> { private static final Logger LOG = LoggerFactory.getLogger(LoggingFn.class); @ProcessElement public void processElement(ProcessContext c) { KV<String, Double> kv = c.element(); double sum = kv.getValue(); LOG.info(String.valueOf(sum)); c.output(kv); } } }
package com.example; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.ProcessContext; import org.apache.beam.sdk.values.KV; import com.google.gson.Gson; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PubSubToNumFn extends DoFn<String, KV<String, Double>> { private final Logger LOG = LoggerFactory.getLogger(PubSubToNumFn.class); @ProcessElement public void processElement(ProcessContext c) { String json = c.element(); Gson gson = new Gson(); LOG.info(json); SensorData data = gson.fromJson(json, SensorData.class); if (data == null) { LOG.error("data is null"); } else { LOG.info(data.getTemperature()); } c.output(KV.of("temperature", Double.valueOf(data.getTemperature()))); } }
package com.example; public class SensorData { public String temperature; public SensorData(String temperature) { this.temperature = temperature; } public String getTemperature() { return this.temperature; } }
実行は↓
mvn compile exec:java -Dexec.mainClass=com.example.App -Dexec.args="--project=<<project-id>> --tempLocation=gs://path/to/temp/ --stagingLocation=gs://path/to/staging/ --runner=DataflowRunner"
maven exec:javaのコマンドライン引数で、「--project=」のような形でクラウドの情報を入れて実行すると、コードの中でPipelineOptionsFactoryに引き渡され、Pipelineをrun()するとCloud Dataflowに自動でデプロイしてくれる。
Cloud Dataflowへデプロイし、コンソールで実行が確認出来たら、センサーデータを送信する。
センサデータの送信は↓でやった通りの手順。30秒単位で送信するため、setIntervalの第二引数は30000として実行した。
kmth23.hatenablog.com
Cloud Dataflowのコンソール上で、気温の合計値が、2分毎にログ出力されれば成功。
過去のQiitaのBeamの記事などを見ながらだと、SDKのバージョンが違うため仕様が変わっており、コンパイルエラーになることが多かった。 公式のドキュメントを見ながら進めるのが、結局最短で学習できるかもしれない。