|
skywalking agent 初始化
核心概念
BootService: 启动各个grpc客户端接口, 包含完整生命周期, prepare -> boot -> onComplete -> shutdown, 包含 TraceSegmentServiceClient, ServiceManagementClient 等等ServiceManager: 管理各种 BootService
入口类: SkyWalkingAgent, 通过使用 javaagent 的引导 SkyWalkingAgent#premain 方法进入, 并调用 ServiceManager.INSTANCE.boot(); 来启动各种 BootService
public static void premain(String agentArgs, Instrumentation instrumentation) throws PluginException { ... try { // 简单的调用各个booService 进行 prepare -> boot -> onComplete 操作 ServiceManager.INSTANCE.boot(); } catch (Exception e) { LOGGER.error(e, "Skywalking agent boot failure."); } // 注册关闭钩子 Runtime.getRuntime() .addShutdownHook(new Thread(ServiceManager.INSTANCE::shutdown, "skywalking service shutdown thread")); ...}java agent
使用 javaagent 需要几个步骤:
定义一个 MANIFEST.MF 文件,必须包含 Premain-Class 选项,通常也会加入Can-Redefine-Classes 和 Can-Retransform-Classes 选项。创建一个Premain-Class 指定的类,类中包含 premain 方法,方法逻辑由用户自己确定。将 premain 的类和 MANIFEST.MF 文件打成 jar 包。使用参数 -javaagent:/jar包路径=[agentArgs 参数] 启动要代理的方法。
==premain== 方法用于 main 执行之前的预处理, 用于类的增强
字段如下描述, args 参数通过 -javaagent:xxx.jar==yyy 传入字符串, 如果需要修改字节码, 必须使用方式1, JVM 会优先加载 1 签名的方法,加载成功忽略 2,如果1 没有,加载 2 方法
// 1 public static void premain(String args, Instrumentation instrumentation)// 2public static void premain(String args)
探秘 Java 热部署二(Java agent premain)
探针 , 服务端的通信流程
上报信息分为 "注册通信" , "数据上报" 两部分
agent 包中和服务端的相关 grpc 通信服务类
EventReportServiceClient: 服务事件上报(服务启动, 关闭)LogReportServiceClient: 日志上报ServiceManagementClient: 服务实例信息, 心跳上报TraceSegmentServiceClient: TraceSegment 上报JVMMetricsSender: jvm 信息上报
注册通信
用于上报服务的相关信息, 包含 服务名称, 实例信息, 核心逻辑在 ServiceManagementClient#run,
心跳定时器初始化在 prepare 中完成, 30 秒 执行一次
@Overridepublic void boot() { heartbeatFuture = Executors.newSingleThreadScheduledExecutor( new DefaultNamedThreadFactory("ServiceManagementClient") ).scheduleAtFixedRate( new RunnableWithExceptionProtection( this, t -> LOGGER.error("unexpected exception.", t) ), 0, Config.Collector.HEARTBEAT_PERIOD, TimeUnit.SECONDS );}
上报主体逻辑, 心跳部分 keepAlive 每30秒执行1次, 而实例信息上报 30(collector.heartbeat_period) * 10(collector.properties_report_period_factor) 一次, 这有效避免了服务端信息丢失时, 无法收集的情况
if (Math.abs(sendPropertiesCounter.getAndAdd(1)) % Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) { managementServiceBlockingStub .withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS) .reportInstanceProperties(InstanceProperties.newBuilder() // 服务名称则是通过 agent.config 配置文件写入 .setService(Config.Agent.SERVICE_NAME) // 实例名称如果没配置的话, 则会在 ServiceInstanceGenerator#prepare 中自动生成 .setServiceInstance(Config.Agent.INSTANCE_NAME) // 服务属性通过 agent.config 配置文件写入 .addAllProperties(OSUtil.buildOSInfo( Config.OsInfo.IPV4_LIST_SIZE)) .addAllProperties(SERVICE_INSTANCE_PROPERTIES) .addAllProperties(LoadedLibraryCollector.buildJVMInfo()) .build());} else { final Commands commands = managementServiceBlockingStub.withDeadlineAfter( GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS ).keepAlive(InstancePingPkg.newBuilder() .setService(Config.Agent.SERVICE_NAME) .setServiceInstance(Config.Agent.INSTANCE_NAME) .build()); ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);}
实例属性的 Protobuf 消息定义
message InstanceProperties { string service = 1; string serviceInstance = 2; repeated KeyStringValuePair properties = 3;}message KeyStringValuePair { string key = 1; string value = 2;}// grpc 服务定义service ManagementService { // 上报实例消息 rpc reportInstanceProperties (InstanceProperties) returns (Commands) { } // 保持心跳 rpc keepAlive (InstancePingPkg) returns (Commands) { }}
Endpoint收集部分呢, 8.7.0 未找到书中提及的 Endpoint 发送, 看起来是和 Trace 一起发送解析了, 这块到服务端再看下
jvm 信息上报
主体代码入口 JVMService#run, 执行流程
JVMService#boot 中定义了名称为 JVMService-produce 的 Executor(每 1 秒执行一次) , 对应生产者定义 JVMService-consume 的 Executor(每 1 秒执行一次) , 类为 JVMMetricsSender, 对应消费者JVMService#run 收集 jvm 信息 封装成 JVMMetric, 塞到 JVMMetricsSender#LinkedBlockingQueue 中完成生产行为JVMMetricsSender#run 将消息通过 grpc发送到服务端, 完成消费行为
数据上报
TraceSegment 的上报通过 TraceSegmentServiceClient 完成, 这块上个部分已大体有说明了, 这里主要关注下 初始化部分
@Overridepublic void boot() { lastLogTime = System.currentTimeMillis(); segmentUplinkedCounter = 0; segmentAbandonedCounter = 0; // 初始化, channel 为 5, buffer 为300 的 DataCarrier, 使用失败重试策略 carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE); // 初始化 ConsumerThread 进行 TraceSegment 发送 carrier.consume(this, 1);}@Overridepublic void onComplete() { // 添加监听器到 TracingContext, 用于 Trace 完成时通知执行 consume TracingContext.ListenerManager.add(this);}@Overridepublic void shutdown() { TracingContext.ListenerManager.remove(this); carrier.shutdownConsumers();} |
|