【问题标题】:FlinkQueryableState: configuration issues on a local clusterFlinkQueryableState:本地集群上的配置问题
【发布时间】:2019-01-24 00:49:09
【问题描述】:

我正在从 IDE 运行 flink。将数据存储在可查询对象中是有效的, 但不知何故,当我查询它时,它会引发异常。

异常

Failure(akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:6123/), Path(/user/jobmanager)])

我的代码:

config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,"localhost")
config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,"6123")

@throws[Throwable]
def recover(failure: Throwable): Future[Array[Byte]] = if (failure.isInstanceOf[AssertionError]) return Futures.failed(failure)
else {
  // At startup some failures are expected
  // due to races. Make sure that they don't
  // fail this test.
  return Patterns.after(retryDelay, TEST_ACTOR_SYSTEM.scheduler, TEST_ACTOR_SYSTEM.dispatcher, new Callable[Future[Array[Byte]]]() {
    @throws[Exception]
    def call: Future[Array[Byte]] = return getKvStateWithRetries(queryName, key, serializedKey)
  })
}
}

  @SuppressWarnings(Array("unchecked"))
  private def getKvStateWithRetries(queryName: String,
                                keyHash: Int,
                                serializedKey: Array[Byte]): Future[Array[Byte]] = {

val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey)
kvState.recoverWith(recover(queryName, keyHash, serializedKey))
  }

def onSuccess = new OnSuccess[Array[Byte]]() {
@throws(classOf[Throwable])
override def onSuccess(result: Array[Byte]): Unit = {
  println("found record ")
  val value = KvStateRequestSerializer.deserializeValue(result, valueSerializer)
  println(value)
 }
}


override def invoke(query: QueryMetaData): Unit = {
println("getting inside querystore"+query.record)
val serializedResult = flinkQuery.getResult(query.record, queryName)
serializedResult.onSuccess(onSuccess) 

我没有生成新的迷你集群或集群。提交 喜欢https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java 因为我想在与使用 env.execute 运行的主应用程序相同的环境中的同一集群中进行此操作。这一步有必要吗?

从文档中默认 flink 在 localhost:6123 运行 连接有问题吗?我需要在单独的集群中提交作业吗?

【问题讨论】:

  • 还有办法知道jobmanager在哪里运行。我找不到 api
  • 你是如何提交你的工作的?你能分享你的工作提交日志吗?
  • 我正在从 IDE 运行我的作业,我认为从 IDE 运行时无法连接到作业管理器。它使用 yarn/cluster 模式
  • 是的,没错。

标签: apache-flink flink-streaming


【解决方案1】:

经过大量谷歌搜索后,我找到了解决方案。

我正在使用 LocalStreamEnvironment 并遇到相同的错误,直到找到此线程 RemoteEnv connect failed。描述的错误是针对不同的设置(不是本地),但用于测试的主题中包含的 gist 示例正在创建 LocalFlinkMiniCluster,并将参数“useSingleActorSystem”设置为 false

查看 LocalStreamEnvironment 的实现,MiniCluster 是在“useSingleActorSystem”设置为 true 的情况下创建的。

我只是创建了一个扩展 LocalStreamEnvironment 的类 LocalQueryableStreamEnvironment,其中创建了迷你集群,“useSingleActorSystem”设置为 true,一切都在 IDE 中运行。

现在我的代码如下:

配置:

Configuration config = new Configuration();
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 6);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
config.setInteger(JobManagerOptions.WEB_PORT, JobManagerOptions.WEB_PORT.defaultValue());
config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT,JobManagerOptions.PORT.defaultValue());
**config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);**

注意:QueryableState 仅适用于将此配置 LOCAL_NUMBER_TASK_MANAGER 设置为大于 1 的值!

实例化/执行环境:

LocalQueryableStreamEnvironment env = LocalQueryableStreamEnvironment.createLocalEnvironment(3, config);
...
env.addSource(anySource)
   .keyby(anyAtribute)
   .flatmap(new UpdateMyStateToBeQueriedLaterMapper())
   .addSink(..); //etc
...
env.execute("JobNameHere");

并创建客户端:

final Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, JobManagerOptions.PORT.defaultValue());

HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils
    .createHighAvailabilityServices(
                   config, 
                   Executors.newSingleThreadScheduledExecutor(),
                   HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION
    );
return new QueryableStateClient(config,highAvailabilityServices);

更多信息访问:

Queryable States in ApacheFlink - Implementation

Queryable State Client with 1.3.0-rc0

我的依赖:

compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-cep_2.11', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.11', version: '1.3.1'
compile 'org.apache.flink:flink-runtime-web_2.11:1.3.1'

【讨论】:

    猜你喜欢
    • 2019-03-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-23
    • 2021-07-20
    • 2021-11-02
    • 1970-01-01
    相关资源
    最近更新 更多