flink java提交任务,根据jobname获取jobid,rest获取任务的状态

flink java提交任务,根据jobname获取jobid,rest获取任务的状态1.传入jobname:publicclassReduceDemo{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String>words=env.socketT

1.传入jobname:
在这里插入图片描述

public class ReduceDemo { 
   

    public static void main(String[] args) throws Exception { 
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> words = env.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(w -> Tuple2.of(w, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
        KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = wordAndOne.keyBy(0);
        /*sum*/
        SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = keyBy.reduce(new ReduceFunction<Tuple2<String, Integer>>() { 
   
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)  { 
   
                String key = value1.f0;
                Integer count1 = value1.f1;
                Integer count2 = value2.f1;
                int count = count1 + count2;
                return Tuple2.of(key, count);
            }
        });
        reduce.print();
        env.execute("cz");
    }
}

2.上传任务:

public class Upjar { 
   
    public static void main(String[] args) throws IOException { 
   
        List<String> arg = new ArrayList<>();
        arg.add("sh");
        arg.add("-c");
        arg.add("./flink run -p 2 -C file://绝对路径 -C file://绝对路径 -C file://绝对路径 -c 主类路径 主类所在jar包的绝对路径\n");
        ProcessBuilder pb = new ProcessBuilder(arg);
        //设置工作目录
        pb.directory(new File("/home/flink/flink-1.10.0/bin"));
        //redirectErrorStream 属性默认值为false,意思是子进程的标准输出和错误输出被发送给两个独立的流,可通过 Process.getInputStream() 和 Process.getErrorStream() 方法来访问
        //值设置为 true,标准错误将与标准输出合并。合并的数据可从 Process.getInputStream() 返回的流读取,而从 Process.getErrorStream() 返回的流读取将直接到达文件尾
        pb.redirectErrorStream(true);
        File log = new File("/home/flink/flink-1.10.0/logs");
        pb.redirectOutput(ProcessBuilder.Redirect.appendTo(log));
        Process p = pb.start();
        assert pb.redirectInput() == ProcessBuilder.Redirect.PIPE;
        //重定向标准输出到日志
        assert pb.redirectOutput().file() == log;
        assert p.getInputStream().read() == -1;

    }
}

3.获取jobid
在这里插入图片描述
通过rest访问根据jobname获取jobid,可使用 apache的httpcomponents,根据返回值获取jobid

public class HttpComponentsForRest {
    static String jobName = "97c5216de4ef4b15b16de3a3ab53fd78";
    static String urlPrefix = "http://192.168.1.127:8081";

    public static void main(String[] args) throws IOException {
        String jobID = getJobID(urlPrefix, jobName);
//        getJobState(urlPrefix, jobID);
//        stop(urlPrefix, jobID);


        try {
            while (true) {
                getJobState(urlPrefix, jobID);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            System.out.println("pipeline stop");
        }


    }

    /*若任务的状态为running,则调用api结束任务*/
    public static void stop(String urlPrefix, String jid) throws IOException {
        CloseableHttpResponse stopJob = send(new HttpPatch(urlPrefix + "/jobs/" + jid));
        StatusLine statusLine = stopJob.getStatusLine();
        int statusCode = statusLine.getStatusCode();
        /*202代表执行成功,关闭pipeline*/
        if (statusCode != 202) {
            throw new RuntimeException("当前任务未能正确关闭");
        }

    }

    /*判断当前任务的状态   "state": "RUNNING" */
    public static void getJobState(String urlPrefix, String jid) throws IOException {
        CloseableHttpResponse getJobState = send(new HttpGet(urlPrefix + "/jobs/" + jid));
        String getJobStateRes = transform(getJobState);
        JSONObject jsonObject = JSON.parseObject(getJobStateRes);
        String state = jsonObject.getString("state");
        if (!"RUNNING".equals(state)) {
            throw new RuntimeException("当前任务处于停止或故障状态");
        }
    }

    /*根据jobname获取jobid*/
    public static String getJobID(String urlPrefix, String jobName) throws IOException {
        CloseableHttpResponse getJobID = send(new HttpGet(urlPrefix + "/jobs/overview"));
        String getJobIDRes = transform(getJobID);
        JSONObject res = JSON.parseObject(getJobIDRes);
        JSONArray jobs = res.getJSONArray("jobs");
        String jid = null;
        for (int i = 0; i < jobs.size(); i++) {
            JSONObject jsonObject = (JSONObject) jobs.get(i);
            String name = (String) jsonObject.get("name");
            if (name.equals(jobName)) {
                jid = (String) jsonObject.get("jid");
                break;
            }
        }
        if (jid == null || jid.equals("")) {
            throw new RuntimeException("pipeline在flink中没有对应的任务");
        }
        return jid;
    }

    public static CloseableHttpResponse send(HttpRequestBase httpRequestBase) throws IOException {
        CloseableHttpClient httpclient = HttpClients.createDefault();
        CloseableHttpResponse response = httpclient.execute(httpRequestBase);
        return response;
    }

    public static String transform(CloseableHttpResponse response) throws IOException {
        HttpEntity entity = response.getEntity();
        String result = EntityUtils.toString(entity);
        return result;
    }


}

关于flink中任务通过rest访问控制状态,可参考官方文档

今天的文章flink java提交任务,根据jobname获取jobid,rest获取任务的状态分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/29947.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注