Apache Spark 入门 (DataFrame+Hive+SparkSQL)

435 查看

layout: post

http://spark.apache.org/docs/latest/quick-start.html| 0f41ff82e270667d9fadbc467533cee31 |

Spark Shell

➜ spark-1.4.0-bin-hadoop2.6 ./bin/spark-shell

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_25)
15/06/28 10:36:07 INFO ui.SparkUI: Started SparkUI at http://127.0.0.1:4040
15/06/28 10:36:07 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
15/06/28 10:36:08 INFO hive.HiveContext: Initializing execution hive, version 0.13.1
15/06/28 10:36:23 INFO repl.SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

Basic RDD Operation

第一个例子: 统计一个文本文件的单词数量.
调用sc的textFile(fileName)会生成一个MapPartitionsRDD

scala> val textFile = sc.textFile("README.md")
15/06/28 10:36:45 INFO storage.MemoryStore: ensureFreeSpace(63424) called with curMem=0, maxMem=278019440
15/06/28 10:36:45 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 61.9 KB, free 265.1 MB)
15/06/28 10:36:45 INFO storage.MemoryStore: ensureFreeSpace(20061) called with curMem=63424, maxMem=278019440
15/06/28 10:36:45 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.6 KB, free 265.1 MB)
15/06/28 10:36:45 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58638 (size: 19.6 KB, free: 265.1 MB)
15/06/28 10:36:45 INFO spark.SparkContext: Created broadcast 0 from textFile at <console>:21
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

调用上面生成的textFile RDD的count()会触发一个Action.

scala> textFile.count()
java.net.ConnectException: Call From hadoop/127.0.0.1 to localhost:9000 failed on connection exception: java.net.ConnectException: 拒绝连接; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    ...
Caused by: java.net.ConnectException: 拒绝连接
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    ...

由于本机已经安装了Hadoop,使用的是伪分布式模式,所以Spark会读取Hadoop的配置信息.
我们这里先不启动Hadoop,使用本地模式,要手动添加file:///并使用绝对路径读取文本文件

scala> textFile
res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

重新构造读取本地文本文件的textFile RDD

scala> val textFile = sc.textFile("file:///home/hadoop/soft/spark-1.4.0-bin-hadoop2.6/README.md")
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21

触发RDD的Action: count

scala> textFile.count()
15/06/28 10:44:07 INFO scheduler.DAGScheduler: Job 0 finished: count at <console>:24, took 0.275609 s
res2: Long = 98

又一个Action RDD : 输出文本文件的第一行

scala> textFile.first()
15/06/28 10:44:27 INFO scheduler.DAGScheduler: Job 1 finished: first at <console>:24, took 0.017917 s
res3: String = # Apache Spark

More RDD Operations

1.统计包含了Spark这个单词一共有几行

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:23
scala> textFile.filter(line => line.contains("Spark")).count()

2.文本文件中长度最长的那一行,它一共有多少个单词

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

3.MapReduce WordCount

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:23

scala>  wordCounts.collect()
res6: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (locally.,1), (locally,2), (changed,1), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (YARN,,1), (graph,1), (Hive,2), (first,1), (["Specifying,1), ("yarn-client",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), (application,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (distribution.,1), (are,1), (params,1), (scala>,1), (systems.,1...

4.Cache

scala>  linesWithSpark.cache()
res7: linesWithSpark.type = MapPartitionsRDD[4] at filter at <console>:23

scala> linesWithSpark.count()
15/06/28 10:47:11 INFO scheduler.DAGScheduler: Job 5 finished: count at <console>:26, took 0.054036 s
res8: Long = 19

scala> linesWithSpark.count()
15/06/28 10:47:14 INFO scheduler.DAGScheduler: Job 6 finished: count at <console>:26, took 0.016638 s
res9: Long = 19
val textFile = sc.textFile("file:///home/hadoop/soft/spark-1.4.0-bin-hadoop2.6/README.md")
〇 textFile.count()
① textFile.first()
val linesWithSpark = textFile.filter(line => line.contains("Spark"))
② textFile.filter(line => line.contains("Spark")).count()
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
③ val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
④ wordCounts.collect()
linesWithSpark.cache()
⑤ linesWithSpark.count()
⑥ linesWithSpark.count()
⑦ linesWithSpark.count()

Spark Shell UI

http://127.0.0.1:4040

Jobs, Stages, Storage

Jobs: 上面每个Action RDD编号对应了下图中的Job Id.

Stages: 上面有8个Job, 但是Stages多了一个. 其实是④的collect有两个stage

Storage: 在Cache的时候才有

查看Stage

在Jobs中点击Job Id=4的collect RDD(输出WordCount的结果). 在下方的列表中可以看到有2个Stages
仔细观察列表的最后面两列, 分别是Shuffle Read和Shuffle Write.
其中map会进行Shuffle Write, collect会进行Shuffle Read

点击Stage Id=4的map. 它的DAG可视化图和上面的概览图的左侧是一样的

Spark的WebUI还提供了一个EventTime,可以很清楚地看到每个阶段消耗的时间

回退,点击Stage Id=5的collect


Spark Standalone 集群安装

准备工作:

1.master无密码ssh到slaves(将master的pub追加到所有slaves的authorized_keys)
2.关闭所有节点的防火墙(chkconfig iptables off)
3.安装scala-2.10,并设置~/.bashrc

cd $SPARK_HOME
vi conf/spark-env.sh

export JAVA_HOME=/usr/java/jdk1.7.0_51
export SCALA_HOME=/usr/install/scala-2.10.5
export HADOOP_HOME=/usr/install/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SPARK_MASTER_IP=dp0652
export MASTER=spark://dp0652:7077
#export SPARK_LOCAL_IP=dp0652
export SPARK_LOCAL_DIRS=/usr/install/spark-1.4.0-bin-hadoop2.6
export SPARK_MASTER_WEBUI_PORT=8082
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=8g

vi conf/slaves

dp0652
dp0653
dp0655
dp0656
dp0657

将spark目录分发到集群的其他节点

cd ..
scp -r $SPARK_HOME dp0653:/usr/install
scp -r $SPARK_HOME dp0655:/usr/install
scp -r $SPARK_HOME dp0656:/usr/install
scp -r $SPARK_HOME dp0657:/usr/install

由于集群中dp0652和dp0653的内存比较大, 我们修改了这两个节点的spark-env.sh

export SPARK_WORKER_INSTANCES=2
export SPARK_WORKER_MEMORY=20g

启动集群, 在master上启动即可.

[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.master.Master-1-dp0652.out
dp0656: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0656.out
dp0655: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0655.out
dp0657: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0657.out
dp0652: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0652.out
dp0653: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0653.out
dp0652: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-2-dp0652.out
dp0653: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-2-dp0653.out

在master和slaves上查看Spark进程

[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ jps -lm
40708 org.apache.spark.deploy.master.Master --ip dp0652 --port 7077 --webui-port 8082
41095 org.apache.spark.deploy.worker.Worker --webui-port 8082 spark://dp0652:7077
40926 org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ ssh dp0653
Last login: Thu Jul  2 09:07:17 2015 from 192.168.6.140
[qihuang.zheng@dp0653 ~]$ jps -lm
27153 org.apache.spark.deploy.worker.Worker --webui-port 8082 spark://dp0652:7077
27029 org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077
[qihuang.zheng@dp0653 ~]$ exit
logout
Connection to dp0653 closed.
[qihuang.zheng@dp0652 logs]$ ssh dp0655
Last login: Thu Jul  2 08:55:05 2015 from 192.168.6.140
[qihuang.zheng@dp0655 ~]$ jps -lm
8766 org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077
[qihuang.zheng@dp0655 ~]$

在master上查看web ui: http://dp0652:8082/

遇到一些问题

1.如果配置了SPARK_LOCAL_IP, 但是并没有在slaves上修改为自己的IP,则会报错:

15/07/02 09:04:08 ERROR netty.NettyTransport: failed to bind to /192.168.6.52:0, shutting down Netty transport
Exception in thread "main" java.net.BindException: Failed to bind to: /192.168.6.52:0: Service 'sparkWorker' failed after 16 retries!
        at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
        at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393)
        at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389)
        at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
        at scala.util.Try$.apply(Try.scala:161)
        at scala.util.Success.map(Try.scala:206)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
        at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
        at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
        at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
        at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/07/02 09:04:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/07/02 09:04:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/07/02 09:04:09 INFO util.Utils: Shutdown hook called

原因分析: SPARK_LOCAL_IP指的是本机IP地址,因此分发到集群的不同节点上,都要到各自的节点修改为自己的IP地址.
如果集群节点比较多,则比较麻烦, 可以用SPARK_LOCAL_DIRS代替.

2.如果没有配置export MASTER, 在worker上会报错:

5/07/02 08:40:51 INFO worker.Worker: Retrying connection to master (attempt # 12)
15/07/02 08:40:51 INFO worker.Worker: Connecting to master akka.tcp://sparkMaster@dp0652:7077/user/Master...
15/07/02 08:40:51 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@dp0652:7077].
Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.
Reason: 拒绝连接: dp0652/192.168.6.52:7077
15/07/02 08:41:23 ERROR worker.Worker: RECEIVED SIGNAL 15: SIGTERM
15/07/02 08:41:23 INFO util.Utils: Shutdown hook called

导致的后果是虽然slaves上都启动了Worker进程(使用jps查看),但是在Master上并没有看到workers. 这时候应该查看Master上的日志.
master上启动成功显示的日志是spark@dp0652:7077. 而上面却显示的是sparkMaster@dp0652:7077. 所以应该手动export MASTER

3.最后成功启动集群, 在Master上的日志:

Spark Command: /usr/java/jdk1.7.0_51/bin/java -cp /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../conf/:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/install/hadoop/etc/hadoop/:/usr/install/hadoop/etc/hadoop/ -Xms512m -Xmx512m -XX:MaxPermSize=128m org.apache.spark.deploy.master.Master --ip dp0652 --port 7077 --webui-port 8082
========================================
15/07/02 09:27:49 INFO master.Master: Registered signal handlers for [TERM, HUP, INT]
15/07/02 09:27:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/07/02 09:27:50 INFO spark.SecurityManager: Changing view acls to: qihuang.zheng
15/07/02 09:27:50 INFO spark.SecurityManager: Changing modify acls to: qihuang.zheng
15/07/02 09:27:50 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(qihuang.zheng); users with modify permissions: Set(qihuang.zheng)
15/07/02 09:27:51 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/07/02 09:27:51 INFO Remoting: Starting remoting
15/07/02 09:27:51 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@dp0652:7077]
15/07/02 09:27:51 INFO util.Utils: Successfully started service 'sparkMaster' on port 7077.
15/07/02 09:27:51 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/07/02 09:27:51 INFO server.AbstractConnector: Started SelectChannelConnector@dp0652:6066
15/07/02 09:27:51 INFO util.Utils: Successfully started service on port 6066.
15/07/02 09:27:51 INFO rest.StandaloneRestServer: Started REST server for submitting applications on port 6066
15/07/02 09:27:51 INFO master.Master: Starting Spark master at spark://dp0652:7077
15/07/02 09:27:51 INFO master.Master: Running Spark version 1.4.0
15/07/02 09:27:51 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/07/02 09:27:51 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8082
15/07/02 09:27:51 INFO util.Utils: Successfully started service 'MasterUI' on port 8082.
15/07/02 09:27:51 INFO ui.MasterWebUI: Started MasterWebUI at http://192.168.6.52:8082
15/07/02 09:27:52 INFO master.Master: I have been elected leader! New state: ALIVE
15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.52:35398 with 1 cores, 20.0 GB RAM
15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.56:60106 with 1 cores, 8.0 GB RAM
15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.55:50995 with 1 cores, 8.0 GB RAM
15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.53:55994 with 1 cores, 20.0 GB RAM
15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.57:34020 with 1 cores, 8.0 GB RAM
15/07/02 09:27:56 INFO master.Master: Registering worker 192.168.6.52:55912 with 1 cores, 20.0 GB RAM
15/07/02 09:27:56 INFO master.Master: Registering worker 192.168.6.53:35846 with 1 cores, 20.0 GB RAM

在53的其中一个Worker上的日志:

Spark Command: /usr/java/jdk1.7.0_51/bin/java -cp /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../conf/:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/install/hadoop/etc/hadoop/:/usr/install/hadoop/etc/hadoop/ -Xms512m -Xmx512m -XX:MaxPermSize=128m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077
========================================
15/07/02 09:27:52 INFO worker.Worker: Registered signal handlers for [TERM, HUP, INT]
15/07/02 09:27:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/07/02 09:27:52 INFO spark.SecurityManager: Changing view acls to: qihuang.zheng
15/07/02 09:27:52 INFO spark.SecurityManager: Changing modify acls to: qihuang.zheng
15/07/02 09:27:52 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(qihuang.zheng); users with modify permissions: Set(qihuang.zheng)
15/07/02 09:27:53 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/07/02 09:27:53 INFO Remoting: Starting remoting
15/07/02 09:27:54 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkWorker@192.168.6.53:55994]
15/07/02 09:27:54 INFO util.Utils: Successfully started service 'sparkWorker' on port 55994.
15/07/02 09:27:54 INFO worker.Worker: Starting Spark worker 192.168.6.53:55994 with 1 cores, 20.0 GB RAM
15/07/02 09:27:54 INFO worker.Worker: Running Spark version 1.4.0
15/07/02 09:27:54 INFO worker.Worker: Spark home: /usr/install/spark-1.4.0-bin-hadoop2.6
15/07/02 09:27:54 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/07/02 09:27:54 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081
15/07/02 09:27:54 INFO util.Utils: Successfully started service 'WorkerUI' on port 8081.
15/07/02 09:27:54 INFO ui.WorkerWebUI: Started WorkerWebUI at http://192.168.6.53:8081
15/07/02 09:27:54 INFO worker.Worker: Connecting to master akka.tcp://sparkMaster@dp0652:7077/user/Master...
15/07/02 09:27:54 INFO worker.Worker: Successfully registered with master spark://dp0652:7077

Spark Shell

[qihuang.zheng@dp0653 spark-1.4.0-bin-hadoop2.6]$ bin/spark-shell --master spark://dp0652:7077 --executor-memory 4g
[qihuang.zheng@dp0653 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --master spark://dp0652:7077 --class org.apache.spark.examples.SparkPi --executor-memory 4g --total-executor-cores 2 lib/spark-examples-1.4.0-hadoop2.6.0.jar 1000

SparkSQL

SparkSQL Table Operation with ParquetFile

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("/user/qihuang.zheng/sparktest/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.saveAsParquetFile("/user/qihuang.zheng/sparktest/people.parquet")
people.write.parquet("/user/qihuang.zheng/sparktest/people2.parquet")

// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("/user/qihuang.zheng/sparktest/people.parquet")

// Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// JOIN TABLE
val jointbls = sqlContext.sql("SELECT people.name FROM people join parquetFile where people.name=parquetFile.name")
jointbls.map(t => "Name: " + t(0)).collect().foreach(println)

如果在执行cache时内存不足,会退出当前shell,解决办法是在spark-shell命令前添加SPARK_SUBMIT_OPTS="-XX:MaxPermSize=1g"

Spark Hive

如果没有编译hive on spark,而是直接把hive-site.xml分发到spark集群的conf目录下,直接启动spark-sql会报错:

[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-sql
Exception in thread "main" java.lang.RuntimeException: java.io.IOException: 权限不够
    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:330)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:109)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: 权限不够
    at java.io.UnixFileSystem.createFileExclusively(Native Method)
    at java.io.File.createNewFile(File.java:1006)
    at java.io.File.createTempFile(File.java:1989)
    at org.apache.hadoop.hive.ql.session.SessionState.createTempFile(SessionState.java:432)
    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:328)
    ... 11 more
15/07/03 08:42:33 INFO util.Utils: Shutdown hook called
15/07/03 08:42:33 INFO util.Utils: Deleting directory /tmp/spark-831ff199-cf80-4d49-a22f-824736065289

这是因为Spark集群的每个Worker都需要Hive的支持,而Worker节点并没有都安装了hive. 而且spark需要编译支持hive的包.
但是重新编译hive on spark要花很多时间,可不可以直接使用集群中已经安装好的hive呢? YES!!
http://lxw1234.com/archives/2015/06/294.htm| 0f41ff82e270667d9fadbc467533cee318 |
http://shiyanjun.cn/archives/1113.html| 0f41ff82e270667d9fadbc467533cee320 |
http://www.cnblogs.com/hseagle/p/3758922.html| 0f41ff82e270667d9fadbc467533cee322 |

1.在spark-env.sh中添加

export HIVE_HOME=/usr/install/apache-hive-0.13.1-bin
export SPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-5.1.34.jar:$SPARK_CLASSPATH

2.将apache-hive-0.13.1-bin分发到集群中的每个节点(SparkWorker所在的节点)

cd install
scp -r apache-hive-0.13.1-bin dp0653:/usr/install/

3.拷贝apache-hive-0.13.1-bin/conf/hive-site.xml到$SPARK_HOME/conf下

scp apache-hive-0.13.1-bin/conf/hive-site.xml dp0653:/usr/install/spark-1.4.0-bin-hadoop2.6/conf

4.重启spark集群

sbin/stop-all.sh
sbin/start-all.sh

5.测试spark-sql

SPARK_CLASSPATH was detected (set to '/usr/install/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.34.jar:').
This is deprecated in Spark 1.0+.
Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath

15/07/03 10:00:56 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' to '/usr/install/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.34.jar:' as a work-around.
15/07/03 10:00:56 WARN spark.SparkConf: Setting 'spark.driver.extraClassPath' to '/usr/install/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.34.jar:' as a work-around.

15/07/03 10:01:00 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.6.53:9083
15/07/03 10:01:00 INFO hive.metastore: Connected to metastore.
15/07/03 10:01:00 INFO session.SessionState: No Tez session required at this point. hive.execution.engine=mr.
SET spark.sql.hive.version=0.13.1
SET spark.sql.hive.version=0.13.1
spark-sql> show databases;
default
test
spark-sql> use test;
Time taken: 2.045 seconds
spark-sql> show tables;
koudai    false
...
spark-sql> select count(*) from koudai;
311839
Time taken: 12.443 seconds, Fetched 1 row(s)
spark-sql>

Tips & TODO

  1. 在做指标分析时,如果是每天或者每周这样的统计间隔,可以将分析后的结果保存成Persistent Table或者save到HDFS上供别人使用.

  2. 线上的数据一般都比较多,查询时可以使用partition分区


2018 - 知识虫 - 我的知识库 渝ICP备16002641号-2

渝公网安备 50010702501581号