1 : 데이터 소스를 준비하십시오
프로젝트에서 새 Student.txt 파일을 만들면 내용이 다음과 같습니다.
1, Zhangsan, 20 2, Lisi, 21 3, Wanger, 19 4, Fangliu, 18
둘 : 구현
자바 버전 :
1. 먼저 새 학생 Bean 객체를 만들고 직렬화 및 ToString () 메소드를 구현하십시오. 특정 코드는 다음과 같습니다.
패키지 com.cxd.sql; import java.io.serializable; @suppresswarnings ( "Serial") 공개 클래스 학생을 구현하십시오. 문자열 sname; int Sage; public String getSid () {return sid; } public void setSid (String Sid) {this.sid = sid; } public String getSname () {return sname; } public void setsname (String Sname) {this.sname = sname; } public int getSage () {return sage; } public void setsage (int sage) {this.sage = sage; } @override public String toString () {return "return"study [sid = " + sid +", sname = " + sname +", sage = " + sage +"]; }}2. 변환, 특정 코드는 다음과 같습니다
package com.cxd.sql; import java.util.arraylist; import org.apache.spark.sparkconf; import org.apache.spark.api.java.javardd; import org.apache.spark.sql.dataset; import org.apache.spark.sql.row; import org.spark.skl.row org.apache.spark.sql.savemode; import org.apache.spark.sql.sparksession; import org.apache.spark.sql.types.datatypes; import org.apache.spark.sql.types.Struct field; import org.apache.spark.sql.types.Structpepes txtToparquetDemo {public static void main (String [] args) {sparkConf conf = new sparkConf (). setAppName ( "txtToparquet"). setmaster ( "local"); sparksession spark = sparksession.builder (). config (conf) .getorCreate (); RELCINGTRANSFORM (SPARK); // Java Reflection DynamicTransform (Spark); // 동적 변환}/***Java 반사를 통해 변환*@param spark*/private static void recelftransform (sparksession spark) {javardd> spark.read (). textfile ( "stuinfo.txt"). javardd <tudling> rowrdd = source.map (line-> {string parts [] = line.split ( ","); Student stu = new Student (); stu.setsid (parts [0]); stu.setsname (parts [1]); stu.setsage (integer.valueof (part])); DataSet <Row> df = spark.createdataframe (rowrdd, within.class); df.select ( "sid", "sname", "sage"). Coalesce (1) .write (). mode (savemode.append) .parquet ( "parquet.res"); } / *** 동적 변환* @param spark* / private static void dynamictransform (sparksession spark) {javardd <string> source = spark.read (). textFile ( "stuinfo.txt"). javardd (); javardd <줄> rowrdd = source.map (line-> {string [] part [] parts = line.split ( ","); String sid = parts [0]; String Sname = parts [1]; int sage = integer.parseint (parts [2]); return rowactory.create (sid, sname, sage);}; ArrayList <structfield> fields = new ArrayList <structfield> (); Structfield 필드 = NULL; field = datatypes.createstructfield ( "sid", datatypes.stringtype, true); fields.add (필드); Field = DataTypes.CreatesTructField ( "Sname", Datatypes.stringType, True); fields.add (필드); Field = DataTypes.CreatesTructField ( "Sage", Datatypes.IntegerType, true); fields.add (필드); structtype schema = datatypes.createstructtype (필드); DataSet <Row> df = spark.createdataframe (RowRDD, Schema); df.coalesce (1) .write (). mode (savemode.append) .parquet ( "parquet.res1"); }}스칼라 버전 :
import org.apache.spark.sql.sql.sparksessionimport org.apache.spark.sql.types.stringtypeimport org.apache.spark.sql.types.structfieldimport org.apache.spark.sql.sql.sql.sql.sql.sql.typestuct org.apache.spark.sql.types.integertypeobject rdd2dataset {case class student (id : int, name : string : string, ags : array [string]) {val spark = sparksession.builder (). master ( "local"). Reflectreate (Spark) DynamicCreate (Spark)} /*** Java 반사를 통해 변환* @Param Spark* /Private Def Recelscreate (Spark : Sparksession) : init = {import spark.implichits._val sturdd = spark.sparkContext.TextFile ( "Student2.txt") // todf (todf)입니다. studf = sturdd.map (_. split ( ",")).지도 (partiestudent (partion (0) .trim.toint, parts (1), parts (2) .trim.toint). todf () //studf.select("id","name" 짓밟는다. Studf.printschema () studf.createorreplacetempview ( "학생") val namedf = spark.sql ( "<20") //namedf.write.text("Result ") // 쿼리를 쓴 F.Show ()}* @parmam spamp* /private def DynamicCreate (spark : sparksession) : iting = {val sturdd = spark.sparkcontext.textfile ( "withy.txt") import spark.implichits._ val schemastring = "id, name, age"val fields = schemastring.split ( ","). schema = structtype (필드) val rowrdd = sturdd.map (_. split ( ","). map (part십시오 (0), parts (1), parts (1), parts (2))) val studf = spark.createdataframe (rowrdd, schema) schem.printschema () val tmpview = valceteoreorplecetempview. namedf = spark.sql ( "나이 <20")의 학생에서 이름을 선택하십시오.메모:
1. 위의 모든 코드가 테스트되었으며 테스트 환경은 Spark2.1.0 및 JDK1.8입니다.
2.이 코드는 Spark2.0 이전 버전에는 적용되지 않습니다.
Java와 Scala에 의해 Spark RDD를 데이터 프레임으로 변환하는 두 가지 방법의 위의 요약은 내가 공유하는 모든 컨텐츠입니다. 나는 당신이 당신에게 참조를 줄 수 있기를 바랍니다. 그리고 당신이 wulin.com을 더 지원할 수 있기를 바랍니다.