1: Préparez la source de données
Créez un nouveau fichier Student.txt dans le cadre du projet, et le contenu est:
1, Zhangsan, 20 2, Lisi, 21 3, Wanger, 19 4, Fangliu, 18
Deux: implémentation
Version Java:
1. Créez d'abord un nouvel objet Student Bean, implémentez la sérialisation et la méthode toString (). Le code spécifique est le suivant:
package com.cxd.sql; import java.io.serializable; @SuppressWarnings ("Serial") La classe publique implémente Serializable {String Sid; String Sname; int sage; public String getSID () {return sid; } public void setsid (string sid) {this.sid = sid; } public String getSname () {return sname; } public void setsName (String Sname) {this.sname = sname; } public int getSage () {return sage; } public void setage (int sage) {this.sage = sage; } @Override public String toString () {return "Student [sid =" + sid + ", sname =" + sname + ", sage =" + sage + "]"; }}2. Convertir, le code spécifique est le suivant
package com.cxd.sql; importer java.util.arraylist; import org.apache.spark.sparkconf; import org.apache.spark.api.java.javardd; import org.apache.spark.sql.dataset; import org.apache.sql.sql.row; import org.apache.spark.sql.row org.apache.spark.sql.saveMode; import org.apache.spark.sql.sparkcession; import org.apache.spark.sql.types.statypes; import org.apache.sql.types.structfield; TxtToparquetDemo {public static void main (String [] args) {sparkConf conf = new SparkConf (). SetAppName ("txtToParquet"). Setmaster ("local"); SPARKSESSE SPARK = SPARKSESSE.BUILDER (). Config (conf) .getorCreate (); ReflectTransform (Spark); // Java Reflection DynamicTransform (Spark); // Conversion dynamique} / ** * Converti via Java Reflection * @param Spark * / private static void reflevTransform (SparkSession Spark) {javardd <string> Source = Spark.read (). Javardd <Student> rowrdd = source.map (line -> {String Parts [] = line.split (","); Student Stu = new Student (); stu.setsid (parties [0]); stu.setsName (parties [1]); Stu.setsage (Integer.Valueof (Parts [2]); DataSet <Row> df = Spark.CreateDataFrame (Rowrdd, Student.class); DF.Select ("Sid", "Sname", "Sage"). Coalesce (1) .Write (). Mode (SaveMode.Apend) .Parquet ("Parquet.res"); } / ** * Conversion dynamique * @param spark * / private static void dynamictransform (sparkcession Spark) {javardd <string> source = spark.read (). Textfile ("stuinfo.txt"). Javardd (); Javardd <row> rowrdd = source.map (line -> {string [] parti parties = line.split (","); string sid = parties [0]; string sname = parties [1]; int sage = Integer.parseint (parties [2]); return rowfactory.create (sid, sname, sage);}); ArrayList <StructField> Fields = new ArrayList <StructField> (); StructField Field = NULL; field = dataTypes.CreatestructField ("Sid", datatypes.stringType, true); fields.add (champ); field = datatypes.createstructField ("Sname", datatypes.stringType, true); fields.add (champ); field = dataTypes.CreatestructField ("sage", datatypes.integerType, true); fields.add (champ); StructType schema = datatypes.CreateSructType (champs); DataSet <Row> df = Spark.CreateDataFrame (Rowrdd, schéma); DF.COALESCE (1) .WRITE (). MODE (SAVEMODE.APPEND) .PARQUET ("Parquet.res1"); }}Version Scala:
import org.apache.spark.sql.sparkssemport org.apache.spark.sql.types.stringtypeimport org.apache.spark.sql.types.structFieldImport org.apache.spark.sql.types.structtypeImport Org.apache.Spark.sql.types.Structs org.apache.spark.sql.types.integerTypeObject rdd2Dataset {Case class étudiant (id: int, name: String, Âge: int) def main (args: array [string]) {val spark = sparkession.builder (). maître ("local"). Spark.Implicits._ ReflectCreate (Spark) DynamicCreate (Spark)} / ** * Converti via Java Reflection * @param Spark * / Private Def ReflectReate (Spark: Sparkcession): unit = {Importer Spark.Implicits._Val Sturdd = Spark.SparkContex Studf = Sturdd.map (_. Split (",")). Map (partiesfordStudent (parties (0) .trim.toint, parties (1), parties (2) .trim.toint)). Studf.printSchema () Studf.CreateorReplaceTempView ("Student") Val Namedf = Spark.Sql ("SELECT Name From Student Where Age <20") //Namedf.Write.Text("Result ") // Écrivez le résultat query à un fichier nommé def. DynamicCreate (Spark: Sparkcession): Unit = {Val Sturdd = Spark.SparkContext.TextFile ("Student.txt") Import Spark.Implicits._ Val Schemastrring = "ID, Name, Age" Val Fields = ScheMastrring.Split ("). Map (FieldName => Struried (FieldName, Stringtype, Nullable = Nullable) schéma = structType (champs) val rowrdd = Sturdd.map (_. Split (",")). map (partiesfordrow (parties (0), parties (1), parties (2))) val Studf = Spark.CreatedataFrame (Rowrdd, schéma) Studf.printchema () Val tmpVive namedf = spark.sql ("Sélectionner le nom de l'élève où l'âge <20") //namedf.write.text("Result ") // écrivez le résultat de la requête dans un fichier nommé Namedf.show ()}}Note:
1. Tous les codes ci-dessus ont été testés et l'environnement de test est Spark2.1.0 et JDK1.8.
2. Ce code ne s'applique pas aux versions avant Spark2.0.
Le résumé ci-dessus des deux méthodes de conversion de Spark RDD en DataFrame par Java et Scala est tout le contenu que je partage avec vous. J'espère que vous pourrez vous faire référence et j'espère que vous pourrez soutenir Wulin.com plus.