package user.commons;
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
+import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.Response;
import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;
import com.ibm.nosql.json.JSONUtil;
import com.ibm.nosql.json.api.BasicDBObject;
-import user.commons.nosql.NoSQLUtils;
+public class MediaSamuraiAPI {
-public class MediaSamuraiAPI implements IFFAStransAPI {
+ private static final Logger log = LogManager.getLogger();
- private static final Logger logger = LogManager.getLogger();
+ public static void main1(String[] args) throws Exception {
- public static void main(String[] args) throws Exception {
final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
String inDir = "\\\\10.11.1.90\\data\\";
String outDir = "\\\\10.11.1.100\\Promise\\TRANSCODER\\FFASTRANSCODER\\Out\\";
String inputs[] = { "13-02009-0000-1.mov" };
- for (int i = 0; i < inputs.length; i++) {
- final String inputi = inDir + inputs[i];
- final String outputi = outDir + FilenameUtils.removeExtension(inputs[i]) + ".MP4";
- // Task task = new Task(inDir + input, outDir + output);
+ for (String input2 : inputs) {
+ final String inputi = inDir + input2;
+ final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4";
Runnable task = new Runnable() {
private String input = inputi;
@Override
public void run() {
try {
- IFFAStransAPI api = new MediaSamuraiAPI("http://10.11.1.111:65445/api/json/v1/", p -> {
- // System.out.println(output + " progress: " + p);
+ log.info("Started");
+ MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/", p -> {
+ System.out.println(output + " progress: " + p);
});
-
- api.submit("MP4", input);
- api.monitor(1000);
- Thread.sleep(2000);
- if (!Files.exists(Paths.get(output)))
- throw new Exception("Missing " + output);
+ BasicDBObject job = new BasicDBObject();
+ job.put("name", inputi);
+ job.put("delay", 1);
+ String taskId = api.submit(job);
+
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(3000);
+ BasicDBObject status = api.getStatus(taskId);
+ log.info("TaskId {} {}% {}", taskId, status.getInt("progress"), status.getString("status"));
+ }
} catch (Exception e) {
e.printStackTrace();
}
}
-
};
executor.execute(task);
executor.awaitTermination(1, TimeUnit.HOURS);
}
+ public static void main(String[] args) throws Exception {
+ String inDir = "\\\\10.11.1.90\\data\\";
+ String outDir = "\\\\10.11.1.100\\Promise\\TRANSCODER\\FFASTRANSCODER\\Out\\";
+
+ String inputs[] = { "13-02009-0000-1.mov" };
+ for (String input2 : inputs) {
+ final String inputi = inDir + input2;
+ final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4";
+
+ try {
+ log.info("Started");
+ MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/", p -> {
+ System.out.println(outputi + " progress: " + p);
+ });
+ BasicDBObject job = new BasicDBObject();
+ job.put("name", inputi);
+ job.put("delay", 1);
+ String taskId = api.submit(job);
+ while (true) {
+ Thread.sleep(3000);
+ BasicDBObject status = api.getStatus(taskId);
+ String taskStatus = status.getString("status");
+ log.info("TaskId {} {}% {}", taskId, status.getInt("progress"), taskStatus);
+ if ("ERROR".equals(taskStatus) || "COMPLETED".equals(taskStatus)) {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
private ResteasyWebTarget webTarget;
private IProgressChangedListener listener;
- private String jobId;
- private String inputFile;
- private String workflowName;
- private String inputFileName;
public MediaSamuraiAPI(String apiAddress, IProgressChangedListener listener) {
this.listener = listener;
- webTarget = new ResteasyClientBuilder().build().target(apiAddress);
- }
-
- private void doSubmit(BasicDBObject jobToSubmit) throws Exception {
- ResteasyWebTarget target = webTarget.path("jobs");
- Response apiResponse = target.request().post(Entity.entity(jobToSubmit.toString(), MediaType.APPLICATION_JSON));
- if (apiResponse.getStatus() != 202)
- throw new Exception("Can not submit, response status is: " + apiResponse.getStatus());
- String json = apiResponse.readEntity(String.class);
- // logger.info("Transoder response: {}", json);
- if (StringUtils.isBlank(json))
- throw new Exception("Can not submit, response JSON is empty");
- BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json);
- if (resultObject == null)
- throw new Exception("Can not submit, response object is null");
- jobId = resultObject.getString("job_id");
- logger.info("Job for {} is submitted, ID: {}", inputFile, jobId);
- }
-
- @Override
- public BasicDBObject getHistory() {
- ResteasyWebTarget target = webTarget.path("history");
- BasicDBObject result = null;
try {
- Response apiResponse = target.request().get();
- if (apiResponse.getStatus() != 200) {
- logger.info("{} | Invalid response {}", inputFileName, apiResponse.getStatus());
- return null;
- }
- String json = apiResponse.readEntity(String.class);
- BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json);
- List<BasicDBObject> jobs = NoSQLUtils.asList(resultObject, "history");
-
- if (jobs == null || jobs.size() == 0) {
- logger.info("{} | No jobs in response", inputFileName);
- return null;
- }
- File f = new File(inputFile);
-
- for (BasicDBObject job : jobs) {
- if (job == null)
- continue;
-
- String file = NoSQLUtils.asString(job, "file");
- if (file == null)
- continue;
-
- String wf_name = NoSQLUtils.asString(job, "wf_name");
- if (wf_name == null || !workflowName.equals(wf_name))
- continue;
-
- if (file.equals(f.getName())) {
- // logger.info("Found success history {}", f.getName());
- result = job;
- break;
- }
- if (file.equals(inputFile)) {
- // logger.info("Found error history {}", inputFile);
- result = job;
- break;
- }
- }
+ webTarget = new ResteasyClientBuilder().build().target(apiAddress);
} catch (Exception e) {
- logger.error(e.getClass() + " " + e.getMessage());
+ log.info("Error", e);
}
-
- return result;
+ log.info("Created");
}
- @Override
- public BasicDBObject getStatus(String jobID) {
- ResteasyWebTarget target = webTarget.path("jobs");
+ public String submit(BasicDBObject jobToSubmit) throws Exception {
+ ResteasyWebTarget target = webTarget.path("submit");
+ Response apiResponse = target.request().post(Entity.entity(jobToSubmit.toString(), MediaType.APPLICATION_JSON));
+ if (apiResponse.getStatus() != 200) {
+ throw new Exception("Submit error, response status is: " + apiResponse.getStatus());
+ }
+ String json = apiResponse.readEntity(String.class);
+ BasicDBObject result = (BasicDBObject) JSONUtil.jsonToDbObject(json);
+ String taskId = result.getString("task_id");
+ log.info("Job for {} is submitted, Task ID: {}", jobToSubmit, taskId);
+ return taskId;
+ }
- BasicDBObject result = null;
+ public BasicDBObject getStatus(String taskId) {
+ ResteasyWebTarget target = webTarget.path("status").path(taskId);
+ BasicDBObject status = null;
try {
Response apiResponse = target.request().get();
- // logger.info("Transoder response code: {}", apiResponse.getStatus());
-
if (apiResponse.getStatus() != 200) {
- logger.info("{} | Invalid response {}", inputFileName, apiResponse.getStatus());
+ log.info("Task ID {} status: Invalid response {}", taskId, apiResponse.getStatus());
return null;
}
String json = apiResponse.readEntity(String.class);
- // logger.info("Transoder response: {}", json);
- // System.out.println(json);
- BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json);
- List<BasicDBObject> jobs = NoSQLUtils.asList(resultObject, "jobs");
-
- if (jobs == null || jobs.size() == 0) {
- logger.info("{} | No jobs in response", inputFileName);
- return null;
+ status = (BasicDBObject) JSONUtil.jsonToDbObject(json);
+ if (Objects.nonNull(listener)) {
+ listener.onProgressChanged(status.getInt("progress"));
}
- for (BasicDBObject job : jobs) {
- if (jobID.equals(job.getString("job_id"))) {
- // logger.info("Found job {}", jobID);
- result = job;
- break;
- }
- }
-
} catch (Exception e) {
- logger.error(e.getClass() + " " + e.getMessage());
- }
-
- return result;
- }
-
- private long getWorkflowId(String workflowName, List<BasicDBObject> workflows, long wfID) {
- for (BasicDBObject workflow : workflows) {
- if (!workflowName.equals(workflow.getString("wf_name")))
- continue;
- wfID = workflow.getLong("wf_id");
- }
- return wfID;
- }
-
- @Override
- public List<BasicDBObject> getWorkflows() {
- ResteasyWebTarget target = webTarget.path("workflows");
- Response apiResponse = target.request().get();
- if (apiResponse.getStatus() != 200)
- return null;
- String json = apiResponse.readEntity(String.class);
- BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json);
- return NoSQLUtils.asList(resultObject, "workflows");
- }
-
- @Override
- public void monitor(int pollIntervall) throws InterruptedException, Exception {
- int progress = 0;
-
- long started = System.currentTimeMillis();
-
- BasicDBObject status = null;
- boolean hasAnyResponse = false;
- while (true) {
- long queryTime = System.currentTimeMillis();
-
- status = getStatus(jobId);
- if (status != null) {
- hasAnyResponse = true;
- List<BasicDBObject> splits = NoSQLUtils.asList(status, "splits");
-
- int current = 0;
- if (splits != null && splits.size() > 0) {
- for (BasicDBObject split : splits) {
- String prg = NoSQLUtils.asString(split, "progress");
- current += (int) Float.parseFloat(prg);
- }
- current = current / splits.size();
- }
- if (current != progress) {
- progress = current;
- listener.onProgressChanged(progress);
- logger.info("{} | {}%", inputFileName, progress);
- }
-
- } else {
- logger.info("{} status does not exist, checking history", inputFileName);
- listener.onProgressChanged(100);
- BasicDBObject history = getHistory();
- if (history != null) {
- hasAnyResponse = true;
- long state = NoSQLUtils.asLong(history, "state");
- String jobEnd = history.getString("job_end");
- if (history == null || state != 1) {
- String error = NoSQLUtils.asString(history, "outcome");
- if (error == null)
- error = "Not specified error occured";
- throw new Exception("State: " + state + ", message: " + error);
- } else {
- logger.info("{} | completed", inputFileName);
- break;
- }
-
- }
- }
-
- // 5 percig nincs valasz
- if (!hasAnyResponse && (queryTime - started > 10 * 60 * 1000))
- throw new Exception("Transcoder timeout");
- Thread.sleep(pollIntervall);
+ log.error(e.getClass() + " " + e.getMessage());
}
+ return status;
}
- @Override
- public void submit(String workflowName, String inputFile) throws Exception {
- this.workflowName = workflowName;
- this.inputFile = inputFile;
- this.inputFileName = Paths.get(inputFile).getFileName().toString();
- List<BasicDBObject> workflows = getWorkflows();
- if (workflows == null)
- throw new Exception("No workflows");
-
- long wfID = -1;
- wfID = getWorkflowId(workflowName, workflows, wfID);
- if (wfID < 0)
- throw new Exception("Workflow does not exist: " + workflowName);
-
- BasicDBObject jobToSubmit = new BasicDBObject("wf_id", wfID).append("inputfile", inputFile);
- doSubmit(jobToSubmit);
- }
}