【发布时间】:2016-06-07 19:03:13
【问题描述】:
我需要从 Kairosdb 中的所有指标值创建一个 csv 文件。
kairosdb UI 已经具有另存为功能,但在导出的文件中没有指标名称。我们也不能将多个指标导出到一个文件中。
我面临的问题是匹配多个指标的时间戳。例如,一个指标可能会返回 5 个时间戳值。另一个指标可能会返回 10 个时间戳值,这些值可能与之前的指标匹配。
所以我需要生成如下的 csv:
tmestamp,metric1,metric2,tmetric3\n
0,1,,2\n
1,,2,\n
2,1,3,6\n
3,5,5, \n
4,,,5\n
查询返回的值可能超过 10000 个数据点。我该如何解决这个问题。我可以在 spark 集群中运行这个程序吗?
我试过的代码:
package com.example;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.kairosdb.client.builder.DataPoint;
public class Test {
private static Map<MetricMap, String> metricMaps = new HashMap<>();
public static void main(String args[]) {
Map<String, List<DataPoint>> metriDps = new HashMap<>();
String[] metricNames = new String[] { "m1", "m2", "m3" };
List<DataPoint> dataPoints1 = new ArrayList<DataPoint>();
DataPoint dp1 = new DataPoint(0, 1);
DataPoint dp2 = new DataPoint(2, 1);
DataPoint dp3 = new DataPoint(3, 5);
dataPoints1.add(dp1);
dataPoints1.add(dp2);
dataPoints1.add(dp3);
metriDps.put("m1", dataPoints1);
List<DataPoint> dataPoints2 = new ArrayList<DataPoint>();
DataPoint dp21 = new DataPoint(1, 2);
DataPoint dp22 = new DataPoint(2, 3);
DataPoint dp23 = new DataPoint(3, 5);
dataPoints2.add(dp21);
dataPoints2.add(dp22);
dataPoints2.add(dp23);
metriDps.put("m2", dataPoints2);
List<DataPoint> dataPoints3 = new ArrayList<DataPoint>();
DataPoint dp31 = new DataPoint(0, 2);
DataPoint dp32 = new DataPoint(2, 6);
DataPoint dp33 = new DataPoint(4, 5);
dataPoints3.add(dp31);
dataPoints3.add(dp32);
dataPoints3.add(dp33);
metriDps.put("m3", dataPoints3);
try {
FileWriter writer = new FileWriter("/home/lr/Desktop/csv1.csv");
metriDps.keySet().stream().forEach(key -> createMap(metriDps.get(key), key));
String value;
for (MetricMap metricMap : metricMaps.keySet()) {
String time = metricMap.getTime();
writer.append(time);
writer.append(',');
for (int i = 0; i < 3; i++) {
MetricMap map = new MetricMap();
map.setName(metricNames[i]);
map.setTime(time);
value = metricMaps.get(map);
if (value != null)
writer.append(metricMaps.get(map));
else
writer.append("");
if (i == 2)
writer.append('\n');
else
writer.append(',');
}
}
// generate whatever data you want
writer.flush();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void createMap(List<DataPoint> list, String key) {
MetricMap map = null;
for (DataPoint dp : list) {
map = new MetricMap();
map.setName(key);
map.setTime(String.valueOf(dp.getTimestamp()));
metricMaps.put(map, String.valueOf(dp.getValue()));
}
}
}
非常感谢您的帮助。
【问题讨论】:
-
系列按时间排序?我会用它来“并行”迭代所有 3 个,随时随地构建输出。首先将所有内容存储在地图中会使用比必要更多的内存,这可能会限制可伸缩性(您可能可以用更小的时间片查询所有 3 个以减少您必须处理的批量大小)
-
是的,系列是按时间排序的。你能告诉我怎么做吗?
-
我拥有的代码正在将重复值写入 csv 文件。我很震惊。
-
@zapl 你能不能给这点点灯?
标签: java apache-spark kairosdb