1: подготовьте источник данных
Создайте новый файл Student.txt в рамках проекта, а содержимое:
1, Чжангсан, 20 2, Лиси, 21 3, Wanger, 19 4, Fangliu, 18
Два: реализация
Java версия:
1. Сначала создайте новый объект Student Bean, реализуйте сериализацию и метод ToString (). Конкретный код заключается в следующем:
пакет com.cxd.sql; импортировать java.io.serializable; @suppresswarnings ("serial") открытый ученик ученики реализует serializable {String sid; String Sname; int sage; public String getsId () {return sid; } public void setSid (String SID) {this.sid = sid; } public String getSname () {return Sname; } public void setSname (String sname) {this.sname = sname; } public int gestage () {return sage; } public void setSage (int sage) {this.sage = sage; } @Override public String toString () {return "Student [sid =" + sid + ", sname =" + sname + ", sage =" + sage + "]"; }}2. преобразовать, конкретный код выглядит следующим образом
Пакет com.cxd.sql; импорт java.util.arraylist; import org.apache.spark.sparkconf; import org.apache.spark.api.java.javardd; import org.apache.spark.sql.dataset; import orgach.spark.sql.row; импорт org.pare.sry.sry.sry.sry.shory.sry.sry.shory. org.apache.spark.sql.savemode; import org.apache.spark.sql.sparksession; import org.apache.spark.sql.types.datatypes; импорт org.apache.spark.sql.types.struct; Txttoparquetdemo {public static void main (string [] args) {sparkConf conf = new SparkConf (). SetAppName ("txtToparquet"). Setmaster ("local"); Sparksession spark = sparksession.builder (). Config (conf) .getorCreate (); RefertTransform (Spark); // Java Refluection DynamicTransform (Spark); // Динамическое преобразование}/***преобразовать через Java Reflection*@param spark*/private static void Referthransform (sparksession spark) {javardd <string> source = spark.read (). Textfile ("stuinfo.txt"). Javardd <toold> rowrdd = source.map (line -> {String Parts [] = line.split (","); Student Stu = new Student (); Stu.setsId (Parts [0]); Stu.SetsName (части [1]); Stu.SetSage (integer.valueof (parts [2])); Набор данных <row> df = spark.createdataFrame (rowrdd, study.class); df.select ("sid", "sname", "sage"). Coalesce (1) .write (). Mode (savemode.append) .parquet ("parquet.res"); } / *** Динамическое преобразование* @param spark* / private static void dynamictransform (sparksession spark) {javardd <string> source = spark.read (). Textfile ("stuinfo.txt"). Javardd (); Javardd <row> rowrdd = source.map (line -> {string [] parts = line.split (","); string sid = parts [0]; string sname = parts [1]; int sAge = integer.parseint (части [2]); return rowfactory.create (sid, smage, sage);}); ArrayList <stractfield> fields = new ArrayList <stractfield> (); Structfield Field = null; field = dataTypes.createstructfield ("sid", dataTypes.stringtype, true); Fields.Add (Field); field = dataTypes.createstructfield ("sname", dataTypes.stringtype, true); Fields.Add (Field); field = dataTypes.createstructfield ("sage", dataTypes.integertype, true); Fields.Add (Field); Structtype schema = dataTypes.createstructType (Fields); Набор данных <row> df = spark.createdataFrame (rowrdd, schema); df.coalesce (1) .write (). Mode (savemode.append) .parquet ("parquet.res1"); }}Версия Scala:
Import org.apache.spark.sql.sparksessionImport org.apache.spark.sql.types.stringtypeimport org.apache.spark.sql.types.structfieldimport org.apache.sql.types.structtypeimport org.pach.sparepepepepes org.apache.spark.sql.types.integertypeobject rdd2dataset {case class class student (id: int, name: string, ag: int) def main (args: array [string]) {val spark = sparksession.builder (). Spark.implicits._ RefertCreate (Spark) DynamicCreate (Spark)} /**** Конвертируется через Java Reflection* @param spark* /private def RefertCreate (Spark: sparksessession): Unit = {Import Spark.Implicits._val sturdd = spark.sparkcontext.textfile ("student2 Studf = Sturdd.map (_. Split (",")). MAP (PartsœTudent (Parts (0) .trim.toint, Parts (1), Parts (2) .trim.toint)). Todf () //studf.select("ID",Name",Gage"tema. Studf.createorReplacetEmpView ("Студент") val allyf = spark.sql ("Выберите имя из студента, где возраст <20") //namedf.write.tect("result ") // Написать результат запроса в файл с именем chal.show ()} /*** sturdd = spark.sparkcontext.textfile ("student.txt") import spark.implicits._ val schemastring = "id, name, age" val fields = schemastring.split (","). Map (fieldname => structfield (FieldName, StringType, nullable = true)) val Schema = structtype (fields, stringtype, nullable = true)). rowrdd = Sturdd.map (_. Split (",")). MAP (Partsœrow (Parts (0), Parts (1), Parts (2))) val Studf = spark.createdataFrame (rowrdd, schema) studf.printschema () val tmpview = studf.createReplacetempeempeempeem (") valefview = supef.createorplempeempeempeem ("). Студент, где возраст <20 ") //namedf.write.text("result") // Записать результат запроса в файл с именем with.show ()}}Примечание:
1. Все вышеперечисленные коды были проверены, а испытательная среда - Spark2.1.0 и JDK1.8.
2. Этот код не применяется к версиям до Spark2.0.
Приведенная выше резюме двух методов преобразования Spark RDD в DataFrame с помощью Java и Scala - это все контент, которым я делюсь с вами. Я надеюсь, что вы можете дать вам ссылку, и я надеюсь, что вы сможете поддержать Wulin.com больше.