Flink补充内容

2020/10/25 flink 共 2660 字,约 8 分钟

Flink的类加载策略

一个类加载的过程大致分为加载、链接(验证、准备、解析)、初始化

我们通常提到类的加载,就是指利用类加载器(ClassLoader)通过类的全限定名来获取定义此类的二进制字节码流,进而构造出类的定义

在 flink-conf.yaml 中提供了控制类加载策略的参数 classloader.resolve-order,可选项有 child-first(默认)和 parent-first

Parrent-first 类加载器 即空继承 FlinkUserClassLoader

ChildFirstClassLoader

static final class ChildFirstClassLoader extends URLClassLoader {

		/**
		 * The classes that should always go through the parent ClassLoader. This is relevant
		 * for Flink classes, for example, to avoid loading Flink classes that cross the
		 * user-code/system-code barrier in the user-code ClassLoader.
		 */
		private final String[] alwaysParentFirstPatterns;

		public ChildFirstClassLoader(URL[] urls, ClassLoader parent, String[] alwaysParentFirstPatterns) {
			super(urls, parent);
			this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;
		}

		@Override
		protected synchronized Class<?> loadClass(
			String name, boolean resolve) throws ClassNotFoundException {

			// First, check if the class has already been loaded
			Class<?> c = findLoadedClass(name);

			if (c == null) {
				// check whether the class should go parent-first
				for (String alwaysParentFirstPattern : alwaysParentFirstPatterns) {
					if (name.startsWith(alwaysParentFirstPattern)) {
						return super.loadClass(name, resolve);
					}
				}

				try {
					// check the URLs
					c = findClass(name);
				} catch (ClassNotFoundException e) {
					// let URLClassLoader do it, which will eventually call the parent
					c = super.loadClass(name, resolve);
				}
			}

			if (resolve) {
				resolveClass(c);
			}

			return c;
		}

		@Override
		public URL getResource(String name) {
			// first, try and find it via the URLClassloader
			URL urlClassLoaderResource = findResource(name);

			if (urlClassLoaderResource != null) {
				return urlClassLoaderResource;
			}

			// delegate to super
			return super.getResource(name);
		}

		@Override
		public Enumeration<URL> getResources(String name) throws IOException {
			// first get resources from URLClassloader
			Enumeration<URL> urlClassLoaderResources = findResources(name);

			final List<URL> result = new ArrayList<>();

			while (urlClassLoaderResources.hasMoreElements()) {
				result.add(urlClassLoaderResources.nextElement());
			}

			// get parent urls
			Enumeration<URL> parentResources = getParent().getResources(name);

			while (parentResources.hasMoreElements()) {
				result.add(parentResources.nextElement());
			}

			return new Enumeration<URL>() {
				Iterator<URL> iter = result.iterator();

				public boolean hasMoreElements() {
					return iter.hasNext();
				}

				public URL nextElement() {
					return iter.next();
				}
			};
		}
	}

相同的 connector 需要提供不同的版本时(例如hbase),类冲突不可避免 有冲突的 connector 使用独立的 classloader 单独加载:

  1. connector打包到镜像中固定的目录下;

  2. 运行时根据 connector 配置,使用指定目录下的 jar 创建 child first 的 classloader 加载sink或者source function

connector jar 路径运行时动态确认,不打到镜像:

  1. 减少镜像 size;

  2. 方便扩展;

文档信息

Search

    Table of Contents