【发布时间】:2021-07-20 07:01:14
【问题描述】:
我正在使用 kafka2.11 我正在粘贴我的 server.properties。我有 3 个 kafka 集群节点
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
#broker.rack=1
############################# Socket Server Settings #############################
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://xx.xx.xx.xx:9092
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/home/kafka_data/
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
num.replica.fetchers=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Log Flush Policy #############################
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
log.retention.bytes=1000000
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=xx.xx.xx.xx:2181,xx.xx.xx.xx:2181,xx.xx.xx.xx:2181/kafka
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
###################### Topic configuration ##############################
delete.topic.enable=true
num.partitions=3
default.replication.factor=3
auto.create.topic.enable=false
# Enables automatic leader balancing.
auto.leader.rebalance.enable=true
# The frequency with which the partition rebalance check is triggered by the controller
leader.imbalance.check.interval.seconds=60
#offsets.topic.replication.factor=1
############################## REPLICA SETTING ##########################i#######
# max wait time for each fetcher request issued by follower replicas
replica.fetch.wait.max.ms=300
#replica.fetch.min.bytes=1
# If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isr
replica.lag.time.max.ms=30000
# The socket receive buffer for network requests
replica.socket.receive.buffer.bytes=1000000
# The socket timeout for network requests.
replica.socket.timeout.ms=30000
# The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received
# before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
request.timeout.ms=30000
metadata.max.age.ms=30000
####################### PRODUCER CONFIG ##################################
acks=all
#unclean.leader.election.enable=false
message.max.bytes=1000000000
fetch.message.max.bytes=1000000000
replica.fetch.max.bytes=1000000000
unclean.leader.election.enable=true
#enable.idempotence = true
#message.send.max.retries= 10000000
#max.in.flight.requests.per.connection=1
我设置了log.retention.hours=168,即保留 7 天
我通过创建新组并添加 --from-beginning 属性来消耗数据,但我有 4 天的日志。
bin/kafka-consumer-groups.sh --describe --group consumer2 --bootstrap-server lc-helk.nic.in:9092
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
ise 2 1114176 1114176 0
ise 0 1113757 1113757 0
ise 1 1113627 1113627 0
在日志条目中,我看到 7 月 17 日,21 条日志,它应该显示 7 月 13 日,21 条日志
如果我想更改此 kafka 主题配置,如何在现有 kafka 主题中更改此配置。
【问题讨论】:
-
您设法解决了这个问题吗?您需要更多信息吗?
-
是的,这可以应用于现有主题,例如在 config/server.properties 中设置
log.retention.bytes=-1,因为我们必须停止所有代理来执行此操作 -
那么您介意将此问题标记为已解决吗?还是我误会了?
-
谢谢它的工作
标签: apache-kafka