【问题标题】:How to send tomcat access logs to kafka如何将tomcat访问日志发送到kafka
【发布时间】:2018-06-07 19:29:51
【问题描述】:

我想将 Tomcat 访问日志发送到 kafka 主题。我已阅读 tomcat 日志记录文档,发现 tomcat 使用 apache juli。

我想删除默认日志并将所有访问日志发送到 kafka。

我在 server.xml 中找到了

 <Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
               prefix="localhost_access_log" suffix=".txt"
               pattern="%h %l %u %t &quot;%r&quot; %s %b" />

现在我需要更改此设置,但如何更改?

【问题讨论】:

    标签: java tomcat spring-boot tomcat8


    【解决方案1】:

    你可以翻看tomcat的源码,你会发现key存在于AccessLogValve.java中:

    对于 Tomcat 8:

    public void log(CharArrayWriter message) {
        this.rotate();
        if (this.checkExists) {
            synchronized(this) {
                if (this.currentLogFile != null && !this.currentLogFile.exists()) {
                    try {
                        this.close(false);
                    } catch (Throwable var8) {
                        ExceptionUtils.handleThrowable(var8);
                        log.info(sm.getString("accessLogValve.closeFail"), var8);
                    }
    
                    this.dateStamp = this.fileDateFormatter.format(new Date(System.currentTimeMillis()));
                    this.open();
                }
            }
        }
    
        try {
            synchronized(this) {
                if (this.writer != null) {
                    message.writeTo(this.writer);
                    this.writer.println("");
                    if (!this.buffered) {
                        this.writer.flush();
                    }
                }
            }
        } catch (IOException var7) {
            log.warn(sm.getString("accessLogValve.writeFail", new Object[]{message.toString()}), var7);
        }
    
    }
    

    你应该记录一下,然后你就会知道如何配置。

    那么我们开始吧,你应该创建一个类扩展 ValveBase 实现 AccessLog,如:

    public class LeKafkaAccesslogValve extends ValveBase implements AccessLog {
        private String topic;
        private String bootstrapServers;
    
        //  If set to zero then the producer will not wait for any acknowledgment from the server at all.
        private String acks;
    
        private String producerSize ;
    
        private String properties;
    
        private List<Producer<byte[], byte[]>> producerList;
        private AtomicInteger producerIndex = new AtomicInteger(0);
        private int timeoutMillis;
        private boolean enabled = true; 
    
        private String pattern;
        private AccessLogElement accessLogElement;
        private String localeName;
        private Locale locale = Locale.getDefault();
    
    
        @Override
        public void log(Request request, Response response, long l) {
            if (producerList != null && getEnabled() && getState().isAvailable() && null != this.accessLogElement) {
                try {
                    getNextProducer().send(new ProducerRecord<byte[], byte[]>(topic, this.accessLogElement.buildLog(request,response,time,this).getBytes(StandardCharsets.UTF_8))).get(timeoutMillis, TimeUnit.MILLISECONDS);
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    log.error('accesslog in kafka exception', e);
                }
            }
        }
    
        @Override
        public void setRequestAttributesEnabled(boolean b) {
            //some other code if you would like
        }
    
        @Override
        public boolean getRequestAttributesEnabled() {
            //some other code if you would like
            return false;
        }
    
        @Override
        public void invoke(Request request, Response response) throws IOException, ServletException {
            //some other code if you would like
        }
    }
    

    然后你应该在 server.xml 中添加你自己的配置,比如:

    <Valve className='com.xxx.lekafkavalve.LeKafkaAccesslogValve'         enabled='true'  topic='info' pattern='%{yyyy-MM-dd     HH:mm:ss}t||info||AccessValve||Tomcat||%A||%a||%r||%s||%D' bootstrapServers='kafkaaddress' producerSize='5' properties='acks=0||producer.size=3'/>
    

    【讨论】:

    • 谢谢张,让我试试这个。
    【解决方案2】:

    嗯。另外,你可以将日志框架切换到log4j2以获得更高的效率,这样向kafka发送消息不会导致主要的速度下降

    【讨论】:

      猜你喜欢
      • 2014-08-08
      • 2020-06-02
      • 1970-01-01
      • 2021-11-26
      • 2019-02-28
      • 1970-01-01
      • 2018-01-14
      • 1970-01-01
      • 2016-08-15
      相关资源
      最近更新 更多