1: Prepare la fuente de datos
Cree un nuevo archivo de studs.txt en el proyecto, y el contenido es:
1, Zhangsan, 20 2, Lisi, 21 3, Wanger, 19 4, Fangliu, 18
Dos: implementación
Versión de Java:
1. Primero cree un nuevo objeto de estudiante Bean, implementa la serialización y el método toString (). El código específico es el siguiente:
paquete com.cxd.sql; import java.io.serializable; @suppleswarnings ("serial") El estudiante de clase pública implementa serializable {String sid; Cadena sname; int Sage; public String getSID () {return sid; } public void setSid (String sid) {this.sid = sid; } public String getSname () {return sname; } public void setSname (string sname) {this.sname = sname; } public int getSage () {return sage; } public void setSage (int sage) {this.sage = sage; } @Override public String toString () {return "Student [sid =" + sid + ", sname =" + sname + ", sage =" + sage + "]"; }}2. Convertir, el código específico es el siguiente
paquete com.cxd.sql; import java.util.arrayList; import org.apache.spark.sparkconf; import org.apache.spark.api.java.javardd; importar org.apache.spark.sql.dataset; importar org.apache.sql.row; org.apache.spark.sql.savemode; importar org.apache.spark.sql.sparksession; import org.apache.spark.sql.types.datatypes; import org.apache.spark.sql.types.tuctfield; import org.apache.sql.types.structtype; TxtToparquetDemo {public static void main (string [] args) {sparkconf conf = new SparkConf (). SetAppName ("TxtToparquet"). SetMaster ("local"); SparkSession Spark = SparkSession.Builder (). Config (conf) .getorcreate (); reflejTransform (Spark); // Java Reflection DynamicTransform (Spark); // Conversión dinámica}/***Convertir a través de Java Reflection*@param Spark*/private static void reflejTransform (SparkSession Spark) {Javardd <String> Source = Spark.read (). TextFile ("stuinfo.txt"). Javardd (); Javardd <entudent> rowrdd = fuente.map (línea -> {string piezas [] = line.split (","); stu = new student (); stu.setsid (partes [0]); stu.setsname (piezas [1]); stu.setsage (integer.valueOf (partes [2])); regreso stu;}); DataSet <OW> df = Spark.CreateDataFrame (ROWRDD, Student.Class); df.select ("Sid", "Sname", "Sage"). Coalesce (1) .Write (). Mode (SaveMode.Append) .Parquet ("parquet.res"); } / *** Conversión dinámica* @param Spark* / private static void DynamicTransform (SparkSession Spark) {javardd <string> fuente = Spark.read (). TextFile ("stuinfo.txt"). Javardd (); Javardd <ow> rowrdd = fuente.map (línea -> {string [] Parts = Line.split (","); String sid = Parts [0]; String Sname = Parts [1]; int sage = Integer.ParseInt (Parts [2]); return RowFactory.Create (Sid, Sname, Sage);}); ArrayList <Structfield> Fields = new ArrayList <Structfield> (); Structfield campo = nulo; campo = Datatypes.CreateStructfield ("Sid", DatatyPes.StringType, True); campos.add (campo); campo = Datatypes.CreateStructfield ("Sname", DatatyPes.StringType, true); campos.add (campo); campo = Datatypes.CreateStructfield ("Sage", Datatypes.InteGertype, true); campos.add (campo); Structtype schema = Datatypes.CreateStructType (campos); DataSet <OW> df = Spark.CreateDataFrame (ROWRDD, SCHEMA); df.coaleSce (1) .Write (). MODE (SaveMode.Append) .Parquet ("Parquet.res1"); }}Versión de Scala:
importar org.apache.spark.sql.sparksessionImport org.apache.spark.sql.types.stringtypeImport org.apache.spark.sql.types.structfieldImport org.apache.spark.sql.types.TructtypeMport Org.apache.sql.typachypes.types.TructtypePort Org.apache.sql.type org.apache.spark.sql.types.IntegertypeObject rdd2DataSet {case class student (id: int, name: string, age: int) def main (args: array [string]) {val spark = sparksession.builder (). maestro ("local"). appName ("rdd2dataset"). getCrreate () importador () importar. reflejado (chispa) dynamicCreate (Spark)} /*** Convierte a través de la reflexión de Java* @param Spark* /private def reflejado (Spark: Sparksession): unit = {import spark.implicits._val sturdd = spark.sparkcontext.textfile ("student2.txt") // toDf () es una conversión implícita studf = sturdd.map (_. Split (",")). MAP (Partsrep. Student (Parts (0) .Trim.Toint, Parts (1), Parts (2) .Trim.Toint)). TODF () //studf. studf.printschema () studf.createReReplAcetEmpView ("estudiante") val namedf = spark.sql ("Seleccione el nombre del estudiante donde la edad <20") //namedf.write.text(t"result ") // Escriba el resultado de la consulta a un archivo llamado F.Show ()} /*** conversación dinámica* @param* /privado de la defensa DynamicCreate (Spark: SparkSession): unitar schema = structtype (campos) val rowrdd = sturdd.map (_. split (",")). MAP (Parts⇒Row (Parts (0), Parts (1), Parts (2))) Val Studf = Spark.CreateDataFrame (ROWRDD, Schema) Studf.PrintsChema () Val TMPVIEW val namedf = spark.sql ("Seleccione el nombre del estudiante donde la edad <20") //namedf.write.text("result ") // Escriba el resultado de la consulta en un archivo llamado F.Show ()}}Nota:
1. Todos los códigos anteriores se han probado y el entorno de prueba es Spark2.1.0 y JDK1.8.
2. Este código no se aplica a las versiones antes de Spark2.0.
El resumen anterior de los dos métodos para convertir Spark RDD en DataFrame por Java y Scala es todo el contenido que comparto con usted. Espero que pueda darle una referencia y espero que pueda apoyar más a Wulin.com.