forked from yjp123456/SparkDemo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark_note.txt
341 lines (290 loc) · 19.3 KB
/
spark_note.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
1、spark在window上部署比较麻烦点,不能直接通过运行start-all.sh,因为它会执行命令
C:\Program Files\Java\jdk1.8.0_91\bin\java -cp E:\spark-1.6.2-bin-hadoop2.6/conf\;
E:\spark-1.6.2-bin-hadoop2.6/lib/spark-assembly-1.6.2-hadoop2.6.0.jar;
E:\spark-1.6.2-bin-hadoop2.6\lib\datanucleus-api-jdo-3.2.6.jar;
E:\spark-1.6.2-bin-hadoop2.6\lib\datanucleus-core-3.2.10.jar;
E:\spark-1.6.2-bin-hadoop2.6\lib\datanucleus-rdbms-3.2.9.ja
r -Xms1g -Xmx1g org.apache.spark.deploy.master.Master --ip nj-jalen-yang1 --port 7077 --webui-port 8080
这个命令直接在命令行运行会报错,需要把前面的C:\Program Files\Java\jdk1.8.0_91\bin\去掉才行,因此可以把去掉后的命令在命令行
在命令行直接运行
下面分别是启动master和slave的命令,可以通过git下执行start-all.sh和start-slave.sh 192.168.181.128:7077(master ip)来获取
java -cp E:\spark-1.6.2-bin-hadoop2.6/conf\;
E:\spark-1.6.2-bin-hadoop2.6/lib/spark-assembly-1.6.2-hadoop2.6.0.jar;
E:\spark-1.6.2-bin-hadoop2.6\lib\datanucleus-api-jdo-3.2.6.jar;
E:\spark-1.6.2-bin-hadoop2.6\lib\datanucleus-core-3.2.10.jar;
E:\spark-1.6.2-bin-hadoop2.6\lib\datanucleus-rdbms-3.2.9.jar
-Xms1g -Xmx1g org.apache.spark.deploy.master.Master --ip nj-jalen-yang1 --port 7077 --webui-port 8080
java -cp E:\spark-1.6.2-bin-hadoop2.6/conf\;E:\spark-1.6.2-bin-hadoop2.6/lib/spark-assembly-1.6.2-hadoop2.6.0.jar;
E:\spark-1.6.2-bin-hadoop2.6\lib\datanucleus-api-jdo-3.2.6.jar;
E:\spark-1.6.2-bin-hadoop2.6\lib\datanucleus-core-3.2.10.jar;
E:\spark-1.6.2-bin-hadoop2.6\lib\datanucleus-rdbms-3.2.9.jar
-Xms1g -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 10.64.34.64:7077
2、window要想连接本地的ssh,需要cygwin配置sshd服务才行,配置后可以通过ssh localhost来验证
3、启动spark命令
cd /Project/spark && bin/spark-submit --master spark://192.168.1.3:7077 --driver-class-path "/Project/extract_jar/sqljdbc4.jar:/Project/extract_jar/postgresql-9.3-1103.jdbc3.jar:/Project/extract_jar:/Project/extract_jar/sqljdbc4.jar:/Project/extract_jar/postgresql-9.3-1103.jdbc3.jar:/Project/extract_jar:" spark-scripts/calcRentionRateByGeo2to14.py >> stat_retentionrate2-14.log
4、spark引用外部的Jar包时需要加上路径,最好是配置SPARK_CLASSPATH,代码中不要配置spark.executor.extraClassPath
5、spark排序功能:df.sort(df.age.desc()).collect();df.orderBy(df.age.desc()).collect()
6、python时间戳转时间:
timestamp = 1467269590
dt = datetime.utcfromtimestamp(timestamp)
7、spark有两种方式设置memory per node,一个是代码中SparkConf.set("spark.executor.memory","2g")
另一种是启动时命令行设置--executor-memory 2G
命令选项有:
-c CORES, --cores CORES Number of cores to use
-m MEM, --memory MEM Amount of memory to use (e.g. 1000M, 2G)
-d DIR, --work-dir DIR Directory to run apps in (default: SPARK_HOME/work)
-i HOST, --ip IP Hostname to listen on (deprecated, please use --host or -h)
-h HOST, --host HOST Hostname to listen on
-p PORT, --port PORT Port to listen on (default: random)
--webui-port PORT Port for web UI (default: 8081)
--properties-file FILE Path to a custom Spark properties file.
Default is conf/spark-defaults.conf.
8、spark DataFrame.write.json和DataFrame.read.json都是分布式方式进行的,写的话会生成多个小文件,读的话可以读整个文件夹
9、pycharm要想实现本地调试,需要把spark-1.6.2-bin-hadoop2.6\python\pyspark拷贝到Python27\Lib\site-packages目录下
10、master机器通过ssh访问每台worker。通过ssh访问worker是并行进行的,需要设置无密码访问(使用私钥)。如果你不设置无密码访问,
你可以设置SPARK_SSH_FOREGROUND环境变量,然后一个一个地为worker输入密码。
11、spark可以将一个DataFrame注册成临时表,如:
NewUser.registerTempTable("test")//spark2.0用createOrReplaceTempView替换了
这个表的生命周期与sqlContext有关,如果要创建全局的临时表,可以用
df.createGlobalTempView("people")//spark 2.1,针对所有SparkSession都能访问
不过这样的话查询表时需要加上固定的前缀global_temp,如:
spark.sql("SELECT * FROM global_temp.people").show()
12、spark2.0开始使用SparkSession替代SqlContext和HiveContext,
from pyspark.conf import SparkConf
spark = SparkSession.builder.config(conf=SparkConf())
13、sparkContext加载txt文件转化为RDD:
# Load a text file and convert each line to a Row.
sc = spark.sparkContext
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
最后需要用SparkSession来创建DataFrame
14、DataFrame保存数据:
peopleDF.write.parquet("people.parquet")
parquetFile = spark.read.parquet("people.parquet")
15、合并schema,如果同一个目录下存在两个不同schema格式的数据,那么可以通过设置mergeSchema为true来合并这些schema:
//test_table目录下存在data/test_table/key=1和data/test_table/key=2两个文件,它们拥有不同schema
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
合并后会增加一个key属性,不过数据并不会因为具有相同属性而合并,而是两个文件的分别列出来
16、spark读写数据库:
jdbcurl = "jdbc:postgresql://10.206.132.19:5432/testapp?user=Administrator&password=mac8.6"
apphistory_processed.write.jdbc(jdbcurl, "myapphistory_bak", mode="overwrite")
ipDF = sqlContext.read.jdbc(jdbcurl, 'ip').dropDuplicates()//ip是表名
不过可以用python自带的方式连接数据库:(这种方式比上面的快,因为spark采用分布式会花费额外的时间整理从worker上传的数据)
conn = psycopg2.connect("dbname='PromotionDB_New' user='Administrator' host='10.206.132.19' password='mac8.6'")
cur=conn.cursor()
strsql = "INSERT INTO ga_activeusermonth_version (version, activeuser, inserttime,statdate,appname) VALUES (%s, %s, %s, %s,%s) "
try:
cur.executemany(strsql,inResults)
conn.commit()
17、spark下载地址:http://spark.apache.org/downloads.html
18、spark2.0开始用start-all.sh生成的命令会出现一个--host选项,但是后面的值却是空的,需要在spark-env.sh里面配置
SPARK_MASTER_HOST,它的值就是本机的ip地址,如下:
java -cp "E:\spark-2.1.0-bin-hadoop2.6/conf\;E:\spark-2.1.0-bin-hadoop2.6\jars\*" -Xmx1g org.apache.spark.deploy.master.Master --host 10.64.20.29 --port 7077 --webui-port 8080
worker的启动会漏洞一个master的端口号,需要手动加上去:
java -cp "E:\spark-2.1.0-bin-hadoop2.6/conf\;E:\spark-2.1.0-bin-hadoop2.6\jars\*" -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 10.64.20.29:7077
19、spark2.0开始,spark-2.1.0-bin-hadoop2.6目录下需要自己创建一个\launcher\target\scala-2.11目录来指定scala版本或者在
spark-env里面设置SPARK_SCALA_VERSION属性,不过window下还是用前一种比较好,后一种不一定用的了,如果没指定scala版本的话
就无法让worker工作;另外spark2.0开始master不分配任务给自己,必须有至少一个worker才能工作
20、spark2.0开始DataFrame没有map方法了,只有RDD才有,所以必须通过DataFrame.rdd.map实现
21、spark在ubuntu中搭建好后可以通过命令jps来查看所有java进程中是否包含worker或者master
22、spark设置spark.executor.memory参数时要小心,如果超出机器本身的大小将会报错:
WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
因此要正确配置其大小,先用命令 free -m 查看可用内存大小(-m代表以MB显示),然后再设置这个参数
23、ubuntu中修改/etc/profile只有针对login shell才不用每次执行source /etc/profile,其他的都要执行一次才能生效
24、spark分配给每个worker的总内存由num-executors乘以executor-memory来计算的,所以机器的剩余内存必须大于这个值
25、Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,
那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题
26、spark.default.parallelism参数用于设置每个stage的默认task数量。这个参数极为重要,
如果不设置可能会直接影响你的Spark作业性能,Spark作业的默认task数量为num-executors * executor-cores的2~3倍较为合适,
如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃
27、spark.storage.memoryFraction参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6,
如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。
避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能
28、spark.shuffle.memoryFraction参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,
进行聚合操作时能够使用的Executor内存的比例,默认是0.2
29、spark分布式搭建步骤:http://www.cnblogs.com/purstar/p/6293605.html
1)配置ssh无秘钥登录
ssh-keygen -t rsa -P '' #一路回车直到生成公钥
scp /home/jieping/.ssh/id_rsa.pub 10.64.66.215:/home/jieping/.ssh/id_rsa.pub.master //将master生成的pub拷贝到worker上
cat /home/jieping/.ssh/id_rsa.pub >> /home/jieping/.ssh/authorized_keys //将自己生成的pub添加到授权列表里
cat /home/jieping/.ssh/id_rsa.pub.master >> /home/jieping/.ssh/authorized_keys //将master的pub添加到授权列表里
2)配置etc/profile
export HADOOP_HOME=/home/hadoop/hadoop-2.6.1/
export PATH="$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH"
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SPARK_HOME=/opt/spark-2.1.0-bin-hadoop2.7/
export PATH="$SPARK_HOME/bin:$PATH"
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64
执行source /etc/profile让配置生效
3)配置hadoop和spark的slaves文件
最好使用机器名作为节点名,下面会提到hadoop这方面的问题
如果使用机器名的话,还需要配置/etc/hosts文件将对应的机器名和ip绑定
4)配置hadoop
进入$HADOOP_HOME/etc/hadoop/hadoop-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_112/
$HADOOP_HOME/etc/hadoop/core-site.xml:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://master:9000</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/hadoop-2.7.3/tmp</value>
</property>
</configuration>
$HADOOP_HOME/etc/hadoop/hdfs-site.xml:
<configuration>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>master:50090</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/opt/hadoop-2.7.3/hdfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/opt/hadoop-2.7.3/hdfs/data</value>
</property>
<property>
<name>dfs.namenode.datanode.registration.ip-hostname-check</name>
//在Hadoop 2中,不可以被NameNode解析的DataNode将会被拒绝通信,如果没配置这个必须
hadoop/etc/slaves里面用机器名取代ip地址,然后在/etc/hosts里面加上这个机器名对应的ip地址
<value>false</value>
</property>
</configuration>
$HADOOP_HOME/etc/hadoop/mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>master:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>master:19888</value>
</property>
</configuration>
$HADOOP_HOME/etc/hadoop/yarn-site.xml
<!-- Site specific YARN configuration properties -->
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>master:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>master:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>master:8031</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>master:8033</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>master:8088</value>
</property>
</configuration>
最后格式化一下节点:
hdfs namenode -format -force //进行强制的格式化会同时格式化namenode和datanode
5)配置spark-env.sh //最好根据该文件里注释的属性来判断是否存在这个属性
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64
export HADOOP_HOME=/home/hadoop/hadoop-2.6.1
export SPARK_MASTER_HOST=10.64.66.215 //SPARK_MASTER_IP在2.0已经不再使用,不设置这个值将使用机器名,比较危险
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_MERMORY=2G
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SPARK_SCALA_VERSION=2.11
6)拷贝相关文件到worker节点:
先要修改worker节点的/home目录为可写权限
scp -r /home/hadoop/hadoop-2.6.1 10.64.66.127:/home/hadoop
scp -r /home/spark/spark-2.1.0-bin-hadoop2.7 10.64.66.127:/home/spark
7)创建hdfs文件系统
hadoop fs -mkdir -p /Hadoop/Input
hadoop fs -put wordcount.txt /Hadoop/Input
8)编辑测试代码
val file=sc.textFile("hdfs://master:9000/Hadoop/Input/wordcount.txt")
val rdd = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
rdd.collect()
rdd.foreach(println)
9)hdfs是hadoop的一个核心设计,利用这个,我们可以在window上设计spark master,ubuntu上设立worker,然后可以正常访问
hdfs上的文件,把工作仍交给worker,window上的master就不会有太大负担,这样就能让开发变得简便起来
30、spark大致流程是:
1)提交一个应用
2)创建对应的driver进程
3)driver进程将代码分成多个stage,每个stage分成多个task,然后分配给各个worker
4)driver进程向ResourceManager申请资源,ResourceManager根据NodeManager汇报上来的每台机器的资源状况,分配一定的
CPU和内存给应用
5)应用开始执行
31、hadoop如果没有退出安全模式,那么使用hbase时,hmaster就会报错
ERROR: org.apache.hadoop.hbase.ipc.ServerNotRunningYetException: Server is not running yet
查看 hbase master log, 发现
2014-07-14 23:31:51,270 INFO [master:192.168.126.8:60000] util.FSUtils: Waiting for dfs to exit safe mode...
原因是hadoop提供的NameNode在安全模式下是只读模式的
解决办法是退出 hadoop 安全模式:hdfs dfsadmin -safemode leave
32、部署hadoop和hbase的时候,机器名千万不要对应127.0.0.1,这样的话hadoop就会在本地的9000端口监听,导致
datanode访问不了,因为机器名必须唯一对应ip,另外hbase在设置zookeeper的时候会根据设置的值找到对方的机器名,
因此如果你不把机器名作为参数传进来,那么在/etc/hosts里面就得重复设置一个对方的机器名和ip映射,所以最好使用机器名
而不是随便取的一个名字,设置机器名可以在/etc/hostname里面修改,然后在/etc/hosts里面写上对应ip
33、如果想使用hbase自带的zookeeper,那么需要在hbase-env.sh中配置export HBASE_MANAGES_ZK=true
34、hmaster的webui默认在端口16010,默认连接端口是60000
具体配置查看:http://blog.csdn.net/ningxuezhu/article/details/50547970
35、window上编写调用hbase代码时,本地的hosts文件里必须配置hmaster和zookeeper机器名和ip的映射规则,否则
会报unknown host异常
36、如果采用java编写spark,那么自定义的类必须打包成一个jar包,然后调用
SparkConf conf;
conf.setJars(new String[]{"/E:/hbase.jar"})
来指向这个jar包,原因是当spark把任务分给各个节点时,每个节点运行时都会去找自定义的类,如果你没把jar包给它,
那么它就会报错找不到类,也就是最后提示java.lang.ClassCastException,而setJars方法会把设置的所有路径下的jar包都拷贝给各个节点
这样每个节点就能正常运行了,另外注意的是spark下的netty-all-4.0.42.Final.jar会和hbase下的同样一个jar包冲突,因此导入的
时候去掉hbase的那个jar包就行了
37、要使用hbase,需要设置
SPARK_CLASSPATH=$HBASE_HOME/lib/hbase-protocol-1.2.6.jar:$HBASE_HOME/lib/hbase-common-1.2.6.jar:$HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar:$HBASE_HOME/lib/hbase-server-1.2.6.jar:$HBASE_HOME/lib/hbase-client-1.2.6.jar:$HBASE_HOME/lib/metrics-core-2.2.0.jar
38、使用java开发spark,要记住一定要通过setJars指明当前项目生成的jar包位置,因为worker节点接到任务时首先通过
反射机制加载任务代码所在的类,通过这个类来寻找指定的rdd等对象,然后调用这些对象执行计算,
这时候如果找不到这个类就会报错,因此项目中所有用到的类必须都指定出来,这样worker节点才能顺利执行任务
39、当某一行没有要过滤的字段时,SingleColumnValueFilter是默认这一行符合过滤条件的,可以通过下面设置解决
filter.setFilterIfMissing(true);//true 跳过改行;false 通过该行
40、HQuorumPeer进程是Zookeeper的一个实例
41、下面是hbase与对应的hadoop支持版本
"S" = supported
"X" = not supported
"NT" = Not tested
HBase-1.1.x HBase-1.2.x HBase-1.3.x HBase-2.0.x
Hadoop-2.0.x-alpha X X X X
Hadoop-2.1.0-beta X X X X
Hadoop-2.2.0 NT X X X
Hadoop-2.3.x NT X X X
Hadoop-2.4.x S S S X
Hadoop-2.5.x S S S X
Hadoop-2.6.0 X X X X
Hadoop-2.6.1+ NT S S S
Hadoop-2.7.0 X X X X
Hadoop-2.7.1+ NT S S S
Hadoop-2.8.0 X X X X
Hadoop-3.0.0-alphax NT NT NT NT
42、hbase新版本中使用下面方式获取表格
connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(tableName));
43、hbase文档链接:http://hbase.apache.org/book.html#client.connection.pooling