1: Prepare the data source
Create a new student.txt file under the project, and the contents are:
1,zhangsan,20 2,lisi,21 3,wanger,19 4,fangliu,18
Two: Implementation
Java version:
1. First create a new student Bean object, implement serialization and toString() method. The specific code is as follows:
package com.cxd.sql;import java.io.Serializable;@SuppressWarnings("serial")public class Student implements Serializable { String sid; String sname; int sage; public String getSid() { return sid; } public void setSid(String sid) { this.sid = sid; } public String getSname() { return sname; } public void setSname(String sname) { this.sname = sname; } public int getSage() { return sage; } public void setSage(int sage) { this.sage = sage; } @Override public String toString() { return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]"; } }2. Convert, the specific code is as follows
package com.cxd.sql;import java.util.ArrayList;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SaveMode;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;public class TxtToParquetDemo { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local"); SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); reflectTransform(spark);//Java reflection dynamicTransform(spark);//Dynamic conversion} /** *Convert through Java reflection* @param spark */ private static void reflectTransform(SparkSession spark) { JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD(); JavaRDD<Student> rowRDD = source.map(line -> { String parts[] = line.split(","); Student stu = new Student(); stu.setSid(parts[0]); stu.setSname(parts[1]); stu.setSage(Integer.valueOf(parts[2])); return stu; }); Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class); df.select("sid", "sname", "sage").coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res"); } /** * Dynamic conversion* @param spark */ private static void dynamicTransform(SparkSession spark) { JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD(); JavaRDD<Row> rowRDD = source.map( line -> { String[] parts = line.split(","); String sid = parts[0]; String sname = parts[1]; int sage = Integer.parseInt(parts[2]); return RowFactory.create( sid, sname, sage ); }); ArrayList<StructField> fields = new ArrayList<StructField>(); StructField field = null; field = DataTypes.createStructField("sid", DataTypes.StringType, true); fields.add(field); field = DataTypes.createStructField("sname", DataTypes.StringType, true); fields.add(field); field = DataTypes.createStructField("sage", DataTypes.IntegerType, true); fields.add(field); StructType schema = DataTypes.createStructType(fields); Dataset<Row> df = spark.createDataFrame(rowRDD, schema); df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1"); } }Scala version:
import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types.StringTypeimport org.apache.spark.sql.types.StructFieldimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.types.IntegerTypeobject RDD2Dataset { case class Student(id:Int,name:String,age:Int) def main(args:Array[String]) { val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate() import spark.implicits._ reflectCreate(spark) dynamicCreate(spark) } /** * Convert through Java reflection* @param spark */ private def reflectCreate(spark:SparkSession):Unit={ import spark.implicits._val stuRDD=spark.sparkContext.textFile("student2.txt") //toDF() is an implicit conversion val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF() //stuDf.select("id","name","age").write.text("result") //Specify the column name for the write file stuDf.printSchema() stuDf.createOrReplaceTempView("student") val nameDf=spark.sql("select name from student where age<20") //nameDf.write.text("result") //Write the query result to a file nameDf.show() } /** * Dynamic conversion* @param spark */ private def dynamicCreate(spark:SparkSession):Unit={ val stuRDD=spark.sparkContext.textFile("student.txt") import spark.implicits._ val schemaString="id,name,age" val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema=StructType(fields) val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2))) val stuDf=spark.createDataFrame(rowRDD, schema) stuDf.printSchema() val tmpView=stuDf.createOrReplaceTempView("student") val nameDf=spark.sql("select name from student where age<20") //nameDf.write.text("result") //write the query result to a file nameDf.show() }}Note:
1. All the above codes have been tested and the test environment is spark2.1.0 and jdk1.8.
2. This code does not apply to versions before spark2.0.
The above summary of the two methods of converting Spark RDD into DataFrame by Java and scala is all the content I share with you. I hope you can give you a reference and I hope you can support Wulin.com more.