Spark 2.0 on EMR で Structured Streaming をやってみた

Distributed computing (Apache Hadoop, Spark, …) Advent Calendar 2016” の 12/19 担当ということで、Spark 2.0 on EMR で Spark Streaming と Structured Streaming をやってみた結果を書きます。


この記事でやること

この記事では Spark 2.0 で、現在アルファ版の Structured Streaming をやってみます。
Structured Streaming とは、Spark SQL エンジンで実現されたストリーム処理の仕組みです。

従来型の Spark Streaming は RDD を DStream とよばれる Spark Streaming 特有のモデルを導入して扱うのに対して、Structured Streaming では Spark SQL で広く使用されている Dataset/DataFrame API により同一のプログラミングモデルで扱うことができ、Spark SQL のバッチジョブと同じ書き方で Streaming 処理を実現できます。
また、”Structured” ということでスキーマをもっているため、例えばデータの到着時刻ではなく出力時刻ベースで集計することができたりします。

Spark 2.0 時点ではアルファリリースということでプロダクションへの適用は推奨されていませんが、将来的には Spark のストリーム処理の標準となっていくようです。

この記事の流れとしては、まずは従来型の Spark Streaming をやってみて、その次に Structured Streaming をやってみます。

事前準備

Spark 2.0 を準備します。
といっても、EMR 5.2.0 (現在の最新バージョン) で Application に Spark 2.0.2 を選択して起動するだけです。
今回は、マスターノード m4.xlarge、コアノード m4.xlarge x 2 の構成で検証しています。

SSH でマスターノードに接続するとこんな感じになります。

Using username "hadoop".
Authenticating with public key "imported-openssh-key"
Last login: Sat Dec 17 14:35:45 2016

       __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2016.09-release-notes/
19 package(s) needed for security, out of 30 available
Run "sudo yum update" to apply all updates.

EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

[hadoop@ip-172-31-28-103 ~]$

今回は REPL で検証しますので、spark-shell を起動します。

[hadoop@ip-172-31-28-103 ~]$ spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/12/17 14:52:20 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
16/12/17 14:52:31 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://172.31.28.103:4040
Spark context available as 'sc' (master = yarn, app id = application_1481792128181_0003).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.2
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

従来型の Spark Streaming

まず、spark-shell に以下のようなソースコードを投入します。

import org.apache.spark._
import org.apache.spark.streaming._

val ssc = new StreamingContext(sc, Seconds(15))
val lines = ssc.socketTextStream("ip-172-31-28-103", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()

このとき以下に注意。

  • socketTextStream の接続先ホストを “localhost” とすると、今回のように複数台クラスタで動作させているときにハマるので、ホスト名で指定する。
  • 新規の SparkConf に対して StreamingContext を定義すると複数の SparkContext が同一 JVM 上で動作している WARN が出るので、既存の SparkContext の上に StreamingContext を指定する

ここまで準備できたら、別の端末を開いて Netcat でつなぎます。

$ nc -lk 9999

では、spark-shell の端末に戻って ssc.start します。

spark-shell> ssc.start()

Netcat 側の端末で例えば “If you want to test Spark on EMR please launch EMR and choose Spark then you can get Spark environment quickly.” と入力すると、spark-shell 側の端末には以下のように表示されます。

-------------------------------------------
Time: 1481988480000 ms
-------------------------------------------
(can,1)
(quickly.,1)
(get,1)
(,1)
(EMR,2)
(please,1)
(want,1)
(test,1)
(Spark,3)
(you,2)
...

ちゃんと WordCount できてますね。

Structured Streaming

次に、本日のお題の Structured Streaming を試します。

spark-shell に以下を流し込みます。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()

import spark.implicits._

val lines = spark.readStream.format("socket").option("host", "ip-172-31-28-103").option("port", 9999).load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()

Spark Streaming のときと同じように Netcat でつないで文字列を入力すると、以下のようになります。

-------------------------------------------
Batch: 2
-------------------------------------------
16/12/17 15:40:10 WARN TextSocketSource: Stream closed by ip-172-31-28-103:9999
+-----------+-----+
|      value|count|
+-----------+-----+
|     launch|    1|
|         If|    1|
|   quickly.|    1|
|        you|    2|
|        can|    1|
|     please|    1|
|        EMR|    2|
|      hello|    1|
|         on|    1|
|     choose|    1|
|       want|    1|
|        and|    1|
|      world|    1|
|        get|    1|
|      Spark|    3|
|           |    2|
|       then|    1|
|environment|    1|
|       test|    1|
|         to|    1|
+-----------+-----+

WordCountできました。

まとめ

今回は従来型の Spark Streaming と Structured Streaming を試しました。
Structured Streaming は従来型の Spark Streaming の DStream を使った方法に比べて Dataset/DataFrame API で記述できる分、ソースも書きやすいし可読性も上がるしいい感じです。

まだアルファリリースとのことで、これから様々な改善が入るかと思います。
Spark JIRA の Structured Streaming の Issue はこちらから確認できますので動向をウォッチしつつ何かあれば報告したりできるとよいかと。

参考資料

  1. トラックバックはまだありません。

コメントを残す

以下に詳細を記入するか、アイコンをクリックしてログインしてください。

WordPress.com ロゴ

WordPress.com アカウントを使ってコメントしています。 ログアウト / 変更 )

Twitter 画像

Twitter アカウントを使ってコメントしています。 ログアウト / 変更 )

Facebook の写真

Facebook アカウントを使ってコメントしています。 ログアウト / 変更 )

Google+ フォト

Google+ アカウントを使ってコメントしています。 ログアウト / 変更 )

%s と連携中

%d人のブロガーが「いいね」をつけました。