1: Bereiten Sie die Datenquelle vor
Erstellen Sie eine neue student.txt -Datei unter dem Projekt, und der Inhalt ist:
1, Zhangsan, 20 2, Lisi, 21 3, Wanger, 19 4, Fangliu, 18
Zwei: Implementierung
Java -Version:
1. Erstellen Sie zunächst ein neues Student -Bean -Objekt, implementieren Sie Serialisierung und ToString () -Methode. Der spezifische Code lautet wie folgt:
Paket com.cxd.sql; import Java.io.Serializable; @Suppresswarnings ("Serial") öffentliche Klasse Schüler implementiert serialisierbar {String Sid; String Sname; int Salbei; public String getsid () {return Sid; } public void setsid (String sid) {this.sid = Sid; } public String GetName () {return sname; } public void setSname (String sname) {this.sname = sname; } public int getaage () {return Sage; } public void setSage (int sage) {this.Sage = sage; } @Override public String toString () {return "student [sid =" + sid + ", sname =" + sname + ", sage =" + sage + "]"; }}2. Konvertieren Sie den spezifischen Code wie folgt
Paket com.cxd.sql; import java.util.arrayList; import org.apache.spark.sparkconf; import org.apache.spark.api.java.javardd; import org.apache.spark.sql.dataset; org.apache.spark.sql.savemode; import org.apache.spark.sql.sparkSession; import org.apache.spark.sql.types.datatypes; import org.apache.spark.sql.types.structfield; TxtToparquetDemo {public static void main (String [] args) {SparkConf conf = new SparkConf (). SetAppname ("txtToparquet"). SetMaster ("local"); SparkSession Spark = SparkSession.builder (). Config (conf) .getOrcreate (); ReflectTransform (Spark); // Java Reflection DynamicTransform (Spark); // Dynamische Konvertierung}/***Durch Java Reflection*@param Spark*/private statische void -reflektransform (SparkSession Spark) {Javardd <string> Quelle = Spark.Read (). Textfile ("stuinfo.txt"). Javardd <Studenten> rowrdd = source.map (Zeile -> {String parts [] = line DataSet <Row> df = Spark.Createdataframe (rowrdd, student.class); DF.Select ("Sid", "Sname", "Sage"). Calesce (1) .Write (). } / *** Dynamische Konvertierung* @param spark* / private statische void dynamictransform (sparksesession spark) {javardd <string> source = spark.read (). Textfile ("stuinfo.txt"). Javardd (); Javardd <row> rowrdd = source.map (Zeile -> {String [] parts = line.split (","); String sid = parts [0]; String sname = parts [1]; int sage = ginnger.parseInt (parts [2]); ArrayList <Strupfield> fields = new ArrayList <Structfield> (); Structfield field = null; field = DataTypes.CreateStructField ("Sid", DataTypes.Stringtype, true); Fields.Add (Feld); field = DataTypes.CreateStructField ("Sname", DataTypes.Stringtype, true); Fields.Add (Feld); field = DataTypes.CreateStructField ("Sage", DataTypes.InegerType, true); Fields.Add (Feld); Strukturtypschema = DataTypes.CreateStructType (Felder); DataSet <Row> df = Spark.Createdataframe (RowRDD, Schema); df.coalesce (1) .Write (). modus (Savemode.Append) .Parquet ("parquet.res1"); }}Scala -Version:
import org.apache.spark.sql.sparkSessionimport org.apache.spark.sql.types.stringtypeMport org.apache.spark.sql.types.structfieldimport org.apache.spark.sql.tructtructypportypportypportierung oder GTREIMPRAGE.SPARK.SPARK.SQL.TRUCTPORTTIMPREIDE.SPARK.SQL.TRUCTTYPARTPORTLEMPRAGE.SPARK.SQL.SSTRUCTTIMPRAGE.SPARK.SPARK.SPARK.SPARK.SPARK.SPARK.SCL.TRUCTTYPACTLEMPORT oder org.apache.spark.sql.types.InegerTypeObject rdd2dataset {case class Student (ID: int, Name: String, Alter: int) def main (args: array [string]) {val spark = sparkSession.builder (). Master ("Local". ReflectCreate (Spark) Dynamiccreate (Spark)} /*** Durch Java Reflection* @param Spark* /privat def reflectCreate (Spark: SparkSession): Einheit = {Import Spark.implicits._val Sturdd = SparkContext.textFile ("Student2.txt2.TXT") // / / / ///DF () ist ein und usw (). studf = sturdd.map (_. split (",")). map (Teile ⇒ Student (Teile (0) .Trim.toint, Teile (1), Teile (2) .trim. studf.printschema () studf.createorreplacetempView ("student") val namedf = spark.sql ("Name aus dem Schüler auswählen, wobei Alter <20") //namedf.write.text(" "Result ") // das Abfragergebnis mit einem Datei namenf.show ()} /** ** ** ** *** @PARAMCUSCUSION* @PARAM CONTURTION WRITE WRECKENTET DynamicCreate (Spark: SparkSession): Einheit = {val sturdd = spark.sparkcontext.textFile ("student.txt") import spark.implicits Schema = Strukturtyp (Felder) Val Rowrdd = Sturdd.Map (_. Split (",")). MAP (Teile ⇒ ROW (Teile (0), Teile (1), Teile (2))) Val studf = Spark. namedf = spark.sql ("Name aus dem Schüler auswählen, wobei Alter <20") //Namedf.write.text("Result ") // das Abfrageergebnis in eine Datei namensf.show ()}} schreibenNotiz:
1. Alle oben genannten Codes wurden getestet und die Testumgebung ist Spark2.1.0 und JDK1.8.
2. Dieser Code gilt nicht für Versionen vor Spark2.0.
Die obige Zusammenfassung der beiden Methoden zur Konvertierung von Spark RDD in DataFrame von Java und Scala ist der gesamte Inhalt, den ich mit Ihnen teile. Ich hoffe, Sie können Ihnen eine Referenz geben und ich hoffe, Sie können wulin.com mehr unterstützen.