找回密码
 立即注册
查看: 227|回复: 0

Protobuf 在知乎大数据场景的应用

[复制链接]
发表于 2022-12-2 12:23 | 显示全部楼层 |阅读模式
1 背景

Protobuf(Protocol Buffers)是 Google 公司开发的一种跨语言和平台的序列化数据结构的方式,是一个灵活的、高效的用于序列化数据的协议。得益于其优秀的编解码性能与简单易用的特点,被广泛应用于各大场景,如通信协议、数据存储等。
Protobuf 在知乎大数据主要有以下两种场景:
一是 Protobuf 数据入仓,知乎目前有很大一部分数据,如服务日志,特征数据等,都是采用 Protobuf 格式进行存储。这些数据会被发送到 Kafka 或 Pulsar,再通过 Flink 进行消费,最后落入到 Hive 表中,供离线分析。但是因为 Flink 在早期版本(知乎目前的 Flink 版本是 1.13.2)没有提供通用的 Protobuf Format 解决方案,所以用户在使用 Flink 进行消费的时候,只能自己写 Jar 包程序来解析 Protobuf 的数据,再拼接成 Hive 表的数据写入 HDFS 中。
二是 Protobuf 出仓,用户将经过离线 ETL 处理的 Hive 表数据,组织成 Protobuf 的格式,再写回到在线数据库如 Redis,MySQL,TiDB或者是 Kafka/Pulsar 之类的消息系统。这种场景用户也是自己写 Spark 或者 Flink Jar 包程序来处理,用户需要自己读取 Hive 表的数据,再将表的数据组织成 Protobuf 格式。
这两类场景在使用 Protobuf 的痛点如下:

  • 都需要用户自己写 Jar 包来做 Protobuf 的编码与解码,开发门槛高;
  • Jar 包程序不容易维护,不同的用户有自己的 Protobuf 定义,每个 Protobuf 的解析都要写一个程序,导致项目冗余;
  • 每次 Protobuf 增加字段时,从发布到上线流程过长,容易出错;
  • 代码容易失传,尤其体现在交接过程中。
基于以上痛点,我们决定开发通用的 Protobuf 处理工具。
2 Flink Protobuf Format

首先是针对 Protobuf 入仓的场景,我们开发了 Flink Protobuf Format,该 Format 基于 Protobuf Message 的 Java 反射 API 实现,利用用户编译出的 Java 类解码 Protobuf Message,再拼接成 Flink 的数据结构。依托于 Flink SQL 丰富的 Connector,被解析后的 Protobuf 能够落入下游的任意数据源中,落入 Hive 自然也不在话下。
我们以下面的 Message 为例,简单描述一下 Protobuf Format 的使用:
syntax = "proto3";
package com.zhihu.platform.proto.example;
option java_package = "com.zhihu.platform.proto.example";
option java_multiple_files = true;

message SimpleTest {
  optional int64 uid = 1;
  optional string name = 2;
  optional int32 int32_test = 3;
  optional bytes bytes_test = 4;
  optional double double_test = 5;
  optional float float_test = 6;
  repeated int32 int32_array_test = 7;
  optional InnerMessageTest innerMessage = 8;
  repeated InnerMessageTest innerMessage_arr = 9;
  optional State enum_test = 10;
  map<int64, int64> simple_map_test = 11;
  map<string, InnerMessageTest> nest_map_test = 12;

  message InnerMessageTest{
    optional int64 v1 = 1;
    optional int32 v2 = 2;
  }

  enum State {
    RUNNING = 0;
    FINISHED = 1;
  }
}Format 的使用方式如下,以 Kafka Connector 为例:
CREATE TABLE `test_catalog`.`test_database`.`test_table`
(
  `uid` BIGINT,
  `name` STRING,
  `int32_test` INT,
  `bytes_test` BYTES,
  `double_test` DOUBLE,
  `float_test` FLOAT,
  `int32_array_test` ARRAY<INT>,
  `innerMessage` ROW<`v1` BIGINT, `v2` INT>,
  `innerMessage_arr` ARRAY<ROW<`v1` BIGINT, `v2` INT>>,
  `enum_test` STRING,
  `simple_map_test` MAP<BIGINT, BIGINT>,
  `nest_map_test` MAP<STRING, ROW<`v1` BIGINT, `v2` INT>>
)
WITH (
  'properties.bootstrap.servers' = 'xxx',
  'connector' = 'kafka',
  'topic' = 'xxxx',
  'format' = 'protobufV1',
  'protobufV1.class-name' = 'com.zhihu.platform.proto.example.SimpleTest',
  'protobufV1.ignore-parse-errors' = 'true'
);这里 Protobuf Format 我们命名为 protobufV1 而不用 protobuf,原因是我们期待 Flink 社区提供 Protobuf Format 的解决方案,所以将 protobuf 这个名字预留了。事实证明预留 protobuf 的名称是明智的,Flink 在高版本果然提供了 Protobuf Format,与我们的方案比较类似,感兴趣的话可以查看 Flink 的最新文档,里面有相关介绍。
2.1 解码方式的选择

Protobuf 的解码有以下两种备选方案:

  • Dynamic Message 的方式解码,这种方式的好处在于不用将 proto 文件真正编译成某种编程语言,可以做到语言无关,但是解码性能会有较大的损耗;
  • 编译 proto 文件为 Java 类,利用 Java 类做解码,这样做的好处是能够获得最高的解码效率和最大的解码性能,缺点是需要用户自行编译 proto 文件为 Java 类,对其他语言的用户不友好,需要用户有一定的 Java 基础,另外不同的 proto 文件对应的 Java 类也不同,如何让程序以一种比较优雅的形式获取到用户编译的 Protobuf Jar 包,也是一件比较困难的事情。
经过内部讨论,我们选择了方案 2,也就是利用用户编译出来的 Protobuf Java 类来解码 Protobuf 数据,主要考虑如下:

  • Protobuf Java 类解码效率最高,性能最好;
  • 知乎内部的 Protobuf 发布有比较严格的流程,CI 构建时,会同时将 proto 文件编译成 Golang 和 Java 两种语言,并且会将 Java 版本发布到内部 Maven 仓库中,所以每一个线上的 Protobuf 格式,都能够很轻易找到其对应的 Java Jar 包;
  • 知乎内部有 Flink SQL Gateway,能够很轻易的让 Flink 程序获取到用户编译的 Java 包(具体参考下面的章节)。
2.2 Protobuf Jar 包注入

Protobuf Jar 包在我们的设计中有两种注入方式:

  • 在 Flink 集群启动前,将 Jar 包下载到 Flink 的 lib 目录内,再启动集群,这样 Flink 集群内就存在 Protobuf 的 Jar 包了,我们也能很轻易的获取到 Protobuf Message 对应的 Java 类;
  • 通过 Flink SQL 里 Table 的 properties 属性,将 Jar 包的 Maven 地址传给 Flink Format,Flink Format 在接受到 Jar 包的地址后,利用 URLClassloader 加载 Protobuf Message 对应的 Java 类。
2.2.1 利用 JuiceFS 动态注入 Jar 包

首先看 Jar 包注入的第一种方式,前一段时间我们分享过 利用 JuiceFS 给 Flink 容器启动加速,简单描述就是知乎的 Flink 集群都是部署在 K8s 上的,Flink 的容器在启动前,会读取某些特定的环境变量的值,从 JuiceFS 动态下载启动所需的 Jar 包,Jar 包下载完成后,再启动 Flink 集群。
知乎内部的 Flink SQL 都是通过 Flink SQL Gateway 提交的,我们在 Flink SQL Gateway 上扩展了 ADD JAR 语法,用于添加外部的 Jar 包到 Flink 容器的环境变量中,Flink 集群在启动时,就会下载 ADD JAR 指定的 Jar 包,添加到集群中。这样用户只需要通过写 ADD JAR 命令,就能把自己的 Protobuf Message 对应的 Jar 包注入到 Flink 集群中。
具体的使用方式如下:
-- 添加自己的 protobuf 对应版本的包,防止不同版本的 Protobuf 冲突
ADD JAR 'https://zhihu-juicefs.com/protobuf-java-3.19.1.jar';
-- 添加自己编译出来的 Message 包
ADD JAR 'https://zhihu-juicefs.com/simple-test.jar';
CREATE TABLE `test_catalog`.`test_database`.`test_table`
(
  `uid` BIGINT,
  `name` STRING,
  `int32_test` INT,
  `bytes_test` BYTES,
  `double_test` DOUBLE,
  `float_test` FLOAT,
  `int32_array_test` ARRAY<INT>,
  `innerMessage` ROW<`v1` BIGINT, `v2` INT>,
  `innerMessage_arr` ARRAY<ROW<`v1` BIGINT, `v2` INT>>,
  `enum_test` STRING,
  `simple_map_test` MAP<BIGINT, BIGINT>,
  `nest_map_test` MAP<STRING, ROW<`v1` BIGINT, `v2` INT>>
)
WITH (
  'properties.bootstrap.servers' = 'xxx',
  'connector' = 'kafka',
  'topic' = 'xxxx',
  'format' = 'protobufV1',
  'protobufV1.class-name' = 'com.zhihu.platform.proto.example.SimpleTest',
  'protobufV1.ignore-parse-errors' = 'true'
);Jar 包加载的流程图如下:


2.2.2 利用 URLClassloader 动态加载 JuiceFs 上的 Jar 包

其次再看 Jar 包注入的第二种方式,利用 URLClassloader 远程加载 Jar 包。
这里要求用户在建表的时候,指定 Format 所使用的 Jar 包:
CREATE TABLE `test_catalog`.`test_database`.`test_table`
(
  `uid` BIGINT,
  `name` STRING,
  `int32_test` INT,
  `bytes_test` BYTES,
  `double_test` DOUBLE,
  `float_test` FLOAT,
  `int32_array_test` ARRAY<INT>,
  `innerMessage` ROW<`v1` BIGINT, `v2` INT>,
  `innerMessage_arr` ARRAY<ROW<`v1` BIGINT, `v2` INT>>,
  `enum_test` STRING,
  `simple_map_test` MAP<BIGINT, BIGINT>,
  `nest_map_test` MAP<STRING, ROW<`v1` BIGINT, `v2` INT>>
)
WITH (
  'properties.bootstrap.servers' = 'xxx',
  'connector' = 'kafka',
  'topic' = 'xxxx',
  'format' = 'protobufV1',
  'protobufV1.class-name' = 'com.zhihu.platform.proto.example.SimpleTest',
  'protobufV1.ignore-parse-errors' = 'true',
  'protobufV1.jar-urls' = 'https://zhihu-juicefs.com/protobuf-java-3.19.1.jar,https://zhihu-juicefs.com/simple-test.jar'
);Protobuf Format 在检查到有 jar-urls 这个属性时,会新建一个 URLClassloader,并且将该属性对应 Jar 包链接注入到 classpath 中,以供加载。
这里值得一提的是,一开始我们在 URLClassloader 里传入的是内部 Maven 上 Jar 的 URL,但是 Flink 程序在启动时,经常出现 ClassNotFoundException 的错误,加载不到用户的 Class。最后定位原因是内部的镜像仓库在负载比较高的情况下,偶尔会出现问题,而 URLClassloader 在访问这些 URL 失败的情况下,就会跳过这些 URL 对应的 Jar 包,也不会报错,等到真正要使用这些 URL 上的 Jar 包里的 Class 时,就会报 ClassNotFoundException 的错误。
镜像仓库本意是用于 CI 构建以及公共类库发布,本身就不是为高并发设计的,所以我们需要寻找另一种解决方案。我们最开始想到的解决方案就是将用户需要的 Jar 包从内部的 Maven 仓库上缓存一份到 JuiceFS 上,然后再让 URLClassloader 从 JuiceFS 上加载。因为我们之前用 JuiceFS 已经快一年了,没有出过任何问题,所以对 JuiceFS 在这个场景的可用性是非常有信心的。
在 JuiceFS 社区的帮助下,我们选择了以 webdav 的方式来访问 JuiceFS,这里不选择 s3 proxy 的方式是因为 URLClassloader 不适合使用需要认证的 URL,而 webdav 我们以 read-only 的方式启动,就能轻易被 URLClassloader 访问到,无需担心验证的问题。
Jar 包加载的流程图如下:



2.3 建表工具

Protobuf Message 可能出现多级嵌套的复杂情况,这个时候人力建表已经是几乎不可能完成的事情,为此我们开发了专门的建表工具来辅助用户创建与 Protobuf  Message 结构相匹配的 Hive 或 Flink 表。
以面给出的 proto 文件为例:


3 Protobuf Hive UDF

其次是针对 Protobuf 出仓的场景,这类场景一般是批处理,比较适合在 Hive 或 Spark 里进行。
这里我们开发了两种 Hive UDF,用于将 Hive 表转换成 Protobuf Message,用户在获得 Message 的二进制数据后,再使用我们内部的数据集成平台,将数据写入到其他存储中。
3.1 将 Message 拍平赋值

还是以上文给出的 proto 文件为例,我们开发的第一种 Protobuf UDF 使用方式如下:
SET protobuf.jar-urls=https://zhihu-juicefs.com/protobuf-java-3.19.1.jar,https://zhihu-juicefs.com/simple-test.jar;
SET protobuf.class-name=com.zhihu.platform.proto.example.SimpleTest;
SELECT to_protobuf(
               "uid", 1,
               "name", "zhangsan",
               "bytes_test", "12345",
               "double_test", cast(123.456 as double),
               "float_test", cast(123.456 as float),
               "enum_test", "running",
               "innerMessage.v1", 1,
               "innerMessage.v2", 2,
               "int32_array_test(0)", 0,
               "int32_array_test(1)", 1,
               "innerMessage_arr(0).v1", 3,
               "innerMessage_arr(0).v2", 4,
               "innerMessage_arr(1).v1", 5,
               "innerMessage_arr(1).v2", 6,
               "simple_map_test[1]", cast(1 as bigint),
               "simple_map_test[2]", cast(2 as bigint),
               "nest_map_test[a].v1", cast(7 as bigint),
               "nest_map_test[a].v2", 8,
               "nest_map_test.v1", cast(9 as bigint),
               "nest_map_test.v2", 10
           ) AS protobuf_bytes;这样用户可以直接指定 Message 内任意的属性的值:

  • 如果属性之间具有层级/嵌套关系,则用 . 分隔;
  • 数组用 () 将下标括起来,表示数组的第几个元素;
  • MAP 用 [] 将 key 括起来,表示这个 key 对应的 value。
但是这种写法也有一些缺陷:

  • 对于大数组或者大 MAP, UDF 写起来会非常复杂;
  • 对于不确定长度的数组,或者是 key 不固定的 MAP,无法赋值。
所以此 UDF 使用起来简单,但是只适合比较简单的 Protobuf Message。
3.2 利用 struct 赋值

针对第一种 UDF 的限制,我们又开发了第二种 UDF,使用方式如下:
SET protobuf.jar-urls=https://zhihu-juicefs.com/protobuf-java-3.19.1.jar,https://zhihu-juicefs.com/simple-test.jar;
SET protobuf.class-name=com.zhihu.platform.proto.example.SimpleTest;
SELECT to_protobuf(
               named_struct(
                       "uid", 1,
                       "name", "zhangsan",
                       "bytes_test", "12345",
                       "double_test", cast(123.456 as double),
                       "float_test", cast(123.456 as float),
                       "enum_test", "running",
                       "innerMessage", named_struct("v1", 1, "v2", 2),
                       "innerMessage_arr", array(named_struct("v1", 1, "v2", 2), named_struct("v1", 3, "v2", 4))
                   )
           ) AS protobuf_bytes;这样,用户可以使用 Hive 内自带的 named_struct 函数,将值拼接成与 Protobuf Message 相对应的 struct 结构,直接给整个 Message 赋值。
比如现在有一张 Hive 表,结构与 Protobuf 的 Message 十分类似:
CREATE TABLE `simple_test`(
  `uid` bigint,
  `name` string,
  `int32_test` int,
  `bytes_test` binary,
  `double_test` double,
  `float_test` float,
  `int32_array_test` array<int>,
  `innermessage` struct<v1:bigint,v2:int>,
  `innermessage_arr` array<struct<v1:bigint,v2:int>>,
  `enum_test` string,
  `simple_map_test` map<bigint,bigint>,
  `nest_map_test` map<string,struct<v1:bigint,v2:int>>
);我们就可以很简单得到 Protobuf Message:
select to_protobuf(
               named_struct("uid", uid,
                            "name", name,
                            "bytes_test", bytes_test,
                            "double_test", double_test,
                            "float_test", float_test,
                            "enum_test", enum_test,
                            "innerMessage",innerMessage,
                            "int32_array_test",int32_array_test,
                            "innerMessage_arr",innerMessage_arr,
                            "simple_map_test",simple_map_test,
                            "nest_map_test",nest_map_test
                   )
           ) AS protobuf_bytes FROM simple_test;3.3 Hive Jar 包冲突解决

说完了 UDF 的设计,接下来说一个我们在开发中十分头疼的问题,那就是 Jar 包冲突。因为 Hive 引入了 Protobuf 2 相关的依赖,所以在用户传入的 Protobuf 依赖为 Protobuf 3 时,会出现 NoSuchMehodError 等错误。
Jar 包冲突简单来说就是在不同的 Jar 包内有相同名字的 Class,但是他们的成员属性和方法不尽相同,程序在调用这个 Class 的方法时,可能因为加载到了另一个同名的 Class,导致调用出错。
Jar 包冲突最常见的解决方案是利用 Maven 的 shade 插件,将自身所引用的带冲突的 package rename 到其他的 package 下,这样原先有冲突的类因为被强制重命名了,就不是同名类,不会出现冲突了。这种方式简单粗暴,但是不适用我们的场景,主要是这种方式需要将 UDF 的 Jar 包与用户的 Protobuf 绑定到一起打包来进行统一的重命名,这样会导致每一个用户都要重新打包自己的 Protobuf,不便于维护与管理。
最后,我们参考了 Flink 的 ChildFirstClassLoader,在加载用户的 Jar 包时,使用 child-first 的加载方式。
在前面我们给出的样例中,有这样一段注入 Jar 包的 SQL:
SET protobuf.jar-urls=https://zhihu-juicefs.com/protobuf-java-3.19.1.jar,https://zhihu-juicefs.com/simple-test.jar;这里不用 Hive 的 ADD JAR 语法注入 Jar 的原因就是因为 Jar 包冲突。因此我们定义了 protobuf.jar-urls 这个属性,UDF 会从 Hive Server 的当前 Session 中读取 SessionConf 中这个属性的值,再将属性值内的这些 URL 放入 ChildFirstClassLoader,以 child-first 的方式加载,成功解决了 Jar 包冲突的问题。
这里我们 Jar 包依然采用 JuiceFS 做缓存。
3.4 Hive Server Metaspace OOM 解决

因为我们引入了 ChildFirstClassLoader,而 UDF 又没有类似 close 的接口来供我们释放 ClassLoader 的相关资源,这样会导致 Hive Server 每次在使用 Protobuf UDF 时加载进来的类都不会被释放,从而导致 Metaspace OOM。
这里我们通过研究 Hive 的 ADD JAR 语法,发现 Hive Server 的每个连接都有自己的 SessionConf 以及自己的 Session ClassLoader,这些信息都被封装到了 SessionState 对象,在连接断开时,SessionState 会释放 Session ClassLoader。而所有的 SeesionState 都封装在一个全局的 ThreadLocal 中,我们在 UDF 内能够轻易的取到。这样我们只需要将我们创建的 ChildFirstClassLoader 作为 Session Classloader,将原来的 Session Classloader 作为 ChildFirstClassLoader 的Parent,这样在连接关闭时,ChildFirstClassLoader 将会被自动关闭,释放加载的用户 Jar 包。
代码大致如下:
SessionState sessionState = SessionState.get();
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
if (sessionState != null) {
  parentClassLoader = sessionState.getConf().getClassLoader();
}
ChildFirstClassLoader childFirstClassLoader = new ChildFirstClassLoader(urls, parentClassLoader,);
if (sessionState != null) {
  sessionState.getConf().setClassLoader(childFirstClassLoader);
}

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Unity开发者联盟 ( 粤ICP备20003399号 )

GMT+8, 2024-11-24 16:25 , Processed in 0.124896 second(s), 26 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表