欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

flink启动参数传递解析

时间:2023-07-02
flink 启动参数传递解析

启动 flink run -d -c class.main -p 3 xxx.jar xxx.properties

public static void main(String[] args) throws Exception { // 解析参数 org.apache.commons.configuration2.Configuration conf = ConfigInitialization.initConfig(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(conf.getInt(MsmqConfigurationConsts.Flink_PARALLELISM)); // 设置全局参数 env.getConfig().setGlobalJobParameters(StaticFunctionCreator.clickhouseCommonConfig(conf)); env.execute("flink test"); }

import java.util.Map;import org.apache.commons.configuration2.Configuration;import org.apache.commons.configuration2.ConfigurationMap;import org.apache.commons.configuration2.FilebasedConfiguration;import org.apache.commons.configuration2.PropertiesConfiguration;import org.apache.commons.configuration2.builder.FilebasedConfigurationBuilder;import org.apache.commons.configuration2.builder.fluent.Parameters;import org.apache.commons.configuration2.ex.ConfigurationException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ConfigInitialization {private static final Logger logger = LoggerFactory.getLogger(ConfigInitialization.class);private ConfigInitialization() {}public static Configuration initConfig(String[] args) {String filePath = "develop_config.properties";if(args.length>0) {logger.info("The specified input parameter is {}" ,args[0]);filePath = args[0];}FilebasedConfigurationBuilder builder = new FilebasedConfigurationBuilder(PropertiesConfiguration.class).configure(new Parameters().properties().setFileName(filePath));Configuration config = null;try {config = builder.getConfiguration();} catch (ConfigurationException e) {logger.error("Could not load configuration file|path:{}",filePath,e);System.exit(-1);}return config;}}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。