1:データソースを準備します
プロジェクトの下に新しいsustent.txtファイルを作成すると、内容は次のとおりです。
1、Zhangsan、20 2、Lisi、21 3、Wanger、19 4、Fangliu、18
2:実装
Javaバージョン:
1.最初に新しい学生Beanオブジェクトを作成し、SerializationとToString()メソッドを実装します。特定のコードは次のとおりです。
パッケージcom.cxd.sql; import java.io.serializable; @suppresswarnings( "serial")パブリッククラスの学生はserializable {string sid;文字列スナム; int sage; public string getsid(){return sid; } public void setSid(string sid){this.sid = sid; } public string getSname(){return sname; } public void setsname(string sname){this.sname = sname; } public int getsage(){return sage; } public void setSage(int sage){this.sage = sage; } @Override public String toString(){return "sudent [sid =" + sid + "、sname =" + sname + "、sage =" + sage + "]"; }}2。変換、特定のコードは次のとおりです
パッケージcom.cxd.sql; Import java.util.arraylist; import org.apache.spark.sparkconf; import org.apache.spark.api.java.javardd; import org.apache.spark.sql.dataset; import org.apache.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark.spark. org.apache.spark.savemode; import org.apache.spark.sql.sparksession; import org.apache.spark.sql.types.datatypes; import org.apache.spark.sql.types.structfield; import org.apach.tl.types.tlucttype txtttoparquetdemo {public static void main(string [] args){sparkconf conf = new SparkConf()。setAppname( "txtttoparquet")。setmaster( "local"); Sparksession spark = sparks.builder()。config(conf).getorcreate();反射トランスフォーム(Spark); // Java Reflection DynamicTransform(Spark); // Dynamic Conversion}/***Java Reflection*@Param Spark*/private static void refrect Transform(Sparks spark){javardd <string> source = spark.read()textfile( "stuinfo.txt()")。 Javardd <Student> rowrdd = source.map(line-> {string parts [] = line.split( "、"); suste stu = new sustent(); stu.setsid(parts [0]); stu.setsname(parts [1]); stu.setsage(integer.valueof(parts [2]); ratrun stu; stu;}); DataSet <Row> df = spark.createdataframe(rowrdd、sudent.class); df.select( "sid"、 "sname"、 "sage")。coalesce(1).write()。mode(savemode.append).parquet( "parquet.res"); } / ***ダイナミック変換* @param Spark* / private static void dynamictransform(sparksession spark){javardd <string> source = spark.read()。 Javardd <Row> rowrdd = source.map(line-> {string [] parts = line.split( "、"); string sid = parts [0]; string sname = parts [1]; int sage = integer.parseint(parts [2]); return rowfactory.create(sid、sname、sage);}); arrayList <structfield> fields = new ArrayList <structfield>(); structfield field = null; field = dataTypes.createstructfield( "sid"、datAtypes.stringType、true); fields.add(field); field = dataTypes.createstructfield( "sname"、datAtypes.stringType、true); fields.add(field); field = dataTypes.createstructfield( "sage"、datAtypes.integertype、true); fields.add(field); structtype schema = datatypes.createstructType(fields); DataSet <Row> df = spark.createdataframe(rowrdd、schema); df.coalesce(1).write()。mode(savemode.append).parquet( "parquet.res1"); }}Scalaバージョン:
org.apache.spark.sql.sparksessionimport org.apache.spark.sql.types.stringtypeimport org.apache.spark.sql.types.Structfieldimport org.apache.spark.sql.tl.types.structtypeimport org.apach.spark.spark.spepepes.Stark.spepepes.Typepes.TypePES.THIMPORT org.apache.spark.sql.types.integertypeobject rdd2dataset {ケースクラスの学生(ID:int、name、string、age:int)def main(args:array [string]){val spark = sparksession.builder()。マスター( "local")。 reffermcleate(spark)dynamiccreate(spark)} /*** java reflectionを介して変換* @param spark* /private def refrectcreate(spark:sparksession):unit = {import spark.implicits._val sturdd = spark.sparkcontext.textfile( "dustear2.txt") studf = sturdd.map(_。split( "、"))。マップ(パート(パート(0).trim.toint、parts(1)、parts(2).trim.toint)。 Studf.createorreplaceTempview( "desute")val namedf = spark.sql( "select name from student from age age <20")//namedf.write.text( "result") sturdd = spardd.sparkcontext.textfile( "sustent.txt")Import spark.implicits._ val schemastring = "id、name、age" val fields = schemastring.split( "、")。 rowrdd = sturdd.map(_。split( "、")) namedf = spark.sql( "年齢<20")//namedf.write.text( "result")//クエリ結果をファイルにfile namedf.show()}}}に書き込みます注記:
1.上記のすべてのコードがテストされており、テスト環境はSpark2.1.0およびJDK1.8です。
2。このコードは、Spark2.0の前にバージョンには適用されません。
JavaとScalaのSpark Rddをデータフレームに変換する2つの方法の上記の概要は、私があなたと共有するすべてのコンテンツです。参照を提供できることを願っています。wulin.comをもっとサポートできることを願っています。