windowns使用PySpark环境配置和基本操作的解决办法
感兴趣的小伙伴,下面一起跟随php教程的雯雯来看看吧
这篇文章主要为大家详细介绍了windowns使用PySpark环境配置和基本操作的简单示例,具有一定的参考价值,可以用来参考一下。
感兴趣的小伙伴,下面一起跟随php教程的雯雯来看看吧!
下载依赖
首先需要下载hadoop和spark,解压,然后设置环境变量。hadoop清华源下载spark清华源下载
代码如下:
1 2 3 | <code> HADOOP_HOME => /path/hadoop SPARK_HOME => /path/spark</code> |
windowns使用PySpark环境配置和基本操作
安装pyspark。
代码如下:
1 2 | <code> pip install pyspark</code> |
windowns使用PySpark环境配置和基本操作
基本使用
可以在shell终端,输入pyspark,有如下回显:
输入以下指令进行测试,并创建SparkContext,SparkContext是任何spark功能的入口点。
代码如下:
1 2 3 4 5 | <code> >>> from pyspark import SparkContext >>> sc = SparkContext( "local" , "First App" ) </code> |
windowns使用PySpark环境配置和基本操作
如果以上不会报错,恭喜可以开始使用pyspark编写代码了。不过,我这里使用IDE来编写代码,首先我们先在终端执行以下代码关闭SparkContext。
代码如下:
1 2 | <code> >>> sc.stop()</code> |
windowns使用PySpark环境配置和基本操作
下面使用pycharm编写代码,如果修改了环境变量需要先重启pycharm。在pycharm运行如下程序,程序会起本地模式的spark计算引擎,通过spark统计abc.txt文件中a和b出现行的数量,文件路径需要自己指定。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 | <code> from pyspark import SparkContext sc = SparkContext( "local" , "First App" ) logFile = "abc.txt" logData = sc.textFile(logFile).cache() numAs = logData.filter(lambda s: 'a' in s). count () numBs = logData.filter(lambda s: 'b' in s). count () print ( "Line with a:%i,line with b:%i" % (numAs, numBs)) </code> |
windowns使用PySpark环境配置和基本操作
运行结果如下:
20/03/11 16:15:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableUsing Spark's default log4j profile: org/apache/spark/log4j-defaults.propertiesSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).20/03/11 16:15:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.Line with a:3,line with b:1
这里说一下,同样的工作使用python可以做,spark也可以做,使用spark主要是为了高效的进行分布式计算。戳pyspark教程戳spark教程
RDD
RDD代表Resilient Distributed Dataset,它们是在多个节点上运行和操作以在集群上进行并行处理的元素,RDD是spark计算的操作对象。一般,我们先使用数据创建RDD,然后对RDD进行操作。对RDD操作有两种方法:Transformation(转换) - 这些操作应用于RDD以创建新的RDD。例如filter,groupBy和map。Action(操作) - 这些是应用于RDD的操作,它指示Spark执行计算并将结果发送回驱动程序,例如count,collect等。
创建RDD
parallelize是从列表创建RDD,先看一个例子:
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | <code> from pyspark import SparkContext sc = SparkContext( "local" , "count app" ) words = sc.parallelize( [ "scala" , "java" , "hadoop" , "spark" , "akka" , "spark vs hadoop" , "pyspark" , "pyspark and spark" ]) print (words)</code> |
windowns使用PySpark环境配置和基本操作
结果中我们得到一个对象,就是我们列表数据的RDD对象,spark之后可以对他进行操作。
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
Count
count方法返回RDD中的元素个数。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | <code> from pyspark import SparkContext sc = SparkContext( "local" , "count app" ) words = sc.parallelize( [ "scala" , "java" , "hadoop" , "spark" , "akka" , "spark vs hadoop" , "pyspark" , "pyspark and spark" ]) print (words) counts = words. count () print ( "Number of elements in RDD -> %i" % counts)</code> |
windowns使用PySpark环境配置和基本操作
返回结果:
Number of elements in RDD -> 8
Collect
collect返回RDD中的所有元素。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | <code> from pyspark import SparkContext sc = SparkContext( "local" , "collect app" ) words = sc.parallelize( [ "scala" , "java" , "hadoop" , "spark" , "akka" , "spark vs hadoop" , "pyspark" , "pyspark and spark" ]) coll = words.collect() print ( "Elements in RDD -> %s" % coll) </code> |
windowns使用PySpark环境配置和基本操作
返回结果:
Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']
foreach
每个元素会使用foreach内的函数进行处理,但是不会返回任何对象。下面的程序中,我们定义的一个累加器accumulator,用于储存在foreach执行过程中的值。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | <code> from pyspark import SparkContext sc = SparkContext( "local" , "ForEach app" ) accum = sc.accumulator(0) data = [1, 2, 3, 4, 5] rdd = sc.parallelize(data) def increment_counter(x): print (x) accum.add(x) return 0 s = rdd. foreach (increment_counter) print (s) # None print ( "Counter value: " , accum) </code> |
windowns使用PySpark环境配置和基本操作
返回结果:
NoneCounter value: 15
filter
返回一个包含元素的新RDD,满足过滤器的条件。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | <code> from pyspark import SparkContext sc = SparkContext( "local" , "Filter app" ) words = sc.parallelize( [ "scala" , "java" , "hadoop" , "spark" , "akka" , "spark vs hadoop" , "pyspark" , "pyspark and spark" ] ) words_filter = words.filter(lambda x: 'spark' in x) filtered = words_filter.collect() print ( "Fitered RDD -> %s" % (filtered)) Fitered RDD -> [ 'spark' , 'spark vs hadoop' , 'pyspark' , 'pyspark and spark' ]</code> |
windowns使用PySpark环境配置和基本操作
也可以改写成这样:
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | <code> from pyspark import SparkContext sc = SparkContext( "local" , "Filter app" ) words = sc.parallelize( [ "scala" , "java" , "hadoop" , "spark" , "akka" , "spark vs hadoop" , "pyspark" , "pyspark and spark" ] ) def g(x): for i in x: if "spark" in x: return i words_filter = words.filter(g) filtered = words_filter.collect() print ( "Fitered RDD -> %s" % (filtered))</code> |
windowns使用PySpark环境配置和基本操作
map
将函数应用于RDD中的每个元素并返回新的RDD。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | <code> from pyspark import SparkContext sc = SparkContext( "local" , "Map app" ) words = sc.parallelize( [ "scala" , "java" , "hadoop" , "spark" , "akka" , "spark vs hadoop" , "pyspark" , "pyspark and spark" ] ) words_map = words.map(lambda x: (x, 1, "_{}" .format(x))) mapping = words_map.collect() print ( "Key value pair -> %s" % (mapping))</code> |
windowns使用PySpark环境配置和基本操作
返回结果:
Key value pair -> [('scala', 1, '_scala'), ('java', 1, '_java'), ('hadoop', 1, '_hadoop'), ('spark', 1, '_spark'), ('akka', 1, '_akka'), ('spark vs hadoop', 1, '_spark vs hadoop'), ('pyspark', 1, '_pyspark'), ('pyspark and spark', 1, '_pyspark and spark')]
Reduce
执行指定的可交换和关联二元操作后,然后返回RDD中的元素。
代码如下:
1 2 3 4 5 6 7 8 9 | <code> from pyspark import SparkContext from operator import add sc = SparkContext( "local" , "Reduce app" ) nums = sc.parallelize([1, 2, 3, 4, 5]) adding = nums.reduce(add) print ( "Adding all the elements -> %i" % (adding))</code> |
windowns使用PySpark环境配置和基本操作
这里的add是python内置的函数,可以使用ide查看:
代码如下:
1 2 3 4 | <code> def add(a, b): "Same as a + b." return a + b</code> |
windowns使用PySpark环境配置和基本操作
reduce会依次对元素相加,相加后的结果加上其他元素,最后返回结果(RDD中的元素)。
Adding all the elements -> 15
Join
返回RDD,包含两者同时匹配的键,键包含对应的所有元素。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | <code> from pyspark import SparkContext sc = SparkContext( "local" , "Join app" ) x = sc.parallelize([( "spark" , 1), ( "hadoop" , 4), ( "python" , 4)]) y = sc.parallelize([( "spark" , 2), ( "hadoop" , 5)]) print ( "x =>" , x.collect()) print ( "y =>" , y.collect()) joined = x.join(y) final = joined.collect() print ( "Join RDD -> %s" % ( final )) </code> |
windowns使用PySpark环境配置和基本操作
返回结果:
x => [('spark', 1), ('hadoop', 4), ('python', 4)]y => [('spark', 2), ('hadoop', 5)]Join RDD -> [('hadoop', (4, 5)), ('spark', (1, 2))]
到此这篇关于windowns使用PySpark环境配置和基本操作的文章就介绍到这了,更多相关PySpark环境配置 内容请搜索php教程以前的文章或继续浏览下面的相关文章希望大家以后多多支持php教程!
注:关于windowns使用PySpark环境配置和基本操作的简单示例的内容就先介绍到这里,更多相关文章的可以留意