calcite parser代码生成详解#

本文代码均已上传到gitee
calcite的parser代码生成分为如下两个步骤

calcite-parser-code-generate-process

生成Parse.jj#

文件目录如下

1
2
3
4
5
6
7
8
9
10
├── pom.xml
└── src
├── main
│   ├── codegen
│   │   ├── config.fmpp
│   │   ├── includes
│   │   │   ├── compoundIdentifier.ftl
│   │   │   └── parserImpls.ftl
│   │   └── templates
│   │   └── Parser.jj

添加calcite dependency

1
2
3
4
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
</dependency>

配置drill-fmpp-maven-plugin插件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<plugin>
<groupId>org.apache.drill.tools</groupId>
<artifactId>drill-fmpp-maven-plugin</artifactId>
<executions>
<execution>
<configuration>
<config>src/main/codegen/config.fmpp</config>
<output>${project.build.directory}/generated-sources/fmpp</output>
<templates>src/main/codegen/templates</templates>
</configuration>
<id>generate-fmpp-sources</id>
<phase>validate</phase>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>

codegen 模块的文件都拷贝自对应版本的calclite core/src/main/codegen路径 https://github.com/apache/calcite/tree/main/core/src/main/codegen

然后把https://github.com/apache/calcite/blob/main/core/src/main/codegen/default_config.fmpp 中的parser属性与config.fmpp中的parser属性合并。就可以通过mvn package命令生成Parser.jj了。当然,如果有定制化修改的需求,也可以在这个阶段修改config.fmpp

calcite-parser-code-generator-fmpp

Parser.jj生成java代码#

文件目录如下

1
2
3
4
5
├── pom.xml
├── src
│   ├── main
│   │   ├── codegen
│   │   │   └── Parser.jj

Parser.jj就是我们上一步生成的Parser.jj,如果有什么想要的定制化修改,也可以在这个步骤改入到Parser.jj中。

添加calcite dependency

1
2
3
4
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
</dependency>

配置javacc-maven-plugin如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<executions>
<execution>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/codegen</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>

生成代码

calcite-parser-code-generator-javacc

无Parser.jj定制化修改,一步生成#

如果不需要对Parser.jj进行定制化修改,那么可以通过连续运行两个插件来生成代码,这里给出pom文件样例,不再赘述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
<plugin>
<groupId>org.apache.drill.tools</groupId>
<artifactId>drill-fmpp-maven-plugin</artifactId>
<executions>
<execution>
<configuration>
<config>src/main/codegen/config.fmpp</config>
<output>${project.build.directory}/generated-sources/fmpp</output>
<templates>src/main/codegen/templates</templates>
</configuration>
<id>generate-fmpp-sources</id>
<phase>validate</phase>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<executions>
<execution>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/fmpp</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<lookAhead>2</lookAhead>
<isStatic>false</isStatic>
</configuration>
</execution>
<execution>
<id>javacc-test</id>
<phase>generate-test-sources</phase>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-test-sources/fmpp</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-test-sources/javacc</outputDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<isStatic>false</isStatic>
<ignoreCase>true</ignoreCase>
<unicodeInput>true</unicodeInput>
</configuration>
</execution>
</executions>
</plugin>

lvs是做什么的#

lvs通常用做tcp/udp协议的四层负载均衡

lvs-brief

相比也可以用于四层负载的Nginx组件,Lvs因为运行在内核态,性能高是它的主要优势,同样,因为运行在内核态中,无法像Nginx那样,对四层的tls做卸载等动作。

lvs性能相关指标(用户视角)#

客户端的连接数#

  • UDP模式下,按连接超时时间计算(根据业务需求决定)。可通过ipvsadm -l --timeout来查看udp超时时间
  • TCP模式下,即为tcp连接数

客户端请求流量#

即client与lvs、lvs与RS之间交互的流量

客户端请求平均包大小#

即client与lvs、lvs与RS之间的平均包大小

lvs性能相关参数#

会话超时时间#

查看#

1
ipvsadm -l --timeout

修改#

1
ipvsadm --set ${tcptimeout} ${tcpfintimeout} ${udptimeout}

vm conntrack最大个数#

查看#

1
sysctl -a |grep net.netfilter.nf_conntrack_max

查看当前nf_conntrack个数#

1
2
3
4
# 方式一
conntrack -C
# 方式二
cat /proc/net/nf_conntrack | wc -l

修改#

1
sysctl -w net.netfilter.nf_conntrack_max=1024

hashsize#

什么是hashsize#

hashsize也就是nf_conntrack_buckets,如果不手动指定。linux会根据机器的内存计算。如果要支持海量的nf_conntrack,则可以适当调大。

1
2
3
4
5
6
7
8
9
10
11
12
  // nf_conntrack_core.c
nf_conntrack_htable_size
= (((nr_pages << PAGE_SHIFT) / 16384)
/ sizeof(struct hlist_head));
if (BITS_PER_LONG >= 64 &&
nr_pages > (4 * (1024 * 1024 * 1024 / PAGE_SIZE)))
nf_conntrack_htable_size = 262144;
else if (nr_pages > (1024 * 1024 * 1024 / PAGE_SIZE))
nf_conntrack_htable_size = 65536;

if (nf_conntrack_htable_size < 1024)
nf_conntrack_htable_size = 1024;

hlist_head的大小在64位的机器下大小为16

查看#

1
cat /sys/module/nf_conntrack/parameters/hashsize

修改 (方式一)#

1
echo 65536 > /sys/module/nf_conntrack/parameters/hashsize

修改(方式二)永久生效#

1
2
3
4
5
6
# exmaple file, you can modify this config if exists. File name doesn't matter.
# 样例文件,你可以修改已存在的这个文件。文件名称并不重要。
touch /etc/modprobe.d/lvs.conf
echo "options nf_conntrack hashsize=65536" >> /etc/modprobe.d/lvs.conf
# then you need reboot
# 需要重试来使配置生效

文件句柄数#

查看#

1
ulimit -n

修改#

不同的linux发行版,修改方式不太一样,以RedHat为例

1
2
num=`ulimit -n`
sed -i "s|$num|65536|g" /etc/security/limits.d/*-nofile.conf

lvs性能瓶颈#

虚拟机内存#

contnrack使用slab分配内存,可以通过slabtop命令查看nf_conntrack模块占用的内存。当连接数较高时,Lvs的内存瓶颈在于会话管理。

conntrack最大理论内存占用为

1
max_mem_used = conntrack * max * sizeof (struct nf_conntrack) + conntrack_buckets * sizeof (struct list_head)

使用如下python代码计算

1
2
3
4
5
6
7
8
9
10
import ctypes

# 这个是nf_conntrack的动态库所在路径
# libnetfilter git地址 git://git.netfilter.org/libnetfilter_conntrack
LIBNETFILTER_CONNTRACK = '/usr/lib/aarch64-linux-gnu/libnetfilter_conntrack.so.3.7.0'
nfct = ctypes.CDLL(LIBNETFILTER_CONNTRACK)
print("max size of struct nf_conntrack:")
print(nfct.nfct_maxsize())
print("sizeof(struct list_head):")
print(ctypes.sizeof(ctypes.c_void_p) * 2)

其中nfct_maxsize出自于git://git.netfilter.org/libnetfilter_conntrack中的src/conntrack/api.c

1
2
3
/**
* nfct_maxsize - return the maximum size in bytes of a conntrack object
*/

在如下操作系统下

1
2
uname -a
> Linux primary 5.4.0-122-generic #138-Ubuntu SMP Wed Jun 22 15:05:39 UTC 2022 aarch64 aarch64 aarch64 GNU/Linux

以100万conntrack_max,65536buckets为例,占用的内存为

1_000_000 * 392 + 65536 * 16 约等于 373.84 + 1 为374M内存

网卡流量#

最大进出带宽。在云上,通常由云厂商限制。如果你将lvs上面的浮动Ip通过EIP的方式暴露出去(这很常见),还需要考虑EIP自身的带宽

网卡进出包个数(PPS)#

最大进出包个数

虚拟机能支持的最大网络连接数#

ECS上可以支持的最大网络连接数。在云上,通常由云厂商限制

Lvs监控&扩容#

cpu使用率#

可在超过百分之80的时候告警。处理方式:

  • 如果内存还没有到达瓶颈,可以通过扩大hashsize的方式,降低hash链上元素的个数,减少匹配消耗的cpu
  • 如果内存水位也较高。对CPU进行扩容

内存使用率#

可在超过内存容量百分之80的时候告警。处理方式:扩容内存

conntrack个数#

通过conntrack -Ccat /proc/net/nf_conntrack | wc -l, 定期进行统计,使用sysctl -w net.netfilter.nf_conntrack_max进行扩容

网卡流量、网卡进出包个数#

可以利用云厂商的监控或nicstat命令查看。处理方式:扩容网卡

最大网络连接数#

可以利用云厂商的监控或netstat -an|egrep "tcp|udp"|grep -v "LISTEN"|wc -lss -tun state all | grep -v LISTEN | wc -l查看。处理方式:扩容ECS规格

EIP带宽#

通过云厂商的指标来监控。处理方式,扩容EIP的BGP带宽

prometheus 磁盘布局#

采集到的数据每两个小时形成一个block。每个block由一个目录组成,并存放在data路径下。该目录包含一个包含该时间窗口的所有时间序列样本的块子目录、一个元数据文件和一个索引文件(将metric_name和label索引到目录下的时间序列)。 chunks 目录中的样本默认组合成一个或多个段文件,每个段文件最大为 512MB。 当通过 API 删除系列时,删除记录存储在单独的 tombstone 文件中(而不是立即从块段中删除数据)。

当前正在写入的块保存在内存中,没有完全持久化。通过WAL日志来防止崩溃丢失数据。预写日志分为数节(segments)保存在wal文件夹中。这些文件包含尚未压缩的原始数据; 因此它们比常规块文件大得多。 Prometheus 将至少保留三个预写日志文件。在高流量下,会保留三个以上的 WAL 文件,以便保留至少两个小时的原始数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
./data
├── 01BKGV7JBM69T2G1BGBGM6KB12
│ └── meta.json
├── 01BKGTZQ1SYQJTR4PB43C8PD98
│ ├── chunks
│ │ └── 000001
│ ├── tombstones
│ ├── index
│ └── meta.json
├── 01BKGTZQ1HHWHV8FBJXW1Y3W0K
│ └── meta.json
├── 01BKGV7JC0RY8A6MACW02A2PJD
│ ├── chunks
│ │ └── 000001
│ ├── tombstones
│ ├── index
│ └── meta.json
├── chunks_head
│ └── 000001
└── wal
├── 000000002
└── checkpoint.00000001
└── 00000000

prometheus概念#

  • Label: 标签,string格式的kv组合
  • series: 时间序列,label的组合
  • chunk: 时间,value的数据

prometheus索引格式#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
┌────────────────────────────┬─────────────────────┐
│ magic(0xBAAAD700) <4b> │ version(1) <1 byte> │
├────────────────────────────┴─────────────────────┤
│ ┌──────────────────────────────────────────────┐ │
│ │ Symbol Table │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Series │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Postings 1 │ │
│ ├──────────────────────────────────────────────┤ │
│ │ ... │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Postings N │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Postings Offset Table │ │
│ ├──────────────────────────────────────────────┤ │
│ │ TOC │ │
│ └──────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────┘

写入索引时,可以在上面列出的主要部分之间添加任意数量的0字节作为填充。顺序扫描文件时,必须跳过部分间的任意0字节。

下面描述的大部分部分都以 len 字段开头。 它总是指定就在尾随 CRC32 校验和之前的字节数。 校验和就计算这些字节的校验和(不包含len字段)

符号表#

符号表包含已存储序列的标签对中出现的重复数据删除字符串的排序列表。 它们可以从后续部分中引用,并显着减少总索引大小。

该部分包含一系列字符串entry,每个entry都以字符串的原始字节长度为前缀。 所有字符串均采用 utf-8 编码。 字符串由顺序索引引用。 字符串按字典顺序升序排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
┌────────────────────┬─────────────────────┐
│ len <4b> │ #symbols <4b> │
├────────────────────┴─────────────────────┤
│ ┌──────────────────────┬───────────────┐ │
│ │ len(str_1) <uvarint> │ str_1 <bytes> │ │
│ ├──────────────────────┴───────────────┤ │
│ │ . . . │ │
│ ├──────────────────────┬───────────────┤ │
│ │ len(str_n) <uvarint> │ str_n <bytes> │ │
│ └──────────────────────┴───────────────┘ │
├──────────────────────────────────────────┤
│ CRC32 <4b> │
└──────────────────────────────────────────┘

序列 series#

保存一个具体的时间序列,其中包含系列的label集合和block中的chunks。

每个series都是16字节对齐。series的id为偏移量除以16。series ID 的排序列表也就是series label的字典排序列表。

1
2
3
4
5
6
7
8
9
┌───────────────────────────────────────┐
│ ┌───────────────────────────────────┐ │
│ │ series_1 │ │
│ ├───────────────────────────────────┤ │
│ │ . . . │ │
│ ├───────────────────────────────────┤ │
│ │ series_n │ │
│ └───────────────────────────────────┘ │
└───────────────────────────────────────┘

每一个series先保存label的数量,然后是包含label键值对的引用。 标签对按字典顺序排序。然后是series涉及的索引块的个数,然后是一系列元数据条目,其中包含块的最小 (mint) 和最大 (maxt) 时间戳以及对其在块文件中位置的引用。mint 是第一个样本的时间,maxt 是块中最后一个样本的时间。 在索引中保存时间范围数据, 允许按照时间范围删除数据时,如果时间范围匹配,不需要直接访问时间数据。

空间大小优化: 第一个块的 mint 被存储,它的 maxt 被存储为一个增量,并且 mintmaxt 被编码为后续块的前一个时间的增量。 类似的,第一个chunk的引用被存储,下一个引用被存储为前一个chunk的增量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
┌──────────────────────────────────────────────────────────────────────────┐
│ len <uvarint> │
├──────────────────────────────────────────────────────────────────────────┤
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ labels count <uvarint64> │ │
│ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ ref(l_i.name) <uvarint32> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ ref(l_i.value) <uvarint32> │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ ... │ │
│ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ chunks count <uvarint64> │ │
│ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ c_0.mint <varint64> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ c_0.maxt - c_0.mint <uvarint64> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ ref(c_0.data) <uvarint64> │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ c_i.mint - c_i-1.maxt <uvarint64> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ c_i.maxt - c_i.mint <uvarint64> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ ref(c_i.data) - ref(c_i-1.data) <varint64> │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ ... │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
├──────────────────────────────────────────────────────────────────────────┤
│ CRC32 <4b> │
└──────────────────────────────────────────────────────────────────────────┘

Posting#

Posting这一节存放着关于series引用的单调递增列表,简单来说就是存放id和时间序列的对应关系

1
2
3
4
5
6
7
8
9
10
11
12
13
┌────────────────────┬────────────────────┐
│ len <4b> │ #entries <4b> │
├────────────────────┴────────────────────┤
│ ┌─────────────────────────────────────┐ │
│ │ ref(series_1) <4b> │ │
│ ├─────────────────────────────────────┤ │
│ │ ... │ │
│ ├─────────────────────────────────────┤ │
│ │ ref(series_n) <4b> │ │
│ └─────────────────────────────────────┘ │
├─────────────────────────────────────────┤
│ CRC32 <4b> │
└─────────────────────────────────────────┘

Posting sections的顺序由postings offset table决定。

Posting Offset Table#

postings offset table包含着一系列posting offset entry,根据label的名称和值排序。每一个posting offset entry存放着label的键值对以及在posting sections中其series列表的偏移量。用来跟踪posting sections。当index文件加载时,它们将部分加载到内存中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌─────────────────────┬──────────────────────┐
│ len <4b> │ #entries <4b> │
├─────────────────────┴──────────────────────┤
│ ┌────────────────────────────────────────┐ │
│ │ n = 2 <1b> │ │
│ ├──────────────────────┬─────────────────┤ │
│ │ len(name) <uvarint> │ name <bytes> │ │
│ ├──────────────────────┼─────────────────┤ │
│ │ len(value) <uvarint> │ value <bytes> │ │
│ ├──────────────────────┴─────────────────┤ │
│ │ offset <uvarint64> │ │
│ └────────────────────────────────────────┘ │
│ . . . │
├────────────────────────────────────────────┤
│ CRC32 <4b> │
└────────────────────────────────────────────┘

TOC#

table of contents是整个索引的入口点,并指向文件中的各个部分。 如果引用为零,则表示相应的部分不存在,查找时应返回空结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
┌─────────────────────────────────────────┐
│ ref(symbols) <8b> │
├─────────────────────────────────────────┤
│ ref(series) <8b> │
├─────────────────────────────────────────┤
│ ref(label indices start) <8b> │
├─────────────────────────────────────────┤
│ ref(label offset table) <8b> │
├─────────────────────────────────────────┤
│ ref(postings start) <8b> │
├─────────────────────────────────────────┤
│ ref(postings offset table) <8b> │
├─────────────────────────────────────────┤
│ CRC32 <4b> │
└─────────────────────────────────────────┘

chunks 磁盘格式#

chunks文件创建在block中的chunks/目录中。 每个段文件的最大大小为 512MB。
文件中的chunk由uint64的索引组织,索引低四位为文件内偏移,高四位为段序列号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
┌──────────────────────────────┐
│ magic(0x85BD40DD) <4 byte> │
├──────────────────────────────┤
│ version(1) <1 byte> │
├──────────────────────────────┤
│ padding(0) <3 byte> │
├──────────────────────────────┤
│ ┌──────────────────────────┐ │
│ │ Chunk 1 │ │
│ ├──────────────────────────┤ │
│ │ ... │ │
│ ├──────────────────────────┤ │
│ │ Chunk N │ │
│ └──────────────────────────┘ │
└──────────────────────────────┘

chunks中的Chunk格式#

1
2
3
┌───────────────┬───────────────────┬──────────────┬────────────────┐
│ len <uvarint> │ encoding <1 byte> │ data <bytes> │ CRC32 <4 byte> │
└───────────────┴───────────────────┴──────────────┴────────────────┘

查询数据#

code#

查询的prometheus方法签名

1
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet

支持从block中,remote等各种地方查询获取数据

prometheus会在内存中维护一个数据结构

1
2
3
// Map of LabelName to a list of some LabelValues's position in the offset table.
// The first and last values for each name are always present.
postings map[string][]postingOffset

在内存中,保留每个label name,并且每n个保存label值,降低内存的占用。但是第一个和最后一个值总是保存在内存中。

查询数据流程#

prometheus-tsdb-index

参考资料#

什么是sidecar?#

kubernetes-sidecar-what-is

sidecar,直译为边车。 如上图所示,边车就是加装在摩托车旁来达到拓展功能的目的,比如行驶更加稳定,可以拉更多的人和货物,坐在边车上的人可以给驾驶员指路等。边车模式通过给应用服务加装一个“边车”来达到控制逻辑的分离的目的。

对于微服务来讲,我们可以用边车模式来做诸如 日志收集、服务注册、服务发现、限流、鉴权等不需要业务服务实现的控制面板能力。通常和边车模式比较的就是像spring-cloud那样的sdk模式,像上面提到的这些能力都通过sdk实现。

kubernetes-sidecar-what-can-do

这两种实现模式各有优劣,sidecar模式会引入额外的性能损耗以及延时,但传统的sdk模式会让代码变得臃肿并且升级复杂,控制面能力和业务面能力不能分开升级。

本文的代码已经上传到gitee

sidecar 实现原理#

介绍了sidecar的诸多功能,但是,sidecar是如何做到这些能力的呢?

原来,在kubernetes中,一个pod是部署的最小单元,但一个pod里面,允许运行多个container(容器),多个container(容器)之间共享存储卷和网络栈。这样子,我们就可以多container来做sidecar,或者init-container(初始化容器)来调整挂载卷的权限

kubernetes-sidecar-inside

日志收集sidecar#

日志收集sidecar的原理是利用多个container间可以共用挂载卷的原理实现的,通过将应用程序的日志路径挂出,用另一个程序访问路径下的日志来实现日志收集,这里用cat来替代了日志收集,部署yaml模板如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
apiVersion: v1
kind: Pod
metadata:
name: webserver
spec:
volumes:
- name: shared-logs
emptyDir: {}

containers:
- name: nginx
image: ttbb/nginx:mate
volumeMounts:
- name: shared-logs
mountPath: /opt/sh/openresty/nginx/logs

- name: sidecar-container
image: ttbb/base
command: ["sh","-c","while true; do cat /opt/sh/openresty/nginx/logs/nginx.pid; sleep 30; done"]
volumeMounts:
- name: shared-logs
mountPath: /opt/sh/openresty/nginx/logs

使用kubectl create -f 创建pod,通过kubectl logs命令就可以看到sidecar-container打印的日志输出

1
kubectl logs webserver sidecar-container

转发请求sidecar#

这一节我们来实现,一个给应用程序转发请求的sidecar,应用程序代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
use std::io::prelude::*;
use std::net::{TcpListener, TcpStream};

fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

for stream in listener.incoming() {
let stream = stream.unwrap();

handle_connection(stream);
}
println!("Hello, world!");
}

fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];

stream.read(&mut buffer).unwrap();

let contents = "Hello";

let response = format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
contents.len(),
contents
);

println!("receive a request!");
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}

我们再来写一个sidecar,它会每15秒向应用程序发出请求

1
2
3
4
5
6
7
8
9
10
use std::thread;
use std::time::Duration;

fn main() {
loop {
thread::sleep(Duration::from_secs(15));
let response = reqwest::blocking::get("http://localhost:7878").unwrap();
println!("{}", response.text().unwrap())
}
}

通过仓库下的intput/build.sh脚本构造镜像,运行yaml如下

1
2
3
4
5
6
7
8
9
10
11
apiVersion: v1
kind: Pod
metadata:
name: webserver
spec:
containers:
- name: input-server
image: sidecar-examples:input-http-server

- name: input-sidecar
image: sidecar-examples:sidecar-input

通过查看kubectl logs input input-http-server可以看到input-http-server收到了请求

1
2
receive a request!
receive a request!

拦截请求sidecar#

应用程序代码,它会每15s向localhost发出请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.shoothzj.sidecar

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}

object HttpClient {
def main(args: Array[String]): Unit = {
while (true) {
Thread.sleep(15_000L)
implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "SingleRequest")
// needed for the future flatMap/onComplete in the end
implicit val executionContext: ExecutionContextExecutor = system.executionContext

val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://localhost:7979/hello"))

responseFuture
.onComplete {
case Success(res) => println(res)
case Failure(_) => sys.error("something wrong")
}
}
}
}

我们再来写一个sidecar,它会拦截http请求并打印日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.shoothzj.sidecar

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._

import scala.concurrent.ExecutionContextExecutor
import scala.io.StdIn

object HttpServer {

def main(args: Array[String]): Unit = {

implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "my-system")
// needed for the future flatMap/onComplete in the end
implicit val executionContext: ExecutionContextExecutor = system.executionContext

val route =
path("hello") {
get {
println("receive a request")
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
}
}

val bindingFuture = Http().newServerAt("localhost", 7979).bind(route)
while (true) {
Thread.sleep(15_000L)
}
}
}

通过仓库下的output/build.sh脚本构造镜像,运行yaml如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
apiVersion: v1
kind: Pod
metadata:
name: output
spec:
volumes:
- name: shared-logs
emptyDir: {}

containers:
- name: output-workload
image: sidecar-examples:output-workload
imagePullPolicy: Never

- name: sidecar-output
image: sidecar-examples:sidecar-output
imagePullPolicy: Never

通过查看kubectl logs output output-workload可以看到output-sidecar收到了请求

1
2
3
4
5
6
7
8
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:15:47 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:16:02 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:16:17 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:16:32 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:16:47 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:17:02 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:17:17 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:17:32 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))

maven checkstyle#

添加maven plugin依赖#

1
2
3
4
5
6
7
8
9
10
11
12
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.2</version>
<configuration>
<configLocation>config/checkstyle.xml</configLocation>
<suppressionsLocation>config/suppressions.xml</suppressionsLocation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<encoding>UTF-8</encoding>
<excludes>**/proto/*</excludes>
</configuration>
</plugin>
  • configLocation 存放checkstyle的规则配置文件,附录有样例内容
  • SuppressionsLocation 存放屏蔽规则配置文件,附录有样例内容
  • includeTestSourceDirectory 是否检测测试文件夹,建议配置为true

maven-lint-checkstyle-config

结束#

最后就可以通过mvn checkstyle:check来检查您的工程啦。如果有违反了checkstyle的地方,命令行会提示出错的地方和违反的规则,如下图所示

maven-lint-checkstyle-fail1

maven-lint-checkstyle-fail2

附录#

规则配置文件举例#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE module PUBLIC
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
"https://checkstyle.org/dtds/configuration_1_3.dtd">

<!-- This is a checkstyle configuration file. For descriptions of
what the following rules do, please see the checkstyle configuration
page at http://checkstyle.sourceforge.net/config.html -->
<module name="Checker">
<module name="FileTabCharacter">
<!-- Checks that there are no tab characters in the file. -->
</module>

<!-- All Java AST specific tests live under TreeWalker module. -->
<module name="TreeWalker">

<module name="SuppressionCommentFilter">
<property name="offCommentFormat" value="CHECKSTYLE.OFF\: ([\w\|]+)"/>
<property name="onCommentFormat" value="CHECKSTYLE.ON\: ([\w\|]+)"/>
<property name="checkFormat" value="$1"/>
</module>

<module name="SuppressWarningsHolder" />

<!--

IMPORT CHECKS

-->

<module name="RedundantImport">
<!-- Checks for redundant import statements. -->
<property name="severity" value="error"/>
<message key="import.redundancy"
value="Redundant import {0}."/>
</module>

<module name="AvoidStarImport">
<property name="severity" value="error"/>
</module>

<module name="RedundantModifier">
<!-- Checks for redundant modifiers on various symbol definitions.
See: http://checkstyle.sourceforge.net/config_modifier.html#RedundantModifier
-->
<property name="tokens" value="METHOD_DEF, VARIABLE_DEF, ANNOTATION_FIELD_DEF, INTERFACE_DEF, CLASS_DEF, ENUM_DEF"/>
</module>

<!--
IllegalImport cannot blacklist classes, and c.g.api.client.util is used for some shaded
code and some useful code. So we need to fall back to Regexp.
-->
<module name="RegexpSinglelineJava">
<property name="format" value="com\.google\.api\.client\.util\.(ByteStreams|Charsets|Collections2|Joiner|Lists|Maps|Objects|Preconditions|Sets|Strings|Throwables)"/>
</module>

<!--
Require static importing from Preconditions.
-->
<module name="RegexpSinglelineJava">
<property name="format" value="^import com.google.common.base.Preconditions;$"/>
<property name="message" value="Static import functions from Guava Preconditions"/>
</module>

<module name="UnusedImports">
<property name="severity" value="error"/>
<property name="processJavadoc" value="true"/>
<message key="import.unused"
value="Unused import: {0}."/>
</module>

<!--

JAVADOC CHECKS

-->

<!-- Checks for Javadoc comments. -->
<!-- See http://checkstyle.sf.net/config_javadoc.html -->
<module name="JavadocMethod">
<property name="scope" value="protected"/>
<property name="severity" value="error"/>
<property name="allowMissingParamTags" value="true"/>
<property name="allowMissingReturnTag" value="true"/>
</module>

<!-- Check that paragraph tags are used correctly in Javadoc. -->
<!-- <module name="JavadocParagraph"/>-->

<module name="JavadocType">
<property name="scope" value="protected"/>
<property name="severity" value="error"/>
<property name="allowMissingParamTags" value="true"/>
</module>

<module name="JavadocStyle">
<property name="severity" value="error"/>
<property name="checkHtml" value="true"/>
</module>

<!--

NAMING CHECKS

-->

<!-- Item 38 - Adhere to generally accepted naming conventions -->

<module name="PackageName">
<!-- Validates identifiers for package names against the
supplied expression. -->
<!-- Here the default checkstyle rule restricts package name parts to
seven characters, this is not in line with common practice at Google.
-->
<property name="format" value="^[a-z]+(\.[a-z][a-z0-9]{1,})*$"/>
<property name="severity" value="error"/>
</module>

<module name="TypeNameCheck">
<!-- Validates static, final fields against the
expression "^[A-Z][a-zA-Z0-9]*$". -->
<metadata name="altname" value="TypeName"/>
<property name="severity" value="error"/>
</module>

<module name="ConstantNameCheck">
<!-- Validates non-private, static, final fields against the supplied
public/package final fields "^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$". -->
<metadata name="altname" value="ConstantName"/>
<property name="applyToPublic" value="true"/>
<property name="applyToProtected" value="true"/>
<property name="applyToPackage" value="true"/>
<property name="applyToPrivate" value="false"/>
<property name="format" value="^([A-Z][A-Za-z0-9_]*|FLAG_.*)$"/>
<message key="name.invalidPattern"
value="Variable ''{0}'' should be in ALL_CAPS (if it is a constant) or be private (otherwise)."/>
<property name="severity" value="error"/>
</module>

<module name="StaticVariableNameCheck">
<!-- Validates static, non-final fields against the supplied
expression "^[a-z][a-zA-Z0-9]*_?$". -->
<metadata name="altname" value="StaticVariableName"/>
<property name="applyToPublic" value="true"/>
<property name="applyToProtected" value="true"/>
<property name="applyToPackage" value="true"/>
<property name="applyToPrivate" value="true"/>
<property name="format" value="^[a-z][a-zA-Z0-9]*_?$"/>
<property name="severity" value="error"/>
</module>

<module name="MemberNameCheck">
<!-- Validates non-static members against the supplied expression. -->
<metadata name="altname" value="MemberName"/>
<property name="applyToPublic" value="true"/>
<property name="applyToProtected" value="true"/>
<property name="applyToPackage" value="true"/>
<property name="applyToPrivate" value="true"/>
<property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
<property name="severity" value="error"/>
</module>

<module name="MethodNameCheck">
<!-- Validates identifiers for method names. -->
<metadata name="altname" value="MethodName"/>
<property name="format" value="(^[a-z][a-zA-Z0-9]*(_[a-zA-Z0-9]+)*$|Void)"/>
<property name="severity" value="error"/>
</module>

<module name="ParameterName">
<!-- Validates identifiers for method parameters against the
expression "^[a-z][a-zA-Z0-9]*$". -->
<property name="severity" value="error"/>
</module>

<module name="LocalFinalVariableName">
<!-- Validates identifiers for local final variables against the
expression "^[a-z][a-zA-Z0-9]*$". -->
<property name="severity" value="error"/>
</module>

<module name="LocalVariableName">
<!-- Validates identifiers for local variables against the
expression "^[a-z][a-zA-Z0-9]*$". -->
<property name="severity" value="error"/>
</module>

<!-- Type parameters must be either one of the four blessed letters
T, K, V, W, X or else be capital-case terminated with a T,
such as MyGenericParameterT -->
<module name="ClassTypeParameterName">
<property name="format" value="^(((T|K|V|W|X|R)[0-9]*)|([A-Z][a-z][a-zA-Z]*))$"/>
<property name="severity" value="error"/>
</module>

<module name="MethodTypeParameterName">
<property name="format" value="^(((T|K|V|W|X|R)[0-9]*)|([A-Z][a-z][a-zA-Z]*T))$"/>
<property name="severity" value="error"/>
</module>

<module name="InterfaceTypeParameterName">
<property name="format" value="^(((T|K|V|W|X|R)[0-9]*)|([A-Z][a-z][a-zA-Z]*T))$"/>
<property name="severity" value="error"/>
</module>

<module name="LeftCurly">
<!-- Checks for placement of the left curly brace ('{'). -->
<property name="severity" value="error"/>
</module>

<module name="RightCurly">
<!-- Checks right curlies on CATCH, ELSE, and TRY blocks are on
the same line. e.g., the following example is fine:
<pre>
if {
...
} else
</pre>
-->
<!-- This next example is not fine:
<pre>
if {
...
}
else
</pre>
-->
<property name="option" value="same"/>
<property name="severity" value="error"/>
</module>

<!-- Checks for braces around if and else blocks -->
<module name="NeedBraces">
<property name="severity" value="error"/>
<property name="tokens" value="LITERAL_IF, LITERAL_ELSE, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO"/>
</module>

<module name="UpperEll">
<!-- Checks that long constants are defined with an upper ell.-->
<property name="severity" value="error"/>
</module>

<module name="FallThrough">
<!-- Warn about falling through to the next case statement. Similar to
javac -Xlint:fallthrough, but the check is suppressed if a single-line comment
on the last non-blank line preceding the fallen-into case contains 'fall through' (or
some other variants that we don't publicized to promote consistency).
-->
<property name="reliefPattern"
value="fall through|Fall through|fallthru|Fallthru|falls through|Falls through|fallthrough|Fallthrough|No break|NO break|no break|continue on"/>
<property name="severity" value="error"/>
</module>

<!-- Checks for over-complicated boolean expressions. -->
<module name="SimplifyBooleanExpression"/>

<!-- Detects empty statements (standalone ";" semicolon). -->
<module name="EmptyStatement"/>

<!--

WHITESPACE CHECKS

-->

<module name="WhitespaceAround">
<!-- Checks that various tokens are surrounded by whitespace.
This includes most binary operators and keywords followed
by regular or curly braces.
-->
<property name="tokens" value="ASSIGN, BAND, BAND_ASSIGN, BOR,
BOR_ASSIGN, BSR, BSR_ASSIGN, BXOR, BXOR_ASSIGN, COLON, DIV, DIV_ASSIGN,
EQUAL, GE, GT, LAND, LE, LITERAL_CATCH, LITERAL_DO, LITERAL_ELSE,
LITERAL_FINALLY, LITERAL_FOR, LITERAL_IF, LITERAL_RETURN,
LITERAL_SYNCHRONIZED, LITERAL_TRY, LITERAL_WHILE, LOR, LT, MINUS,
MINUS_ASSIGN, MOD, MOD_ASSIGN, NOT_EQUAL, PLUS, PLUS_ASSIGN, QUESTION,
SL, SL_ASSIGN, SR_ASSIGN, STAR, STAR_ASSIGN"/>
<property name="severity" value="error"/>
</module>

<module name="WhitespaceAfter">
<!-- Checks that commas, semicolons and typecasts are followed by
whitespace.
-->
<property name="tokens" value="COMMA, SEMI, TYPECAST"/>
</module>

<module name="NoWhitespaceAfter">
<!-- Checks that there is no whitespace after various unary operators.
Linebreaks are allowed.
-->
<property name="tokens" value="BNOT, DEC, DOT, INC, LNOT, UNARY_MINUS,
UNARY_PLUS"/>
<property name="allowLineBreaks" value="true"/>
<property name="severity" value="error"/>
</module>

<module name="NoWhitespaceBefore">
<!-- Checks that there is no whitespace before various unary operators.
Linebreaks are allowed.
-->
<property name="tokens" value="SEMI, DOT, POST_DEC, POST_INC"/>
<property name="allowLineBreaks" value="true"/>
<property name="severity" value="error"/>
</module>

<module name="OperatorWrap">
<!-- Checks that operators like + and ? appear at newlines rather than
at the end of the previous line.
-->
<property name="option" value="NL"/>
<property name="tokens" value="BAND, BOR, BSR, BXOR, DIV, EQUAL,
GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD,
NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR "/>
</module>

<module name="OperatorWrap">
<!-- Checks that assignment operators are at the end of the line. -->
<property name="option" value="eol"/>
<property name="tokens" value="ASSIGN"/>
</module>

<module name="ParenPad">
<!-- Checks that there is no whitespace before close parens or after
open parens.
-->
<property name="severity" value="error"/>
</module>

<module name="ModifierOrder"/>

</module>
</module>

屏蔽规则配置文件举例#

1
2
3
4
5
6
7
8
9
10
11
<?xml version="1.0"?>
<!DOCTYPE suppressions PUBLIC
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
"https://checkstyle.org/dtds/configuration_1_3.dtd">

<suppressions>
<!-- suppress all checks in the generated directories -->
<suppress checks=".*" files=".+[\\/]generated[\\/].+\.java"/>
<suppress checks=".*" files=".+[\\/]generated-sources[\\/].+\.java"/>
<suppress checks=".*" files=".+[\\/]generated-test-sources[\\/].+\.java"/>
</suppressions>

maven dependency-check#

引入dependnecy-check插件#

项目中原有的依赖是这样的

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.41.Final</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<plugin>
<groupId>org.owasp</groupId>
<artifactId>dependency-check-maven</artifactId>
<version>${dependency-check-maven.version}</version>
<configuration>
<suppressionFiles>
<suppressionFile>src/owasp-dependency-check-suppressions.xml</suppressionFile>
</suppressionFiles>
<failBuildOnCVSS>7</failBuildOnCVSS>
<msbuildAnalyzerEnabled>false</msbuildAnalyzerEnabled>
<nodeAnalyzerEnabled>false</nodeAnalyzerEnabled>
<yarnAuditAnalyzerEnabled>false</yarnAuditAnalyzerEnabled>
<pyDistributionAnalyzerEnabled>false</pyDistributionAnalyzerEnabled>
<pyPackageAnalyzerEnabled>false</pyPackageAnalyzerEnabled>
<pipAnalyzerEnabled>false</pipAnalyzerEnabled>
<pipfileAnalyzerEnabled>false</pipfileAnalyzerEnabled>
<retireJsAnalyzerEnabled>false</retireJsAnalyzerEnabled>
<msbuildAnalyzerEnabled>false</msbuildAnalyzerEnabled>
<mixAuditAnalyzerEnabled>false</mixAuditAnalyzerEnabled>
<nugetconfAnalyzerEnabled>false</nugetconfAnalyzerEnabled>
<assemblyAnalyzerEnabled>false</assemblyAnalyzerEnabled>
<skipSystemScope>true</skipSystemScope>
</configuration>
<executions>
<execution>
<goals>
<goal>aggregate</goal>
</goals>
</execution>
</executions>
</plugin>

然后可以通过mvn clean install verify -DskipTests来检测。这个demo下,会输出

1
2
3
4
5
[ERROR] One or more dependencies were identified with vulnerabilities that have a CVSS score greater than or equal to '7.0': 
[ERROR]
[ERROR] netty-all-4.1.41.Final.jar: CVE-2019-16869(7.5), CVE-2021-37136(7.5), CVE-2020-11612(7.5), CVE-2021-37137(7.5), CVE-2019-20445(9.1), CVE-2019-20444(9.1), CVE-2020-7238(7.5)
[ERROR]
[ERROR] See the dependency-check report for more details.

实际使用时,由于dependency-check检查相对耗时,一般通过单独的profile来控制开关

屏蔽CVE漏洞#

如果出现dependency-check误报或者是评估该漏洞不涉及,可以通过supression file来屏蔽

屏蔽单一CVE漏洞#

1
2
3
4
5
6
7
<suppress>
<notes><![CDATA[
file name: zookeeper-prometheus-metrics-3.8.0.jar
]]></notes>
<sha1>849e8ece2845cb0185d721233906d487a7f1e4cf</sha1>
<cve>CVE-2021-29425</cve>
</suppress>

通过文件正则来屏蔽CVE漏洞#

1
2
3
4
5
<suppress>
<notes>CVE-2011-1797 FP, see https://github.com/jeremylong/DependencyCheck/issues/4154</notes>
<filePath regex="true">.*netty-tcnative-boringssl-static.*\.jar</filePath>
<cve>CVE-2011-1797g</cve>
</suppress>

本篇文章探讨镜像仓库registry的高可用

镜像仓库高可用无单点故障涉及那些场景#

镜像仓库对外提供访问无单点故障#

镜像仓库对外提供的访问点保持高可用

镜像仓库的数据存储高可用#

存储在镜像仓库中的数据都得是高可用的

镜像仓库无单点故障技术关键点#

镜像仓库对外提供访问无单点故障#

和上一篇文章一样,如果IaaS能提供ELB,我们最好是使用ELB,或者使用浮动IP的方式替换

镜像仓库的数据存储高可用#

  • 配置镜像仓库使用IaaS的S3存储
  • 配置镜像仓库使用本地存储,通过共享文件路径存储来实现高可用,如Glusterfs
  • 配置镜像仓库使用S3存储,自建兼容S3 API的存储Server

通常会使用共享存储来做到镜像仓库存储的高可用

方案概述#

那么其实镜像仓库的高可用方案就是对上面方案的组合,下面我们举几个例子

镜像仓库依赖组件部署方式#

MinIoKeepAlivedregistry都推荐使用容器部署,方便运维管理,但是镜像推荐内置到虚拟机中,不依赖镜像仓库或其他组件,避免循环依赖

使用IaaS的S3存储 + 负载均衡组件#

这是最简单的方案,得益于云厂商提供的S3存储和负载均衡组件,我们可以进行很简单的配置,并部署一台以上的registry,如下图所示

kubernetes-registry-ha-s3

自建兼容S3存储 + KeepAlived浮动Ip#

我们可以自己搭建MinIo集群来作为兼容S3存储,由于MINIO最低部署4个节点,我们需要根据故障域机器来选择部署MINIO的数目,比如,故障域是三台物理机,我们部署4节点就不妥。原因是,4节点,总会有一台物理机上会部署2个minio节点,如果这台物理机挂掉,就会导致单点故障。所以,如果故障域为三台物理机,我们最好部署6节点,可容忍一台物理机宕机。其他的节点,读者也可以自行测算。

下图是假设三台物理机,minio6副本场景下的部署示意图

kubernetes-registry-ha-minio

注,为了图的美观,并未画出所有的连线

MINIO关键配置#

  • 节点数目6个
  • EC2

k8s高可用无单点故障涉及那些场景#

k8s 节点添加、pod添加等增删查改无单点故障#

需要元数据的存储和处理能力高可用

k8s对外的apiServer(如worker)无单点故障#

worker node和其他组件访问apiServer路径高可用

k8s无单点故障技术关键点#

元数据存储#

通过etcd存储元数据,etcd三节点集群保证高可用

元数据处理#

通过多个kube-controllerkube-scheduler节点来保证高可用

worker节点请求数据通过多ip或负载均衡来保证#

节点请求通信通过多Ip或负载均衡来保证高可用,这里也有几种方式

IaaS厂商可提供负载均衡的场景下#

如下图所示,可将worker node的访问地址指向负载均衡的地址

kubernetes-ha-iaas-lb

私有化部署KeepAlived#

私有化部署场景常用keepAlived提供浮动IP来给worker node或其他组件访问,如下图所示

kubernetes-ha-keepalived

私有化部署加上负载均衡组件#

如果你觉得同一时刻只有单个apiServer工作会成瓶颈,也可以使用KeepAlivedNginxHaProxy来对ApiServer做负载均衡

kubernetes-ha-keepalived-nginx

为了简化图像,只画出了master1上的Nginx向后转发的场景。

至于Nginx和KeepAlived如何部署,推荐采用容器化的部署模式,方便进行监控和运维;但是镜像不从镜像仓库拉取,而是保存在master节点上,这样虽然升级复杂一点,但是这样子kubernetes的高可用就不依赖镜像仓库了,不会和镜像仓库形成循环依赖,更不会影响镜像仓库的高可用方案,大大简化了后续的技术方案。(因为镜像仓库可能会占据较大的存储空间,可能会和master节点分离部署,这时会作为worker节点连接master节点)。

有很多场景需要我们的代码检测一个进程是否存在,常用的一种方式是通过调用脚本通过ps -ef的方式查看,然而其实这种做法并不怎么高效,会fork一个进程出来,还会影响go协程的调度

一种更好的方式是可以通过解析/proc文件夹来得到想要的信息,其实可以通过strace命令查看,ps -ef也是读取了这个路径下的信息

linux-ps-ef-strace

下面分别是java和go的轮子示例

使用正则表达式[0-9]+的原因是/proc路径下还有一些其他文件,其中pid都是数字。

java#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private static final Pattern numberPattern = Pattern.compile("[0-9]+");

public static boolean processExists(String processName) throws Exception {
final File procFile = new File("/proc");
if (!procFile.isDirectory()) {
throw new Exception("why proc dir is not directory");
}
final File[] listFiles = procFile.listFiles();
if (listFiles == null) {
return false;
}
final List<File> procDir = Arrays.stream(listFiles).filter(f -> numberPattern.matcher(f.getName()).matches()).collect(Collectors.toList());
// find the proc cmdline
for (File file : procDir) {
try {
final byte[] byteArray = FileUtils.readFileToByteArray(new File(file.getCanonicalPath() + File.separator + "cmdline"));
final byte[] bytes = new byte[byteArray.length];
for (int i = 0; i < byteArray.length; i++) {
if (byteArray[i] != 0x00) {
bytes[i] = byteArray[i];
} else {
bytes[i] = (byte) 0x20;
}
}
final String cmdLine = new String(bytes, StandardCharsets.UTF_8);
if (cmdLine.contains(processName)) {
return true;
}
} catch (IOException e) {
// the proc may end during the loop, ignore it
log.error("read file exception ", e);
}
}
return false;
}

go#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func ProcessExists(processName string) (bool, error) {
result := false
fileInfos, err := ioutil.ReadDir("/proc")
if err != nil {
return false, err
}
for _, info := range fileInfos {
name := info.Name()
matched, err := regexp.MatchString("[0-9]+", name)
if err != nil {
return false, err
}
if !matched {
continue
}
cmdLine, err := parseCmdLine("/proc/" + info.Name() + "/cmdline")
if err != nil {
glog.Error("read cmd line failed ", err)
// the proc may end during the loop, ignore it
continue
}
if strings.Contains(cmdLine, processName) {
result = true
}
}
return result, err
}

func parseCmdLine(path string) (string, error) {
cmdData, err := ioutil.ReadFile(path)
if err != nil {
return "", err
}
if len(cmdData) < 1 {
return "", nil
}

split := strings.Split(string(bytes.TrimRight(cmdData, string("\x00"))), string(byte(0)))
return strings.Join(split, " "), nil
}

java 根据线程统计CPU#

设计思路#

java的ThreadMXBean可以获取每个线程CPU执行的nanoTime,那么可以以这个为基础,除以中间系统经过的纳秒数,就获得了该线程的CPU占比

编码#

首先,我们定义一个结构体,用来存放一个线程上次统计时的纳秒数和当时的系统纳秒数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import lombok.Data;

@Data
public class ThreadMetricsAux {

private long usedNanoTime;

private long lastNanoTime;

public ThreadMetricsAux() {
}

public ThreadMetricsAux(long usedNanoTime, long lastNanoTime) {
this.usedNanoTime = usedNanoTime;
this.lastNanoTime = lastNanoTime;
}

}

然后我们在SpringBoot中定义一个定时任务,它将定时地统计计算每个线程的CPU信息,并输出到MeterRegistry,当你调用SpringActuator的接口时,你将能获取到这个指标。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import com.google.common.util.concurrent.AtomicDouble;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.HashMap;

@Slf4j
@Service
public class ThreadMetricService {

@Autowired
private MeterRegistry meterRegistry;

private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();

private final HashMap<Long, ThreadMetricsAux> map = new HashMap<>();

private final HashMap<Meter.Id, AtomicDouble> dynamicGauges = new HashMap<>();

/**
* one minutes
*/
@Scheduled(cron = "0 * * * * ?")
public void schedule() {
final long[] allThreadIds = threadBean.getAllThreadIds();
for (long threadId : allThreadIds) {
final ThreadInfo threadInfo = threadBean.getThreadInfo(threadId);
if (threadInfo == null) {
continue;
}
final long threadNanoTime = getThreadCPUTime(threadId);
if (threadNanoTime == 0) {
// 如果threadNanoTime为0,则识别为异常数据,不处理,并清理历史数据
map.remove(threadId);
}
final long nanoTime = System.nanoTime();
ThreadMetricsAux oldMetrics = map.get(threadId);
// 判断是否有历史的metrics信息
if (oldMetrics != null) {
// 如果有,则计算CPU信息并上报
double percent = (double) (threadNanoTime - oldMetrics.getUsedNanoTime()) / (double) (nanoTime - oldMetrics.getLastNanoTime());
handleDynamicGauge("jvm.threads.cpu", "threadName", threadInfo.getThreadName(), percent);
}
map.put(threadId, new ThreadMetricsAux(threadNanoTime, nanoTime));
}
}

// meter Gauge相关代码
private void handleDynamicGauge(String meterName, String labelKey, String labelValue, double snapshot) {
Meter.Id id = new Meter.Id(meterName, Tags.of(labelKey, labelValue), null, null, Meter.Type.GAUGE);

dynamicGauges.compute(id, (key, current) -> {
if (current == null) {
AtomicDouble initialValue = new AtomicDouble(snapshot);
meterRegistry.gauge(key.getName(), key.getTags(), initialValue);
return initialValue;
} else {
current.set(snapshot);
return current;
}
});
}

long getThreadCPUTime(long threadId) {
long time = threadBean.getThreadCpuTime(threadId);
/* thread of the specified ID is not alive or does not exist */
return time == -1 ? 0 : time;
}

}

其他配置#

依赖配置#

pom文件中

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

Prometheus接口配置#

application.yaml

1
2
3
4
5
management:
endpoints:
web:
exposure:
include: health,info,prometheus

效果#

通过curl命令调用curl localhost:20001/actuator/prometheus|grep cpu

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
jvm_threads_cpu{threadName="RMI Scheduler(0)",} 0.0
jvm_threads_cpu{threadName="http-nio-20001-exec-10",} 0.0
jvm_threads_cpu{threadName="Signal Dispatcher",} 0.0
jvm_threads_cpu{threadName="Common-Cleaner",} 3.1664628758074733E-7
jvm_threads_cpu{threadName="http-nio-20001-Poller",} 7.772143763853949E-5
jvm_threads_cpu{threadName="http-nio-20001-Acceptor",} 8.586978352515361E-5
jvm_threads_cpu{threadName="DestroyJavaVM",} 0.0
jvm_threads_cpu{threadName="Monitor Ctrl-Break",} 0.0
jvm_threads_cpu{threadName="AsyncHttpClient-timer-8-1",} 2.524386571545477E-4
jvm_threads_cpu{threadName="Attach Listener",} 0.0
jvm_threads_cpu{threadName="scheduling-1",} 1.2269694160981585E-4
jvm_threads_cpu{threadName="container-0",} 1.999795692406262E-6
jvm_threads_cpu{threadName="http-nio-20001-exec-9",} 0.0
jvm_threads_cpu{threadName="http-nio-20001-exec-7",} 0.0
jvm_threads_cpu{threadName="http-nio-20001-exec-8",} 0.0
jvm_threads_cpu{threadName="http-nio-20001-exec-5",} 0.0
jvm_threads_cpu{threadName="Notification Thread",} 0.0
jvm_threads_cpu{threadName="http-nio-20001-exec-6",} 0.0
jvm_threads_cpu{threadName="http-nio-20001-exec-3",} 0.0
jvm_threads_cpu{threadName="http-nio-20001-exec-4",} 0.0
jvm_threads_cpu{threadName="Reference Handler",} 0.0
jvm_threads_cpu{threadName="http-nio-20001-exec-1",} 0.0012674719289349648
jvm_threads_cpu{threadName="http-nio-20001-exec-2",} 6.542541277148053E-5
jvm_threads_cpu{threadName="RMI TCP Connection(idle)",} 1.3998786340454562E-6
jvm_threads_cpu{threadName="Finalizer",} 0.0
jvm_threads_cpu{threadName="Catalina-utility-2",} 7.920883054498174E-5
jvm_threads_cpu{threadName="RMI TCP Accept-0",} 0.0
jvm_threads_cpu{threadName="Catalina-utility-1",} 6.80101662787773E-5

Java计算磁盘使用率#

https://support.huaweicloud.com/bestpractice-bms/bms_bp_2009.html

华为云文档上的材料值得学习。

翻阅资料

1
2
3
https://www.kernel.org/doc/Documentation/ABI/testing/procfs-diskstats

13 - time spent doing I/Os (ms)

这就意味着如果我想统计一个磁盘在一定周期内的利用率,只需要对这两个数字做差,除以统计的间隔,即就是这段时间内磁盘的利用率

1
2
3
cat /proc/diskstats
253 0 vda 24046 771 2042174 180187 20689748 21411881 527517532 18028256 0 14610513 18201352
253 1 vda1 23959 771 2038022 180153 20683957 21411881 527517532 18028066 0 14610312 18201129

样例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.github.shoothzj.demo.metrics;

import com.github.shoothzj.demo.base.module.ShellResult;
import com.github.shoothzj.demo.base.util.LogUtil;
import com.github.shoothzj.demo.base.util.ShellUtil;
import com.github.shoothzj.demo.base.util.StringUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* @author hezhangjian
*/
@Slf4j
public class DiskUtilizationMetrics {

private static final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

private static long lastTime = -1;

public static void main(String[] args) {
LogUtil.configureLog();
String diskName = "vda1";
scheduledExecutor.scheduleAtFixedRate(() -> metrics(diskName), 0, 10, TimeUnit.SECONDS);
}

private static void metrics(String diskName) {
//假设统计vda磁盘
String[] cmd = {
"/bin/bash",
"-c",
"cat /proc/diskstats |grep " + diskName + "|awk '{print $13}'"
};
ShellResult shellResult = ShellUtil.executeCmd(cmd);
String timeStr = shellResult.getInputContent().substring(0, shellResult.getInputContent().length() - 1);
long time = Long.parseLong(timeStr);
if (lastTime == -1) {
log.info("first time cal, usage time is [{}]", time);
} else {
double usage = (time - lastTime) / (double) 10_000;
log.info("usage time is [{}]", usage);
}
lastTime = time;
}

}

打印CPU使用#

1
2
3
4
5
private static void printCpuUsage() {
final com.sun.management.OperatingSystemMXBean platformMXBean = ManagementFactory.getPlatformMXBean(com.sun.management.OperatingSystemMXBean.class);
double cpuLoad = platformMXBean.getProcessCpuLoad();
System.out.println(cpuLoad);
}

打印线程堆栈#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static void printThreadDump() {
final StringBuilder dump = new StringBuilder();
final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
// 100代表线程堆栈的层级
final ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
for (ThreadInfo threadInfo : threadInfos) {
dump.append('"');
dump.append(threadInfo.getThreadName());
dump.append("\" ");
final Thread.State state = threadInfo.getThreadState();
dump.append("\n java.lang.Thread.State: ");
dump.append(state);
final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
for (final StackTraceElement stackTraceElement : stackTraceElements) {
dump.append("\n at ");
dump.append(stackTraceElement);
}
dump.append("\n\n");
}
System.out.println(dump);
}

打印内存统计信息#

引入依赖

1
2
3
4
5
<dependency>
<groupId>com.jerolba</groupId>
<artifactId>jmnemohistosyne</artifactId>
<version>0.2.3</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
private static void printClassHisto() {
Histogramer histogramer = new Histogramer();
MemoryHistogram histogram = histogramer.createHistogram();

HistogramEntry arrayList = histogram.get("java.util.ArrayList");
System.out.println(arrayList.getInstances());
System.out.println(arrayList.getSize());

for (HistogramEntry entry : histogram) {
System.out.println(entry);
}
}

打印死锁#

javadoc中指出,这是一个开销较大的操作

1
2
3
4
5
6
7
8
private static void printDeadLock() {
final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
final long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
for (long deadlockedThread : deadlockedThreads) {
final ThreadInfo threadInfo = threadMXBean.getThreadInfo(deadlockedThread);
System.out.println(threadInfo + "deadLocked");
}
}

中间件在很多系统中都存在#

在一个系统里面,或多或少地都会有中间件的存在,总会有数据库吧,其他的如消息队列,缓存,大数据组件。即使是基于公有云构筑的系统,公有云厂商只提供广泛使用的中间件,假如你的系统里面有很多组件没那么泛用,那么就只能自己维护,如ZooKeeperEtcdPulsarPrometheusLvs

什么是中间件adapter#

中间件adapter指的是和中间件运行在一起(同一个物理机或同一个容器),使得中间件和商用系统中已有的组件进行对接,最终使得该中间件达到在该系统商用的标准。像Prometheus的众多exporter,就是将中间件和已有的监控系统(Prometheus)进行对接的adpater

为什么不修改中间件源码直接集成#

原因可以有很多,这里我列出几点

源码修改容易,维护困难#

很多时候不是社区通用需求,无法合并到社区主干。后续每次中间件版本升级,源码的修改就要重新进行一次。社区大版本代码重构,有的甚至不知道如何修改下去。并且对研发人员的技能要求高。

源码与团队技术栈不同,修改困难#

这是最常见的,像java团队维护erlang写的rabbitmq

和其他系统对接,有语言要求#

XX监控系统,只能使用X语言接入,但中间件使用Y语言写的,怎么办?adapter的能力就体现出来了。

为什么在商用系统中中间件做不到开箱即用#

在商用系统中,对一个新引入的中间件,往往有如下能力上的诉求,原生的中间件很难满足

  • 适配原有的监控系统
  • 适配原有的告警系统
  • 适配原有的证书系统
  • 适配原有的备份系统(如果该中间件有状态)
  • 适配原有的容灾系统(如果该中间件有状态)
  • 自动化能力(适配部署、账号创建、权限策略创建)
  • 对外暴露时封装一层接口
  • 应用程序和中间件的服务发现

有时候,业务也会根据业务的需求对中间件做一些能力增强,这部分需求比较定制,这里无法展开讨论了。

我们来逐一讨论上面列出的能力诉求,凡是adapter能实现的功能,对中间件做修改也能实现,只不过因为上一节列出的原因,选择不在中间件处侵入式修改。

适配原有的监控系统#

监控系统获取数据,往往是推拉两种模式,如果该中间件原生不支持和该监控系统对接。我们就可以让adapter先从中间件处取得监控数据,再和监控系统对接

适配原有的告警系统#

如果中间件发生了不可恢复的错误,如写事务文件失败,操作ZooKeeper元数据失败,可以通过adapter来识别中间件是否发生了上述不可恢复的错误,并和告警系统对接,发出告警。

适配原有的证书系统#

这一点也很关键,开源的中间件,根据我的了解,几乎没有项目做了动态证书轮换的方案,证书基本都不支持变更。而出色的商用系统是一定要支持证书轮换的。不过很遗憾的是,这些涉及到TLS握手的关键流程,adapter无法干涉这个流程,只能对中间件进行侵入式修改。

适配原有的备份系统#

通过adapter对中间件进行定期备份、按照配置中心的策略备份、备份文件自动上传到文件服务器等。

适配原有的容灾系统#

这个视中间件而定,有些中间件如Pulsar原生支持跨地域容灾的话,我们可能做一做配置就好了。另外一些,像mysqlmongo这种,可能我们还需要通过adapter来进行数据同步。不过这个时候adapter负责的职责就大了,还包括了容灾能力。

自动化能力#

自动化部署#

比如ZooKeeperKafkafilebeat在安装的时候,要求填写配置文件,我们就可以让adapter来自动化生成配置或更新配置

账号和策略的创建更新#

kubernetesmysqlmongo,我们可以在安装的时候通过adapter来自动化创建或更新

对外暴露时封装一层接口#

封装接口常用于中间件的提供者,出于种种原因,如中间件原本接口能力太大、中间件原本接口未做权限控制、中间件原本接口未适配期望的权限框架等。我们可以用adapter封装实现一层新的接口对外暴露。

应用程序和中间件的服务发现#

应用程序发现中间件#

应用程序与中间件的连接,说的简单一点就是如何获取Ip,如果是基于kubernetes的部署,那么不推荐配置Ip,最好是配置域名,因为Ip会跟着容器的生命周期变化。首先,你的应用程序并不会因为中间件的一个容器重启了来重建客户端,往往是通过一个简单重连的方式连接到新的中间件容器继续工作。其次,我们的运维人员也不会每时每刻盯着容器Ip是否变化来进行配置吧。以下图为例,域名的配置要优于Ip的配置。

application-discover-middleware

截止到目前,我们只需要一个静态配置,使得应用程序可以连接到中间件。最好这个配置是可以修改的,这样我们还可以继承蓝绿、灰度发布的能力。

中间件到业务程序的发现#

这个模式常用于负载均衡中间件如LvsNginx自动维护后端列表,我们可以通过adapter来从注册中心获取后端服务的实例信息,并实时更新。

总结#

在商用系统中,中间件并没有想象中的那么开箱即用,本文讲述了一些中间件集成到商用系统中需要具备的能力。在对中间件侵入式修改没有技术能力或不想对中间件进行侵入式修改的场景。选用团队常用的、占用资源少的语言来开发中间件adapter应该是更好的选择。

0%