From 3cafa9fd6d741abd4ecc9c33d4eb0129cd00e335 Mon Sep 17 00:00:00 2001 From: =?utf8?q?V=C3=A1s=C3=A1ry=20D=C3=A1niel?= Date: Mon, 29 Jan 2024 23:24:37 +0100 Subject: [PATCH] Minimal MediaSamuraiAPI and test --- .../run-mediacube-server.launch | 1 + server/-dependencies/jobengine.target | 1 + server/-dependencies/pom.xml | 3 + server/-product/mediacube.product | 1 + .../META-INF/MANIFEST.MF | 5 +- .../src/user/commons/MediaSamuraiAPI.java | 288 +++++------------- 6 files changed, 87 insertions(+), 212 deletions(-) diff --git a/server/-configuration/run-mediacube-server.launch b/server/-configuration/run-mediacube-server.launch index dc9e0b79..d8214b64 100644 --- a/server/-configuration/run-mediacube-server.launch +++ b/server/-configuration/run-mediacube-server.launch @@ -38,6 +38,7 @@ + diff --git a/server/-dependencies/jobengine.target b/server/-dependencies/jobengine.target index 019d6763..a1905c20 100644 --- a/server/-dependencies/jobengine.target +++ b/server/-dependencies/jobengine.target @@ -87,6 +87,7 @@ + win32 diff --git a/server/-dependencies/pom.xml b/server/-dependencies/pom.xml index 240ef05a..293e6158 100644 --- a/server/-dependencies/pom.xml +++ b/server/-dependencies/pom.xml @@ -77,6 +77,9 @@ default-cli + + + org.apache.felix:org.apache.felix.fileinstall:3.7.4 diff --git a/server/-product/mediacube.product b/server/-product/mediacube.product index 7f48d93d..af3db99d 100644 --- a/server/-product/mediacube.product +++ b/server/-product/mediacube.product @@ -37,6 +37,7 @@ + diff --git a/server/user.jobengine.osgi.commons/META-INF/MANIFEST.MF b/server/user.jobengine.osgi.commons/META-INF/MANIFEST.MF index 6640ed5c..7c2dcf0d 100644 --- a/server/user.jobengine.osgi.commons/META-INF/MANIFEST.MF +++ b/server/user.jobengine.osgi.commons/META-INF/MANIFEST.MF @@ -13,7 +13,9 @@ Import-Package: com.fasterxml.jackson.annotation;version="2.4.5", com.fasterxml.jackson.jaxrs.json;version="2.4.5", com.sun.jna, junit.framework, + org.apache.commons.logging, org.apache.logging.log4j, + org.apache.logging.log4j.core, org.apache.logging.log4j.message, org.jboss.resteasy.client.jaxrs, org.jboss.resteasy.plugins.providers, @@ -65,4 +67,5 @@ Require-Bundle: org.apache.commons.lang;bundle-version="2.4.0", org.apache.httpcomponents.httpclient;bundle-version="4.2.6", org.apache.httpcomponents.httpcore;bundle-version="4.2.5", org.apache.commons.io;bundle-version="2.2.0", - com.ibm.db2.jcc;bundle-version="1.4.0" + com.ibm.db2.jcc;bundle-version="1.4.0", + org.jboss.resteasy.jaxrs-api;bundle-version="3.0.11" diff --git a/server/user.jobengine.osgi.commons/src/user/commons/MediaSamuraiAPI.java b/server/user.jobengine.osgi.commons/src/user/commons/MediaSamuraiAPI.java index 9dc7acc7..c24dd863 100644 --- a/server/user.jobengine.osgi.commons/src/user/commons/MediaSamuraiAPI.java +++ b/server/user.jobengine.osgi.commons/src/user/commons/MediaSamuraiAPI.java @@ -1,9 +1,6 @@ package user.commons; -import java.io.File; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.List; +import java.util.Objects; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -13,7 +10,6 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.commons.io.FilenameUtils; -import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder; @@ -22,23 +18,21 @@ import org.jboss.resteasy.client.jaxrs.ResteasyWebTarget; import com.ibm.nosql.json.JSONUtil; import com.ibm.nosql.json.api.BasicDBObject; -import user.commons.nosql.NoSQLUtils; +public class MediaSamuraiAPI { -public class MediaSamuraiAPI implements IFFAStransAPI { + private static final Logger log = LogManager.getLogger(); - private static final Logger logger = LogManager.getLogger(); + public static void main1(String[] args) throws Exception { - public static void main(String[] args) throws Exception { final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); String inDir = "\\\\10.11.1.90\\data\\"; String outDir = "\\\\10.11.1.100\\Promise\\TRANSCODER\\FFASTRANSCODER\\Out\\"; String inputs[] = { "13-02009-0000-1.mov" }; - for (int i = 0; i < inputs.length; i++) { - final String inputi = inDir + inputs[i]; - final String outputi = outDir + FilenameUtils.removeExtension(inputs[i]) + ".MP4"; - // Task task = new Task(inDir + input, outDir + output); + for (String input2 : inputs) { + final String inputi = inDir + input2; + final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4"; Runnable task = new Runnable() { private String input = inputi; @@ -47,20 +41,24 @@ public class MediaSamuraiAPI implements IFFAStransAPI { @Override public void run() { try { - IFFAStransAPI api = new MediaSamuraiAPI("http://10.11.1.111:65445/api/json/v1/", p -> { - // System.out.println(output + " progress: " + p); + log.info("Started"); + MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/", p -> { + System.out.println(output + " progress: " + p); }); - - api.submit("MP4", input); - api.monitor(1000); - Thread.sleep(2000); - if (!Files.exists(Paths.get(output))) - throw new Exception("Missing " + output); + BasicDBObject job = new BasicDBObject(); + job.put("name", inputi); + job.put("delay", 1); + String taskId = api.submit(job); + + for (int i = 0; i < 10; i++) { + Thread.sleep(3000); + BasicDBObject status = api.getStatus(taskId); + log.info("TaskId {} {}% {}", taskId, status.getInt("progress"), status.getString("status")); + } } catch (Exception e) { e.printStackTrace(); } } - }; executor.execute(task); @@ -69,216 +67,84 @@ public class MediaSamuraiAPI implements IFFAStransAPI { executor.awaitTermination(1, TimeUnit.HOURS); } + public static void main(String[] args) throws Exception { + String inDir = "\\\\10.11.1.90\\data\\"; + String outDir = "\\\\10.11.1.100\\Promise\\TRANSCODER\\FFASTRANSCODER\\Out\\"; + + String inputs[] = { "13-02009-0000-1.mov" }; + for (String input2 : inputs) { + final String inputi = inDir + input2; + final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4"; + + try { + log.info("Started"); + MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/", p -> { + System.out.println(outputi + " progress: " + p); + }); + BasicDBObject job = new BasicDBObject(); + job.put("name", inputi); + job.put("delay", 1); + String taskId = api.submit(job); + while (true) { + Thread.sleep(3000); + BasicDBObject status = api.getStatus(taskId); + String taskStatus = status.getString("status"); + log.info("TaskId {} {}% {}", taskId, status.getInt("progress"), taskStatus); + if ("ERROR".equals(taskStatus) || "COMPLETED".equals(taskStatus)) { + break; + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + private ResteasyWebTarget webTarget; private IProgressChangedListener listener; - private String jobId; - private String inputFile; - private String workflowName; - private String inputFileName; public MediaSamuraiAPI(String apiAddress, IProgressChangedListener listener) { this.listener = listener; - webTarget = new ResteasyClientBuilder().build().target(apiAddress); - } - - private void doSubmit(BasicDBObject jobToSubmit) throws Exception { - ResteasyWebTarget target = webTarget.path("jobs"); - Response apiResponse = target.request().post(Entity.entity(jobToSubmit.toString(), MediaType.APPLICATION_JSON)); - if (apiResponse.getStatus() != 202) - throw new Exception("Can not submit, response status is: " + apiResponse.getStatus()); - String json = apiResponse.readEntity(String.class); - // logger.info("Transoder response: {}", json); - if (StringUtils.isBlank(json)) - throw new Exception("Can not submit, response JSON is empty"); - BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json); - if (resultObject == null) - throw new Exception("Can not submit, response object is null"); - jobId = resultObject.getString("job_id"); - logger.info("Job for {} is submitted, ID: {}", inputFile, jobId); - } - - @Override - public BasicDBObject getHistory() { - ResteasyWebTarget target = webTarget.path("history"); - BasicDBObject result = null; try { - Response apiResponse = target.request().get(); - if (apiResponse.getStatus() != 200) { - logger.info("{} | Invalid response {}", inputFileName, apiResponse.getStatus()); - return null; - } - String json = apiResponse.readEntity(String.class); - BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json); - List jobs = NoSQLUtils.asList(resultObject, "history"); - - if (jobs == null || jobs.size() == 0) { - logger.info("{} | No jobs in response", inputFileName); - return null; - } - File f = new File(inputFile); - - for (BasicDBObject job : jobs) { - if (job == null) - continue; - - String file = NoSQLUtils.asString(job, "file"); - if (file == null) - continue; - - String wf_name = NoSQLUtils.asString(job, "wf_name"); - if (wf_name == null || !workflowName.equals(wf_name)) - continue; - - if (file.equals(f.getName())) { - // logger.info("Found success history {}", f.getName()); - result = job; - break; - } - if (file.equals(inputFile)) { - // logger.info("Found error history {}", inputFile); - result = job; - break; - } - } + webTarget = new ResteasyClientBuilder().build().target(apiAddress); } catch (Exception e) { - logger.error(e.getClass() + " " + e.getMessage()); + log.info("Error", e); } - - return result; + log.info("Created"); } - @Override - public BasicDBObject getStatus(String jobID) { - ResteasyWebTarget target = webTarget.path("jobs"); + public String submit(BasicDBObject jobToSubmit) throws Exception { + ResteasyWebTarget target = webTarget.path("submit"); + Response apiResponse = target.request().post(Entity.entity(jobToSubmit.toString(), MediaType.APPLICATION_JSON)); + if (apiResponse.getStatus() != 200) { + throw new Exception("Submit error, response status is: " + apiResponse.getStatus()); + } + String json = apiResponse.readEntity(String.class); + BasicDBObject result = (BasicDBObject) JSONUtil.jsonToDbObject(json); + String taskId = result.getString("task_id"); + log.info("Job for {} is submitted, Task ID: {}", jobToSubmit, taskId); + return taskId; + } - BasicDBObject result = null; + public BasicDBObject getStatus(String taskId) { + ResteasyWebTarget target = webTarget.path("status").path(taskId); + BasicDBObject status = null; try { Response apiResponse = target.request().get(); - // logger.info("Transoder response code: {}", apiResponse.getStatus()); - if (apiResponse.getStatus() != 200) { - logger.info("{} | Invalid response {}", inputFileName, apiResponse.getStatus()); + log.info("Task ID {} status: Invalid response {}", taskId, apiResponse.getStatus()); return null; } String json = apiResponse.readEntity(String.class); - // logger.info("Transoder response: {}", json); - // System.out.println(json); - BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json); - List jobs = NoSQLUtils.asList(resultObject, "jobs"); - - if (jobs == null || jobs.size() == 0) { - logger.info("{} | No jobs in response", inputFileName); - return null; + status = (BasicDBObject) JSONUtil.jsonToDbObject(json); + if (Objects.nonNull(listener)) { + listener.onProgressChanged(status.getInt("progress")); } - for (BasicDBObject job : jobs) { - if (jobID.equals(job.getString("job_id"))) { - // logger.info("Found job {}", jobID); - result = job; - break; - } - } - } catch (Exception e) { - logger.error(e.getClass() + " " + e.getMessage()); - } - - return result; - } - - private long getWorkflowId(String workflowName, List workflows, long wfID) { - for (BasicDBObject workflow : workflows) { - if (!workflowName.equals(workflow.getString("wf_name"))) - continue; - wfID = workflow.getLong("wf_id"); - } - return wfID; - } - - @Override - public List getWorkflows() { - ResteasyWebTarget target = webTarget.path("workflows"); - Response apiResponse = target.request().get(); - if (apiResponse.getStatus() != 200) - return null; - String json = apiResponse.readEntity(String.class); - BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json); - return NoSQLUtils.asList(resultObject, "workflows"); - } - - @Override - public void monitor(int pollIntervall) throws InterruptedException, Exception { - int progress = 0; - - long started = System.currentTimeMillis(); - - BasicDBObject status = null; - boolean hasAnyResponse = false; - while (true) { - long queryTime = System.currentTimeMillis(); - - status = getStatus(jobId); - if (status != null) { - hasAnyResponse = true; - List splits = NoSQLUtils.asList(status, "splits"); - - int current = 0; - if (splits != null && splits.size() > 0) { - for (BasicDBObject split : splits) { - String prg = NoSQLUtils.asString(split, "progress"); - current += (int) Float.parseFloat(prg); - } - current = current / splits.size(); - } - if (current != progress) { - progress = current; - listener.onProgressChanged(progress); - logger.info("{} | {}%", inputFileName, progress); - } - - } else { - logger.info("{} status does not exist, checking history", inputFileName); - listener.onProgressChanged(100); - BasicDBObject history = getHistory(); - if (history != null) { - hasAnyResponse = true; - long state = NoSQLUtils.asLong(history, "state"); - String jobEnd = history.getString("job_end"); - if (history == null || state != 1) { - String error = NoSQLUtils.asString(history, "outcome"); - if (error == null) - error = "Not specified error occured"; - throw new Exception("State: " + state + ", message: " + error); - } else { - logger.info("{} | completed", inputFileName); - break; - } - - } - } - - // 5 percig nincs valasz - if (!hasAnyResponse && (queryTime - started > 10 * 60 * 1000)) - throw new Exception("Transcoder timeout"); - Thread.sleep(pollIntervall); + log.error(e.getClass() + " " + e.getMessage()); } + return status; } - @Override - public void submit(String workflowName, String inputFile) throws Exception { - this.workflowName = workflowName; - this.inputFile = inputFile; - this.inputFileName = Paths.get(inputFile).getFileName().toString(); - List workflows = getWorkflows(); - if (workflows == null) - throw new Exception("No workflows"); - - long wfID = -1; - wfID = getWorkflowId(workflowName, workflows, wfID); - if (wfID < 0) - throw new Exception("Workflow does not exist: " + workflowName); - - BasicDBObject jobToSubmit = new BasicDBObject("wf_id", wfID).append("inputfile", inputFile); - doSubmit(jobToSubmit); - } } -- 2.54.0