Spark8: Linux中使用pyspark

Spark8: Linux中使用pyspark

安装

  • 📢📢📢这里我实在Linux服务器中安装的,服务器已经安装了Hadoop、Spark环境,跑的没问题。如果是安装在Windows下,还需要配置Hadoop本地环境,搭建本地Hadoop集群。
pip install pyspark -i https://pypi.tuna.tsinghua.edu.cn/simple
notion image
📢📢📢有一个包版本涉及刀flink这里记录一下

读取HDFS上的txt

from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext dirPath = "hdfs://redis01:8020/flink_input/flink-test.txt" sc = SparkContext(conf=SparkConf().setAppName("The first example")) rdd = sc.textFile(dirPath) print(rdd) # print(rdd.name) # print(rdd.count()) # print(rdd.first()) # print(type(rdd)) # print(rdd.collect())

读取HDFS上的csv

文件:
代码
from pyspark import SparkContext from pyspark.sql import SparkSession # 获取spark的上下文 sc = SparkContext('local', 'spark_file_conversion') sc.setLogLevel('WARN') spark = SparkSession.builder.getOrCreate() spark.conf.set("spark.sql.execution.arrow.pyspark.enabled'", "true") # 读取本地或HDFS上的文件【.load('"hdfs://redis01:8020/flink_input/movies.csv")】 df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("hdfs://redis01:8020/flink_input/movies.csv") # print(df.dtypes) # 所有的信息 # print(df.collect()) # 第一条数据 # print(df.head()) # print(df.head(2)) print(df.show(2)) root |-- rank: integer (nullable = true) |-- src: string (nullable = true) |-- name: string (nullable = true) |-- box_office: string (nullable = true) |-- avg_price: integer (nullable = true) |-- avg_people: integer (nullable = true) |-- begin_date: string (nullable = true)
print(df.dtypes) [('rank', 'int'), ('src', 'string'), ('name', 'string'), ('box_office', 'string'), ('avg_price', 'int'), ('avg_people', 'int'), ('begin_date', 'string')]
print(df.show(2)) +----+--------------------+--------+----------+---------+----------+----------+ |rank| src| name|box_office|avg_price|avg_people|begin_date| +----+--------------------+--------+----------+---------+----------+----------+ | 1|/item/%E6%88%98%E...| 战狼2| 56.83亿| 35| 38|2017.07.27| | 2|/item/%E6%B5%81%E...|流浪地球| 46.41亿| 46| 50|2019.02.05| +----+--------------------+--------+----------+---------+----------+----------+
notion image
print(df.printSchema()) root |-- rank: integer (nullable = true) |-- src: string (nullable = true) |-- name: string (nullable = true) |-- box_office: string (nullable = true) |-- avg_price: integer (nullable = true) |-- avg_people: integer (nullable = true) |-- begin_date: string (nullable = true)
notion image
print(df.select('rank').distinct().collect()) # [Row(rank=12), Row(rank=1), Row(rank=13), Row(rank=6), Row(rank=16), Row(rank=20), Row(rank=5), Row(rank=19), Row(rank=15), Row(rank=9), Row(rank=17), Row(rank=4), Row(rank=8), Row(rank=7), Row(rank=10), Row(rank=11), Row(rank=14), Row(rank=2), Row(rank=18)]

将spark.dataFrame转为pandas.DataFrame

# 所需字段和新老字段映射关系 columns_json_str = '{"name":"影片名称","box_office":"票房"}' columns_dict = json.loads(columns_json_str) # 将spark.dataFrame转为pandas.DataFrame,在此处选取指定的列 df = pd.DataFrame(df.toPandas(),columns=columns_dict.keys()) print(df)
notion image

将pandas.DataFrame转为spark.dataFrame,需要转数据和列名

# 所需字段和新老字段映射关系 columns_json_str = '{"name":"影片名称","box_office":"票房"}' columns_dict = json.loads(columns_json_str) # 将spark.dataFrame转为pandas.DataFrame,在此处选取指定的列 df = pd.DataFrame(df.toPandas(),columns=columns_dict.keys()) # print(df) data_values=df.values.tolist() data_coulumns=list(df.columns) #将pandas.DataFrame转为spark.dataFrame,需要转数据和列名 df = spark.createDataFrame(data_values,data_coulumns) # print(df) for key in columns_dict.keys() : df = df.withColumnRenamed(key , columns_dict[key]); print(df.collect()) print(df.printSchema())
notion image

数据写入到文件

# 所需字段和新老字段映射关系 columns_json_str = '{"name":"影片名称","box_office":"票房"}' columns_dict = json.loads(columns_json_str) # 将spark.dataFrame转为pandas.DataFrame,在此处选取指定的列 df = pd.DataFrame(df.toPandas(),columns=columns_dict.keys()) # print(df) data_values=df.values.tolist() data_coulumns=list(df.columns) #将pandas.DataFrame转为spark.dataFrame,需要转数据和列名 df = spark.createDataFrame(data_values,data_coulumns) # print(df) for key in columns_dict.keys() : df = df.withColumnRenamed(key , columns_dict[key]); # print(df.collect()) # print(df.printSchema()) # 将重命名之后的数据写入到文件 filepath = 'new_movies.csv' df.write.format("csv").options(header='true', inferschema='true').save('hdfs://redis01:8020/flink_input/' + filepath)
notion image

全代码

from pyspark import SparkContext from pyspark.sql import SparkSession import pandas as pd import json # 获取spark的上下文 sc = SparkContext('local', 'spark_file_conversion') sc.setLogLevel('WARN') spark = SparkSession.builder.getOrCreate() spark.conf.set("spark.sql.execution.arrow.pyspark.enabled'", "true") # 读取本地或HDFS上的文件【.load('"hdfs://redis01:8020/flink_input/movies.csv")】 df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("hdfs://redis01:8020/flink_input/movies.csv") # print(df.dtypes) # print(df.collect()) # print(df.head()) # print(df.show(2)) # print(df.head(2)) # print(df.columns) # print(df.printSchema()) # print(df.select('rank').distinct().collect()) # 所需字段和新老字段映射关系 columns_json_str = '{"name":"影片名称","box_office":"票房"}' columns_dict = json.loads(columns_json_str) # 将spark.dataFrame转为pandas.DataFrame,在此处选取指定的列 df = pd.DataFrame(df.toPandas(),columns=columns_dict.keys()) # print(df) data_values=df.values.tolist() data_coulumns=list(df.columns) #将pandas.DataFrame转为spark.dataFrame,需要转数据和列名 df = spark.createDataFrame(data_values,data_coulumns) # print(df) for key in columns_dict.keys() : df = df.withColumnRenamed(key , columns_dict[key]); # print(df.collect()) # print(df.printSchema()) # 将重命名之后的数据写入到文件 filepath = 'new_movies.csv' df.write.format("csv").options(header='true', inferschema='true').save('hdfs://redis01:8020/flink_input/' + filepath)