-
Notifications
You must be signed in to change notification settings - Fork 542
Expand file tree
/
Copy pathmetrics.py
More file actions
562 lines (487 loc) · 16.4 KB
/
metrics.py
File metadata and controls
562 lines (487 loc) · 16.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
import concurrent.futures
import json
import os
import queue
import random
import socket
import string
import threading
import time
from collections import Counter
from typing import Any
from influxdb import InfluxDBClient
from feapder import setting
from feapder.utils.log import log
from feapder.utils.tools import aio_wrap, ensure_float, ensure_int
_inited_pid = None
# this thread should stop running in the forked process
_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=1, thread_name_prefix="metrics"
)
class MetricsEmitter:
def __init__(
self,
influxdb,
*,
batch_size=10,
max_timer_seq=0,
emit_interval=10,
retention_policy=None,
ratio=1.0,
debug=False,
add_hostname=False,
max_points=10240,
default_tags=None,
):
"""
Args:
influxdb: influxdb instance
batch_size: 打点的批次大小
max_timer_seq: 每个时间间隔内最多收集多少个 timer 类型点, 0 表示不限制
emit_interval: 最多等待多长时间必须打点
retention_policy: 对应的 retention policy
ratio: store 和 timer 类型采样率,比如 0.1 表示只有 10% 的点会留下
debug: 是否打印调试日志
add_hostname: 是否添加 hostname 作为 tag
max_points: 本地 buffer 最多累计多少个点
"""
self.pending_points = queue.Queue()
self.batch_size = batch_size
self.influxdb: InfluxDBClient = influxdb
self.tagkv = {}
self.max_timer_seq = max_timer_seq
self.lock = threading.Lock()
self.hostname = socket.gethostname()
self.last_emit_ts = time.time() # 上次提交时间
self.emit_interval = emit_interval # 提交间隔
self.max_points = max_points
self.retention_policy = retention_policy # 支持自定义保留策略
self.debug = debug
self.add_hostname = add_hostname
self.ratio = ratio
self.default_tags = default_tags or {}
def define_tagkv(self, tagk, tagvs):
self.tagkv[tagk] = set(tagvs)
def _point_tagset(self, p):
return f"{p['measurement']}-{sorted(p['tags'].items())}-{p['time']}"
def _make_time_to_ns(self, _time):
"""
将时间转换为 ns 级别的时间戳,补足长度 19 位
Args:
_time:
Returns:
"""
time_len = len(str(_time))
random_str = "".join(random.sample(string.digits, 19 - time_len))
return int(str(_time) + random_str)
def _accumulate_points(self, points):
"""
对于处于同一个 key 的点做聚合
- 对于 counter 类型,同一个 key 的值(_count)可以累加
- 对于 store 类型,不做任何操作,influxdb 会自行覆盖
- 对于 timer 类型,通过添加一个 _seq 值来区分每个不同的点
"""
counters = {} # 临时保留 counter 类型的值
timer_seqs = Counter() # 记录不同 key 的 timer 序列号
new_points = []
for point in points:
point_type = point["tags"].get("_type", None)
tagset = self._point_tagset(point)
# counter 类型全部聚合,不做丢弃
if point_type == "counter":
if tagset not in counters:
counters[tagset] = point
else:
counters[tagset]["fields"]["_count"] += point["fields"]["_count"]
elif point_type == "timer":
if self.max_timer_seq and timer_seqs[tagset] > self.max_timer_seq:
continue
# 掷一把骰子,如果足够幸运才打点
if self.ratio < 1.0 and random.random() > self.ratio:
continue
# 增加 _seq tag,以便区分不同的点
point["tags"]["_seq"] = timer_seqs[tagset]
point["time"] = self._make_time_to_ns(point["time"])
timer_seqs[tagset] += 1
new_points.append(point)
else:
if self.ratio < 1.0 and random.random() > self.ratio:
continue
point["time"] = self._make_time_to_ns(point["time"])
new_points.append(point)
for point in counters.values():
# 修改下counter类型的点的时间戳,补足19位, 伪装成纳秒级时间戳,防止influxdb对同一秒内的数据进行覆盖
point["time"] = self._make_time_to_ns(point["time"])
new_points.append(point)
# 把拟合后的 counter 值添加进来
new_points.append(point)
return new_points
def _get_ready_emit(self, force=False):
"""
把当前 pending 的值做聚合并返回
"""
if self.debug:
log.info("got %s raw points", self.pending_points.qsize())
# 从 pending 中读取点, 设定一个最大值,避免一直打点,一直获取
points = []
while len(points) < self.max_points or force:
try:
points.append(self.pending_points.get_nowait())
except queue.Empty:
break
# 聚合点
points = self._accumulate_points(points)
if self.debug:
log.info("got %s point", len(points))
log.info(json.dumps(points, indent=4))
return points
def emit(self, point=None, force=False):
"""
1. 添加新点到 pending
2. 如果符合条件,尝试聚合并打点
3. 更新打点时间
:param point:
:param force: 强制提交所有点 默认False
:return:
"""
if point:
self.pending_points.put(point)
# 判断是否需要提交点 1、数量 2、间隔 3、强力打点
if not (
force
or self.pending_points.qsize() >= self.max_points # noqa: W503
or time.time() - self.last_emit_ts > self.emit_interval # noqa: W503
):
return
# 需要打点,读取可以打点的值, 确保只有一个线程在做点的压缩
with self.lock:
points = self._get_ready_emit(force=force)
if not points:
return
try:
# h(hour) m(minutes), s(seconds), ms(milliseconds), u(microseconds), n(nanoseconds)
self.influxdb.write_points(
points,
batch_size=self.batch_size,
time_precision="n",
retention_policy=self.retention_policy,
)
except Exception:
log.exception("error writing points")
self.last_emit_ts = time.time()
def flush(self):
if self.debug:
log.info("start draining points %s", self.pending_points.qsize())
self.emit(force=True)
def close(self):
self.flush()
try:
self.influxdb.close()
except Exception as e:
log.exception(e)
def make_point(self, measurement, tags: dict, fields: dict, timestamp=None):
"""
默认的时间戳是"秒"级别的
"""
assert measurement, "measurement can't be null"
tags = tags.copy() if tags else {}
tags.update(self.default_tags)
fields = fields.copy() if fields else {}
if timestamp is None:
timestamp = int(time.time())
# 支持自定义hostname
if self.add_hostname and "hostname" not in tags:
tags["hostname"] = self.hostname
point = dict(measurement=measurement, tags=tags, fields=fields, time=timestamp)
if self.tagkv:
for tagk, tagv in tags.items():
if tagv not in self.tagkv[tagk]:
raise ValueError("tag value = %s not in %s", tagv, self.tagkv[tagk])
return point
def get_counter_point(
self,
measurement: str,
key: str = None,
count: int = 1,
tags: dict = None,
timestamp: int = None,
):
"""
counter 不能被覆盖
"""
tags = tags.copy() if tags else {}
if key is not None:
tags["_key"] = key
tags["_type"] = "counter"
count = ensure_int(count)
fields = dict(_count=count)
point = self.make_point(measurement, tags, fields, timestamp=timestamp)
return point
def get_store_point(
self,
measurement: str,
key: str = None,
value: Any = 0,
tags: dict = None,
timestamp=None,
):
tags = tags.copy() if tags else {}
if key is not None:
tags["_key"] = key
tags["_type"] = "store"
fields = dict(_value=value)
point = self.make_point(measurement, tags, fields, timestamp=timestamp)
return point
def get_timer_point(
self,
measurement: str,
key: str = None,
duration: float = 0,
tags: dict = None,
timestamp=None,
):
tags = tags.copy() if tags else {}
if key is not None:
tags["_key"] = key
tags["_type"] = "timer"
fields = dict(_duration=ensure_float(duration))
point = self.make_point(measurement, tags, fields, timestamp=timestamp)
return point
def emit_any(self, *args, **kwargs):
point = self.make_point(*args, **kwargs)
self.emit(point)
def emit_counter(self, *args, **kwargs):
point = self.get_counter_point(*args, **kwargs)
self.emit(point)
def emit_store(self, *args, **kwargs):
point = self.get_store_point(*args, **kwargs)
self.emit(point)
def emit_timer(self, *args, **kwargs):
point = self.get_timer_point(*args, **kwargs)
self.emit(point)
_emitter: MetricsEmitter = None
_measurement: str = None
def init(
*,
influxdb_host=None,
influxdb_port=None,
influxdb_udp_port=None,
influxdb_database=None,
influxdb_user=None,
influxdb_password=None,
influxdb_measurement=None,
retention_policy=None,
retention_policy_duration="180d",
emit_interval=60,
batch_size=100,
debug=False,
use_udp=False,
timeout=22,
ssl=False,
retention_policy_replication: str = "1",
set_retention_policy_default=True,
**kwargs,
):
"""
打点监控初始化
Args:
influxdb_host:
influxdb_port:
influxdb_udp_port:
influxdb_database:
influxdb_user:
influxdb_password:
influxdb_measurement: 存储的表,也可以在打点的时候指定
retention_policy: 保留策略
retention_policy_duration: 保留策略过期时间
emit_interval: 打点最大间隔
batch_size: 打点的批次大小
debug: 是否开启调试
use_udp: 是否使用udp协议打点
timeout: 与influxdb建立连接时的超时时间
ssl: 是否使用https协议
retention_policy_replication: 保留策略的副本数, 确保数据的可靠性和高可用性。如果一个节点发生故障,其他节点可以继续提供服务,从而避免数据丢失和服务不可用的情况
set_retention_policy_default: 是否设置为默认的保留策略,当retention_policy初次创建时有效
**kwargs: 可传递MetricsEmitter类的参数
Returns:
"""
global _inited_pid, _emitter, _measurement
if _inited_pid == os.getpid():
return
influxdb_host = influxdb_host or setting.INFLUXDB_HOST
influxdb_port = influxdb_port or setting.INFLUXDB_PORT
influxdb_udp_port = influxdb_udp_port or setting.INFLUXDB_UDP_PORT
influxdb_database = influxdb_database or setting.INFLUXDB_DATABASE
influxdb_user = influxdb_user or setting.INFLUXDB_USER
influxdb_password = influxdb_password or setting.INFLUXDB_PASSWORD
_measurement = influxdb_measurement or setting.INFLUXDB_MEASUREMENT
retention_policy = (
retention_policy or f"{influxdb_database}_{retention_policy_duration}"
)
if not all(
[
influxdb_host,
influxdb_port,
influxdb_udp_port,
influxdb_database,
influxdb_user,
influxdb_password,
]
):
return
influxdb_client = InfluxDBClient(
host=influxdb_host,
port=influxdb_port,
udp_port=influxdb_udp_port,
database=influxdb_database,
use_udp=use_udp,
timeout=timeout,
username=influxdb_user,
password=influxdb_password,
ssl=ssl,
)
# 创建数据库
if influxdb_database:
try:
influxdb_client.create_database(influxdb_database)
influxdb_client.create_retention_policy(
retention_policy,
retention_policy_duration,
replication=retention_policy_replication,
default=set_retention_policy_default,
)
except Exception as e:
log.error("metrics init falied: {}".format(e))
return
_emitter = MetricsEmitter(
influxdb_client,
debug=debug,
batch_size=batch_size,
retention_policy=retention_policy,
emit_interval=emit_interval,
**kwargs,
)
_inited_pid = os.getpid()
log.info("metrics init successfully")
def emit_any(
tags: dict,
fields: dict,
*,
classify: str = "",
measurement: str = None,
timestamp=None,
):
"""
原生的打点,不进行额外的处理
Args:
tags: influxdb的tag的字段和值
fields: influxdb的field的字段和值
classify: 点的类别
measurement: 存储的表
timestamp: 点的时间戳,默认为当前时间
Returns:
"""
if not _emitter:
return
tags = tags or {}
tags["_classify"] = classify
measurement = measurement or _measurement
_emitter.emit_any(measurement, tags, fields, timestamp)
def emit_counter(
key: str = None,
count: int = 1,
*,
classify: str = "",
tags: dict = None,
measurement: str = None,
timestamp: int = None,
):
"""
聚合打点,即会将一段时间内的点求和,然后打一个点数和
Args:
key: 与点绑定的key值
count: 点数
classify: 点的类别
tags: influxdb的tag的字段和值
measurement: 存储的表
timestamp: 点的时间戳,默认为当前时间
Returns:
"""
if not _emitter:
return
tags = tags or {}
tags["_classify"] = classify
measurement = measurement or _measurement
_emitter.emit_counter(measurement, key, count, tags, timestamp)
def emit_timer(
key: str = None,
duration: float = 0,
*,
classify: str = "",
tags: dict = None,
measurement: str = None,
timestamp=None,
):
"""
时间打点,用于监控程序的运行时长等,每个duration一个点,不会被覆盖
Args:
key: 与点绑定的key值
duration: 时长
classify: 点的类别
tags: influxdb的tag的字段和值
measurement: 存储的表
timestamp: 点的时间戳,默认为当前时间
Returns:
"""
if not _emitter:
return
tags = tags or {}
tags["_classify"] = classify
measurement = measurement or _measurement
_emitter.emit_timer(measurement, key, duration, tags, timestamp)
def emit_store(
key: str = None,
value: Any = 0,
*,
classify: str = "",
tags: dict = None,
measurement: str = None,
timestamp=None,
):
"""
直接打点,不进行额外的处理
Args:
key: 与点绑定的key值
value: 点的值
classify: 点的类别
tags: influxdb的tag的字段和值
measurement: 存储的表
timestamp: 点的时间戳,默认为当前时间
Returns:
"""
if not _emitter:
return
tags = tags or {}
tags["_classify"] = classify
measurement = measurement or _measurement
_emitter.emit_store(measurement, key, value, tags, timestamp)
def flush():
"""
强刷点到influxdb
Returns:
"""
if not _emitter:
return
_emitter.flush()
def close():
"""
关闭
Returns:
"""
if not _emitter:
return
_emitter.close()
# 协程打点
aemit_counter = aio_wrap(executor=_executor)(emit_counter)
aemit_store = aio_wrap(executor=_executor)(emit_store)
aemit_timer = aio_wrap(executor=_executor)(emit_timer)