maltadirk 发表于 2023-2-21 08:12

python pb protobuf kafka flink java Mac

参考学习:Document
本文大概介绍一下python写入pb序列化的数据到Kafka,然后java这边在flink的基础上进行消费Kafka的数据。
先启动zk,然后启动Kafka。zk 3.4.8   kafka 2.4.1

我当时没配环境变量,所以都是切到相应目录去运行,有利于熟悉。
cd /Users/hebingrong/application/zookeeper-3.4.8/bin
./zkServer.sh start
zk占用了2181端口
https://blog.csdn.net/weixin_40283268/article/details/108722   FAILED TO WRITE PID因为创建里面的tempdir时出现问题,修改一下这个临时目录的路径即可
启动成功后,jps 可以查看   QuorumPeerMain 进程。

cd /Users/hebingrong/application/kafka_2.13-2.4.1/bin
./kafka-server-start.sh /Users/hebingrong/application/kafka_2.13-2.4.1/config/server.properties
kafka使用了9092端口启动完成后可以jps 看到 Kafka 进程
此时有

http://pic3.zhimg.com/v2-9040c473456723c9b417e49a0d20470a_b.png
这三个进程。

然后是python写入pb序列化的数据到Kafka。
先准备pb的编译器,即 protoc。这个推荐可以使用brew安装,我这安装的是3.6版本。brew 不方便安装一些历史版本的软件,所以要选定版本的时候我一般手动安装,brew维护的旧版本的软件太少了。
echo 'export PATH="/usr/local/opt/protobuf@3.6/bin:$PATH"' >> /Users/hebingrong/.bash_profile
快速导入环境变量,不过还是得定位到文件source一下。
接着是准备pb的原始对象文件。

http://pic1.zhimg.com/v2-1e2d3a14981928eaa1cc277dfdc49fe4_r.jpg
注意这里的三个名字,包名,文件名,message名,在python中不咋看得出来有啥用。
先提前看Java里面的认识一下


外部包名,类名,静态内部类的类名。    这里的message其实就是对应我们平常的pojo,也就是我们平常存数据的类。
这样我们就知道怎么填对应的属性了。还有required,optional修饰符。

required:一个格式良好的消息一定要(至少)含有1个这种字段,表示该值是必须要设置的;(也就是传输过程中如果没有这个值,那么就肯定失败)
optional:每个消息中可以包含0个或多个optional类型的字段。
repeated:在一个格式良好的消息中,这种字段可以重复任意多次(包括0次)。重复的值的顺序会被保留,表示该值可以重复,相当于java中的List,golang中的切片。
————————————————
版权声明:本文为CSDN博主「love666666shen」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:Mac端安装protobuf及其简单使用_sym的博客-CSDN博客_mac protoc

http://pic1.zhimg.com/v2-8ff57d7ef18c7b0fb660d9a7cd448ae4_r.jpg
然后就是根据那个基本文件生成在不同语言当中的基本类。   
protoc --python_out=./ ./location.proto      
接着python还要安装Kafka的客户端。注意一下对应Kafka的版本即可。
python 安装pb支持包   pip3 install protobuf==3.6.1python对应的pb版本也是3.6.1
import sys
import location_pb2
import json
from kafka import KafkaProducer



if __name__ == '__main__':

    # object.id = "1"
    # object.name = "张三"
    # object.age= 18
    # str_person=object.SerializeToString()
    # print(str_person)
    # object.ParseFromString(str_person)

    producer = KafkaProducer(
                            # value_serializer=str.encode(),我们调用SerializeToString就已经序列化好数据了,所以这里就不需要再定义序列化方式了
                            bootstrap_servers=['localhost:9092']
                         )
    for i in range (10):
      si=str(i)
      object=location_pb2.Person()
      object.id = si
      object.name="张三"+si
      object.age=18
      str_person=object.SerializeToString()
      print(str_person)
      object.ParseFromString(str_person)
      print("name="+object.name)
      print(object)
      producer.send('my_topic', str_person)
    producer.close()这里就是连接到Kafka,然后推送到my_topic这个主题下。
location_pb2.person 就像是创建一个对象一样。
然后就是SerializeToString序列化      ParseFromString反序列化了。

接着就是flink这边去消费Kafka的信息。


基本结构如上
public class PBDeserializer implements DeserializationSchema<Person> {

    @Override
    public Person deserialize(byte[] bytes) throws IOException {
      return Person.parseFrom(bytes);
    }

    @Override
    public boolean isEndOfStream(Person o) {
      return false;
    }

    @Override
    public TypeInformation<Person> getProducedType() {
      return TypeInformation.of(Person.class);
    }
}

然后是主流程类
package com.hbr.readdata;

import location.Location.Person;
import location.PBDeserializer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class FirstRead {
    public static void main(String[] args) throws Exception {
      // 创建执行环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // 设置并行度1
      env.setParallelism(1);

      Properties properties = new Properties();
      properties.setProperty("bootstrap.servers", "localhost:9092");
      // 下面这些次要参数
      properties.setProperty("group.id", "consumer-group");
      properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      //properties.setProperty("auto.offset.reset", "latest");earliest
      properties.setProperty("auto.offset.reset", "earliest");

      FlinkKafkaConsumer<Person> consumer = new FlinkKafkaConsumer<>("my_topic", new PBDeserializer(), properties);
      // flink添加外部数据源
      DataStream<Person> dataStream = env.addSource(consumer);

      // 打印输出对中文输出不友好,懒得去理这个自己得打印方式了
      dataStream.print();

      //先执行print,然后再执行这个mapfunction
      dataStream.map(new MyMapFunction());

      env.execute();
    }
    public static class MyMapFunction implements MapFunction<Person,String> {

      @Override
      public String map(Person person) throws Exception {
            System.out.println("my name is "+person.getName());
            System.out.println("my age is "+person.getAge());
            System.out.println("my id is "+person.getId());
            return null;
      }
    }
}
接着注意依赖:
    <properties>
      <maven.compiler.source>8</maven.compiler.source>
      <maven.compiler.target>8</maven.compiler.target>
      <flink.version>1.11.0</flink.version>
      <scala.binary.version>2.11</scala.binary.version>
    </properties>
    <dependencies>
      <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.8.0</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
      </dependency>
    </dependencies>pb反序列化这个类依赖于flink-core这个包,它是在flink-client这个依赖当中。
protoc生成的Java基本类依赖于 protobuf-java 这个jar包。
基本的执行环境这些,比如 StreamExecutionEnvironment 这些,是在flink-streaming-java包,也是在flink-client包中。
然后flinkcustomer这个类,在flink-connector-kafka_2.11即flink连接kafka的包中。
所以这里只引入了三个jar包,flink版本是1.11.0

然后对主流程类进行大改解析
"auto.offset.reset", "earliest"   消费者还没启动时,生产者就把数据放到Kafka上,设置成这个之后,消费者启动的时候,还可以消费掉之前存储在Kafka上的数据。
"auto.offset.reset", "latest   这个就不能消费之前留存的数据,只能消费到之后发送到Kafka上的数据。

execute之后才是真正把任务提交给flink执行。
最后的截图如下:



http://pic4.zhimg.com/v2-eb152d88a8d74d8c07e546b6295c7757_r.jpg
大概先这样。
页: [1]
查看完整版本: python pb protobuf kafka flink java Mac