1: Prepare a fonte de dados
Crie um novo arquivo do Student.txt sob o projeto, e o conteúdo é:
1, Zhangsan, 20 2, Lisi, 21 3, Wanger, 19 4, Fangliu, 18
Dois: implementação
Versão Java:
1. Primeiro, crie um novo objeto de feijão para estudantes, implemente o método de serialização e tostring (). O código específico é o seguinte:
pacote com.cxd.sql; importar java.io.Serializable; @suppresswarnings ("serial") classe pública implementa o aluno serializável {string sid; String snam; int sage; public string getsid () {return sid; } public void Setsid (String sid) {this.sid = sid; } public string getsName () {return sname; } public void SetsName (string sname) {this.sname = sname; } public int getSage () {return sage; } public void Setsage (int sage) {this.sage = sage; } @Override public string tostring () {return "Student [sid =" + sid + ", sname =" + sname + ", sage =" + sage + "]"; }}2. Converter, o código específico é o seguinte
pacote com.cxd.sql; importar java.util.ArrayList; importar org.apache.spark.sparkConf; importar org.apache.spark.api.java.javardd; importar.apache.spark.sql.dataset; importação.aPachepAchek.sqLey; org.apache.spark.sql.SaveMode; importar org.apache.spark.sql.sparkSession; importar org.apache.spark.sql.types.datatypes; importação org.aPACHACTACK.SSQL.TYPES.STRUCTIFFIEL; TxttoparquetDemo {public static void main (string [] args) {sparkconf conf = new SparkConf (). SetappName ("txttoparquet"). Setmaster ("local"); SparkSession Spark = SparkSession.Builder (). Config (conf) .GetorCreate (); refleteTransform (Spark); // Java Reflexão Dynamictransform (Spark); // Conversão dinâmica}/***Converta através da reflexão Java*@param Spark*/private estático void reflettransform (SparkSession Spark) {JAVARDD <String> fonte. JAVARDD <DAULD> ROWRDD = fonte.map (linha -> {string partes [] = line.split (","); Student Stu = new Student (); Stu.SetsId (partes [0]); Stu.SetsName (Parts [1]); Stu.Setsage (integger.Valueof (2]; 2]; DataSet <LOW> df = Spark.createTataframe (Rowrdd, Student.class); df.Select ("Sid", "Snome", "Sage"). Coalesce (1) .Write (). Modo (Savemode.append) .parquet ("parquet.res"); } / *** Conversão dinâmica* @param Spark* / private estático void dynamictransform (SparkSession Spark) {javardd <string> fonte = spark.read (). Textfile ("stuinfo.txt"). Javardd (); JAVARDD <LOW> rowrdd = source.map (linha -> {string [] partes = line.split (","); string sid = partes [0]; string snam = partes [1]; int sage = integer.parseint (partes [2]); retornar linefactory.create (sid, sname, sage);}); ArrayList <tructfield> campos = new ArrayList <tructfield> (); Campo structfield = nulo; field = dados de dados.createstructfield ("sid", dados de dados.stringType, true); campos.add (campo); field = Datatypes.createstructfield ("sname", datatypes.stringType, true); campos.add (campo); field = dados de dados.createstructfield ("sage", dados de dados.integertype, true); campos.add (campo); Esquema structtype = dados de dados.createstructtype (campos); DataSet <LOW> df = Spark.createTataframe (rowrdd, esquema); df.coalesce (1) .Write (). Modo (Savemode.append) .Parquet ("parquet.res1"); }}Versão do Scala:
importar org.apache.spark.sql.sparkSessionImport org.apache.spark.sql.types.stringtypeimport org.apache.spark.sql.tutcttypeimport orgache.spark.sql.types.structypeimport org.apache.spark.sysplyes.spimpimpimsp.pache.spk.spk.sql.types.structtypeimport org.apache.spark.sk.syts.sttypeimport org.apache.spark.sql.types.structypeimport org.apache.spark.s. org.apache.spark.sql.types.integertypeObject rdd2dataSet {Case classe Student (id: int, nome: string, idade: int) def main (args: Array [string]) {valsks2data = sparkSession.builder (). master ("local). refleteCreate (Spark) DynamicCreate (Spark)} /*** Converta através da reflexão Java* @param Spark* /private def refletCreate (Spark: SparkSession): Unit = {importar Spark.implicits._Val Sturdd = Spark.SparkContext.textFile ("Student2.TXT") // TODF () ISPLOST ISMPROSST IS ANGRICK IS PRAIRST. studf = sturdd.map (_. split (",")). mapa (partes postes (partes (0) .Trim.Toint, partes (1), partes (2) .Trim.toint). studf.printschema () studf.createReplacetempview ("aluno") val chamadof = spark.sql ("Selecione o nome do aluno onde idade <20") //namedf.write.text("Result ") // Escreva o resultado da consulta para um arquivo denominado dynamicCreate(spark:SparkSession):Unit={ val stuRDD=spark.sparkContext.textFile("student.txt") import spark.implicits._ val schemaString="id,name,age" val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true)) val esquema = structype (campos) val rowrdd = sturdd.map (_. split (",")). mapa (peçasculas. tmpview = studf.createReplacetempView ("Student") val chamadof = spark.sql ("Selecione o nome do aluno onde idade <20") //namedf.write.text("Result ") // Escreva o resultado da consulta em um arquivo denominadof.show ()}}}}}}Observação:
1. Todos os códigos acima foram testados e o ambiente de teste é Spark2.1.0 e JDK1.8.
2. Este código não se aplica às versões antes do Spark2.0.
O resumo acima dos dois métodos de conversão de Spark RDD em DataFrame por Java e Scala é todo o conteúdo que compartilho com você. Espero que você possa lhe dar uma referência e espero que você possa apoiar mais o wulin.com.