1: เตรียมแหล่งข้อมูล
สร้างไฟล์ student.txt ใหม่ภายใต้โครงการและเนื้อหาคือ:
1, Zhangsan, 20 2, Lisi, 21 3, Wanger, 19 4, Fangliu, 18
สอง: การดำเนินการ
เวอร์ชัน Java:
1. ก่อนสร้างวัตถุถั่วนักเรียนใหม่ใช้วิธีการทำให้เป็นอนุกรมและ toString () รหัสเฉพาะมีดังนี้:
แพ็คเกจ com.cxd.sql; นำเข้า java.io.serializable; @suppresswarnings ("อนุกรม") นักเรียนชั้นเรียนสาธารณะใช้ serializable {String sid; Sname String; Int Sage; สตริงสาธารณะ getSid () {return sid; } โมฆะสาธารณะ setSID (สตริง sid) {this.sid = sid; } สตริงสาธารณะ getSname () {return sname; } โมฆะสาธารณะ setSname (String sname) {this.sname = sname; } public int getSage () {return sage; } โมฆะสาธารณะ setSage (int sage) {this.sage = sage; } @Override สตริงสาธารณะ toString () {return "นักเรียน [sid =" + sid + ", sname =" + sname + ", sage =" + sage + "]"; -2. แปลงรหัสเฉพาะมีดังนี้
แพ็คเกจ com.cxd.sql; นำเข้า java.util.arraylist; นำเข้า org.apache.spark.sparkconf; นำเข้า org.apache.spark.api.java.javardd; นำเข้า org.apache.spark.sql.dataset; import org.apache.pache.sqlow.sql org.apache.spark.sql.savemode; นำเข้า org.apache.spark.sql.sparksession; นำเข้า org.apache.spark.sql.types.datatypes; นำเข้า org.apache.park.sql.types.structfield; txttoparquetDemo {โมฆะคงที่สาธารณะหลัก (สตริง [] args) {SparkConf conf = new SparkConf (). setAppName ("txttoparquet"). setmaster ("local"); Sparksession Spark = Sparksession.builder (). config (conf) .getorCreate (); ReflectTransform (Spark); // Java Reflection DynamicTransform (Spark); // Dynamic Conversion}/***แปลงผ่าน Java Reflection*@param Spark*/โมฆะคงที่ส่วนตัวสะท้อนให้เห็น javardd <Tudent> rowrdd = source.map (บรรทัด -> {ชิ้นส่วนสตริง [] = line.split (","); นักเรียน stu = นักเรียนใหม่ (); stu.setsid (ส่วน [0]); stu.setsname (ส่วน [1]); stu.setsage ชุดข้อมูล <Wow> DF = Spark.CreatedataFrame (Rowrdd, Student.Class); df.select ("sid", "sname", "sage"). coalesce (1) .write (). โหมด (savemode.append) .parquet ("parquet.res"); } / *** การแปลงแบบไดนามิก* @param Spark* / โมฆะส่วนตัวคงที่ DynamicTransform (Sparksession Spark) {Javardd <String> Source = spark.read (). textFile ("stuinfo.txt"). javardd (); Javardd <Wow> rowrdd = source.map (บรรทัด -> {string [] ส่วน = line.split (","); สตริง sid = ส่วน [0]; สตริง sname = ส่วน [1]; int sage = integer.parseint (ส่วน [2]); returnkatory.create ArrayList <Clustfield> ฟิลด์ = new ArrayList <Clustfield> (); Structfield Field = NULL; field = datatypes.createstructfield ("sid", datatypes.stringtype, true); Fields.add (ฟิลด์); field = datatypes.createstructfield ("sname", datatypes.stringType, true); Fields.add (ฟิลด์); field = datatypes.createstructfield ("sage", datatypes.integertype, true); Fields.add (ฟิลด์); structType schema = datatypes.createstructType (ฟิลด์); ชุดข้อมูล <Wow> DF = Spark.CreatedataFrame (Rowrdd, Schema); df.coalesce (1) .write (). โหมด (savemode.append) .parquet ("parquet.res1"); -เวอร์ชันสกาล่า:
นำเข้า org.apache.spark.sql.SparksessionImport org.apache.spark.sql.types.stringtypeimport org.apache.spark.sql.types.structfieldimport org.apache.sql.types.structtypeimport org.apache.spark.sql.types.integertypeobject rdd2dataset {นักเรียนชั้นเรียน (ID: int, ชื่อ: สตริง, อายุ: int) def main (args: array [string]) {val spark = sparks.builder () ReflectCreate (Spark) DynamicCreate (Spark)} /*** แปลงผ่าน Java Reflection* @param Spark* /Private def ReflectCreate (Spark: Sparksession): UNIT = {Import Spark.implicits._val sturdd = spark.sparkContext.TextFile ("student2.txt" studf = sturdd.map (_. split (",")). แผนที่ (ส่วนที่นักเรียน (ส่วน (0) .Trim.toint, ชิ้นส่วน (1), ชิ้นส่วน (2) .Trim.toint)). todf () //studf.select("id", studf.createorreplacetempview ("นักเรียน") val namedf = spark.sql ("เลือกชื่อจากนักเรียนที่อายุ <20") //namedf.write.text("Result ") // เขียนผลลัพธ์การสืบค้นไปยังไฟล์ชื่อ sshow ()} /*** sturdd = spark.sparkcontext.textfile ("student.txt") นำเข้า spark.implicits._ val schemastring = "id, ชื่อ, อายุ" เขตข้อมูลวาล = schemastring.split (","). แผนที่ (FieldName => structfield rowrdd = sturdd.map (_. split (",")). map (ส่วนที่⇒ row (ส่วน (0), ชิ้นส่วน (1), ชิ้นส่วน (2))) val studf = spark.createdataframe (rowrdd, schema) studf.printschema () นักเรียนที่อายุ <20 ") //namedf.write.text("Result") // เขียนผลการสอบถามไปยังไฟล์ที่ชื่อว่า f.show ()}}บันทึก:
1. รหัสข้างต้นทั้งหมดได้รับการทดสอบและสภาพแวดล้อมการทดสอบคือ Spark2.1.0 และ JDK1.8
2. รหัสนี้ใช้ไม่ได้กับเวอร์ชันก่อน Spark2.0
บทสรุปข้างต้นของสองวิธีในการแปลง Spark RDD เป็น DataFrame โดย Java และ Scala เป็นเนื้อหาทั้งหมดที่ฉันแบ่งปันกับคุณ ฉันหวังว่าคุณจะให้ข้อมูลอ้างอิงและฉันหวังว่าคุณจะสนับสนุน wulin.com มากขึ้น