趣味のログ

自分用の作業ログ。。

Cloud Dataflowを使ってGrovePi+のセンサデータをウィンドウ集計(Java編)

↓の記事でRaspberryPi3からCloud IoTへ送信したGrovePi+のセンサデータを、Cloud Dataflowでウィンドウ集計してみた。
kmth23.hatenablog.com

Cloud Dataflow のJavaのクイックスタートを参考に進める。

Java と Apache Maven を使用したクイックスタート  |  Cloud Dataflow のドキュメント  |  Google Cloud

まず、mavenでプロジェクトを作成。

mvn archetype:generate \
      -DgroupId=com.example \
      -DartifactId=dev-dataflow \
      -Dversion="0.1" \
      -DinteractiveMode=false \
      -Dpackage=com.example

サンプルを参考にしつつ、pom.xmlは下記のようにした。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>dev-dataflow</artifactId>
  <packaging>jar</packaging>
  <version>0.1</version>
  <name>dev-dataflow</name>
  <url>http://maven.apache.org</url>

  <properties>
    <gson.version>2.8.2</gson.version>
    <dataflow.version>2.2.0</dataflow.version>
    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
    <guava.version>20.0</guava.version>
    <pubsub.version>v1-rev10-1.22.0</pubsub.version>
  </properties>

  <build>
    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>${maven-exec-plugin.version}</version>
          <configuration>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

  <dependencies>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>${gson.version}</version>
    </dependency>

    <dependency>
      <groupId>com.google.cloud.dataflow</groupId>
      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
      <version>${dataflow.version}</version>
    </dependency>

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-pubsub</artifactId>
      <version>${pubsub.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>${guava.version}</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

Javaのコードは下記。
Cloud IoTへ送信し、Cloud Pub/Subへ入ったセンサデータをDataflowから取得し、そこから気温データを抜き出す。
この気温データについて、2分毎の合計値を、2分毎にログ出力させた。(特に目的はないが。。お試しで)

package com.example;

import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

import com.google.gson.Gson;

import org.joda.time.Duration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class App 
{
    public static void main( String[] args )
    {
        DataflowPipelineOptions options = PipelineOptionsFactory
        .fromArgs(args)
        .withValidation()
        .create()
                .as(DataflowPipelineOptions.class);
        options.setStreaming(true);
        options.setJobName("test");

        Pipeline p = Pipeline.create(options);
        PCollection<KV<String, Double>> scores = p.apply(PubsubIO.readStrings().fromTopic("projects/project-id/topics/raspi3"))
            .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(2))))
            .apply(ParDo.of(new PubSubToNumFn()))
            .apply(Sum.<String>doublesPerKey())
            .apply(ParDo.of(new LoggingFn()));

        p.run();
    }

    public static class LoggingFn extends DoFn<KV<String, Double>, KV<String, Double>> {

        private static final Logger LOG = LoggerFactory.getLogger(LoggingFn.class);

        @ProcessElement
        public void processElement(ProcessContext c) {
            KV<String, Double> kv = c.element();
            double sum = kv.getValue();

            LOG.info(String.valueOf(sum));
            c.output(kv);
        }
    }
}
package com.example;

import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.values.KV;
import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubSubToNumFn extends DoFn<String, KV<String, Double>> {

    private final Logger LOG = LoggerFactory.getLogger(PubSubToNumFn.class);

    @ProcessElement
    public void processElement(ProcessContext c) {
        String json = c.element();
        Gson gson = new Gson();
        LOG.info(json);
        SensorData data = gson.fromJson(json, SensorData.class);
        if (data == null) {
            LOG.error("data is null");
        } else {
            LOG.info(data.getTemperature());
        }
        c.output(KV.of("temperature", Double.valueOf(data.getTemperature())));
    }
}
package com.example;

public class SensorData {
    public String temperature;
    public SensorData(String temperature) {
        this.temperature = temperature;
    }
    public String getTemperature() {
        return this.temperature;
    }
}

実行は↓

mvn compile exec:java -Dexec.mainClass=com.example.App -Dexec.args="--project=<<project-id>> --tempLocation=gs://path/to/temp/ --stagingLocation=gs://path/to/staging/ --runner=DataflowRunner"

maven exec:javaコマンドライン引数で、「--project=」のような形でクラウドの情報を入れて実行すると、コードの中でPipelineOptionsFactoryに引き渡され、Pipelineをrun()するとCloud Dataflowに自動でデプロイしてくれる。 Cloud Dataflowへデプロイし、コンソールで実行が確認出来たら、センサーデータを送信する。
センサデータの送信は↓でやった通りの手順。30秒単位で送信するため、setIntervalの第二引数は30000として実行した。 kmth23.hatenablog.com

Cloud Dataflowのコンソール上で、気温の合計値が、2分毎にログ出力されれば成功。

過去のQiitaのBeamの記事などを見ながらだと、SDKのバージョンが違うため仕様が変わっており、コンパイルエラーになることが多かった。 公式のドキュメントを見ながら進めるのが、結局最短で学習できるかもしれない。