クラウドインフラ構築記

現在AWSの構築支援に携わっております。今注視しているのは、GKE、BigQuery、Google Dataflowなどサービスを展開しているGoolge Cloud Platformです。

DataFramesとDataSetを試してみました。 #ApacheSpark


Spark 1.6がリリースされたことで、DataSet、DataFrameを試してみました。

まずは、事前に下記をインストールしておきます。(CentOS7にて)

  • Java7 1.7.0_80
  • Hadoop-2.6.3のインストール※インストールはこちらを参照してください。
  • Development  Toolsをグループインストール

環境変数の設定

export SPARK_DIST_CLASSPATH=$(hadoop classpath)

VMパラメータを設定します。これをしないと、Sparkのビルド途中でout of memoryで失敗します。

export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"

Spark 1.6のビルドをします。かなり時間がかかりますので、完了まで待ちます。Scalaは、 2.10.4

build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.3 -DskipTests clean package

hadoopデーモンの起動

start-all.sh

DataSetで取り込むAWS IP Rangesをcsv形式で取り組んでおきます。Sparkはjson形式も取り込むことも可能ですが、ip-ranges.jsonそのままだとjson構造が入れ子になっていることもあり、csvにしてからSpark DataSetに取り込みました。

curl https://ip-ranges.amazonaws.com/ip-ranges.json | jq '.prefixes[] | {ip_prefix,region,service}' | jq "[.ip_prefix,.region,.service] | @csv" > ip_ranges.txt

Spark Shellを起動します。


$ spark-1.6.0/bin/spark-shell
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.0
/_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_80)
Type in expressions to have them evaluated.
Type :help for more information.
16/01/17 09:26:46 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 172.16.10.18 instead (on interface enp0s3)
16/01/17 09:26:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Spark context available as sc.
SQL context available as sqlContext.

scala>

DataFramesスキーマ定義を設定

scala> case class AwsIp(ip_prefix: String, region: String, service: String)

上記で取得したAWS IP rangesのcsvをDataFramesに取り組みます。

scala> val awsip = sc.textFile("/home/hdspark/ip_ranges.txt").map(_.split(",")).map(p => AwsIp(p(0),p(1),p(2))).toDF()

DataFramesスキーマ定義の確認

scala> awsip.printSchema()
root
|-- ip_prefix: string (nullable = true)
|-- region: string (nullable = true)
|-- service: string (nullable = true)

regionでグルーピング。東京リージョンのCIDR数は、オレゴンより多いとは。

scala> awsip.groupBy("region").count().show()
+--------------+-----+
| region|count|
+--------------+-----+
| eu-central-1| 18|
| cn-north-1| 10|
| us-east-1| 122|
|ap-northeast-1| 56|
|ap-southeast-1| 50|
| us-west-1| 46|
| us-west-2| 51|
|ap-southeast-2| 34|
|ap-northeast-2| 11|
| GLOBAL| 35|
| us-gov-west-1| 6|
| sa-east-1| 29|
| eu-west-1| 74|
+--------------+-----+

serviceでグルーピング

scala> awsip.groupBy("service").count().show()
+--------------------+-----+
| service|count|
+--------------------+-----+
|ROUTE53_HEALTHCHECKS| 16|
| ROUTE53| 1|
| AMAZON| 323|
| CLOUDFRONT| 17|
| EC2| 185|
+--------------------+-----+

regionが「ap-northeast-1」が合致するものを抽出し、その中で、serviceが「AMAZON」のものを抽出することも。

scala> val f1 =awsip.filter(awsip.col("region").equalTo("ap-northeast-1"))
scala> f1.filter(f1.col("service").equalTo("AMAZON")).show()
+---------------+--------------+-------+
| ip_prefix| region|service|
+---------------+--------------+-------+
| 27.0.0.0g22|ap-northeast-1| AMAZON|
| 46.51.224.0g19|ap-northeast-1| AMAZON|
| 52.68.0.0g15|ap-northeast-1| AMAZON|
| 52.92.60.0g22|ap-northeast-1| AMAZON|
| 52.94.9.0g24|ap-northeast-1| AMAZON|
| 52.95.30.0g23|ap-northeast-1| AMAZON|
| 52.95.34.0g24|ap-northeast-1| AMAZON|
| 52.95.56.0g22|ap-northeast-1| AMAZON|
| 52.95.243.0g24|ap-northeast-1| AMAZON|
|52.95.255.48g28|ap-northeast-1| AMAZON|
| 52.192.0.0g15|ap-northeast-1| AMAZON|ml
| 52.196.0.0g14|ap-northeast-1| AMAZON|
| 54.64.0.0g15|ap-northeast-1| AMAZON|
| 54.92.0.0g17|ap-northeast-1| AMAZON|
| 54.95.0.0g16|ap-northeast-1| AMAZON|
| 54.150.0.0g16|ap-northeast-1| AMAZON|
| 54.168.0.0g16|ap-northeast-1| AMAZON|
| 54.178.0.0g16|ap-northeast-1| AMAZON|
| 54.199.0.0g16|ap-northeast-1| AMAZON|
|54.231.224.0g21|ap-northeast-1| AMAZON|
+---------------+--------------+-------+
only showing top 20 rows

こんな感じで、フィルタリング機能も充実しています。DataSet 、DataFramesについては下記も参照してください。

http://spark.apache.org/docs/latest/sql-programming-guide.html

in-memoryで動くこともあるが、Hadoopより100x高速なので、これからデータプロセッシングはSparkがお勧め。

2016年内に、2.0のリリースも予定。(Rearchitecting for Mobile Platform、MLLib 2.0)

MLLib 2.0 については、https://issues.apache.org/jira/browse/SPARK-12626も。

Google Dataproc、EMRとも現状、Spark 1.6.0には対応していません(1.5.2が対応している最新版)。Google DataProcは、2015/9、2015/11と新バージョンがリリースしているなので、次は、2016/1に新バージョンがリリース?

コメントは受け付けていません。