【问题标题】:OpenTracing with Kafka Streams - How to?使用 Kafka Streams 进行 OpenTracing - 如何?
【发布时间】:2018-10-18 13:43:47
【问题描述】:

我正在尝试将 Jaeger 跟踪集成到 K-Streams 中。我正计划将跟踪添加到我最重要的几个管道中,并且想知道将 traceid 从一个管道传递到另一个管道的好方法是什么?

到目前为止,这是我所拥有的 - 在流处理管道开始时,我启动了一个服务器跨度并将 traceid 保存到状态存储中。稍后,在转换管道中,我访问 statestore 并从 transform() 方法捕获跟踪。这是在流处理中处理跟踪的好方法吗?

input
  .mapValues(toSomethingThatMyAppUnderstands)      
  .mapValues(this::startStreamTrace)
  .filter((k, v) -> v.isPresent())            
  .mapValues(Optional::get)                   
  .mapValues(doSomethingHereWith)       
  .flatMapValues(doSomethingElse)       
  .filter((k, v) -> isInterestingEvent(v))    
  .transform(() -> new TransformerWithTracing<SomeObjectA, SomeObjectB>(IN_MEM_STORE_NAME, someFunction), IN_MEM_STORE_NAME)
  .flatMapValues(c -> c)
  .to(outTopic, Produced.with(Serdes.String(), new EventSerde()));



public class TransformerWithTracing<V, VR> implements Transformer<String, V, KeyValue<String, VR>> {

  final Function valueAction;
  final String storeId;
  private KeyValueStore<String, String> traceIdStore;

  public TransformerWithTracing(String storeId, Function valueAction) {
    this.storeId = storeId;
    this.valueAction = valueAction;
  }

  @Override
  public void init(ProcessorContext context) {
   // KeyValueStore store = ((KeyValueStore<String, String>) context.getStateStore(storeId));
    InMemoryKeyValueStore inMemoryKeyValueStore = (InMemoryKeyValueStore) store;
    this.traceIdStore = store;
  }

  @Override
  public KeyValue<String, VR> transform(String key, V value) {
    System.out.println(traceIdStore.get(key));

    // BuildTraceHeader 
    try(Scope scope = serviceTracer.startServerSpan(traceHeader, "Converting to Enterprise Event")) {
      return KeyValue.pair(key, (VR) valueAction.apply(value));
    }
  }

  @Override
  public KeyValue<String, VR> punctuate(long timestamp) {
    return null;
  }

  @Override
  public void close() {
//    if (streamId != null)   traceIdStore.delete(streamId);
  }

}

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams jaeger


    【解决方案1】:

    @jeqo 在这个 zipkin/brave repo 中有类似的想法。

    https://github.com/jeqo/brave/tree/kafka-streams-processor/instrumentation/kafka-streams

    opentracing-contrib repo 中似乎也有一些可用的东西,但它似乎只在跟踪生产者/消费者级别。

    https://github.com/opentracing-contrib/java-kafka-client/tree/master/opentracing-kafka-streams

    • 伦尼

    【讨论】:

    • 谢谢!这就是我最终选择的路线——带商店的变压器供应商。
    猜你喜欢
    • 2018-04-19
    • 1970-01-01
    • 1970-01-01
    • 2017-08-30
    • 1970-01-01
    • 2019-03-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多