1: Siapkan sumber data
Buat file Student.txt baru di bawah proyek, dan isinya adalah:
1, Zhangsan, 20 2, Lisi, 21 3, Wanger, 19 4, Fangliu, 18
Dua: Implementasi
Versi Java:
1. Pertama -tama buat objek kacang siswa baru, implementasikan metode serialisasi dan tostring (). Kode spesifiknya adalah sebagai berikut:
Paket com.cxd.sql; impor java.io.serializable; @suppresswarnings ("serial") siswa kelas publik mengimplementasikan serializable {string sid; SNAME string; Int Sage; string publik getSid () {return sid; } public void setSid (String Sid) {this.sid = sid; } public String getSname () {return sname; } public void setSname (string sname) {this.sname = sname; } public int getage () {return sage; } public void setSage (int sage) {this.sage = sage; } @Override Public String ToString () {return "Student [sid =" + sid + ", sname =" + sname + ", sage =" + sage + "]"; }}2. Konversi, kode spesifik adalah sebagai berikut
Paket com.cxd.sql; import java.util.arraylist; import org.apache.spark.sparkconf; import org.apache.spark.api.java.javardd; import org.apache.spark.sql.dataset; impor org.apache.spark.sql.apory; orgrow.park. org.apache.spark.sql.savemode; impor org.apache.spark.sql.sparksession; impor org.apache.spark.sql.types.datatypes; impor org.apache.spark.sql.types.structfield; impor org.apache.apache.spark.sql.types. {public static void main (string [] args) {sparkconf conf = new sparkconf (). setappname ("txttoparquet"). setMaster ("lokal"); Sparksesion spark = sparksession.builder (). Config (conf) .getorCreate (); ReflectTransform (Spark); // Java Reflection DynamicTransform (Spark); // Konversi Dinamis}/***Konversi melalui Refleksi Java*@param Spark*/Private Static Void ReflectTransform (Sparksession Spark) {Javardd <string> Sumber = Spark.Read (). TextFile ("StuInfo. Javardd <spulhition> rowrdd = source.map (line -> {string bagian [] = line.split (","); siswa Stu = siswa baru (); stu.setsid (bagian [0]); Stu.setsname (bagian [1]); stu.setsage (integer.valueof (bagian [2]))); stu.setsage (integer.valueof (bagian [2]))); stU.setsage (integer.valueof (bagian [2]))); stU.setsage (integer.valueof (bagian [2]))); Dataset <Row> df = spark.createDataFrame (rowrdd, student.class); df.select ("sid", "sname", "sage"). coalesce (1) .write (). Mode (savemode.append) .parquet ("parquet.res"); } / *** konversi dinamis* @param spark* / private static void dynamictransform (sparksession spark) {javardd <string> source = spark.read (). TextFile ("stuinfo.txt"). Javardd (); JAVARDD <Row> rowrdd = Source.map (baris -> {string [] bagian = line.split (","); string sid = bagian [0]; string sname = bagian [1]; int sage = integer.parseint (bagian [2]); return rowfactory.create (sid, sname, sage);}); ArrayList <Structfield> bidang = Daftar ArrayList baru <Structfield> (); Bidang structfield = null; field = DataTypes.CreateStructField ("SID", DataTypes.stringType, true); fields.add (bidang); field = DataTypes.CreateStructField ("Sname", DataTypes.stringType, true); fields.add (bidang); field = DataTypes.CreateStructField ("Sage", DataTypes.IntegerType, true); fields.add (bidang); Skema structtype = dataTypes.createStructType (bidang); Dataset <Row> df = spark.createDataFrame (rowrdd, skema); df.coalesce (1) .write (). Mode (saveMode.append) .parquet ("parquet.res1"); }}Versi Scala:
import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types.StringTypeimport org.apache.spark.sql.types.StructFieldimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.types.IntegerTypeObject rdd2dataset {case class student (id: int, name: string, use: int) def main (args: array [string]) {val spark = sparksession.builder (). master ("local"). appname ("sparksession.builder (). master (" local "). appname (" rdd2d2dat. ReflectCreate (Spark) DynamicCreate (Spark)} /*** Konversi melalui Refleksi Java* @param Spark* /private def CLEFLCREATE (Spark: Sparksession): unit = {impor spark.implicits._val sturdd = spark.sparkcontext.TextFile ("student22.txt") /tundf () adalah val. studf = sturdd.map (_. split (",")). Map (bagian buah student (bagian (0) .trim.toint, bagian (1), bagian (2) .trim.toint)). TODF () //studf.select("id","name", mapan").write.text studf.printschema () studf.createorreplacetempview ("student") val namedf = spark.sql ("Pilih Nama dari Siswa Di mana Umur <20") //namedf.write.Text("Result ") // Conversi Dinamis* @PLOWADF.SHOW ()} /** ** ** Dinamis* DynamicCreate (Spark: Sparksesion): unit = {val sturdd = spark.sparkcontext.TextFile ("student.txt") impor spark.implicits._ val schemastring = "id, name, usia" val fields = schemastring.split (" skema = structtype (bidang) val rowrdd = sturdd.map (_. split (",")). Map (bagian tunggal (bagian (0), bagian (1), bagian (2))) val studf = spark.createdataframe (rowrdd, skema) studf.printschema () valeplac = rowrdd = studf. bernamaf = spark.sql ("Pilih Nama dari Siswa di mana usia <20") //namedf.write.text("Result ") // Tulis hasil kueri ke file bernamaf.show ()}}}Catatan:
1. Semua kode di atas telah diuji dan lingkungan pengujian adalah Spark2.1.0 dan JDK1.8.
2. Kode ini tidak berlaku untuk versi sebelum Spark2.0.
Ringkasan di atas dari dua metode untuk mengubah Spark RDD menjadi DataFrame oleh Java dan Scala adalah semua konten yang saya bagikan dengan Anda. Saya harap Anda dapat memberi Anda referensi dan saya harap Anda dapat mendukung wulin.com lebih lanjut.