【发布时间】:2019-03-29 15:27:05
【问题描述】:
这是我的代码:
def saveToOpenTSDB(rows: Iterator[String], url: String) {
val requestConfig: RequestConfig = RequestConfig.custom().setSocketTimeout(2000).setConnectTimeout(2000).setConnectionRequestTimeout(2000).build()
val httpClient: CloseableHttpClient = HttpClients.createDefault() }
错误堆栈跟踪:
2018-10-25 12:40:39,323 信息 org.apache.flink.client.cli.CliFrontend -------------------------------------------------- ------------------------------------------- 2018-10-25 12:40:39,324 信息 org.apache.flink.client。 cli.Cli前端 - 启动命令行客户端(版本:1.6.1,版本:23e2636,日期:14.09.2018 @ 19:56:46 UTC)2018-10-25 12:40:39,324 信息 org.apache.flink.client.cli.CliFrontend - 操作系统 当前用户:root 2018-10-25 12:40:39,676 INFO org.apache.flink.client.cli.CliFrontend - 当前 Hadoop/Kerberos 用户:root 2018-10-25 12:40:39,676 INFO org.apache.flink.client.cli.CliFrontend - JVM: Java HotSpot(TM) 64 位服务器 VM - Oracle Corporation - 1.8/25.172-b11 2018-10-25 12:40:39,676 信息 org.apache.flink.client.cli.CliFrontend - 最大堆大小:7136 MiBytes 2018-10-25 12:40:39,676 INFO org.apache.flink.client.cli.CliFrontend - JAVA_HOME:/usr/java/jdk1.8.0_172 2018-10-25 12:40:39,678 信息 org.apache.flink.client.cli.CliFrontend - Hadoop版本:2.6.5 2018-10-25 12:40:39,678 信息 org.apache.flink.client.cli.CliFrontend - JVM 选项:2018-10-25 12:40:39,678 信息 org.apache.flink.client.cli.CliFrontend -
-Dlog.file=/root/flink-1.6.1/log/flink-root-client-cuiyk-cdn-test-10.log 2018-10-25 12:40:39,678 信息 org.apache.flink.client.cli.CliFrontend - -Dlog4j.configuration=file:/root/flink-1.6.1/conf/log4j-cli.properties 2018-10-25 12:40:39,678 信息 org.apache.flink.client.cli.CliFrontend - -Dlogback.configurationFile=file:/root/flink-1.6.1/conf/logback.xml 2018-10-25 12:40:39,678 INFO org.apache.flink.client.cli.CliFrontend - 程序参数:2018-10-25 12:40:39,678 INFO org.apache.flink.client.cli.CliFrontend -
运行 2018-10-25 12:40:39,678 信息 org.apache.flink.client.cli.CliFrontend - -c 2018-10-25 12:40:39,678 信息 org.apache.flink.client.cli.CliFrontend - dataclean.FlinkDataCleanDemo 2018-10-25 12:40:39,678 INFO org.apache.flink.client.cli.CliFrontend -
--parallelism 2018-10-25 12:40:39,678 信息 org.apache.flink.client.cli.CliFrontend - 4 2018-10-25 12:40:39,678 信息 org.apache.flink.client.cli.CliFrontend - --jobmanager 2018-10-25 12:40:39,679 信息 org.apache.flink.client.cli.CliFrontend -
cuiyk-cdn-test-4:42115 2018-10-25 12:40:39,679 信息 org.apache.flink.client.cli.CliFrontend -
./flinkkafka2tsdb.jar 2018-10-25 12:40:39,679 信息 org.apache.flink.client.cli.CliFrontend - 类路径: /root/flink-1.6.1/lib/flink-python_2.11-1.6.1.jar:/root/flink-1.6.1/lib/flink-shaded-hadoop2-uber-1.6.1.jar:/root /flink-1.6.1/lib/log4j-1.2.17.jar:/root/flink-1.6.1/lib/slf4j-log4j12-1.7.7.jar:/root/flink-1.6.1/lib/flink -dist_2.11-1.6.1.jar::/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hadoop/etc/hadoop: 2018-10-25 12:40:39,679 信息 org.apache.flink.client.cli.CliFrontend -------------------------------------------------- ------------------------------------------- 2018-10-25 12:40:39,682 信息 org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:classloader.resolve-order, 父母至上 2018-10-25 12:40:39,682 信息 org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:jobmanager.rpc.address, localhost 2018-10-25 12:40:39,682 信息 org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:jobmanager.rpc.port, 6123 2018-10-25 12:40:39,682 信息 org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:jobmanager.heap.size, 1024m 2018-10-25 12:40:39,682 INFO org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:taskmanager.heap.size, 1024m 2018-10-25 12:40:39,683 信息 org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:taskmanager.numberOfTaskSlots, 1 2018-10-25 12:40:39,683 信息 org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:parallelism.default, 1 2018-10-25 12:40:39,683 信息 org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:rest.port, 8081 2018-10-25 12:40:39,696 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli
- 在 /tmp/.yarn-properties-root 下找到 Yarn 属性文件。 2018-10-25 12:40:39,914 信息 org.apache.flink.runtime.security.modules.HadoopModule - Hadoop 用户设置为 root (auth:SIMPLE) 2018-10-25 12:40:39,936 信息 org.apache.flink.client.cli.CliFrontend - 运行“运行”命令。 2018-10-25 12:40:39,940 信息 org.apache.flink.client.cli.CliFrontend - 从 JAR 文件构建程序 2018-10-25 12:40:40,314 INFO org.apache.flink.runtime.rest.RestClient - 休息 客户端端点已启动。 2018-10-25 12:40:40,316 信息 org.apache.flink.client.cli.CliFrontend - 开始执行程序 2018-10-25 12:40:40,316 INFO org.apache.flink.client.program.rest.RestClusterClient - 以交互模式启动程序(分离:false) 2018-10-25 12:40:40,346 警告 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - 忽略配置的key DeSerializer (key.deserializer) 2018-10-25 12:40:40,346 警告 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - 忽略配置值 DeSerializer (value.deserializer) 2018-10-25 12:40:40,532 信息 org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:classloader.resolve-order, parent-first 2018-10-25 12:40:40,532 INFO org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:jobmanager.rpc.address, localhost 2018-10-25 12:40:40,532 信息 org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:jobmanager.rpc.port, 6123 2018-10-25 12:40:40,532 信息 org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:jobmanager.heap.size, 1024m 2018-10-25 12:40:40,532 INFO org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:taskmanager.heap.size, 1024m 2018-10-25 12:40:40,532 信息 org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:taskmanager.numberOfTaskSlots, 1 2018-10-25 12:40:40,532 信息 org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:parallelism.default, 1 2018-10-25 12:40:40,533 信息 org.apache.flink.configuration.GlobalConfiguration - 加载配置属性:rest.port, 8081 2018-10-25 12:40:40,540 INFO org.apache.flink.client.program.rest.RestClusterClient - 提交作业 1b45b5780e0e7067fae20be1db8e7c2f(分离:假)。 2018-10-25 12:41:38,177 信息 org.apache.flink.runtime.rest.RestClient - 关闭休息端点。 2018-10-25 12:41:38,179 信息 org.apache.flink.runtime.rest.RestClient - 休息 端点关闭完成。 2018-10-25 12:41:38,180 错误 org.apache.flink.client.cli.CliFrontend - 错误 在运行命令时。 org.apache.flink.client.program.ProgramInvocationException:作业 失败的。 (作业 ID:1b45b5780e0e7067fae20be1db8e7c2f) 在 org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267) 在 org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486) 在 org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) 在 org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) 在 dataclean.FlinkDataCleanDemo$.main(FlinkDataCleanDemo.scala:162) 在 dataclean.FlinkDataCleanDemo.main(FlinkDataCleanDemo.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) 在 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) 在 org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426) 在 org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804) 在 org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280) 在 org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215) 在 org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044) 在 org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120) 在 java.security.AccessController.doPrivileged(本机方法) 在 javax.security.auth.Subject.doAs(Subject.java:422) 在 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) 在 org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) 在 org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120) 引起:java.lang.NoClassDefFoundError:无法初始化类 org.apache.http.conn.ssl.SSLConnectionSocketFactory 在 org.apache.http.impl.client.HttpClientBuilder.build(HttpClientBuilder.java:912) 在 org.apache.http.impl.client.HttpClients.createDefault(HttpClients.java:58) 在 dataclean.CommonDeploy$.saveToOpenTSDB(CommonDeploy.scala:21) 在 dataclean.FlinkDataCleanDemo$$anonfun$main$1.apply(FlinkDataCleanDemo.scala:152) 在 dataclean.FlinkDataCleanDemo$$anonfun$main$1.apply(FlinkDataCleanDemo.scala:150) 在 org.apache.flink.streaming.api.scala.DataStream$$anon$4.map(DataStream.scala:607) 在 org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) 在 org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) 在 org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:663) 在 org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:663) 在 scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:73) 在 scala.collection.mutable.MutableList.foreach(MutableList.scala:30) 在 org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:663) 在 org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) 在 org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) 在 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) 在 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) 在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) 在 java.lang.Thread.run(Thread.java:748)
//related dependecies:
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.4</version>
</dependency>
【问题讨论】:
-
非常感谢。这正是我想要表达的!
-
我确实通过这种方式解决了这个问题。
-
@GhostCat 哦,你是对的,谢谢你的这种承诺!
-
你终于可以删除不再需要的 cmets...
标签: apache-kafka apache-flink opentsdb