forked from grafana/metrictank
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquery_engine.go
More file actions
169 lines (147 loc) · 5.85 KB
/
query_engine.go
File metadata and controls
169 lines (147 loc) · 5.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package api
import (
"fmt"
"sort"
"github.com/raintank/metrictank/api/models"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/util"
"github.com/raintank/worldping-api/pkg/log"
)
// represents a data "archive", i.e. the raw one, or an aggregated series
type archive struct {
interval uint32
pointCount uint32
chosen bool
}
func (b archive) String() string {
return fmt.Sprintf("<archive int:%d, pointCount: %d, chosen: %t", b.interval, b.pointCount, b.chosen)
}
type archives []archive
func (a archives) Len() int { return len(a) }
func (a archives) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a archives) Less(i, j int) bool { return a[i].interval < a[j].interval }
// updates the requests with all details for fetching, making sure all metrics are in the same, optimal interval
// luckily, all metrics still use the same aggSettings, making this a bit simpler
// note: it is assumed that all requests have the same from, to and maxdatapoints!
// this function ignores the TTL values. it is assumed that you've set sensible TTL's
func alignRequests(reqs []models.Req, aggSettings []mdata.AggSetting) ([]models.Req, error) {
// model all the archives for each requested metric
// the 0th archive is always the raw series, with highest res (lowest interval)
aggs := mdata.AggSettingsSpanAsc(aggSettings)
sort.Sort(aggs)
options := make([]archive, 1, len(aggs)+1)
minInterval := uint32(0) // will contain the smallest rawInterval from all requested series
rawIntervals := make(map[uint32]struct{})
for _, req := range reqs {
if minInterval == 0 || minInterval > req.RawInterval {
minInterval = req.RawInterval
}
rawIntervals[req.RawInterval] = struct{}{}
}
tsRange := (reqs[0].To - reqs[0].From)
// note: not all series necessarily have the same raw settings, will be fixed further down
options[0] = archive{minInterval, tsRange / minInterval, false}
// now model the archives we get from the aggregations
// note that during the processing, we skip non-ready aggregations for simplicity, but at the
// end we need to convert the index back to the real index in the full (incl non-ready) aggSettings array.
aggRef := []int{0}
for j, agg := range aggs {
if agg.Ready {
options = append(options, archive{agg.Span, tsRange / agg.Span, false})
aggRef = append(aggRef, j+1)
}
}
// find the first, i.e. highest-res option with a pointCount <= maxDataPoints
// if all options have too many points, fall back to the lowest-res option and apply runtime
// consolidation
selected := len(options) - 1
runTimeConsolidate := true
for i, opt := range options {
if opt.pointCount <= reqs[0].MaxPoints {
runTimeConsolidate = false
selected = i
break
}
}
/*
do a quick calculation of the ratio between pointCount and maxDatapoints of
the selected option, and the option before that; if the previous option is
a lot closer to max points than we are, we pick that and apply some runtime
consolidation.
eg. with a time range of 1hour,
our options are:
i | span | pointCount
======================
0 | 10s | 360
1 | 600s | 6
2 | 7200s | 0
if maxPoints is 100, then selected will be 1, our 600s rollups.
We then calculate the ratio between maxPoints and our
selected pointCount "6" and the previous option "360".
belowMaxDataPointsRatio = 100/6 = 16.67
aboveMaxDataPointsRatio = 360/100 = 3.6
As the maxDataPoint requested is much closer to 360 then it is to 6,
we will use 360 and do runtime consolidation.
*/
if selected > 0 {
belowMaxDataPointsRatio := float64(reqs[0].MaxPoints) / float64(options[selected].pointCount)
aboveMaxDataPointsRatio := float64(options[selected-1].pointCount) / float64(reqs[0].MaxPoints)
if aboveMaxDataPointsRatio < belowMaxDataPointsRatio {
selected--
runTimeConsolidate = true
}
}
chosenInterval := options[selected].interval
// if we are using raw metrics, we need to find an interval that all request intervals work with.
if selected == 0 && len(rawIntervals) > 1 {
runTimeConsolidate = true
keys := make([]uint32, len(rawIntervals))
i := 0
for k := range rawIntervals {
keys[i] = k
i++
}
chosenInterval = util.Lcm(keys)
options[0].interval = chosenInterval
options[0].pointCount = tsRange / chosenInterval
//make sure that the calculated interval is not greater then the interval of the first rollup.
if len(options) > 1 && chosenInterval >= options[1].interval {
selected = 1
chosenInterval = options[1].interval
}
}
if LogLevel < 2 {
options[selected].chosen = true
for i, archive := range options {
if archive.chosen {
log.Debug("QE %-2d %-6d %-6d <-", i, archive.interval, tsRange/archive.interval)
} else {
log.Debug("QE %-2d %-6d %-6d", i, archive.interval, tsRange/archive.interval)
}
}
}
/* we now just need to update the following properties for each req:
archive int // 0 means original data, 1 means first agg level, 2 means 2nd, etc.
archInterval uint32 // the interval corresponding to the archive we'll fetch
outInterval uint32 // the interval of the output data, after any runtime consolidation
aggNum uint32 // how many points to consolidate together at runtime, after fetching from the archive
*/
for i := range reqs {
req := &reqs[i]
req.Archive = aggRef[selected]
req.ArchInterval = options[selected].interval
req.OutInterval = chosenInterval
req.AggNum = 1
if runTimeConsolidate {
req.AggNum = aggEvery(options[selected].pointCount, req.MaxPoints)
// options[0].{interval,pointCount} didn't necessarily reflect the actual raw archive for this request,
// so adjust where needed.
if selected == 0 && chosenInterval != req.RawInterval {
req.ArchInterval = req.RawInterval
req.AggNum *= chosenInterval / req.RawInterval
}
req.OutInterval = req.ArchInterval * req.AggNum
}
}
return reqs, nil
}