2016年1月17日
から hiruta
DataFramesとDataSetを試してみました。 #ApacheSpark はコメントを受け付けていません
Spark 1.6がリリースされたことで、DataSet、DataFrameを試してみました。
まずは、事前に下記をインストールしておきます。(CentOS7にて)
- Java7 1.7.0_80
- Hadoop-2.6.3のインストール※インストールはこちらを参照してください。
- Development Toolsをグループインストール
環境変数の設定
export SPARK_DIST_CLASSPATH=$(hadoop classpath) |
VMパラメータを設定します。これをしないと、Sparkのビルド途中でout of memoryで失敗します。
export MAVEN_OPTS= "-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" |
Spark 1.6のビルドをします。かなり時間がかかりますので、完了まで待ちます。Scalaは、 2.10.4
build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.3 -DskipTests clean package |
hadoopデーモンの起動
DataSetで取り込むAWS IP Rangesをcsv形式で取り組んでおきます。Sparkはjson形式も取り込むことも可能ですが、ip-ranges.jsonそのままだとjson構造が入れ子になっていることもあり、csvにしてからSpark DataSetに取り込みました。
Spark Shellを起動します。
$ spark-1.6.0/bin/spark-shell |
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). |
log4j:WARN Please initialize the log4j system properly. |
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties |
To adjust logging level use sc.setLogLevel( "INFO" ) |
/___/ .__/\_,_/_/ /_/\_\ version 1.6.0 |
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_80) |
Type in expressions to have them evaluated. |
Type :help for more information. |
16/01/17 09:26:46 WARN Utils: Your hostname , localhost.localdomain resolves to a loopback address: 127.0.0.1; using 172.16.10.18 instead (on interface enp0s3) |
16/01/17 09:26:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address |
Spark context available as sc. |
SQL context available as sqlContext. |
DataFramesスキーマ定義を設定
scala> case class AwsIp(ip_prefix: String, region: String, service: String) |
上記で取得したAWS IP rangesのcsvをDataFramesに取り組みます。
scala> val awsip = sc.textFile( "/home/hdspark/ip_ranges.txt" ).map(_. split ( "," )).map(p => AwsIp(p(0),p(1),p(2))).toDF() |
DataFramesスキーマ定義の確認
scala> awsip.printSchema() |
|-- ip_prefix: string (nullable = true ) |
|-- region: string (nullable = true ) |
|-- service: string (nullable = true ) |
regionでグルーピング。東京リージョンのCIDR数は、オレゴンより多いとは。
scala> awsip.groupBy( "region" ).count().show() |
serviceでグルーピング
scala> awsip.groupBy( "service" ).count().show() |
+--------------------+-----+ |
+--------------------+-----+ |
|ROUTE53_HEALTHCHECKS| 16| |
+--------------------+-----+ |
regionが「ap-northeast-1」が合致するものを抽出し、その中で、serviceが「AMAZON」のものを抽出することも。
scala> val f1 =awsip.filter(awsip.col( "region" ).equalTo( "ap-northeast-1" )) |
scala> f1.filter(f1.col( "service" ).equalTo( "AMAZON" )).show() |
+---------------+--------------+-------+ |
| ip_prefix| region|service| |
+---------------+--------------+-------+ |
| 27.0.0.0g22|ap-northeast-1| AMAZON| |
| 46.51.224.0g19|ap-northeast-1| AMAZON| |
| 52.68.0.0g15|ap-northeast-1| AMAZON| |
| 52.92.60.0g22|ap-northeast-1| AMAZON| |
| 52.94.9.0g24|ap-northeast-1| AMAZON| |
| 52.95.30.0g23|ap-northeast-1| AMAZON| |
| 52.95.34.0g24|ap-northeast-1| AMAZON| |
| 52.95.56.0g22|ap-northeast-1| AMAZON| |
| 52.95.243.0g24|ap-northeast-1| AMAZON| |
|52.95.255.48g28|ap-northeast-1| AMAZON| |
| 52.192.0.0g15|ap-northeast-1| AMAZON|ml |
| 52.196.0.0g14|ap-northeast-1| AMAZON| |
| 54.64.0.0g15|ap-northeast-1| AMAZON| |
| 54.92.0.0g17|ap-northeast-1| AMAZON| |
| 54.95.0.0g16|ap-northeast-1| AMAZON| |
| 54.150.0.0g16|ap-northeast-1| AMAZON| |
| 54.168.0.0g16|ap-northeast-1| AMAZON| |
| 54.178.0.0g16|ap-northeast-1| AMAZON| |
| 54.199.0.0g16|ap-northeast-1| AMAZON| |
|54.231.224.0g21|ap-northeast-1| AMAZON| |
+---------------+--------------+-------+ |
こんな感じで、フィルタリング機能も充実しています。DataSet 、DataFramesについては下記も参照してください。
http://spark.apache.org/docs/latest/sql-programming-guide.html
in-memoryで動くこともあるが、Hadoopより100x高速なので、これからデータプロセッシングはSparkがお勧め。
2016年内に、2.0のリリースも予定。(Rearchitecting for Mobile Platform、MLLib 2.0)
MLLib 2.0 については、https://issues.apache.org/jira/browse/SPARK-12626も。
Google Dataproc、EMRとも現状、Spark 1.6.0には対応していません(1.5.2が対応している最新版)。Google DataProcは、2015/9、2015/11と新バージョンがリリースしているなので、次は、2016/1に新バージョンがリリース?