最近 用户提交了一个问题 说他的jar包里明明包含相关的类型 但是在提交Flink作业的时候 却报出classnotfound的错误

查看之后发现 这里是flink的一个没有说的太明白的地方

用户的代码之所以报错 是因为在代码中引用了mapreduce相关的东西 

我们知道 flink会在生成jobGraph的时候就解析所有需要序列化的类型 这里就涉及需要解析mapReduce的类型 比如Text

但是用户明明打进去了呀 怎么还是找不到

这就涉及flink的类加载机制 flink对于自己的代码 采用默认的java的类加载机制 但是对于用户的代码 使用了自定义的FlinkClassLoader

好吧 这就是问题所在 因为在解析序列化类型的时候 flink会传入默认的类加载器 这个类加载器不包含用户代码 所以在寻找的时候 显然是找不到

知道了问题的症结所在 解决起来却不完美

方案1:将相关的依赖放入lib目录,即加入flink的类加载器

方案2:用户提交作业的时候,动态的将用户的类加入默认的类加载器

以上两个方法都可以解决问题,但缺点也是明显的:

方案1的缺点在于需要频繁的更新flink的lib目录,方案2的缺点在于打破了Flink的类加载机制,使得用户不能独立的使用不同版本的依赖。

至于最终的选择的方案,就需要根据平台具体的情况判断了,目前我们选择的是放入lib包,避免classpath热加载导致不可预知的问题。

不过也简单实验了下热加载的方案。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;

/**
 * to add the jar to this jvm classpath dynamically, but no need to unload the class because jvm will decide when
 * to unload the class
 */
public class ClassloaderUtil {

    private static final Logger LOG = LoggerFactory.getLogger(ClassloaderUtil.class);
    private static Method addURL;
    private static URLClassLoader system;

    static {
        try {
            addURL = URLClassLoader.class.getDeclaredMethod("addURL",
                    new Class[]{URL.class});
            addURL.setAccessible(true);

            system = (URLClassLoader) ClassLoader.getSystemClassLoader();
        } catch (Exception ex) {
            LOG.error("Fail to load classloader staff.", ex);
        }
    }

    public static void addToClasspath(File file, List<URL> classpath) {
        addToClasspath(file);
        for (URL url : classpath) {
            addToClasspath(url);
        }
    }

    public static void addToClasspath(String file) {
        addToClasspath(new File(file));
    }

    public static void addToClasspath(File file) {
        try {
            addToClasspath(file.toURL());
        } catch (Exception ex) {
            LOG.error("Fail to dynamically add classpath.", ex);
        }
    }

    public static void addToClasspath(URL url) {
        try {
            addURL.invoke(system, new Object[]{url});
            LOG.info("Dynamically add classpath [{}]", url);
        } catch (Exception ex) {
            LOG.error("Fail to dynamically add classpath.", ex);
        }
    }
}

参考了https://blog.csdn.net/treeroot/article/details/631490

相关文章:

  • 2021-05-13
  • 2021-06-27
  • 2021-12-11
  • 2021-06-06
  • 2021-10-31
猜你喜欢
  • 2021-12-23
  • 2022-01-09
  • 2022-12-23
  • 2021-06-20
  • 2021-06-21
  • 2021-05-09
  • 2021-07-12
相关资源
相似解决方案