RDD转换成DataFrame的两种方式

类型: 大数据 作者: chengxiaj 时间: 2018-11-03 阅读数: 75 删除 审核 反审核 站点推荐
一、概述

spark sql支持两种不同的方式将rdd转换为dataframe。第一种是使用反射来推断包含特定类型对象的rdd的模式,这种基于反射的方式可以提供更简洁的代码,如果在编写spark应用程序时,已经明确了schema,可以使用这种方式。第二种方式是通过可编程接口来构建schema,然后将其应用于现有的rdd。此方式编写的代码更冗长,但在不知道colum及其type的情况下,可以使用这种方式。

下面案例的数据集如下people.txt:

michael, 29 andy, 30 justin, 19 二、rdd转dataframe案例 1.通过反射的方式

spark sql的scala接口支持自动将包含样例类的rdd转换为dataframe。样例类定义表的schema。通过反射读取样例类的参数名称,并映射成column的名称。

package com.company.sparksql import org.apache.log4j.{level, logger} import org.apache.spark.sql.sparksession object rdd2df_m1 { //创建样例类 case class person(name: string, age: int) def main(args: array[string]): unit = { val spark = sparksession .builder() .appname("rdd2df_m1") .master("local") .getorcreate() logger.getlogger("org.apache.spark").setlevel(level.off) logger.getlogger("org.apache.hadoop").setlevel(level.off) runrdd2df(spark) } private def runrdd2df(spark: sparksession) = { //导入隐式转换,用于rdd转为dataframe import spark.implicits._ //从文本文件中创建rdd,并将其转换为dataframe val peopledf = spark.sparkcontext .textfile("file:///e:/people.txt") .map(_.split(",")) .map(attributes => person(attributes(0), attributes(1).trim.toint)) .todf() //将dataframe注册成临时视图 peopledf.createorreplacetempview("people") // 运行sql语句 val teenagersdf = spark.sql("select name, age from people where age between 13 and 19") // 使用字段索引访问列 teenagersdf.map(teenager => "name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |name: justin| // +------------+ // 通过字段名访问列 teenagersdf.map(teenager => "name: " + teenager.getas[string]("name")).show() // +------------+ // | value| // +------------+ // |name: justin| // +------------+ } } 2.通过构建schema的方式

通过构建schema的方式创建dataframe主要包括三步:

(1)从原始rdd创建row类型的rdd (2)使用structtype,创建schema (3)通过createdataframe方法将schema应用于row类型的rdd

package com.company.sparksql import org.apache.log4j.{level, logger} import org.apache.spark.sql.types.{integertype, stringtype, structfield, structtype} import org.apache.spark.sql.{row, sparksession} object rdd2df_m2 { def main(args: array[string]): unit = { val spark = sparksession .builder() .appname("rdd2df_m1") .master("local") .getorcreate() logger.getlogger("org.apache.spark").setlevel(level.off) logger.getlogger("org.apache.hadoop").setlevel(level.off) runrdd2df(spark) } private def runrdd2df(spark: sparksession) = { //导入隐式转换,用于rdd转为dataframe import spark.implicits._ //创建原始rdd val peoplerdd = spark.sparkcontext.textfile("file:///e:/people.txt") //step 1 将原始rdd转换为row类型的rdd val rowrdd = peoplerdd .map(_.split(",")) .map(attributes => row(attributes(0), attributes(1).trim.toint)) //step 2 创建schema val schema = structtype(array( structfield("name", stringtype, true), structfield("age", integertype, true) )) //step 3 创建df val peopledf = spark.createdataframe(rowrdd, schema) // 将dataframe注册成临时视图 peopledf.createorreplacetempview("people") // 运行sql语句 val results = spark.sql("select name from people") // 使用字段索引访问列 results.map(attributes => "name: " + attributes(0)).show() // +-------------+ // | value| // +-------------+ // |name: michael| // | name: andy| // | name: justin| // +-------------+ } }

来自Lujishu-lujishu.net的文章,链接:http://article.lujishu.net/viewarticle/1283135801180。本文章由作者发布,不代表本站观点,如有侵权,请联系我们撤下该文章。

在此注入作者简介

注册时间: 2016-05-03

  • 文章量
    7
  • 访问量
    311

站点推荐

地摊真的会影响商业地产吗

管理员

关于Spark集群的启动命令大全

thincladt

在Easyui中怎么设置datagrid来自适应屏幕的宽度 你知道吗

nongzuo

大数据平台定义及技术方案和路线

laurustin

如何用Python 将图片和矩阵进行互相转换

langdang

在matlab中图像如何用imwrite()保存

jinpu

计算各种标准差的代码怎么写

outdoersa

在python中修改Dataframe列名,怎么做

jinghuhui

人工智能-人脸识别之人脸姿态

consuwpso

linux 如何再次获取 dhcp 的ip地址

incestsih