1)选择滑动窗口
滑动窗口会导致一个时间点的数据会分布到多个window里来,其它跟滚动窗口没区别
2)滑动数据重新汇聚
计算源是单个时间窗口内预汇聚的数据,不适用于滑动窗口的计算值,比如前面单个窗口计算出来p90,那么滑动窗口的p90怎么算?
需要把前面的时间窗口的现场保留下来传到当前滑动窗口内才可以继续计算,具体代码如下:
//拿到原始数据后,重新计算 @SuppressWarnings("unchecked") public void accumulate(Object value) { if (null == value) { return; } HashMapmapVal = (HashMap ) value; //1)在历史值和当前值中设置新的max值 Object maxObj = mapVal.get("max"); Object minObj = mapVal.get("min"); Object countObj = mapVal.get("count"); Object sumObj = mapVal.get("sum"); Object afObj = mapVal.get("af"); Object ccObj = mapVal.get("cc"); if (null == maxObj || null == minObj || null == countObj || null == sumObj || null == afObj || null == ccObj) { return; } if (false == (ccObj instanceof JSONArray)) { return; } JSONArray jsonArray = (JSONArray) ccObj; int jsonSize = jsonArray.size(); if (0 == jsonSize) { return; } this.max = Math.max(this.max, ((Number) maxObj).doubleValue()); this.min = Math.min(this.min, ((Number) minObj).doubleValue()); this.count += ((Number) countObj).intValue(); this.sum += ((Number) sumObj).doubleValue(); //每次都是直接替换 this.augmentFactor = ((Number) afObj).intValue(); //5)开始汇总以便后面计算各种95线之类的值 Integer[] intArray = new Integer[jsonSize]; intArray = jsonArray.toArray(intArray); for (int index = 0; index < jsonSize; index++) { countContainer[index] += intArray[index]; } //6)over }
3)海量tag的发现
如果上传的metric tag数据非常多,怎么去重是个问题,我采取的方案是
3.1)使用采样率
@Override public void filter(String metric, TreeMaptagValues, boolean tagValuesEmpty) { Random randomGenerator = RANDOM_THREAD_LOCAL.get(); if (0 == randomGenerator.nextInt(20)) { //取5%的采样率 ReportQueue.put(ReportQueue.METRIC_TAG, new MetricAndTags(metric, tagValues, tagValuesEmpty)); //结束 } else { } }
3.2)布隆过滤器判重
//普通数据-bloom filter private static Integer SIZE = 100 * 1000 * 1000; private static Integer BITS = 20; private static Integer HASH_FUNCTION = 1; private static BloomFilter DATA_BLOOM_FILTER = new BloomFilter(SIZE, BITS, HASH_FUNCTION); //哈希code-bloom filter private static BloomFilter HASHCODE_BLOOM_FILTER = new BloomFilter(SIZE, BITS, HASH_FUNCTION); public static synchronized boolean isNewKey(String data) { Key dataKey = new Key(data.getBytes()); if (false == DATA_BLOOM_FILTER.membershipTest(dataKey)) { //不存在就是真的不存在 return true; } //再做hashcode的2次判断 if (false == HASHCODE_BLOOM_FILTER.membershipTest(hashCodeKey(data))) { //不存在就是真的不存在 return true; } //(如果2次都说存在,也没办法了,这条数据丢弃) //返回false表示不是new key return false; }
3.3)元数据幂等性保存到es
注意幂等性,之前存在的数据会被更新,而不是新增一条数据,因为我们是保存元数据
具体就是设置请求体里的upsert为true
@Datapublic class ExecutionMetricTagValue { private Boolean doc_as_upsert=true; private EsMetricTagValue doc;}
3.4)限流防对远程ES的流量冲击
这个是构建一个 Guava对象
private static final RateLimiter RATE_LIMITER = RateLimiter.create(500);//在JVM级别限流,防止对ES产生冲击RATE_LIMITER.acquire(1);
4)用户配置数据拉取
用户配置的一些规则,通过另外一个JVM级别的线程拉取到本地内存,这样就可以不影响flink的计算速度
5)报警屏蔽周期
这是为了防止报警洪灾,实现思路
String res; try { SetParams setParams = new SetParams(); setParams.nx(); setParams.ex(alarmInterval); //仅仅是当前timeSpan内有效,10s不影响30s 1m这种 res = JEDIS_CLUSTER.set(timeSpan + "_hubble_alarm_" + fullKey, "1", setParams); } catch (Exception e) { LOG.error(e.toString()); return; } if (null != res) { //LOG.info("初次插入,可以报警");
主要就是这些,很难的点没有,就是要注意各个细节
----------------------------------------------------------------------------------------------------------
其实我觉得报警系统的精髓在于阈值的设置上,傻乎乎的设置静态值是没有技术含量的,整个报警系统的精髓就在于自动设置报警阈值
所以接下来我会去研究这方面的技术,如果研究出来了我会发文章出来!
----------------------------------------------------------------------------------------------------------
下面放界面图