找回密码
 立即注册
查看: 223|回复: 0

离线计算组件篇-Spark-读写Hbase

[复制链接]
发表于 2022-12-8 15:29 | 显示全部楼层 |阅读模式
5).Spark-core读写HBase的数据


  • 配置Spark
    把HBase的lib目录下的一些jar文件拷贝到Spark中,这些都是编程时需要引入的jar包,需要拷贝的jar文件包括:所有hbase开头的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar
<!--此时需要的Maven依赖 --><?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>wyq</groupId>    <artifactId>ScalaReadHBase</artifactId>    <version>1.0-SNAPSHOT</version><properties>        <spark.version>2.0.0</spark.version>        <scala.version>2.11</scala.version>    </properties>    <dependencies>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-core_${scala.version}</artifactId>            <version>${spark.version}</version>        </dependency>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-streaming_${scala.version}</artifactId>            <version>${spark.version}</version>        </dependency>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-sql_${scala.version}</artifactId>            <version>${spark.version}</version>        </dependency>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-hive_${scala.version}</artifactId>            <version>${spark.version}</version>        </dependency>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-mllib_${scala.version}</artifactId>            <version>${spark.version}</version>        </dependency>        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common -->        <dependency>            <groupId>org.apache.hbase</groupId>            <artifactId>hbase-common</artifactId>            <version>1.2.11</version>        </dependency>        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->        <dependency>            <groupId>org.apache.hbase</groupId>            <artifactId>hbase-client</artifactId>            <version>1.2.11</version>        </dependency>        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->        <dependency>            <groupId>org.apache.hbase</groupId>            <artifactId>hbase-server</artifactId>            <version>1.2.11</version>        </dependency>    </dependencies>    <build>         <plugins>             <!--当前插件是用来让maven能够编译、测试、运行scala项目的 -->            <plugin>                <groupId>org.scala-tools</groupId>                <artifactId>maven-scala-plugin</artifactId>                <version>2.15.2</version>                <executions>                    <execution>                        <goals>                            <goal>compile</goal>                            <goal>testCompile</goal>                        </goals>                    </execution>                </executions>            </plugin>        </plugins>    </build></project># 进入spark的安装目录下,有一个专门放jar包的路径(在spark-2.10之后)cd /usr/local/spark/jars# 将HBase的jar包拷贝到当前目录下cp /usr/local/hbase/lib/hbase*.jar ./ # 将hbase开头的所有jar包全部导入到jars目录下cp /usr/local/hbase/lib/guava-12.0.1.jar ./cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./cp /usr/local/hbase/lib/metrics-core-2.2.0.jar ./
    编写Spark程序并读取HBase数据
import org.apache.hadoop.hbase._import org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.hadoop.hbase.util.Bytesimport org.apache.spark.SparkContextimport org.apache.spark.SparkConfobject SparkOperateHBase {def main(args: Array[String]) {    // 创建HBase的配置文件对象    val conf = HBaseConfiguration.create()    // 初始化一个SparkContext对象,传入对应的SparkConf()对象    val sc = new SparkContext(new SparkConf())    // 设置查询的表名    conf.set(TableInputFormat.INPUT_TABLE, "student")    // 传入配置文件对象,其余的是固定的    val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],  classOf[ImmutableBytesWritable],  classOf[Result])    // 查询当前数据集中有多少行数据    val count = stuRDD.count()    // 测试打印输出    println("Students RDD Count:" + count)    // 将数据持久化到内存,一般是直接调用cache()方法,它会默认调用persist(MEMORY_ONLY);防止在这之前调用了行动操作,然后会从头执行,在之后的行动操作中可以直接调用内存中保存的变量,而不用再从头执行    stuRDD.cache()    // 遍历输出    /*    1. case (_,result):中的"_"是占位符,表示其他的参数,是没有实际作用,都是给系统的变量,我们只用后边的result变量,其中保存了HBase中的数据    2.result.getRow:取出所有的行键    3.result.getValue(列族,某一列);    参数必须是字节数组对象,可以使用 getBytes方法转换为    */    stuRDD.foreach({ case (_,result) =>        val key = Bytes.toString(result.getRow)        val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))        val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes))        val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))        println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age)    })}}

    打包运行
    可以使用任意已经测试通过的打包方式进行打包,然后在运行的时候必须使用“--driver-class-path”参数指定依赖JAR包的路径,而且必须把“/usr/local/hbase/conf”也加到路径中
spark-submit \--class SparkOperateHBase \/usr/local/spark/mycode/simple-project_2.11-1.0.jar# 该命令执行的时候会自动的读取jars目录下的所有包,如果是在jars文件夹下又单独建立的文件存储hbase的jar包,则需要使用以下参数进行指定--driver-class-path /usr/local/spark/jars/hbase/*:/usr/local/hbase/conf

    编写Spark程序并将数据写入HBase


    HBase写入数据格式.png

import org.apache.hadoop.hbase.client.{Put, Result}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableOutputFormatimport org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.mapreduce.Jobimport org.apache.spark.{SparkConf, SparkContext}object SparkWriteHBase {  def main(args: Array[String]): Unit = {    val conf = new SparkConf()    val sc = new SparkContext(conf)    val tableName = "student"    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tableName)    val job = new Job(sc.hadoopConfiguration)    job.setOutputKeyClass(classOf[ImmutableBytesWritable])    job.setOutputValueClass(classOf[Result])    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])    val indataRDD = sc.makeRDD(Array("3,dufu,M,26","4,xingzhesun,M,27"))    val rdd = indataRDD.map(_.split(',')).map{arr=>{      // 行健的值      val put = new Put(Bytes.toBytes(arr(0)))      // info:name列的值      put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))      // info:gender列的值      put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))      // info:age列的值      put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3)))      // 表示返回数据类型,以及数据(put对象)      (new ImmutableBytesWritable, put)    }}    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())  }}


  • 查看HBase中数据表的内容
    scan 'student'

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Unity开发者联盟 ( 粤ICP备20003399号 )

GMT+8, 2024-11-24 16:24 , Processed in 0.089724 second(s), 26 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表