From 44dc14b01db370722f2ceff42f05335a65099657 Mon Sep 17 00:00:00 2001 From: =?utf8?q?V=C3=A1s=C3=A1ry=20D=C3=A1niel?= Date: Wed, 16 Oct 2019 11:07:52 +0000 Subject: [PATCH] FFAStrans API monitor javitasa git-tfs-id: [http://tfs.userrendszerhaz.hu:8080/tfs/DefaultCollection]$/MediaCube;C31569 --- .../src/user/commons/FFAStransAPI.java | 90 +++++++++++-------- .../src/user/commons/IFFAStransAPI.java | 2 +- 2 files changed, 56 insertions(+), 36 deletions(-) diff --git a/server/user.jobengine.osgi.commons/src/user/commons/FFAStransAPI.java b/server/user.jobengine.osgi.commons/src/user/commons/FFAStransAPI.java index bdcd23e8..4159c030 100644 --- a/server/user.jobengine.osgi.commons/src/user/commons/FFAStransAPI.java +++ b/server/user.jobengine.osgi.commons/src/user/commons/FFAStransAPI.java @@ -1,5 +1,6 @@ package user.commons; +import java.io.File; import java.util.List; import javax.ws.rs.client.Entity; @@ -28,7 +29,7 @@ public class FFAStransAPI implements IFFAStransAPI { System.out.println("Progress: " + p); }); - String inputFile = "\\\\10.11.1.100\\Promise\\ARCHIVE\\6.MXF"; + String inputFile = "\\\\10.11.1.100\\Promise\\ARCHIVE\\20.MXF"; api.submit("MP4", inputFile); try { api.monitor(1000); @@ -53,16 +54,17 @@ public class FFAStransAPI implements IFFAStransAPI { private ResteasyWebTarget webTarget; private IProgressChangedListener listener; private String jobId; - private BasicDBObject lastJobToSubmit; + private String inputFile; + private String workflowName; public FFAStransAPI(String apiAddress, IProgressChangedListener listener) { this.listener = listener; webTarget = new ResteasyClientBuilder().build().target(apiAddress); } - private void doSubmit() throws Exception { + private void doSubmit(BasicDBObject jobToSubmit) throws Exception { ResteasyWebTarget target = webTarget.path("jobs"); - Response apiResponse = target.request().post(Entity.entity(lastJobToSubmit.toString(), MediaType.APPLICATION_JSON)); + Response apiResponse = target.request().post(Entity.entity(jobToSubmit.toString(), MediaType.APPLICATION_JSON)); if (apiResponse.getStatus() != 202) throw new Exception("Can not submit, response status is: " + apiResponse.getStatus()); String json = apiResponse.readEntity(String.class); @@ -76,7 +78,7 @@ public class FFAStransAPI implements IFFAStransAPI { } @Override - public BasicDBObject getHistory(String jobStart) { + public BasicDBObject getHistory() { ResteasyWebTarget target = webTarget.path("history"); BasicDBObject result = null; try { @@ -86,12 +88,28 @@ public class FFAStransAPI implements IFFAStransAPI { String json = apiResponse.readEntity(String.class); BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json); List jobs = NoSQLUtils.asList(resultObject, "history"); + + File f = new File(inputFile); + for (BasicDBObject job : jobs) { if (job == null) continue; - String job_start = NoSQLUtils.asString(job, "job_start"); - if (jobStart.equals(job_start)) { + String file = NoSQLUtils.asString(job, "file"); + if (file == null) + continue; + + String wf_name = NoSQLUtils.asString(job, "wf_name"); + if (wf_name == null || !workflowName.equals(wf_name)) + continue; + + if (file.equals(f.getName())) { + logger.info("Found success history"); + result = job; + break; + } + if (file.equals(inputFile)) { + logger.info("Found error history"); result = job; break; } @@ -156,48 +174,48 @@ public class FFAStransAPI implements IFFAStransAPI { @Override public void monitor(int pollIntervall) throws InterruptedException, Exception { - String jobStart = null; int progress = 0; while (true) { Thread.sleep(pollIntervall); + //Status: {"job_id":"20180226-162821-217-A7E91DC625BD","job_start":"2018/02/26 16:28:21","file":"\\\\PROXY-TRANSCODER-01\\MAM-Proxy_input\\20180201-0700_hirado_TEST-_CS.MXF","wf_name":"MAM_proxy","splits":[{"steps":"4 / 5","processor":"Folder","status":"Waiting for next processor resources...","node":"PROXY-TRANSCODE","progress":"78.5"}]} BasicDBObject status = getStatus(jobId); if (status != null) { - //System.out.println("Status: " + status.toPrettyString(null)); - if (jobStart == null) - jobStart = status.getString("job_start"); - + logger.info("Status: " + status.toPrettyString(null)); List splits = NoSQLUtils.asList(status, "splits"); + + int current = 0; if (splits != null && splits.size() > 0) { - String processor = NoSQLUtils.asString(splits.get(0), "processor"); - if (StringUtils.isNotBlank(processor) && "Generate text file".equals(processor)) - continue; - String prg = NoSQLUtils.asString(splits.get(0), "progress"); - int current = (int) Float.parseFloat(prg); - //System.out.println(String.format("%s %s %s", current, progress, current != progress)); - if (current != progress) { - progress = current; - listener.onProgressChanged(progress); + // String processor = NoSQLUtils.asString(splits.get(0), "processor"); + // if (StringUtils.isNotBlank(processor) && "Generate text file".equals(processor)) + // continue; + for (BasicDBObject split : splits) { + String prg = NoSQLUtils.asString(split, "progress"); + current += (int) Float.parseFloat(prg); } + current = current / splits.size(); + //System.out.println(String.format("%s %s %s", current, progress, current != progress)); + } + if (current != progress) { + progress = current; + listener.onProgressChanged(progress); } - //Status: {"job_id":"20180226-162821-217-A7E91DC625BD","job_start":"2018/02/26 16:28:21","file":"\\\\PROXY-TRANSCODER-01\\MAM-Proxy_input\\20180201-0700_hirado_TEST-_CS.MXF","wf_name":"MAM_proxy","splits":[{"steps":"4 / 5","processor":"Folder","status":"Waiting for next processor resources...","node":"PROXY-TRANSCODE","progress":"78.5"}]} } else { - //System.out.println("Progress: " + 100); listener.onProgressChanged(100); - BasicDBObject history = getHistory(jobStart); - - if (history == null && lastJobToSubmit != null) { - //plusz 1 proba - doSubmit(); - lastJobToSubmit = null; - monitor(pollIntervall); - return; - } + BasicDBObject history = getHistory(); + + // if (history == null && lastJobToSubmit != null) { + // //plusz 1 proba + // doSubmit(); + // lastJobToSubmit = null; + // monitor(pollIntervall); + // return; + // } //System.out.println("History: " + history.toPrettyString(null)); if (history == null || NoSQLUtils.asLong(history, "state") != 1) { String error = NoSQLUtils.asString(history, "outcome"); - throw new Exception("Transcode error: " + error); + throw new Exception("Transcode error. " + error); } else { //System.out.println("Transcode completed"); break; @@ -211,6 +229,8 @@ public class FFAStransAPI implements IFFAStransAPI { @Override public void submit(String workflowName, String inputFile) throws Exception { + this.workflowName = workflowName; + this.inputFile = inputFile; List workflows = getWorkflows(); if (workflows == null) throw new Exception("No workflows"); @@ -220,7 +240,7 @@ public class FFAStransAPI implements IFFAStransAPI { if (wfID < 0) throw new Exception("Workflow not exists: " + workflowName); - lastJobToSubmit = new BasicDBObject("wf_id", wfID).append("inputfile", inputFile); - doSubmit(); + BasicDBObject jobToSubmit = new BasicDBObject("wf_id", wfID).append("inputfile", inputFile); + doSubmit(jobToSubmit); } } diff --git a/server/user.jobengine.osgi.commons/src/user/commons/IFFAStransAPI.java b/server/user.jobengine.osgi.commons/src/user/commons/IFFAStransAPI.java index d14ba3e6..9747b424 100644 --- a/server/user.jobengine.osgi.commons/src/user/commons/IFFAStransAPI.java +++ b/server/user.jobengine.osgi.commons/src/user/commons/IFFAStransAPI.java @@ -6,7 +6,7 @@ import com.ibm.nosql.json.api.BasicDBObject; public interface IFFAStransAPI { - BasicDBObject getHistory(String jobStart); + BasicDBObject getHistory(); BasicDBObject getStatus(String jobID); -- 2.54.0