1.1 Spark 读取数据的统一入口:
spark.read.format(指定的格式).load(文件的路径)或者spark.read.格式的名称(文件的路径)
1.2 Spark 写出数据的统一出口:
Dataframe.write.format(保存为什么格式).save(保存到哪个路径)或者Dataframe.write.保存的格式(保存到哪个路径)
1.3 Spark 写出数据有 4 种方式:
append:如果数据源或者表已经存在,继续追加overwrite:如果数据源已经存在,覆盖写出ignore:如果数据源已经存在,将忽略(ignore) Dataframe中的数据,如果不存在,则创建并写出。官网的比喻是类似这条SQL语句:create table if not existserrorifexists:如果数据源(可以理解为文件)已经存在,将抛出异常
2.1 SparkSQL读取数据的综合案例:
from pyspark.sql import SparkSessionif __name__ == '__main__': # 创建上下文对象 spark = SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions', '6').getOrCreate() # 1.1-读取json文件并打印 print('json:') df1=spark.read.format('json').load('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/people.json') df1.printSchema() df1.show() # 1.2-写法二 df2=spark.read.json('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/people.json') df2.printSchema() df2.show() # 2-读取csv文件并打印 print('csv:') df3=spark.read.option('sep',';').option('header',True).option('inferSchema',True).csv('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/people.csv') df3.printSchema() df3.show() # 3-读取parquet文件并打印 print('parquet:') # 下面3种写法效果一样 df4=spark.read.format('parquet').load('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/users.parquet') df4.printSchema() df4.show() df5=spark.read.load('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/users.parquet') df5.printSchema() df5.show() df6=spark.read.parquet('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/users.parquet') df6.printSchema() df6.show() # 4-读取jdbc-mysql文件并打印 print('jdbc:') df7=spark.read.format('jdbc') .option('url','jdbc:mysql://node1:3306') .option('user','root') .option('password','123456') .option('dbtable','bigdata.people') .load() df7.printSchema() df7.show()
2.2 SparkSQL写出数据的综合案例:
from pyspark.sql import SparkSessionif __name__ == '__main__': # 创建上下文对象 spark=SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions','6').getOrCreate() sc=spark.sparkContext # 使用SparkContext,读取txt形成RDD,转换成Dataframe rdd1=sc.textFile('file:///root/1.txt') rdd2=rdd1.map(lambda str:(str.split(',')[0],int(str.split(',')[1].strip()))) df=rdd2.toDF(['name','age']) # 并打印schema和数据 df.printSchema() df.show() # 1.1-将Dataframe保存成为json文件, df.coalesce(1).write.format('json').mode('overwrite').save('file:///root/out/json1') # 1.2-写法二 df.coalesce(1).write.mode('overwrite').json('file:///root/out/json2') # 2-将Dataframe保存成为csv文件 df.coalesce(1).write.mode('overwrite') .option('header',True) .option('sep',';') .csv('file:///root/out/csv1') # 3-将Dataframe保存成为parquet文件(SparkSQL模块中默认读取数据文件格式就是parquet列式存储数据) df.coalesce(1).write.mode('overwrite').save('file:///root/out/parquet1') # 4-将Dataframe保存成为jdbc-mysql数据 df.write.format('jdbc').mode('overwrite') .option('url', 'jdbc:mysql://node1:3306') .option('user', 'root') .option('password', '123456') .option('dbtable', 'bigdata.people') .save()