1: إعداد مصدر البيانات
قم بإنشاء ملف student.txt جديد ضمن المشروع ، والمحتويات هي:
1 ، Zhangsan ، 20 2 ، Lisi ، 21 3 ، Wanger ، 19 4 ، Fangliu ، 18
الثاني: التنفيذ
نسخة جافا:
1. قم أولاً بإنشاء كائن فول طالب جديد ، وتنفيذ طريقة التسلسل و toString (). الرمز المحدد كما يلي:
package com.cxd.sql ؛ import java.io.serializable ؛ suppressWarnings ("Serial") طالب الطبقة العامة يطبق serializable {string sid ؛ سلسلة SNAME int sage السلسلة العامة getSid () {return sid ؛ } public void setSid (String Sid) {this.sid = sid ؛ } السلسلة العامة getName () {return sname ؛ } public void setSname (String sname) {this.sname = sname ؛ } public int getSage () {return sage ؛ } setSage public void (int sage) {this.sage = sage ؛ } Override public string toString () {return "student [sid =" + sid + "، sname =" + sname + "، sage =" + sage + "]" ؛ }}2. تحويل ، الرمز المحدد كما يلي
package com.cxd.sql ؛ import java.util.arraylist ؛ import org.apache.spark.sparkconf ؛ import org.apache.spark.java.javardd ؛ import org.apache.spark.sql.dataset ؛ import org.pache.spark.sql.row ؛ org.apache.spark.sql.savemode ؛ import org.apache.spark.sql.sparksession ؛ import org.apache.spark.sql.types.datatypes ؛ import org.apache.spark.sql.types.structfield ؛ txttoparquetdemo {public static void main (string [] args) {SparkConf conf = new SparkConf (). setAppName ("txttoparquet"). Sparksession Spark = Sparksession.builder (). config (conf) .getorcreate () ؛ عكس نقل (Spark) ؛ // java Reflection DynamicTransform (Spark) ؛ // التحويل الديناميكي}/***تحويل من خلال انعكاس Java*param Spark*/private static void respecttransform (sparksession spark) {javardd <string> source = spark.read (). textfile ("stuinfo.txt"). Javardd <Tudent> rowrdd = source.map (line -> {String Parts [] = line.split ("،" ، "stu stu = new student () ؛ stu.setsid (parts [0]) DataSet <row> df = Spark.CreatedAtaFrame (Rowrdd ، student.class) ؛ df.select ("SID" ، "sname" ، "sage"). coalesce (1) .write (). mode (savemode.append) .Parquet ("parquet.res") ؛ } / *** تحويل ديناميكي* param Spark* / private static void dynamictransform (Sparksession Spark) {javardd <string> source = spark.read (). textfile ("stuinfo.txt"). javardd () ؛ javardd <row> rowrdd = source.map (line -> {string [] parts = line.split ("،" ، "string sid = parts [0] ؛ String sname = parts [1] ؛ int sage = integer.parseint (parts [2]) ArrayList <TructField> fields = new ArrayList <TructField> () ؛ حقل هيكليته = فارغ ؛ Field = dataTypes.Createstructfield ("SID" ، dataTypes.StringType ، true) ؛ Fields.Add (Field) ؛ Field = dataTypes.Createstructfield ("sname" ، dataTypes.StringType ، true) ؛ Fields.Add (Field) ؛ Field = datatypes.createstructfield ("sage" ، dataTypes.IntegerType ، true) ؛ Fields.Add (Field) ؛ schema structtype = datatypes.createstructType (الحقول) ؛ DataSet <row> df = Spark.CreatedataFrame (Rowrdd ، Schema) ؛ df.coalesce (1) .write (). mode (savemode.append) .Parquet ("parquet.res1") ؛ }}نسخة سكالا:
استيراد org.apache.spark.sql.sparksessionimport org.apache.spark.sql.types.stringtypeimport org.apache.sql.types.structfieldimport org.pache.spark.sql.types.structtypeimport org.spark.sql.struct.structtruttypes.structtruttypes. org.apache.spark.sql.types.integerTypeObject rdd2dataset {case class student (int: int ، string ، age: int) def main (args: array [string]) {val Spark = sparksession.builder (). master ("local". عكس create (Spark) DynamicCreate (Spark)} /*** تحويل من خلال انعكاس Java* param Spark* /private def reflectcreate (Spark: Sparksession): unit = {import spark.implicits._val sturdd = spark.sparkcontext.textfile ("student2 studf = sturdd.map (_. split ("،))). خريطة (parts⇒student (الأجزاء (0) .trim.toint ، الأجزاء (1) ، الأجزاء (2) .trim.toint)). todf () //studf.selectorctorctored(id"،name"viage"e.write.text (rect plugul studf.printschema () studf.createRreplacetempView ("student") val namedf = spark.sql ("حدد اسم الطالب حيث العمر <20") DynamicCreate (Spark: Sparksession): unit = {val sturdd = Spark.SparkContext.textFile ("student.txt") استيراد Spark.Implicits._ val scleastring = "id ، name ، age" vields = schemastring.split ("،"). map (fieldname = schema = structtype (الحقول) val rawrdd = sturdd.map (_. split ("،"). map (parts⇒row (parts (0) ، parts (1) ، parts (2)) namedf = spark.sql ("حدد الاسم من الطالب حيث العمر <20") //namedf.write.text("Result ") // اكتب نتيجة الاستعلام إلى ملف اسمهملحوظة:
1. تم اختبار جميع الرموز المذكورة أعلاه وبيئة الاختبار هي Spark2.1.0 و JDK1.8.
2. لا ينطبق هذا الرمز على الإصدارات قبل Spark2.0.
الملخص أعلاه للطريقتين لتحويل Spark RDD إلى DataFrame بواسطة Java و Scala هو كل المحتوى الذي أشاركه معك. آمل أن تتمكن من إعطائك مرجعًا وآمل أن تتمكن من دعم wulin.com أكثر.