kafka/config/ 目录下面有 3 个配置文件
- producer.properties:生产端的配置文件
- consumer.properties:消费端的配置文件
- server.properties:服务端的配置文件
server.properties:服务端的配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
|
broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/logs/kafka
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.roll.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=true
log.cleanup.policy=delete
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
zookeeper.connection.timeout.ms=6000
log.flush.interval.messages=10000
log.flush.interval.ms=3000
delete.topic.enable=true
host.name=kafka01
advertised.host.name=192.168.239.128
|
数据删除
kafka将topic分成不同的partitions,每个partition的日志分成不同的segments,最后以segment为单位将陈旧的日志从文件系统删除。
假设kafka的在server.properity文件中设置的日志目录为tmp/kafka-logs,对于名为test_perf的topic。假设两个partitions,那么我们可以在tmp/kafka-logs目录下看到目录VST_TOPIC-0,VST_TOPIC-1。也就是说kafka使用目录表示topic 分区。
VST_TOPIC-0目录下下,可以看到后缀名为.log和.index的文件,如下
1 2 3
| [root@kafka kafka-logs]# ls test_perf-0/ 00000000000003417135.index.deleted 00000000000003518540.index 00000000000003619945.index 00000000000003417135.log.deleted 00000000000003518540.log 00000000000003619945.log
|
如果所有待删除的陈旧日志都清理了,那么是看不到后缀名为.deleted的文件的。
0x01 基于时间的删除策略
在server.properity文件中设置如下:
1 2 3 4 5
| log.retention.hours=168 //7d log.retention.check.interval.ms=300000 //5min log.segment.bytes=1073741824 //1G log.cleaner.delete.retention.ms=86400000 // 1d log.cleaner.backoff.ms=15000 //15s
|
每个segment的大小为1GB,每5分钟检查一次是否有segment已经查过了7d,如果有将其标记为deleted。标记为deleted的segment默认会保留1天,清理线程会每隔15秒检查一次,是否有标记为deleted的segment的保留时间超过一天了,如果有将其从文件系统删除。
大家注意,kafka清理时是不管该segment中的消息是否被消费过,它清理的依据为是否超过了指定的保留时间,仅此而已。
0x02 基于文件大小的删除策略
在server.properity文件中设置:
你可以同时指定log.retention.bytes和log.retention.hours来混合指定保留规则。一旦日志的大小超过了log.retention.bytes就清除老的segment,一旦某个segment的保留时间超过了规定的值同样将其清除。
log.retention.bytes和log.retention.hours任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖。
在kafka 0.9.0之后 log.cleaner.enable 默认是true。支持的清理策略(log.cleanup.policy)有2种:delete和compact,默认是delete。
producer.properties:生产端的配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
|
metadata.broker.list=kafka01:9092,kafka02:9092,kafka03:9092
compression.codec=none
serializer.class=kafka.serializer.DefaultEncoder
request.required.acks=0
request.timeout.ms=10000
producer.type=sync
queue.buffering.max.ms = 5000
queue.buffering.max.messages=20000
batch.num.messages=500
queue.enqueue.timeout.ms=-1
message.send.max.retries=3
topic.metadata.refresh.interval.ms=60000
|
consumer.properties:消费端的配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
zookeeper.session.timeout.ms=5000
zookeeper.connection.timeout.ms=10000
zookeeper.sync.time.ms=2000
group.id=xxxxx
auto.commit.enable=true
auto.commit.interval.ms=1000
conusmer.id=xxx
client.id=xxxx
queued.max.message.chunks=50
rebalance.max.retries=5
fetch.min.bytes=6553600
fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360
auto.offset.reset=smallest
derializer.class=kafka.serializer.DefaultDecoder
|