在Flink SQL 开发过程中,遇到这样一个错误:Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

错误日志

1
2
3
4
5
6
7
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:114) ~[flink-dist_2.11-1.13-vvr-4.0.13-1-SNAPSHOT.jar:1.13-vvr-4.0.13-1-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93) ~[flink-dist_2.11-1.13-vvr-4.0.13-1-SNAPSHOT.jar:1.13-vvr-4.0.13-1-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) ~[flink-dist_2.11-1.13-vvr-4.0.13-1-SNAPSHOT.jar:1.13-vvr-4.0.13-1-SNAPSHOT]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.13-vvr-4.0.13-1-SNAPSHOT.jar:1.13-vvr-4.0.13-1-SNAPSHOT]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.13-vvr-4.0.13-1-SNAPSHOT.jar:1.13-vvr-4.0.13-1-SNAPSHOT]
at StreamExecCalc$1653.processElement(Unknown Source) ~[?:?]

问题排查

从异常描述来看是:无法将元素转发给下一个算子。

根据含义可以初步定位问题在数据源端,检查源表SQL字段类型、大概率是字段类型不对或者有NULL数据导致的。

  • (比如:数据源主键为INT,FlinkSQL中定义成了VARCHAR,就会报这个错)
  • (比如:FlinkSQL中定义了字段不能为NULL,但其实数据源端有NULL数据,也会报这个错)

解决方法

  1. 首先检查SQL中是否写错了数据源端的字段类型,保证两端的字段数据类型一致。
  2. 再对SQL中定义了NOT NULL的字段,在消费时需要判断NULL并赋个默认值。