spark通过kafka-appender指定⽇志输出到kafka引发的死锁问题
在采⽤log4j的kafka-appender收集spark任务运⾏⽇志时,发现提交到yarn上的任务始终ACCEPTED状态,⽆法进⼊RUNNING状态,并且会重试两次后超时。期初认为是yarn资源不⾜导致,但在确认yarn资源
充裕的时候问题依旧,⽽且基本上能稳定复现。
起初是这么配置spark⽇志输出到kafka的:
sole=org.apache.log4j.ConsoleAppender
sole.
sole.layout=org.apache.log4j.PatternLayout
sole.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m%n
# Kafka appender
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# Set Kafka topic and brokerList
log4j.pic=yarn_spark_log
log4j.appender.kafka.brokerList=localhost:9092
log4j.appender.kafkapressionType=none
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=10
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m
这⾥⽤org.apache.kafka.log4jappender.KafkaLog4jAppender默认将所有⽇志都输出到kafka,这个appender已经被kafka官⽅维护,稳定性应该是可以保障的。
问题定位
发现问题后,尝试将输出到kafka的规则去掉,问题解除!于是把问题定位到跟⽇志输出到kafka有关。通过其他测试,证实⽬标kafka其实是正常的,这就⾮常奇怪了。
查看yarn的ResourceManager⽇志,发现有如下超时
2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.util.AbstractLivelinessMonitor: Expired:appattempt_1578970174552_3204_000002 Timed out after 600 secs
2020-05-07 21:49:48,230 INFO org.apache.hadoop.app.attempt.RMAppAttemptImpl: Updating application attempt
appattempt_1578970174552_3204_000002 with final
state: FAILED, and exit status: -1000
2020-05-07 21:49:48,231 INFO org.apache.hadoop.app.attempt.RMAppAttemptImpl: appattempt_1578970174552_3204_000002 State change from
LAUNCHED to FINAL_SAV
ING on event = EXPIRE
表明,yarn本⾝是接收任务的,但是发现任务迟迟没有启动。在spark的场景下其实是指只有driver启动了,但是没有启动executor。
⽽查看driver⽇志,发现⽇志输出到⼀个地⽅就卡住了,不往下继续了。通过对⽐成功运⾏和卡住的情况发现,⽇志卡在这条上:
2020/05/07 19:37:10.324 INFO SecurityManager: Changing view acls to: yarn,root
2020/05/07 19:37:10.344 INFO Metadata: Cluster ID: 6iG6WHA2SoK7FfgGgWHt_A
卡住的情况下,只会打出SecurityManager这⾏,⽽⽆法打出Metadata这⾏。
猜想Metadata这⾏是kafka-client本⾝打出来的,因为整个上下⽂只有yarn, spark, kafka-client可能会打出这个⽇志。
在kafka-client 2.2.0版本中到这个⽇志是输出位置:
public synchronized void update(MetadataResponse metadataResponse, long now) {
...
String newClusterId = cache.cluster().clusterResource().clusterId();
if (!Objects.equals(previousClusterId, newClusterId)) {
log.info("Cluster ID: {}", newClusterId);
}
...
}
看到synchronized,⾼度怀疑死锁。于是考虑⽤jstack分析:
在yarn上运⾏spark任务的时候,driver进程叫ApplicationMaster,executor进程叫CoarseGrainedExecutorBackend。这⾥⾸先尝试再复现过程中到drvier最终在哪个节点上运⾏,然后快速
使⽤jstack -F <pid>打印堆栈
jstack果然不负众望,报告了死锁!这⾥我把结果贴的全⼀点
[root@node1 ~]# jstack 20136
20136: Unable to open socket file: target process not responding or HotSpot VM not loaded
The -F option can be used when the target process is not responding
[root@node1 ~]# jstack -F 20136
Attaching to process ID 20136,
Debugger attached successfully.
Server compiler detected.
JVM version is 25.231-b11
Deadlock Detection:
Found one Java-level deadlock:
=============================
"kafka-producer-network-thread | producer-1":
waiting to lock Monitor@0x00000000025fcc48 (Object@0x00000000ed680b60, a org/apache/kafka/log4jappender/KafkaLog4jAppender),
which is held by "main"
"main":
waiting to lock Monitor@0x00007fec9dbde038 (Object@0x00000000ee44de38, a org/apache/kafka/clients/Metadata),
which is held by "kafka-producer-network-thread | producer-1"
Found a total of 1 deadlock.
Thread 20157: (state = BLOCKED)
- org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=0, line=231 (Interpreted frame)
- org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame)
- org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame)
- org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame)
kafka最新版本- org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame)
-
org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String, java.lang.Object) @bci=34, line=324 (Interpreted frame)
- org.apache.kafka.clients.Metadata.update(org.quests.MetadataResponse, long) @bci=317, line=365 (Interpreted frame)
- org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(org.quests.RequestHeader, long, org.quests.MetadataResponse) @bci=184, line=1031 (Interpreted frame  - org.apache.kafka.clients.NetworkClient.handleCompletedReceives(java.util.List, long) @bci=215, line=822 (Interpreted frame)
- org.apache.kafka.clients.NetworkClient.poll(long, long) @bci=132, line=544 (Interpreted frame)
- org.apache.kafka.clients.producer.internals.Sender.run(long) @bci=227, line=311 (Interpreted frame)
- org.apache.kafka.clients.producer.internals.Sender.run() @bci=28, line=235 (Interpreted frame)
- java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)
Thread 20150: (state = BLOCKED)
Thread 20149: (state = BLOCKED)
- java.lang.Object.wait(long) @bci=0 (Interpreted frame)
- ve(long) @bci=59, line=144 (Interpreted frame)
- ve() @bci=2, line=165 (Interpreted frame)
- f.Finalizer$FinalizerThread.run() @bci=36, line=216 (Interpreted frame)
Thread 20148: (state = BLOCKED)
- java.lang.Object.wait(long) @bci=0 (Interpreted frame)
- java.lang.Object.wait() @bci=2, line=502 (Interpreted frame)
- HandlePending(boolean) @bci=54, line=191 (Interpreted frame)
- f.Reference$ReferenceHandler.run() @bci=1, line=153 (Interpreted frame)
Thread 20137: (state = BLOCKED)
-
java.lang.Object.wait(long) @bci=0 (Interpreted frame)
- org.apache.kafka.clients.Metadata.awaitUpdate(int, long) @bci=63, line=261 (Interpreted frame)
- org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(java.lang.String, java.lang.Integer, long) @bci=160, line=983 (Interpreted frame)
- org.apache.kafka.clients.producer.KafkaProducer.doSend(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=19, line=860 (Interpreted frame)
- org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=12, line=840 (Interpreted frame)
- org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord) @bci=3, line=727 (Interpreted frame)
- org.apache.kafka.log4jappender.KafkaLog4jAppender.append(org.apache.log4j.spi.LoggingEvent) @
bci=69, line=283 (Interpreted frame)
- org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=106, line=251 (Interpreted frame)
- org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame)
- org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame)
- org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame)
- org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame)
- org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String) @bci=12, line=305 (Interpreted frame)
-
org.apache.spark.internal.Logging$class.logInfo(org.apache.spark.internal.Logging, scala.Function0) @bci=29, line=54 (Interpreted frame)
- org.apache.spark.SecurityManager.logInfo(scala.Function0) @bci=2, line=44 (Interpreted frame)
- org.apache.spark.SecurityManager.llection.immutable.Set, java.lang.String) @bci=36, line=139 (Interpreted frame)
- org.apache.spark.SecurityManager.<init>(org.apache.spark.SparkConf, scala.Option) @bci=158, line=81 (Interpreted frame)
- org.apache.spark.deploy.yarn.ApplicationMaster.<init>(org.apache.spark.deploy.yarn.ApplicationMasterArguments) @bci=85, line=70 (Interpreted frame)
- org.apache.spark.deploy.yarn.ApplicationMaster$.main(java.lang.String[]) @bci=25, line=802 (Interpreted frame)
- org.apache.spark.deploy.yarn.ApplicationMaster.main(java.lang.String[]) @bci=4 (Interpreted frame)
到这⾥,已经确定是死锁,导致driver⼀开始就运⾏停滞,那么当然⽆法提交executor执⾏。
具体的死锁稍后分析,先考虑如何解决。从感性认识看,似乎只要不让kafka-client的⽇志也输出到kafka即可。实验后,发现果然如此:如果只输出org.apache.spark的⽇志就可以正常执⾏。
根因分析
从stack的结果看,造成死锁的是如下两个线程:
kafka-client内部的⽹络线程spark
主⼊⼝线程
两个线程其实都是卡在打⽇志上了,观察堆栈可以发现,两个线程同时持有了同⼀个log对象。⽽这个log对象实际上是kafka-appender。⽽kafka-appender本质上持有kafka-client,及其内部的Metadata 对象。log4j的doAppend为了保证线程安全也⽤synchronized修饰了:
public
synchronized
void doAppend(LoggingEvent event) {
if(closed) {
<("Attempted to append to closed appender named ["+name+"].");
return;
}
if(!isAsSevereAsThreshold(event.level)) {
return;
}
Filter f = this.headFilter;
FILTER_LOOP:
while(f != null) {
switch(f.decide(event)) {
case Filter.DENY: return;
case Filter.ACCEPT: break FILTER_LOOP;
case Filter.NEUTRAL: f = f.next;
}
}
this.append(event);
}
于是事情开始了:
main线程尝试打⽇志,⾸先进⼊了synchronized的doAppend,即获取了kafka-appender的锁
kafka-appender内部需要调⽤kafka-client发送⽇志到kafka,最终调⽤到Thread 20137展⽰的,运⾏到Metadata.awaitUpdate(也是个synchronized⽅法),内部的wait会尝试获取metadata的锁。(详见github/)
但此时,kafka-producer-network-thread线程刚好进⼊了上⽂提到的打Cluster ID这个⽇志的这个阶段(update⽅法也是synchronized的),也就是说kafka-producer-network-thread线程获得了metadata对象的锁
kafka-producer-network-thread线程要打印⽇志同样执⾏synchronized的doAppend,即获取了kafka-appender的锁
上图main-thread持有了log对象锁,要求获取metadata对象锁;kafka-producer-network-thread持有了metadata对象锁,要求获取log对象锁于是造成了死锁。
总结
到此这篇关于spark通过kafka-appender指定⽇志输出到kafka引发的死锁的⽂章就介绍到这了,更多相关spark指定⽇志输出内容请搜索以前的⽂章或继续浏览下⾯的相关⽂章希望⼤家以后多多⽀持!

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。