微服务链路追踪SkyWalking第五课 SkyWalking中Trace落地实现方案
第12讲:剖析 Trace 在 SkyWalking 中的落地实现方案(上)
通过前面几课时的学习,我们已经了解 SkyWalking Agent 启动的基本流程、插件增强代码的基本逻辑以及核心 BootService 实现的功能。从本课时开始,我们将深入分析 SkyWalking Agent 中 Trace 相关的基础组件。
在 04 课时中我们介绍了 OpenTracing 的基本概念,SkyWalking 中 Trace 的相关概念以及实现类与 OpenTracing 中的概念基本类似,像 Trace、Span、Tags、Logs 等核心概念,在 SkyWalking Agent 中都有对应实现,只是在细微实现上略有区别的,其中最重要的是: SkyWalking 的设计在 Trace 级别和 Span 级别之间加了一个 Segment 概念,用于表示一个服务实例内的 Span 集合。
Trace ID
在分布式链路追踪系统中,用户请求的处理过程会形成一条 Trace 。Trace ID 作为 Trace 数据的唯一标识,在面对海量请求的时候,需要保证其唯一性。与此同时,还要保证生成 Trace ID 不会带来过多开销,所以在业务场景中依赖数据库(自增键或是类似 Meituan-Dianping/Leaf 的 ID 生成方式)都不适合 Trace 的场景。
这种要求快速、高性能生成唯一 ID 的需求场景,一般会将 snowflake 算法与实际的场景集合进行改造。
snowflake 算法是 Twitter 开源的分布式 ID 生成算法 。snowflake 算法的核心思想是将一个 ID(long类型)的 64 个 bit 进行切分,其中使用 41 个 bit 作为毫秒数,10 个 bit 作为机器的 ID( 5 个 bit 记录数据中心的 ID,5 个 bit 记录机器的 ID ),12 bit 作为毫秒内的自增 ID,还有一个 bit 位永远是 0。snowflake 算法生成的 ID 结构如下图所示:
snowflake 算法的好处是 ID 可以直接靠算法在内存中产生,内存内的锁控制并发,不需依赖 MySQL 这样的外部依赖,无维护成本。缺点就是每个机器节点在每毫秒内只可以产生 4096 个 ID,超出这个范围就会溢出。另外,如果机器回拨了时间,就会生成重复的 ID。
ID 类是 SkyWalking 中对全局唯一标识的抽象,其生成策略与 snowflake 算法类似。SkyWalking ID 由三个 long 类型的字段(part1、part2、part3)构成,分别记录了 ServiceInstanceId、Thread ID 和 Context 生成序列。Context 生成序列的格式是:
${时间戳} * 10000 + 线程自增序列([0, 9999])
ID 对象序列化之后的格式是将 part1、part2、part3 三部分用“.”分割连接起来 :
${ServiceInstanceId}.${Thread ID}.(${时间戳} * 10000 + 线程自增序列([0, 9999]))
GlobalIdGenerator 是 Agent 中用来生成全局唯一 ID 的基础工具类,在 generate() 方法中的实现如下:
public static ID generate() {
// THREAD_ID_SEQUENCE是 ThreadLocal<IDContext>类型,即每个线程
// 维护一个 IDContext对象
IDContext context = THREAD_ID_SEQUENCE.get();
return new ID(SERVICE_INSTANCE_ID, // service_intance_id
Thread.currentThread().getId(), // 当前线程的ID
context.nextSeq() // 线程内生成的序列号
);
}
IDContext.nextSeq() 方法的实现如下,其中 timestamp() 方法在返回时间戳的时候,会处理时间回拨的场景(使用 Random 随机生成一个时间戳),nextThreadSeq() 方法的返回值在 [0 , 9999] 这个范围内循环:
private long nextSeq() {
return timestamp() * 10000 + nextThreadSeq();
}
GlobalIdGenerator 不仅用于生成 Trace ID ,其他需要唯一 ID 的地方也会通过其 nextSeq() 方法生成。
SkyWalking 中使用 DistributedTraceId 类来抽象 Trace ID,其中封装了一个 ID 类型的字段。DistributedTraceId 有两个实现类,如下图所示:
其中,NewDistirbutedTraceId 负责生成新 Trace ID,请求刚刚进入系统时,会创建 NewDistirbutedTraceId 对象,其构造方法内部会调用 GlobalIdGenerator.generate() 方法生成 ID 对象。
PropagatedTraceId 负责处理 Trace 传播过程中的 TraceId。PropagatedTraceId 的构造方法接收一个 String 类型参数(也就是在跨进程传播时序列化后的 Trace ID),解析之后得到 ID 对象。
在后面的介绍中还会涉及另一个与 Trace ID 相关的类 —— DistributedTraceIds,它表示多个 Trace ID 的集合,其底层封装了一个 LinkedList<DistributedTraceId> 集合,用于记录相关的 Trace ID。
TraceSegment
在 SkyWalking 中,TraceSegment 是一个介于 Trace 与 Span 之间的概念,它是一条 Trace 的一段,可以包含多个 Span。在微服务架构中,一个请求基本都会涉及跨进程(以及跨线程)的操作,例如, RPC 调用、通过 MQ 异步执行、HTTP 请求远端资源等,处理一个请求就需要涉及到多个服务的多个线程。TraceSegment 记录了一个请求在一个线程中的执行流程(即 Trace 信息)。将该请求关联的 TraceSegment 串联起来,就能得到该请求对应的完整 Trace。
下面我们先来介绍 TraceSegment 的核心字段:
- traceSegmentId(ID 类型):TraceSegment 的全局唯一标识,是由前面介绍的 GlobalIdGenerator 生成的。
- refs(List<TraceSegmentRef> 类型):它指向父 TraceSegment。在我们常见的 RPC 调用、HTTP 请求等跨进程调用中,一个 TraceSegment 最多只有一个父 TraceSegment,但是在一个 Consumer 批量消费 MQ 消息时,同一批内的消息可能来自不同的 Producer,这就会导致 Consumer 线程对应的 TraceSegment 有多个父 TraceSegment 了,当然,该 Consumer TraceSegment 也就属于多个 Trace 了。
- relatedGlobalTraces(DistributedTraceIds 类型):记录当前 TraceSegment 所属 Trace 的 Trace ID。
- spans(List<AbstractTracingSpan> 类型):当前 TraceSegment 包含的所有 Span。
- ignore(boolean 类型):ignore 字段表示当前 TraceSegment 是否被忽略。主要是为了忽略一些问题 TraceSegment(主要是对只包含一个 Span 的 Trace 进行采样收集)。
- isSizeLimited(boolean 类型):这是一个容错设计,例如业务代码出现了死循环 Bug,可能会向相应的 TraceSegment 中不断追加 Span,为了防止对应用内存以及后端存储造成不必要的压力,每个 TraceSegment 中 Span 的个数是有上限的(默认值为 300),超过上限之后,就不再添加 Span了。
下图展示了一个 TraceSegment 的核心结构:
Span
TraceSegment 是由多个 Span 构成的,AbstractSpan 抽象类是 SkyWalking 对 Span 概念的抽象,下图是 Span 的继承关系:
首先需要明确的是,我们最终直接使用的 Span 分为 3 类:
- EntrySpan:当请求进入服务时会创建 EntrySpan 类型的 Span,它也是 TraceSegment 中的第一个 Span。例如,HTTP 服务、RPC 服务、MQ-Consumer 等入口服务的插件在接收到请求时都会创建相应的 EntrySpan。
- LocalSpan:它是在本地方法调用时可能创建的 Span 类型,在后面介绍 @Trace 注解的时候我们还会看到 LocalSpan。
- ExitSpan:当请求离开当前服务、进入其他服务时会创建 ExitSpan 类型的 Span。例如, Http Client 、RPC Client 发起远程调用或是 MQ-producer 生产消息时,都会产生该类型的 Span。
下面我们按照 Span 的继承结构,自顶层接口开始逐个向下介绍。首先,AsyncSpan 接口定义了一个异步 Span 的基本行为:
- prepareForAsync() 方法:Span 在当前线程结束了,但是未被彻底关闭,依然是存活的。
- asyncFinish()方法:当前 Span 真正关闭。它与 prepareForAsync() 方法成对出现。
这两个方法在异步框架的插件中会见到。
AbstractSpan 也是一个接口,其中定义了 Span 的基本行为,其中的方法比较重要:
- getSpanId() 方法:用来获得当前 Span 的 ID,Span ID 是一个 int 类型的值,在其所属的 TraceSegment 中唯一,在创建 Span 对象时生成,从 0 开始自增。
- setOperationName()/setOperationId() 方法:用来设置 operation 名称(或 operation ID),这两个信息是互斥的。它们在 AbstractSpan 的具体实现(即 AbstractTracingSpan)中,分别对应 operationId 和 operationName 两个字段,两者只能有一个字段有值。
operationName 即前文介绍的 EndpointName,可以是任意字符串,例如,在 Tomcat 插件中 operationName 就是 URI 地址,Dubbo 插件中 operationName 为 URL + 接口方法签名。
- setComponent() 方法:用于设置组件类型。它有两个重载,在 AbstractTracingSpan 实现中,有 componentId 和 componentName 两个字段,两个重载分别用于设置这两个字段。在 ComponentsDefine 中可以找到 SkyWalking 目前支持的组件类型。
- setLayer() 方法:用于设置 SpanLayer,也就是当前 Span 所处的位置。SpanLayer 是个枚举,可选项有 DB、RPC_FRAMEWORK、HTTP、MQ、CACHE。
- tag(AbstractTag, String) 方法:用于为当前 Span 添加键值对的 Tags。一个 Span 可以有多个 Tags。AbstractTag 中不仅包含了 String 类型的 Key 值,还包含了 Tag 的 ID 以及 canOverwrite 标识。AbstractTracingSpan 实现通过维护一个 List<TagValuePair> 集合(tags 字段)来记录 Tag 信息,TagValuePair 中则封装了 AbstractTag 类型的 Key 以及 String 类型的 Value。
- log() 方法:用于向当前 Span 中添加 Log,一个 Span 可以包含多条日志。在 AbstractTracingSpan 实现中通过维护一个 List<LogDataEntity> 集合(logs 字段)来记录 Log。LogDataEntity 会记录日志的时间戳以及 KV 信息,以异常日志为例,其中就会包含一个 Key 为“stack”的 KV,其 value 为异常堆栈。
- start() 方法:开启 Span,其中会设置当前 Span 的开始时间以及调用层级等信息。
- isEntry() 方法:判断当前是否是 EntrySpan。EntrySpan 的具体实现后面详细介绍。
- isExit() 方法:判断当前是否是 ExitSpan。ExitSpan 的具体实现后面详细介绍。
- ref() 方法:用于设置关联的 TraceSegment 。
AbstractTracingSpan 实现了 AbstractSpan 接口,定义了一些 Span 的公共字段,其中的部分字段在介绍 AbstractSpan 接口时已经提到了,下面简单介绍一下前面未涉及的字段含义:
protected int spanId; // span的ID
protected int parentSpanId; // 记录父Span的ID
protected List<TagValuePair> tags; // 记录Tags的集合
protected long startTime, endTime; // Span的起止时间
protected boolean errorOccurred = false; // 标识该Span中是否发生异常
protected List<TraceSegmentRef> refs; // 指向所属TraceSegment
// context字段指向TraceContext,TraceContext与当前线程绑定,与TraceSegment
// 一一对应
protected volatile AbstractTracerContext context;
AbstractTracingSpan 中提供的方法也比较简单,基本都是上述字段的 getter/setter 方法,这些方法不再展开赘述。这里需要注意两个方法:
- finish(TraceSegment) 方法:该方法会关闭当前 Span ,具体行为是用 endTime 字段记录当前时间,并将当前 Span 记录到所属 TraceSegment 的 spans 集合中。
- transform() 方法:该方法会在 Agent 上报 TraceSegment 数据之前调用,它会将当前 AbstractTracingSpan 对象转换成 SpanObjectV2 对象。SpanObjectV2 是在 proto 文件中定义的结构体,后面 gRPC 上报 TraceSegment 数据时会将其序列化。
StackBasedTracingSpan 在继承 AbstractTracingSpan 存储 Span 核心数据能力的同时,还引入了栈的概念,这种 Span 可以多次调用 start() 方法和 end() 方法,但是两者调用次数必须要配对,类似出栈和入栈的操作。
下面以 EntrySpan 为例说明为什么需要“栈”这个概念,EntrySpan 表示的是一个服务的入口 Span,是 TraceSegment 的第一个 Span,出现在服务提供方的入口,例如,Dubbo Provider、Tomcat、Spring MVC,等等。 那么为什么 EntrySpan 继承 StackBasedTracingSpan 呢? 从前面对 SkyWalking Agent 的分析来看,Agent 插件只会拦截指定类的指定方法并对其进行增强,例如,Tomcat、Spring MVC 等插件的增强逻辑中就包含了创建 EntrySpan 的逻辑(后面在分析具体插件实现的时候,会看到具体的实现代码)。很多 Web 项目会同时使用到这两个插件,难道一个 TraceSegment 要有两个 EntrySpan 吗?显然不行。
SkyWalking 的处理方式是让 EntrySpan 继承了 StackBasedTracingSpan,多个插件同时使用时,整个架构如下所示:
其中,请求相应的 EntrySpan 处理流程如下:
- 当请求经过 Tomcat 插件时(即图中 ① 处),会创建 EntrySpan 并第一次调用 start() 方法,启动该 EntrySpan。
在 start() 方法中会有下面几个操作:
- 将 stackDepth 字段(定义在 StackBasedTracingSpan 中)加 1,stackDepth 表示当前所处的插件栈深度 。
- 更新 currentMaxDepth 字段(定义在 EntrySpan 中),currentMaxDepth 会记录该EntrySpan 到达过的插件栈的最深位置。
- 此时第一次启动 EntrySpan 时会更新 startTime 字段,记录请求开始时间。
此时插件栈(这是为了方便理解而虚拟出来一个栈结构,实际上只有 stackDepth、currentMaxDepth 两个字段,并不会用到栈结构,也不会记录请求经过的插件)的状态如下图所示:
- 当请求经过 Spring MVC 插件时(即图中 ② 处),不会再创建新的 EntrySpan 了,而重新调用该 EntrySpan 的 start() 方法,其中会继续将 stackDepth 以及 currentMaxDepth 字段加 1 。注意,再次调用 start() 方法时不会更新 startTime 字段了,因为请求已经开始处理了。此时插件栈的状态如下图:
- 当请求经过业务逻辑处理完成之后,开始进入 Spring MVC 插件的后置处理逻辑时(即图中 ③ 处),会第 1 次调用 EntrySpan.finish() 方法,其中会将 stackDepth 减 1,即 Spring MVC 插件出栈,此时插件栈的状态如下图:
- 最后进入 Tomcat 插件的后置处理逻辑(即图中 ④ 处),其中会第 2 次调用 finish() 方法,此时 stackDepth 再次减 1,此时 stackDepth 减到了 0 ,整个插件栈已经空了,会调用父类 AbstractTracingSpan 的 finish() 方法将当前 EntrySpan 添加到关联的 TraceSegment 中。
这里需要注意两个点,一是在调用 start() 方法时,会将之前设置的 component、Tags、Log 等信息全部清理掉(startTime不会清理),上例中请求到 Spring MVC 插件之前(即 ② 处之前)设置的这些信息都会被清理掉。二是 stackDepth 与 currentMaxDepth 不相等时(上例中 ③ 处),无法记录上述字段的信息。通过这两点,我们知道 EntrySpan 实际上只会记录最贴近业务侧的 Span 信息。
StackBasedTracingSpan 除了将“栈”概念与 EntrySpan 结合之外,还添加了 peer(以及 peerId)字段来记录远端地址,在发送远程调用时创建的 ExitSpan 会将该记录用于对端地址。
ExitSpan 表示的是出口 Span,如果在一个调用栈里面出现多个插件嵌套的场景,也需要通过“栈”的方式进行处理,与上述逻辑类似,只会在第一个插件中创建 ExitSpan,后续调用的 ExitSpan.start() 方法并不会更新 startTime,只会增加栈的深度。当然,在设置 Tags、Log 等信息时也会进行判断,只有 stackDepth 为 1 的时候,才会能正常写入相应字段。也就是说,ExitSpan 中只会记录最贴近当前服务侧的 Span 信息。
一个 TraceSegment 可以有多个 ExitSpan,例如,Dubbo A 服务在处理一个请求时,会调用 Dubbo B 服务,在得到响应之后,会紧接着调用 Dubbo C 服务,这样,该 TraceSegment 就有了两个完全独立的 ExitSpan。
LocalSpan 则比较简单,它表示一个本地方法调用。LocalSpan 直接继承了 AbstractTracingSpan,由于它未继承 StackBasedTracingSpan,所以也不能 start 或 end 多次,在后面介绍 @Trace 注解的相关实现时,还会看到 LocalSpan 的身影。
第13讲:剖析 Trace 在 SkyWalking 中的落地实现方案(下)
TraceSegmentRef
TraceSegment 中除了 Span 之外,还有另一个需要介绍的重要依赖 —— TraceSegmentRef,TraceSegment 通过 refs 集合记录父 TraceSegment 的信息,它的核心字段大概可以分为 3 类:
父 Span 信息
traceSegmentId(ID 类型):父 TraceSegment 的 ID。
spanId(int 类型):父 Span 的 ID,与 traceSegmentId 结合就可以确定父 Span。
type(SegmentRefType 类型):SegmentRefType 是个枚举,可选值有:CROSS_PROCESS、CROSS_THREAD,分别表示跨进程调用和跨线程调用。
父应用(或者说,上游调用方)信息
peerId 和 peerHost:父应用(即上游调用方)的地址信息。
parentServiceInstanceId(int 类型):父应用(即上游应用)的 ServiceInstanceId。
parentEndpointName 和 parentEndpointId:父应用的(即上游应用)的 Endpoint 信息。
入口信息(在整条 Trace 中都会传递该信息)
entryServiceInstanceId:入口应用的 ServiceInstanceId。
entryEndpointName 和 entryEndpointId:入口 Endpoint 信息。
Context
SkyWalking 中的每个 TraceSegment 都与一个 Context 上下文对象一对一绑定,Context 上下文不仅记录了 TraceSegment 的上下文信息,还提供了管理 TraceSegment 生命周期、创建 Span 以及跨进程(跨线程)传播相关的功能。
AbstractTracerContext 是对上下文概念的抽象,其中定义了 Context 上下文的基本行为:
inject(ContextCarrier) 方法:在跨进程调用之前,调用方会通过 inject() 方法将当前 Context 上下文记录的全部信息注入到 ContextCarrier 参数中,Agent 后续会将 ContextCarrier 序列化并随远程调用进行传播。ContextCarrier 的具体实现在后面会详细分析。
extract(ContextCarrier) 方法:跨进程调用的接收方会反序列化得到 ContextCarrier 对象,然后通过 extract() 方法从 ContextCarrier 中读取上游传递下来的 Trace 信息并记录到当前的 Context 上下文中。
ContextSnapshot capture() 方法:在跨线程调用之前,SkyWalking Agent 会通过 capture() 方法将当前 Context 进行快照,然后将快照传递给其他线程。
continued(ContextSnapshot) 方法:跨线程调用的接收方会从收到的 ContextSnapshot 中读取 Trace 信息并填充到当前 Context 上下文中。
getReadableGlobalTraceId() 方法: 用于获取当前 Context 关联的 TraceId。
createEntrySpan()、createLocalSpan() 方法、createExitSpan() 方法:用于创建 Span。
activeSpan() 方法:用于获得当前活跃的 Span。在 TraceSegment 中,Span 也是按照栈的方式进行维护的,因为 Span 的生命周期符合栈的特性,即:先创建的 Span 后结束。
stopSpan(AbstractSpan) 方法:用于停止指定 Span。
AbstractTraceContext 有两个实现类,如下图所示:
IgnoredTracerContext 表示该 Trace 将会被丢失,所以其中不会记录任何信息,里面所有方法也都是空实现。这里重点来看 TracingContext,其核心字段如下:
samplingService(SamplingService 类型):负责完成 Agent 端的 Trace 采样,后面会展开介绍具体的采样逻辑。
segment(TraceSegment 类型):它是与当前 Context 上下文关联的 TraceSegment 对象,在 TracingContext 的构造方法中会创建该对象。
activeSpanStack(LinkedList<AbstractSpan> 类型):用于记录当前 TraceSegment 中所有活跃的 Span(即未关闭的 Span)。实际上 activeSpanStack 字段是作为栈使用的,TracingContext 提供了 push() 、pop() 、peek() 三个标准的栈方法,以及 first() 方法来访问栈底元素。
spanIdGenerator(int 类型):它是 Span ID 自增序列,初始值为 0。该字段的自增操作都是在一个线程中完成的,所以无需加锁。
管理 Span
一般情况下,在 Agent 插件的前置处理逻辑中,会调用 createEntrySpan() 方法创建 EntrySpan,在 TracingContext 的实现中,会检测 EntrySpan 是否已创建,如果是,则不会创建新的 EntrySpan,只是重新调用一下其 start() 方法即可。TracingContext.createEntrySpan() 方法的大致实现如下:
public AbstractSpan createEntrySpan(final String operationName) { if (isLimitMechanismWorking()) { // 前面提到过,默认配置下,每个TraceSegment只能放300个Span NoopSpan span = new NoopSpan(); // 超过300就放 NoopSpan return push(span); // 将Span记录到activeSpanStack这个栈中 } AbstractSpan entrySpan; final AbstractSpan parentSpan = peek(); // 读取栈顶Span,即当前Span final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId(); if (parentSpan != null && parentSpan.isEntry()) { // 更新 operationId(省略operationName的处理逻辑),省略 // EndpointNameDictionary 的处理,其核心逻辑在前面的小节已经介绍过了。 entrySpan = parentSpan.setOperationId(operationId); // 重新调用 start()方法,前面提到过,start()方法会重置 // operationId(以及或operationName)之外的其他字段 return entrySpan.start(); } else { // 新建 EntrySpan对象,spanIdGenerator生成Span ID并递增 entrySpan = new EntrySpan(spanIdGenerator++, parentSpanId, operationId); // 调用 start()方法,第一次调用start()方法时会设置startTime entrySpan.start(); // 将新建的Span添加到activeSpanStack栈的栈顶 return push(entrySpan); } }
前面通过 demo-webapp 示例介绍了多次调用 EntrySpan.start() 方法中栈相关的概念,这里依旧通过 demo-webapp 示例简单介绍一下 activeSpanStack 这个栈的工作原理,示例 Trace 如下图所示:
当请求经过 Tomcat 插件时会创建 EntrySpan(调用 start() 方法)并入栈到 activeSpanStack 中;请求经过 Spring MVC 插件时不会创建新的 EntrySpan,只会重新调用 start() 方法。接下来在调用 first() 方法时会创建相应的 LocalSpan 并入栈,first() 方法调用结束之后会将该 LocalSpan 出栈;调用 second() 方法时与 Span 出入栈逻辑相同;最后在通过 Dubbo 远程调用 HelloService.say() 方法的时候,会创建相应的 ExitSpan 并入栈,结束 Dubbo 调用之后其相应的 ExitSpan 会出栈,此时整个 activeSpanStack 栈空了,TraceSegment 也就结束了。整个过程如下图所示:
createLocalSpan() 方法负责创建 LocalSpan 对象并添加到 activeSpanStack 集合中,LocalSpan 的 start() 方法中没有栈的概念,存在多次调用的情况,只在这里调用一次即可。
createExitSpan() 方法负责创建 ExitSpan,与 createEntrySpan() 方法类似:
public AbstractSpan createExitSpan(String operationName, String remotePeer) { AbstractSpan exitSpan; // 从activeSpanStack栈顶获取当前Span AbstractSpan parentSpan = peek(); if (parentSpan != null && parentSpan.isExit()) { // 当前Span已经是ExitSpan,则不再新建ExitSpan,而是调用其start()方法 exitSpan = parentSpan; } else { // 当前Span不是 ExitSpan,就新建一个ExitSpan final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId(); exitSpan = new ExitSpan(spanIdGenerator++, parentSpanId, operationId, peerId); push(exitSpan); // 将新建的ExitSpan入栈 } exitSpan.start();// 调用start()方法 return exitSpan; }
了解了 TracingContext 创建以及维护 3 类 Span 的实现之后,我们来看关闭 Span 的方法 —— stopSpan() 方法,它会将当前 activeSpanStack 栈顶的 Span 关闭并出栈,同时在整个 activeSpanStack 栈空了之后,会尝试关闭当前 TraceSegment,具体实现如下:
public boolean stopSpan(AbstractSpan span) { AbstractSpan lastSpan = peek(); // 获取当前栈顶的Span对象 if (lastSpan == span) { // 只能关闭当前活跃Span对象,否则抛异常 if (lastSpan instanceof AbstractTracingSpan) { if (lastSpan.finish(segment)) { // 尝试关闭Span //当Span完全关闭之后,会将其出栈(即从activeSpanStack中删除) pop(); } } else { pop(); // 针对NoopSpan类型Span的处理 } } else { throw new IllegalStateException("Stopping the unexpected..."); } // TraceSegment中全部Span都关闭(且异步状态的Span也关闭了),则当前 // TraceSegment也会关闭,该关闭会触发TraceSegment上传操作,后面详述 if (checkFinishConditions()) { finish(); } return activeSpanStack.isEmpty(); }
跨进程(跨线程)传播
在开始介绍 Context 与跨进程传播相关的实现之前,需要先介绍一下它们的参数 —— ContextCarrier。从类名就可以看出 ContextCarrier 是 Context 上下文的搬运工(Carrier),它实现了 Serializable 接口,负责在进程之间搬运 TracingContext 的一些基本信息,跨进程调用涉及 Client 和 Server 两个系统,所以 ContextCarrier 中的字段 Client 和 Server 含义不同:
traceSegmentId(ID 类型):它记录了 Client 中 TraceSegment ID;从 Server 角度看,记录的是父 TraceSegment 的 ID。
spanId(int 类型):从 Client 角度看,它记录了当前 ExitSpan 的 ID;从 Server 角度,看记录的是父 Span ID。
parentServiceInstanceId(int 类型):它记录的是 Client 服务实例的 ID。
peerHost(String 类型):它记录了 Server 端的地址(这里 peerName 和 peerId 共用了同一个字段)。以 "#" 开头时记录的是 peerName,否则记录的是 peerId,在 inject() 方法(或 extract() 方法)中填充(或读取)该字段时会专门判断处理开头的"#"字符。
entryEndpointName(String 类型):它记录整个 Trace 的入口 EndpointName,该值在整个 Trace 中传播。
parentEndpointName(String 类型):它记录了 Client 入口 EndpointName(或 EndpointId)。以 "#" 开头的时候,记录的是 EndpointName,否则记录的是 EndpointId。
primaryDistributedTraceId(DistributedTraceId 类型):它记录了当前 Trace ID。
entryServiceInstanceId(int 类型):它记录了当前 Trace 的入口服务实例 ID。
跨进程传播 Context 上下文信息的核心流程大致为:远程调用的 Client 端会调用 inject(ContextCarrier) 方法,将当前 TracingContext 中记录的 Trace 上下文信息填充到传入的 ContextCarrier 对象。后续 Client 端的插件会将 ContextCarrier 对象序列化成字符串并将其作为附加信息添加到请求中,这样,ContextCarrier 字符串就会和请求一并到达 Server 端。Server 端的入口插件会检查请求中是否携带了 ContextCarrier 字符串,如果存在 ContextCarrier 字符串,就会将其进行反序列化,然后调用 extract() 方法从 ContextCarrier 对象中取出 Context 上下文信息,填充到当前 TracingContext(以及 TraceSegmentRef) 中。
例如在 demo-webapp 和 demo-provider 的示例中,ContextCarrier 的传播过程如图所示,序列化之后的 ContextCarrier 字符串会放到 RpcContext 中:
这里需要深入介绍一下 ContextCarrier 序列化之后的格式,具体实现在其 serialize() 方法中:
// 有多个版本的结构,这里只关注最新的V2版本 String serialize(HeaderVersion version) { return StringUtil.join('-', "1", Base64.encode(this.getPrimaryDistributedTraceId().encode()), Base64.encode(this.getTraceSegmentId().encode()), this.getSpanId() + "", this.getParentServiceInstanceId() + "", this.getEntryServiceInstanceId() + "", Base64.encode(this.getPeerHost()), Base64.encode(this.getEntryEndpointName()), Base64.encode(this.getParentEndpointName())); }
ContextCarrier 序列化之后得到的字符串分为 9 个部分,每个部分通过"-"(中划线)连接。在 deserialize() 方法中实现了 ContextCarrier 反序列化的逻辑,即将上述字符串进行切分并赋值到对应的字段中,具体逻辑为 serialize() 方法的逆操作,这里不再展开分析。
下面来看 TracingContext 对跨线程传播的支持,这里涉及 capture() 方法和 continued() 方法。跨线程传播时使用 ContextSnapshot 为 Context 上下文创建快照,因为是在一个 JVM 中,所以 ContextSnapshot 不涉及序列化的问题,也无需携带服务实例 ID 以及 peerHost 信息,其他核心字段与 ContextCarrier 类似,这里不再展开介绍。
总结
这个课时我们主要学习了 SkyWalking 对 Trace 基本概念的实现,首先介绍了 Trace ID 的实现结构,之后分析了 TraceSegment 如何维护底层 Span 集合以及父子关系,接下来深入剖析了 3 种类型的 Span 以及 StackBasedTracingSpan 引入的栈的概念。最后剖析了与 TraceSegment 相对应的 TracingContext 的实现,它管理着 3 类 Span 的生命周期,提供了跨进程/跨线程传播的基本方法。
在后面的课时中,我们将深入学习与 Trace 相关的 BootService 实现,分析 SkyWalking Agent 如何在这些基础组件上有条不紊的收集并发送 Trace 数据。
第14讲:收集、发送 Trace 核心原理,Agent 与 OAP 的大动脉
在前面的课时中,我们深入介绍了 SkyWalking 对 Trace 基本概念的实现。本课时我们将继续深入学习 Trace 相关的 BootService 接口实现类以及 Trace 收集和发送的核心逻辑。Trace 相关的 BootService 接口实现类如下图所示:
ContextManager
ContextManager 的主要职责就是管理前文介绍的 TracingContext,它会通过 ThreadLocal 将 TracingContext 对象与当前线程进行绑定,这样就实现了 TraceSegment、TracingContext 和 线程三方之间的关联。
ContextManager 有三个核心字段:
- CONTEXT(ThreadLocal 类型)
:通过该字段可以将一个 TracingContext 对象与一个线程进行关联。 - RUNTIME_CONTEXT(ThreadLocal 类型)
:RuntimeContext 底层封装了一个 ConcurrentHashMap 集合,可以为当前 TracingContext 记录一些附加信息。 - EXTEND_SERVICE(ContextManagerExtendService 类型):ContextManagerExtendService 也实现了 BootService 接口,它主要负责创建 TracingContext 对象。
虽然 ContextManager 实现了 BootService 接口,但是其 prepare()、boot()、onComplete() 方法都为空实现。ContextManager 提供了与 TracingContext 对应的几乎所有方法,基本实现都是委托给当前线程绑定的 TracingContext 对象,这里以 createEntrySpan() 方法为例进行介绍:
public static AbstractSpan createEntrySpan(String operationName,
ContextCarrier carrier) {
SamplingService samplingService = ServiceManager.INSTANCE
.findService(SamplingService.class); // 采样相关
AbstractSpan span;
AbstractTracerContext context;
// 检测ContextCarrier是否合法,其实就是检查它的核心字段是否已填充好
if (carrier != null && carrier.isValid()) {
samplingService.forceSampled();
// 获取当前线程绑定的TracingContext
context = getOrCreate(operationName, true);
// 委托给当前线程绑定的TracingContext来创建EntrySpan
span = context.createEntrySpan(operationName);
// 从ContextCarrier提取上游服务传播过来的Trace信息
context.extract(carrier);
} else { // 没有上游服务的场景
context = getOrCreate(operationName, false);
span = context.createEntrySpan(operationName);
}
return span;
}
getOrCreate() 方法会从 CONTEXT 字段中获取当前线程绑定的 TracingContext 对象,如果当前线程没有关联 TracingContext 上下文,则会通过 ContextManagerExtendService 新建并绑定。
stopSpan() 方法在关闭 Span 的同时,还会检查当前 TraceSegment 是否结束,TraceSegment 结束时会将存储在 CONTEXT 中的 TracingContext 对象以及 RUNTIME_CONTEXT 中的附加信息一并清除,这也是为了防止内存泄露的一步重要操作。
Context 生成与采样
如果不做任何限制,每个请求都应该生成一条完整的 Trace。在面对海量请求时如果也同时产生海量 Trace,就会给网络和存储带来双倍的压力,浪费很多资源。为了解决这个问题,几乎所有的 Trace 系统都会支持采样的功能。SamplingService 就是用来实现采样功能的 BootService 实现。
SamplingService 的采样逻辑依赖 samplingFactorHolder 字段(AtomicInteger 类型)的自增。ContextManagerExtendService 是负责创建 TracingContext 的 BootService 实现,在 ContextManagerExtendService 创建 TracingContext 时,会调用 SamplingService 的 trySampling() 方法递增 samplingFactorHolder 字段(CAS 操作),当增加到阈值(默认值为 3,可以通过 agent.sample_n_per_3_secs 配置进行修改)时会返回 false,表示采样失败,这时 ContextManagerExtendService 就会生成 IgnoredTracerContext,IgnoredTracerContext 是个空 Context 实现,不会记录 Trace 信息。
另外,SamplingService 中会启动一个定时任务,每秒都会将 samplingFactorHolder 字段清零,这样就实现了每秒采样指定条数的 Trace 数据,如下图所示:
Trace 的收集
这里我们先来回顾一个知识点,当 TracingContext 通过 stopSpan() 方法关闭最后一个 Span 时,会调用 finish() 方法关闭相应的 TraceSegment,与此同时,还会调用 TracingContext.ListenerManager.notifyFinish() 方法通知所有监听 TracingContext 关闭事件的监听器 —— TracingContextListener。TracingContext.finish() 方法的相关实现如下:
private void finish() {
TraceSegment finishedSegment =
segment.finish(isLimitMechanismWorking());
TracingContext.ListenerManager.notifyFinish(finishedSegment);
}
TraceSegmentServiceClient 是 TracingContextListener 接口的唯一实现,其主要功能就是在 TraceSegment 结束时对其进行收集,并发送到后端的 OAP 集群。
下图展示了 TraceSegmentServiceClient 的核心结构:
TraceSegmentServiceClient 底层维护了一个 DataCarrier 对象,其底层 Channels 默认有 5 个 Buffer,每个 Buffer 长度为 300,使用的是 IF_POSSIBLE 阻塞写入策略,底层会启动一个 ConsumerThread 线程。
TraceSegmentServiceClient 作为一个 TracingContextListener 接口的实现,会在 notifyFinish() 方法中,将刚刚结束的 TraceSegment 写入到 DataCarrier 中缓存。同时,TraceSegmentServiceClient 实现了前面介绍的 IConsumer 接口,封装了消费 Channels 中数据的逻辑,在 consume() 方法中会首先将消费到的 TraceSegment 对象序列化,然后通过 gRPC 请求发送到后端 OAP 集群。该过程涉及的 gRPC 接口定义如下:
service TraceSegmentReportService {
rpc collect (stream UpstreamSegment) returns (Commands) {
}
}
该 gRPC 请求中用到的 UpstreamSegment 结构体包含了 Trace ID 以及 TraceSegment 序列化之后的字节数组,定义如下所示:
message UpstreamSegment {
repeated UniqueId globalTraceIds = 1;
bytes segment = 2; // TraceSegment信息
}
这个过程中,TraceSegment 对象会转换成相应的 proto 结构体实例,下图展示了 UpstreamSegment 中包含的具体信息:
既然要发送 gRPC 请求,就必然要依赖网络连接,TraceSegmentServiceClient 实现了 GRPCChannelListener 接口,可以监听底层网络连接的变化情况。在 prepare() 方法中可将其作为 Listener 注册到前文介绍的 GRPCChannelManager 中。
明确了发送 Trace 时的具体数据,以及其涉及的 gRPC 请求和接口定义,我们再来看 consume() 方法的具体实现:
public void consume(List<TraceSegment> data) {
if (CONNECTED.equals(status)) { // 根据底层网络连接的状态决定是否发送
// 创建GRPCStreamServiceStatus对象
final GRPCStreamServiceStatus status =
new GRPCStreamServiceStatus(false);
StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver
= serviceStub.collect(new StreamObserver<Commands>() {
public void onNext(Commands commands) {}
public void onError(Throwable throwable) {
// 发生异常会调用 finished()方法,停止等待
status.finished();
// 通知GRPCChannelManager重新创建网络连接
ServiceManager.INSTANCE.findService(
GRPCChannelManager.class).reportError(throwable);
}
public void onCompleted() {
// 发送成功之后,会调用finished()方法结束等待
status.finished();
}
});
for (TraceSegment segment : data) {
// 将TraceSegment转换成UpstreamSegment对象,然后才能进行序列化以
// 及发送操作transform()方法实现的转换逻辑并不复杂,填充字段而已
UpstreamSegment upstreamSegment = segment.transform();
upstreamSegmentStreamObserver.onNext(upstreamSegment);
}
upstreamSegmentStreamObserver.onCompleted();
status.wait4Finish(); // 等待全部TraceSegment数据发送结束
segmentUplinkedCounter += data.size(); // 统计发送的数据量
} else { // 网络连接断开时,只进行简单统计,数据将被直接抛弃
segmentAbandonedCounter += data.size();
}
printUplinkStatus(); // 每隔 30s打印一下发送日志
}
注意,TraceSegmentServiceClient 在批量发送完 UpstreamSegment 数据之后,会通过 GRPCStreamServiceStatus 进行自旋等待,直至该批 UpstreamSegment 全部发送完毕。
最后总结一下,TraceSegmentServiceClient 同时实现了 BootService、IConsumer、GRPCChannelListener、TracingContextListener 四个接口,如下图所示,这四个接口的实现相互依赖,共同完成 Trace 数据的收集和发送:
总结
本课时我们重点介绍了 Trace 相关的 BootService 接口实现。首先介绍了 ContextManager 的核心实现,理清了它是如何将 TracingContext 与当前线程关联起来的。接下来介绍了 SamplingService 实现客户端 Trace 采样的逻辑。最后介绍了上报 Trace 的 gRPC 接口,深入分析了 TraceSegmentServiceClient 收集和上报 Trace 数据的核心逻辑。
Concern: 第二点是,同样的代码改动,在粗代码粒度下,可能被认定为“修改”;在细代码粒度下,可能又被认定为“扩展”。 意思是在同样的代码改动,不同的代码粒度得出的结论可能不同?
坡爱吃坡: 步骤不全吧
Han_Lin_: 请问博主有相关的虚拟机或者原始数据嘛
weixin_39765413: 请问原文在哪呀
王哈哈哈.: 你好源码有吗