Hive on Spark源码分析(一)—— SparkTask
Hive on Spark源码分析(二)—— SparkSession与HiveSparkClient
Hive on Spark源码分析(三)—— SparkClilent与SparkClientImpl(上)
Hive on Spark源码分析(四)—— SparkClilent与SparkClientImpl(下)
Hive on Spark源码分析(五)—— RemoteDriver
Hive on Spark源码分析(六)—— RemoteSparkJobMonitor与JobHandle
SparkClient接口定义了远程Spark客户端的API
// 提交一个异步执行的job,返回一个用于监控job的JobHandle<T extends Serializable> JobHandle<T> submit(Job<T> job);// 请求远程context执行job。该方法忽视job队列,建议仅用于执行快速结束的任务。// 返回一个用于监控job的Future结果<T extends Serializable> Future<T> run(Job<T> job);/*** Stops the remote context.** Any pending jobs will be cancelled, and the remote context will be torn down.*/void stop();/*** Adds a jar file to the running remote context.** Note that the URL should be reachable by the Spark driver process. If running the driver* in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist* on that node (and not on the client machine).** @param uri The location of the jar file.* @return A future that can be used to monitor the operation.*/Future<?> addJar(URI uri);/*** Adds a file to the running remote context.** Note that the URL should be reachable by the Spark driver process. If running the driver* in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist* on that node (and not on the client machine).** @param uri The location of the file.* @return A future that can be used to monitor the operation.*/Future<?> addFile(URI uri);/*** Get the count of executors.*/Future<Integer> getExecutorCount();/*** Get default parallelism. For standalone mode, this can be used to get total number of cores.*/Future<Integer> getDefaultParallelism();/*** Check if remote context is still active.*/boolean isActive();
这里代码本身的注释都很清晰,就不做过多解释。
SparkClientImpl是SparkClient接口的具体实现类。我们来看一下它的具体内容。
首先是构造方法,内容较多,首先对属性赋值:
this.conf = conf;this.hiveConf = hiveConf;this.childIdGenerator = new AtomicInteger();this.jobs = Maps.newConcurrentMap();String clientId = UUID.randomUUID().toString();//产生secret用于与remoteDriver建立连接时的身份认证String secret = rpcServer.createSecret();//startDriver用来在新的进程中启动RemoteDriver,并返回一个接受执行结果的线程this.driverThread = startDriver(rpcServer, clientId, secret);//创建ClientProtocol用于rpc通信,发送消息(提交任务),以及响应远端发来的消息this.protocol = new ClientProtocol();
try {// The RPC server will take care of #rpc client connection# timeouts here.//向远端RPCServer注册rpc客户端this.driverRpc = rpcServer.registerClient(clientId, secret, protocol).get();} catch (Throwable e) {if (e.getCause() instanceof TimeoutException) {LOG.error("Timed out waiting for client to connect.\nPossible reasons include network " +"issues, errors in remote driver or the cluster has no available resources, etc." +"\nPlease check YARN or Spark driver's logs for further information.\nReason2 from SparkClientImpl", e);} else {LOG.error("Error while waiting for client to connect.", e);}//终端driverThread线程driverThread.interrupt();try {//等待driverThread挂掉driverThread.join();} catch (InterruptedException ie) {// Give up.LOG.debug("Interrupted before driver thread was finished.");}throw Throwables.propagate(e);}
在继续往下看之前我们先来看一下注册rpc client的具体实现。这个过程实际是在RpcServer中的另一个registerClient方法中实现。首先创建一个promise监控注册任务执行状态
@VisibleForTestingFuture<Rpc> registerClient(final String clientId, String secret,RpcDispatcher serverDispatcher, long clientTimeoutMs) {final Promise<Rpc> promise = group.next().newPromise();
Runnable timeout = new Runnable() {@Overridepublic void run() {promise.setFailure(new TimeoutException("Timed out waiting for client connection."));}};//在clientTimeoutMs时间后执行timeout,单位是ms,且仅执行一次.//根据timeout的run方法,这里就是在timeout时间后,如果promise还没有完成,则执行promise.setFailureScheduledFuture<?> timeoutFuture = group.schedule(timeout,clientTimeoutMs,TimeUnit.MILLISECONDS);final ClientInfo client = new ClientInfo(clientId, promise, secret, serverDispatcher,timeoutFuture);//判断是否已经注册过该client了if (pendingClients.putIfAbsent(clientId, client) != null) {throw new IllegalStateException(String.format("Client '%s' already registered.", clientId));}
其中clientTimeoutMs是通过在配置文件中的hive.spark.client.server.connect.timeout属性配置的
final ClientInfo client = new ClientInfo(clientId, promise, secret, serverDispatcher,timeoutFuture);//判断是否已经注册过该client了if (pendingClients.putIfAbsent(clientId, client) != null) {throw new IllegalStateException(String.format("Client '%s' already registered.", clientId));}promise.addListener(new GenericFutureListener<Promise<Rpc>>() {@Overridepublic void operationComplete(Promise<Rpc> p) {if (!p.isSuccess()) {pendingClients.remove(clientId);}}});return promise;
driverRpc.addListener(new Rpc.Listener() {@Overridepublic void rpcClosed(Rpc rpc) {//如果rpc通信关闭时,当前SparkClient仍然是alive的,则打印warn信息if (isAlive) {LOG.warn("Client RPC channel closed unexpectedly.");isAlive = false;}}});//实例化后标记为isAliveisAlive = true;
下面看一下startDriver方法的实现。首先是获得rpcServer的host和port,后面需要用到传给RemoteDriver
private Thread startDriver(final RpcServer rpcServer, final String clientId, final String secret)throws IOException {Runnable runnable;final String serverAddress = rpcServer.getAddress();final String serverPort = String.valueOf(rpcServer.getPort());
然后判断是否设置了spark.client.do_not_use.run_driver_in_process,是的话直接在当前进程启动RemoteDriver。此模式建议仅作测试用,不要应用到生产环境
LOG.warn("!!!! Running remote driver in-process. !!!!");runnable = new Runnable() {@Overridepublic void run() {List<String> args = Lists.newArrayList();args.add("--remote-host");args.add(serverAddress);args.add("--remote-port");args.add(serverPort);args.add("--client-id");args.add(clientId);args.add("--secret");args.add(secret);for (Map.Entry<String, String> e : conf.entrySet()) {args.add("--conf");args.add(String.format("%s=%s", e.getKey(), conf.get(e.getKey())));}try {RemoteDriver.main(args.toArray(new String[args.size()]));} catch (Exception e) {LOG.error("Error running driver.", e);}}};}
String sparkHome = conf.get(SPARK_HOME_KEY);if (sparkHome == null) {sparkHome = System.getenv(SPARK_HOME_ENV);}if (sparkHome == null) {sparkHome = System.getProperty(SPARK_HOME_KEY);}String sparkLogDir = conf.get("hive.spark.log.dir");if (sparkLogDir == null) {if (sparkHome == null) {sparkLogDir = "./target/";} else {sparkLogDir = sparkHome + "/logs/";}}
创建一个文件用来保存spark-submit用到的所有配置:
File properties = File.createTempFile("spark-submit.", ".properties"); if (!properties.setReadable(false) || !properties.setReadable(true, true)) { throw new IOException("Cannot change permissions of job properties file."); } properties.deleteOnExit(); //用来保存配置 Properties allProps = new Properties();
首先加载spark-default.conf中的配置:
try { URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf"); if (sparkDefaultsUrl != null) { LOG.info("Loading spark defaults: " + sparkDefaultsUrl); allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl))); } } catch (Exception e) { String msg = "Exception trying to load spark-defaults.conf: " + e; throw new IOException(msg, e); }
for (Map.Entry<String, String> e : conf.entrySet()) { allProps.put(e.getKey(), conf.get(e.getKey())); } allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId); allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret); allProps.put(DRIVER_OPTS_KEY, driverJavaOpts); allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts); ... ... ...
Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8); try { allProps.store(writer, "Spark Context configuration"); } finally { writer.close(); }
由于要新建子进程启动RemoteDriver,因此下面需要决定以何种方式将配置选项传给子进程。如果是以local模式或者yarn-client模式运行,则需要在命令行显示的传递参数;如果是yarn-cluster模式则交给spark-submit来处理。
首先创建一个参数列表argv(最终会转化为在命令行执行的命令),根据不同情况添加不同的参数:
List<String> argv = Lists.newArrayList();
如果启用kerberos认证,添加kerberos认证的相应参数:
if (hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION).equalsIgnoreCase("kerberos")) { argv.add("kinit"); String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL), "0.0.0.0"); String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); argv.add(principal); argv.add("-k"); argv.add("-t"); argv.add(keyTabFile + ";"); }
接下来添加spark-submit命令。如果配置了spark home则直接添加spark_home/bing/spark-submit命令到argv,否则先添加java_home/bin/java
if (sparkHome != null) { argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath()); } else { LOG.info("No spark.home provided, calling SparkSubmit directly."); argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());
获取spark.master
String master = conf.get("spark.master"); Preconditions.checkArgument(master != null, "spark.master is not defined.");
如果运行模式为local、mesos、yarn-client、standalone模式中的一种,则需要配置spark.driver.memory、spark.driver.extraPath、spark.driver.extraLibPath等
if (sparkHome != null) { argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath()); } else { LOG.info("No spark.home provided, calling SparkSubmit directly."); argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath()); //如果运行模式为local或client模式(其实就是除了yarn-cluster以为的模式) if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client") || master.startsWith("spark")) { String mem = conf.get("spark.driver.memory"); if (mem != null) { argv.add("-Xms" + mem); argv.add("-Xmx" + mem); } //配置classpath String cp = conf.get("spark.driver.extraClassPath"); if (cp != null) { argv.add("-classpath"); argv.add(cp); } String libPath = conf.get("spark.driver.extraLibPath"); if (libPath != null) { argv.add("-Djava.library.path=" + libPath); } String extra = conf.get(DRIVER_OPTS_KEY); if (extra != null) { for (String opt : extra.split("[ ]")) { if (!opt.trim().isEmpty()) { argv.add(opt.trim()); } } } }
然后添加spark-submit命令(所有模式下)
argv.add("org.apache.spark.deploy.SparkSubmit")
if (master.equals("yarn-cluster")) { String executorCores = conf.get("spark.executor.cores"); if (executorCores != null) { argv.add("--executor-cores"); argv.add(executorCores); } String executorMemory = conf.get("spark.executor.memory"); if (executorMemory != null) { argv.add("--executor-memory"); argv.add(executorMemory); } String numOfExecutors = conf.get("spark.executor.instances"); if (numOfExecutors != null) { argv.add("--num-executors"); argv.add(numOfExecutors); } }
如果设置了hive.server2.enable.doas为true的话,也就是允许使用发起请求的用户来执行Hive操作,则需要将用户名添加到配置
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { try { String currentUser = Utils.getUGI().getShortUserName(); // do not do impersonation in CLI mode if (!currentUser.equals(System.getProperty("user.name"))) { LOG.info("Attempting impersonation of " + currentUser); argv.add("--proxy-user"); argv.add(currentUser); } } catch (Exception e) { String msg = "Cannot obtain username: " + e; throw new IllegalStateException(msg, e); } }
接下来添加前面创建用来保存配置的文件的路径,以及配置主类为RemoteDriver,配置当前类的jar包,远程host和port
argv.add("--properties-file"); argv.add(properties.getAbsolutePath()); argv.add("--class"); argv.add(RemoteDriver.class.getName()); String jar = "spark-internal"; if (SparkContext.jarOfClass(this.getClass()).isDefined()) { jar = SparkContext.jarOfClass(this.getClass()).get(); } argv.add(jar); argv.add("--remote-host"); argv.add(serverAddress); argv.add("--remote-port"); argv.add(serverPort);
最后,将以hive.spark开头的配置项,以“--conf key=value”的形式添加到参数列表中以传递给RemoteDriver
for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) { String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey); argv.add("--conf"); argv.add(String.format("%s=%s", hiveSparkConfKey, value)); }
String cmd = Joiner.on(" ").join(argv); LOG.info("Running client driver with argv: {}", cmd); ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd);
启动进程
// 使Hive配置在spark中不可见,以防互相影响 pb.environment().remove("HIVE_HOME"); pb.environment().remove("HIVE_CONF_DIR");
if (isTesting != null) { pb.environment().put("SPARK_TESTING", isTesting); } final Process child = pb.start(); int childId = childIdGenerator.incrementAndGet(); final List<String> childErrorLog = new ArrayList<String>(); //重定向新进程的输出 redirect("stdout-redir-" + childId, new Redirector(child.getInputStream())); redirect("stderr-redir-" + childId, new Redirector(child.getErrorStream(), childErrorLog));
runnable = new Runnable() { @Override public void run() { try { //阻塞等待进程的执行结果 int exitCode = child.waitFor(); if (exitCode != 0) { StringBuilder errStr = new StringBuilder(); for (String s : childErrorLog) { errStr.append(s); errStr.append('\n'); } rpcServer.cancelClient(clientId, "Child process exited before connecting back with error log " + errStr.toString()); LOG.warn("Child process exited with code {}", exitCode); } } catch (InterruptedException ie) { LOG.warn("Waiting thread interrupted, killing child process."); Thread.interrupted(); child.destroy(); } catch (Exception e) { LOG.warn("Exception while waiting for child process.", e); } } }; } Thread thread = new Thread(runnable); thread.setDaemon(true); thread.setName("Driver"); thread.start(); return thread;
至此,在SparkClientImpl中启动driver的过程就结束了。
在下篇中我们回到任务提交的流程,去分析
SparkClientImpl中提交任务的方法,以及与提交任务息息相关的内部类ClientProtocol。
最后
以上就是淡定太阳最近收集整理的关于Hive on Spark源码分析(三)—— SparkClilent与SparkClientImpl(上)的全部内容,更多相关Hive内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复