Flink 配置之日志配置

前言

本文介绍的内存配置方法基于 Flink 1.10

Flink 应用程序容器创建并写入三种类型的日志文件:.out 文件、.log 文件和 .err 文件。仅限将 .err 文件压缩并从文件系统中删除,而将 .log 和 .out 日志文件保留在文件系统中。

每个独立的JobManager,TaskManager,HistoryServer和zookeeper守护重定向stdout和stderr到一个后缀名未.out的文件,并写入内部记录到一个后缀名为.log的文件。

配置log4j

Flink1.10 使用的默认日志是 Log4j,配置文件的如下:

  • log4j-cli.properties: 由Flink命令行客户端使用(例如flink run)
  • log4j-yarn-session.properties: 由Flink命令行启动YARN Session(yarn-session.sh)时使用
  • log4j.properties: JobManager / Taskmanager日志(包括standalone和YARN)

默认配置(文件)

log4j.properties 内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n


# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

滚动配置(按大小)

默认配置文件会将 JobManager 和 TaskManager 的日志分别打印在不同的文件中,每个文件的日志大小一直会增加.

生产环境建议将日志文件配置成按大小滚动生成,配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# This affects logging for both user code and Flink
log4j.rootLogger=INFO, R

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO


log4j.appender.R=org.apache.log4j.RollingFileAppender
#日志文件名
log4j.appender.R.File=${log.file}
log4j.appender.R.MaxFileSize=256MB
log4j.appender.R.Append=true
log4j.appender.R.MaxBackupIndex=10
log4j.appender.R.layout=org.apache.log4j.PatternLayout
# 输出模板
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %t %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, R

定期滚动日志文件(按时间)

1
2
3
4
5
6
7
8
9
10
11
log4j.rootLogger=INFO,dailyFile

# 定期滚动日志文件(dailyFile)
log4j.appender.dailyFile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.dailyFile.Threshold=INFO
log4j.appender.dailyFile.ImmediateFlush=true
log4j.appender.dailyFile.Append=true
log4j.appender.dailyFile.File=日志存放路径
log4j.appender.dailyFile.DatePattern='.'yyyy-MM-dd
log4j.appender.dailyFile.layout=org.apache.log4j.PatternLayout
log4j.appender.dailyFile.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %t %-5p %-60c %x - %m%n

console

1
2
3
4
5
6
7
8
9
10
log4j.rootLogger=INFO,console,dailyFile
log4j.additivity.org.apache=true

# 控制台(console)
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=DEBUG
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%p] %m%n

Kafka配置

有的时候需要将日志发送到kafka做一些监控告警或者统一采集到ELK查看分析, 则可以使用KafkaLog4jAppender发送到kafka, 配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# This affects logging for both user code and Flink
log4j.rootLogger=INFO, kafka


# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
# !!!这里的配置要加上kafka,否则会卡在kafka send!!!
log4j.logger.akka=INFO, kafka
log4j.logger.org.apache.kafka=INFO, kafka
log4j.logger.org.apache.hadoop=INFO, kafka
log4j.logger.org.apache.zookeeper=INFO, kafka


# log send to kafka
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.brokerList=localhost:9092
log4j.appender.kafka.topic=flink_logs
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.requiredNumAcks=0
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=[frex] [%d{yyyy-MM-dd HH:mm:ss,SSS}] [%p] %c{1}:%L %x - %m%n
log4j.appender.kafka.level=INFO


# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, kafka

同时需要把 kafka-log4j-appender包放到 ${FLINK_HOME}/lib下

Flink1.11.1

JobManager 和 TaskManager 日志输出在同一个文件中。

这两个版本,我们都可以对Flink日志做滚动的配置,控制每个日志文件的输出大小。

滚动日志的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 滚动日志的配置
# This affects logging for both user code and Flink
rootLogger.level = DEBUG
rootLogger.appenderRef.rolling.ref = RollingFileAppender

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
#日志文件名
appender.rolling.fileName = ${sys:log.file}
#指定当发生文件滚动时,文件重命名规则
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
# 输出模板
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# 指定记录文件的保存策略,该策略主要是完成周期性的日志文件保存工作
appender.rolling.policies.type = Policies
# 基于日志文件大小的触发策略
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
# 当日志文件大小大于size指定的值时,触发滚动
appender.rolling.policies.size.size = 5MB
# 文件保存的覆盖策略
appender.rolling.strategy.type = DefaultRolloverStrategy
# 生成分割(保存)文件的个数,默认为5(-1,-2,-3,-4,-5)
appender.rolling.strategy.max = 10

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

报错处理

log4j:ERROR Could not find value for key log4j.appender.file

1
2
log4j:ERROR Could not find value for key log4j.appender.file
log4j:ERROR Could not instantiate appender named "file".

问题解析

  • 多半是配置文件中出现了 file 变量,比如 log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file, 但是 log4j.rootLogger=INFO, R 设置输出的不是 file(log4j.rootLogger=INFO, file)。

关于 log.file 变量,可以参考 http://www.javashuo.com/article/p-wxmyorug-es.html


Flink 配置之日志配置
https://flepeng.github.io/044-Flink-11-安装和配置-Flink-配置之日志配置/
作者
Lepeng
发布于
2021年3月8日
许可协议