文章目录
Flink提供ParameterTool类,从不同的来源读取参数
一.方法
1.fromArgs方法
读取在命令行中传递的参数,注意传参 格式为 key value ,key必须以 – 或者 – 开头,如 --key1 value1 --key2 value2 -key3 value3
2.fromPropertiesFile方法
读取Properties文件
3.fromSystemProperties方法
读取system properties,通过-Dkey1=value1 -Dkey2=value2
传递
二.示例
1.命令行提交jar,自定义参数
public class Demo {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameters = ParameterTool.fromArgs(args);
String local_path = parameters.get("path",null); //指定参数名:path
DataSet<String> toFilter = env.fromElements(local_path);
FlatMapOperator<String, String> flatMap = toFilter.flatMap(new RichFlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] split = value.split(",");
for(String s:split){
out.collect(s);
}
}
});
flatMap.print();
}
}
public class Upjar {
public static void main(String[] args) throws IOException {
List<String> arg = new ArrayList<>();
arg.add("sh");
arg.add("-c");
arg.add("./flink run -p 2 -C file:///home/czFlinkDemo/target/czFlinkDemo-1.0-SNAPSHOT.jar -c com.cz.dataset.wordCount.Demo /home/czFlinkDemo/target/czFlinkDemo-1.0-SNAPSHOT.jar -path a,gh,jk,ui,o\n");
ProcessBuilder pb = new ProcessBuilder(arg);
//设置工作目录
pb.directory(new File("/home/flink/flink-1.10.0/bin"));
//redirectErrorStream 属性默认值为false,意思是子进程的标准输出和错误输出被发送给两个独立的流,可通过 Process.getInputStream() 和 Process.getErrorStream() 方法来访问
//值设置为 true,标准错误将与标准输出合并。合并的数据可从 Process.getInputStream() 返回的流读取,而从 Process.getErrorStream() 返回的流读取将直接到达文件尾
pb.redirectErrorStream(true);
File log = new File("/mnt/htst/log/htst.log");
pb.redirectOutput(ProcessBuilder.Redirect.appendTo(log));
Process p = pb.start();
assert pb.redirectInput() == ProcessBuilder.Redirect.PIPE;
//重定向标准输出到日志
assert pb.redirectOutput().file() == log;
assert p.getInputStream().read() == -1;
}
}
注意: 把自定义的传参放到命令行的最后
2.本地自定义参数
public class StreamWordCount {
public static void main(String[] args) throws Exception {
//创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//从参数中提取主机名和端口号
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
//读取文本流
DataStreamSource<String> stringDataStreamSource = env.socketTextStream(host, port);
//转换
SingleOutputStreamOperator<Tuple2<String, Long>> streamOperator = stringDataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
String[] words = s.split(" ");
for (String word :
words) {
collector.collect(Tuple2.of(word, 1L));
}
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
KeyedStream<Tuple2<String, Long>, String> keyBy = streamOperator.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {
return stringLongTuple2.f0;
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> sum = keyBy.sum(1);
sum.print();
env.execute();
}
}
定义参数
socket流:
今天的文章flink 命令行提交jar时,传入自定义的参数或配置文件,任务运行时获取参数分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/30026.html