找回密码
 立即注册
查看: 293|回复: 0

skywalking agent 初始化, 数据上报

[复制链接]
发表于 2021-10-9 18:59 | 显示全部楼层 |阅读模式
skywalking agent 初始化


核心概念
    BootService: 启动各个grpc客户端接口, 包含完整生命周期, prepare -> boot -> onComplete -> shutdown, 包含 TraceSegmentServiceClient, ServiceManagementClient 等等ServiceManager: 管理各种 BootService

入口类: SkyWalkingAgent, 通过使用 javaagent 的引导 SkyWalkingAgent#premain 方法进入, 并调用  ServiceManager.INSTANCE.boot(); 来启动各种 BootService
public static void premain(String agentArgs, Instrumentation instrumentation) throws PluginException {    ...    try {    // 简单的调用各个booService 进行 prepare -> boot -> onComplete 操作        ServiceManager.INSTANCE.boot();    } catch (Exception e) {        LOGGER.error(e, "Skywalking agent boot failure.");    }    // 注册关闭钩子    Runtime.getRuntime()            .addShutdownHook(new Thread(ServiceManager.INSTANCE::shutdown, "skywalking service shutdown thread"));    ...}java agent


使用 javaagent 需要几个步骤:
    定义一个 MANIFEST.MF 文件,必须包含 Premain-Class 选项,通常也会加入Can-Redefine-Classes 和 Can-Retransform-Classes 选项。创建一个Premain-Class 指定的类,类中包含 premain 方法,方法逻辑由用户自己确定。将 premain 的类和 MANIFEST.MF 文件打成 jar 包。使用参数 -javaagent:/jar包路径=[agentArgs 参数] 启动要代理的方法。

==premain== 方法用于 main 执行之前的预处理, 用于类的增强

字段如下描述, args 参数通过 -javaagent:xxx.jar==yyy 传入字符串, 如果需要修改字节码, 必须使用方式1, JVM 会优先加载 1 签名的方法,加载成功忽略 2,如果1 没有,加载 2 方法
// 1 public static void premain(String args, Instrumentation instrumentation)// 2public static void premain(String args)
    探秘 Java 热部署二(Java agent premain)
探针 , 服务端的通信流程


上报信息分为 "注册通信" , "数据上报" 两部分

agent 包中和服务端的相关 grpc 通信服务类
    EventReportServiceClient: 服务事件上报(服务启动, 关闭)LogReportServiceClient:  日志上报ServiceManagementClient:  服务实例信息, 心跳上报TraceSegmentServiceClient: TraceSegment 上报JVMMetricsSender:  jvm 信息上报
注册通信


用于上报服务的相关信息, 包含 服务名称, 实例信息, 核心逻辑在 ServiceManagementClient#run,

心跳定时器初始化在 prepare 中完成, 30 秒 执行一次
@Overridepublic void boot() {    heartbeatFuture = Executors.newSingleThreadScheduledExecutor(            new DefaultNamedThreadFactory("ServiceManagementClient")    ).scheduleAtFixedRate(            new RunnableWithExceptionProtection(                    this,                    t -> LOGGER.error("unexpected exception.", t)            ), 0, Config.Collector.HEARTBEAT_PERIOD,            TimeUnit.SECONDS    );}
上报主体逻辑, 心跳部分 keepAlive 每30秒执行1次, 而实例信息上报 30(collector.heartbeat_period) * 10(collector.properties_report_period_factor) 一次, 这有效避免了服务端信息丢失时, 无法收集的情况
if (Math.abs(sendPropertiesCounter.getAndAdd(1)) % Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) {    managementServiceBlockingStub        .withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)        .reportInstanceProperties(InstanceProperties.newBuilder()                                                    // 服务名称则是通过 agent.config 配置文件写入                                                    .setService(Config.Agent.SERVICE_NAME)                                                    // 实例名称如果没配置的话, 则会在 ServiceInstanceGenerator#prepare 中自动生成                                                    .setServiceInstance(Config.Agent.INSTANCE_NAME)                                                    // 服务属性通过 agent.config 配置文件写入                                                    .addAllProperties(OSUtil.buildOSInfo(                                                        Config.OsInfo.IPV4_LIST_SIZE))                                                    .addAllProperties(SERVICE_INSTANCE_PROPERTIES)                                                    .addAllProperties(LoadedLibraryCollector.buildJVMInfo())                                                    .build());} else {    final Commands commands = managementServiceBlockingStub.withDeadlineAfter(        GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS    ).keepAlive(InstancePingPkg.newBuilder()                               .setService(Config.Agent.SERVICE_NAME)                               .setServiceInstance(Config.Agent.INSTANCE_NAME)                               .build());    ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);}
实例属性的 Protobuf 消息定义
message InstanceProperties {    string service = 1;    string serviceInstance = 2;    repeated KeyStringValuePair properties = 3;}message KeyStringValuePair {    string key = 1;    string value = 2;}// grpc 服务定义service ManagementService {    // 上报实例消息    rpc reportInstanceProperties (InstanceProperties) returns (Commands) {    }    // 保持心跳    rpc keepAlive (InstancePingPkg) returns (Commands) {    }}
Endpoint收集部分呢, 8.7.0 未找到书中提及的 Endpoint 发送, 看起来是和 Trace 一起发送解析了, 这块到服务端再看下
jvm 信息上报


主体代码入口 JVMService#run, 执行流程
    JVMService#boot 中定义了名称为 JVMService-produce 的 Executor(每 1 秒执行一次) , 对应生产者定义 JVMService-consume 的 Executor(每 1 秒执行一次) , 类为 JVMMetricsSender, 对应消费者JVMService#run 收集 jvm 信息 封装成 JVMMetric, 塞到 JVMMetricsSender#LinkedBlockingQueue 中完成生产行为JVMMetricsSender#run 将消息通过 grpc发送到服务端, 完成消费行为
数据上报


TraceSegment 的上报通过 TraceSegmentServiceClient 完成, 这块上个部分已大体有说明了, 这里主要关注下 初始化部分
@Overridepublic void boot() {    lastLogTime = System.currentTimeMillis();    segmentUplinkedCounter = 0;    segmentAbandonedCounter = 0;    // 初始化, channel 为 5, buffer 为300 的 DataCarrier, 使用失败重试策略    carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);    // 初始化 ConsumerThread 进行 TraceSegment 发送    carrier.consume(this, 1);}@Overridepublic void onComplete() {    // 添加监听器到 TracingContext, 用于 Trace 完成时通知执行 consume    TracingContext.ListenerManager.add(this);}@Overridepublic void shutdown() {    TracingContext.ListenerManager.remove(this);    carrier.shutdownConsumers();}
懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Unity开发者联盟 ( 粤ICP备20003399号 )

GMT+8, 2024-11-24 17:17 , Processed in 0.089755 second(s), 25 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表