Minimal MediaSamuraiAPI and test
authorVásáry Dániel <vasary@elgekko.net>
Mon, 29 Jan 2024 22:24:37 +0000 (23:24 +0100)
committerVásáry Dániel <vasary@elgekko.net>
Mon, 29 Jan 2024 22:24:37 +0000 (23:24 +0100)
server/-configuration/run-mediacube-server.launch
server/-dependencies/jobengine.target
server/-dependencies/pom.xml
server/-product/mediacube.product
server/user.jobengine.osgi.commons/META-INF/MANIFEST.MF
server/user.jobengine.osgi.commons/src/user/commons/MediaSamuraiAPI.java

index dc9e0b79c682c95c5e7d5006b843b9e3df9c594e..d8214b64e9a94eaa2813b260c3221c621a33c825 100644 (file)
@@ -38,6 +38,7 @@
         <setEntry value="com.ibm.nosql@default:default"/>\r
         <setEntry value="com.microsoft.sqlserver.sqljdbc@default:default"/>\r
         <setEntry value="com.sun.jna@default:default"/>\r
+        <setEntry value="commons-logging@default:default"/>\r
         <setEntry value="groovy@default:default"/>\r
         <setEntry value="javax.annotation-api@default:default"/>\r
         <setEntry value="javax.mail@default:default"/>\r
index 019d6763743b98540f0cb3a7130c32900afae2f3..a1905c20e4103da311bce5e6b5407c8962a877a8 100644 (file)
@@ -87,6 +87,7 @@
        <plugin id="org.objectweb.asm.commons"/>\r
        <plugin id="com.sun.jna.platform"/>\r
        <plugin id="org.apache.felix.fileinstall"/>\r
+       <plugin id="commons-logging"/>\r
 </includeBundles>\r
 <environment>\r
        <os>win32</os>\r
index 240ef05a3f145876dc8c659cf55e8dcea9aa1385..293e6158289c56eb216f1601a8d1c37b9a8f8d3e 100644 (file)
@@ -77,6 +77,9 @@
                                                <id>default-cli</id>\r
                                                <configuration>\r
                                                        <artifacts>\r
+<!--                                                           <artifact>-->\r
+<!--                                                                   <id>commons-logging:commons-logging:1.3.0</id>-->\r
+<!--                                                           </artifact>-->\r
                                                                <artifact>\r
                                                                        <id>org.apache.felix:org.apache.felix.fileinstall:3.7.4</id>\r
                                                                </artifact>\r
index 7f48d93d92162e95cd82f957dd59c21b1a6ddacc..af3db99d014dffc4ed3a47ad326633f807ca66dd 100644 (file)
@@ -37,6 +37,7 @@
       <plugin id="com.microsoft.sqlserver.sqljdbc"/>\r
       <plugin id="com.sun.jna"/>\r
       <plugin id="com.sun.jna.platform"/>\r
+      <plugin id="commons-logging"/>\r
       <plugin id="groovy"/>\r
       <plugin id="javax.annotation-api"/>\r
       <plugin id="javax.mail"/>\r
index 6640ed5c6afd3f7d578a23f501e8c9835ba01bb4..7c2dcf0d46d85b6ad21a7cd2ec927c053567c6c5 100644 (file)
@@ -13,7 +13,9 @@ Import-Package: com.fasterxml.jackson.annotation;version="2.4.5",
  com.fasterxml.jackson.jaxrs.json;version="2.4.5",
  com.sun.jna,
  junit.framework,
+ org.apache.commons.logging,
  org.apache.logging.log4j,
+ org.apache.logging.log4j.core,
  org.apache.logging.log4j.message,
  org.jboss.resteasy.client.jaxrs,
  org.jboss.resteasy.plugins.providers,
@@ -65,4 +67,5 @@ Require-Bundle: org.apache.commons.lang;bundle-version="2.4.0",
  org.apache.httpcomponents.httpclient;bundle-version="4.2.6",
  org.apache.httpcomponents.httpcore;bundle-version="4.2.5",
  org.apache.commons.io;bundle-version="2.2.0",
- com.ibm.db2.jcc;bundle-version="1.4.0"
+ com.ibm.db2.jcc;bundle-version="1.4.0",
+ org.jboss.resteasy.jaxrs-api;bundle-version="3.0.11"
index 9dc7acc7adae2cea0f8f74aaa0801d4869d60f08..c24dd8638b79461b1c26d9031c47896d7603ca44 100644 (file)
@@ -1,9 +1,6 @@
 package user.commons;
 
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -13,7 +10,6 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.lang.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;
@@ -22,23 +18,21 @@ import org.jboss.resteasy.client.jaxrs.ResteasyWebTarget;
 import com.ibm.nosql.json.JSONUtil;
 import com.ibm.nosql.json.api.BasicDBObject;
 
-import user.commons.nosql.NoSQLUtils;
+public class MediaSamuraiAPI {
 
-public class MediaSamuraiAPI implements IFFAStransAPI {
+       private static final Logger log = LogManager.getLogger();
 
-       private static final Logger logger = LogManager.getLogger();
+       public static void main1(String[] args) throws Exception {
 
-       public static void main(String[] args) throws Exception {
                final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
 
                String inDir = "\\\\10.11.1.90\\data\\";
                String outDir = "\\\\10.11.1.100\\Promise\\TRANSCODER\\FFASTRANSCODER\\Out\\";
 
                String inputs[] = { "13-02009-0000-1.mov" };
-               for (int i = 0; i < inputs.length; i++) {
-                       final String inputi = inDir + inputs[i];
-                       final String outputi = outDir + FilenameUtils.removeExtension(inputs[i]) + ".MP4";
-                       // Task task = new Task(inDir + input, outDir + output);
+               for (String input2 : inputs) {
+                       final String inputi = inDir + input2;
+                       final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4";
 
                        Runnable task = new Runnable() {
                                private String input = inputi;
@@ -47,20 +41,24 @@ public class MediaSamuraiAPI implements IFFAStransAPI {
                                @Override
                                public void run() {
                                        try {
-                                               IFFAStransAPI api = new MediaSamuraiAPI("http://10.11.1.111:65445/api/json/v1/", p -> {
-                                                       // System.out.println(output + " progress: " + p);
+                                               log.info("Started");
+                                               MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/", p -> {
+                                                       System.out.println(output + " progress: " + p);
                                                });
-
-                                               api.submit("MP4", input);
-                                               api.monitor(1000);
-                                               Thread.sleep(2000);
-                                               if (!Files.exists(Paths.get(output)))
-                                                       throw new Exception("Missing " + output);
+                                               BasicDBObject job = new BasicDBObject();
+                                               job.put("name", inputi);
+                                               job.put("delay", 1);
+                                               String taskId = api.submit(job);
+
+                                               for (int i = 0; i < 10; i++) {
+                                                       Thread.sleep(3000);
+                                                       BasicDBObject status = api.getStatus(taskId);
+                                                       log.info("TaskId {} {}% {}", taskId, status.getInt("progress"), status.getString("status"));
+                                               }
                                        } catch (Exception e) {
                                                e.printStackTrace();
                                        }
                                }
-
                        };
 
                        executor.execute(task);
@@ -69,216 +67,84 @@ public class MediaSamuraiAPI implements IFFAStransAPI {
                executor.awaitTermination(1, TimeUnit.HOURS);
        }
 
+       public static void main(String[] args) throws Exception {
+               String inDir = "\\\\10.11.1.90\\data\\";
+               String outDir = "\\\\10.11.1.100\\Promise\\TRANSCODER\\FFASTRANSCODER\\Out\\";
+
+               String inputs[] = { "13-02009-0000-1.mov" };
+               for (String input2 : inputs) {
+                       final String inputi = inDir + input2;
+                       final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4";
+
+                       try {
+                               log.info("Started");
+                               MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/", p -> {
+                                       System.out.println(outputi + " progress: " + p);
+                               });
+                               BasicDBObject job = new BasicDBObject();
+                               job.put("name", inputi);
+                               job.put("delay", 1);
+                               String taskId = api.submit(job);
+                               while (true) {
+                                       Thread.sleep(3000);
+                                       BasicDBObject status = api.getStatus(taskId);
+                                       String taskStatus = status.getString("status");
+                                       log.info("TaskId {} {}% {}", taskId, status.getInt("progress"), taskStatus);
+                                       if ("ERROR".equals(taskStatus) || "COMPLETED".equals(taskStatus)) {
+                                               break;
+                                       }
+                               }
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
+               }
+       }
+
        private ResteasyWebTarget webTarget;
        private IProgressChangedListener listener;
-       private String jobId;
-       private String inputFile;
-       private String workflowName;
-       private String inputFileName;
 
        public MediaSamuraiAPI(String apiAddress, IProgressChangedListener listener) {
                this.listener = listener;
-               webTarget = new ResteasyClientBuilder().build().target(apiAddress);
-       }
-
-       private void doSubmit(BasicDBObject jobToSubmit) throws Exception {
-               ResteasyWebTarget target = webTarget.path("jobs");
-               Response apiResponse = target.request().post(Entity.entity(jobToSubmit.toString(), MediaType.APPLICATION_JSON));
-               if (apiResponse.getStatus() != 202)
-                       throw new Exception("Can not submit, response status is: " + apiResponse.getStatus());
-               String json = apiResponse.readEntity(String.class);
-               // logger.info("Transoder response: {}", json);
-               if (StringUtils.isBlank(json))
-                       throw new Exception("Can not submit, response JSON is empty");
-               BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json);
-               if (resultObject == null)
-                       throw new Exception("Can not submit, response object is null");
-               jobId = resultObject.getString("job_id");
-               logger.info("Job for {} is submitted, ID: {}", inputFile, jobId);
-       }
-
-       @Override
-       public BasicDBObject getHistory() {
-               ResteasyWebTarget target = webTarget.path("history");
-               BasicDBObject result = null;
                try {
-                       Response apiResponse = target.request().get();
-                       if (apiResponse.getStatus() != 200) {
-                               logger.info("{} | Invalid response {}", inputFileName, apiResponse.getStatus());
-                               return null;
-                       }
-                       String json = apiResponse.readEntity(String.class);
-                       BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json);
-                       List<BasicDBObject> jobs = NoSQLUtils.asList(resultObject, "history");
-
-                       if (jobs == null || jobs.size() == 0) {
-                               logger.info("{} | No jobs in response", inputFileName);
-                               return null;
-                       }
 
-                       File f = new File(inputFile);
-
-                       for (BasicDBObject job : jobs) {
-                               if (job == null)
-                                       continue;
-
-                               String file = NoSQLUtils.asString(job, "file");
-                               if (file == null)
-                                       continue;
-
-                               String wf_name = NoSQLUtils.asString(job, "wf_name");
-                               if (wf_name == null || !workflowName.equals(wf_name))
-                                       continue;
-
-                               if (file.equals(f.getName())) {
-                                       // logger.info("Found success history {}", f.getName());
-                                       result = job;
-                                       break;
-                               }
-                               if (file.equals(inputFile)) {
-                                       // logger.info("Found error history {}", inputFile);
-                                       result = job;
-                                       break;
-                               }
-                       }
+                       webTarget = new ResteasyClientBuilder().build().target(apiAddress);
                } catch (Exception e) {
-                       logger.error(e.getClass() + " " + e.getMessage());
+                       log.info("Error", e);
                }
-
-               return result;
+               log.info("Created");
        }
 
-       @Override
-       public BasicDBObject getStatus(String jobID) {
-               ResteasyWebTarget target = webTarget.path("jobs");
+       public String submit(BasicDBObject jobToSubmit) throws Exception {
+               ResteasyWebTarget target = webTarget.path("submit");
+               Response apiResponse = target.request().post(Entity.entity(jobToSubmit.toString(), MediaType.APPLICATION_JSON));
+               if (apiResponse.getStatus() != 200) {
+                       throw new Exception("Submit error, response status is: " + apiResponse.getStatus());
+               }
+               String json = apiResponse.readEntity(String.class);
+               BasicDBObject result = (BasicDBObject) JSONUtil.jsonToDbObject(json);
+               String taskId = result.getString("task_id");
+               log.info("Job for {} is submitted, Task ID: {}", jobToSubmit, taskId);
+               return taskId;
+       }
 
-               BasicDBObject result = null;
+       public BasicDBObject getStatus(String taskId) {
+               ResteasyWebTarget target = webTarget.path("status").path(taskId);
+               BasicDBObject status = null;
                try {
                        Response apiResponse = target.request().get();
-                       // logger.info("Transoder response code: {}", apiResponse.getStatus());
-
                        if (apiResponse.getStatus() != 200) {
-                               logger.info("{} | Invalid response {}", inputFileName, apiResponse.getStatus());
+                               log.info("Task ID {} status: Invalid response {}", taskId, apiResponse.getStatus());
                                return null;
                        }
                        String json = apiResponse.readEntity(String.class);
-                       // logger.info("Transoder response: {}", json);
-                       // System.out.println(json);
-                       BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json);
-                       List<BasicDBObject> jobs = NoSQLUtils.asList(resultObject, "jobs");
-
-                       if (jobs == null || jobs.size() == 0) {
-                               logger.info("{} | No jobs in response", inputFileName);
-                               return null;
+                       status = (BasicDBObject) JSONUtil.jsonToDbObject(json);
+                       if (Objects.nonNull(listener)) {
+                               listener.onProgressChanged(status.getInt("progress"));
                        }
-                       for (BasicDBObject job : jobs) {
-                               if (jobID.equals(job.getString("job_id"))) {
-                                       // logger.info("Found job {}", jobID);
-                                       result = job;
-                                       break;
-                               }
-                       }
-
                } catch (Exception e) {
-                       logger.error(e.getClass() + " " + e.getMessage());
-               }
-
-               return result;
-       }
-
-       private long getWorkflowId(String workflowName, List<BasicDBObject> workflows, long wfID) {
-               for (BasicDBObject workflow : workflows) {
-                       if (!workflowName.equals(workflow.getString("wf_name")))
-                               continue;
-                       wfID = workflow.getLong("wf_id");
-               }
-               return wfID;
-       }
-
-       @Override
-       public List<BasicDBObject> getWorkflows() {
-               ResteasyWebTarget target = webTarget.path("workflows");
-               Response apiResponse = target.request().get();
-               if (apiResponse.getStatus() != 200)
-                       return null;
-               String json = apiResponse.readEntity(String.class);
-               BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json);
-               return NoSQLUtils.asList(resultObject, "workflows");
-       }
-
-       @Override
-       public void monitor(int pollIntervall) throws InterruptedException, Exception {
-               int progress = 0;
-
-               long started = System.currentTimeMillis();
-
-               BasicDBObject status = null;
-               boolean hasAnyResponse = false;
-               while (true) {
-                       long queryTime = System.currentTimeMillis();
-
-                       status = getStatus(jobId);
-                       if (status != null) {
-                               hasAnyResponse = true;
-                               List<BasicDBObject> splits = NoSQLUtils.asList(status, "splits");
-
-                               int current = 0;
-                               if (splits != null && splits.size() > 0) {
-                                       for (BasicDBObject split : splits) {
-                                               String prg = NoSQLUtils.asString(split, "progress");
-                                               current += (int) Float.parseFloat(prg);
-                                       }
-                                       current = current / splits.size();
-                               }
-                               if (current != progress) {
-                                       progress = current;
-                                       listener.onProgressChanged(progress);
-                                       logger.info("{} | {}%", inputFileName, progress);
-                               }
-
-                       } else {
-                               logger.info("{} status does not exist, checking history", inputFileName);
-                               listener.onProgressChanged(100);
-                               BasicDBObject history = getHistory();
-                               if (history != null) {
-                                       hasAnyResponse = true;
-                                       long state = NoSQLUtils.asLong(history, "state");
-                                       String jobEnd = history.getString("job_end");
-                                       if (history == null || state != 1) {
-                                               String error = NoSQLUtils.asString(history, "outcome");
-                                               if (error == null)
-                                                       error = "Not specified error occured";
-                                               throw new Exception("State: " + state + ", message: " + error);
-                                       } else {
-                                               logger.info("{} | completed", inputFileName);
-                                               break;
-                                       }
-
-                               }
-                       }
-
-                       // 5 percig nincs valasz
-                       if (!hasAnyResponse && (queryTime - started > 10 * 60 * 1000))
-                               throw new Exception("Transcoder timeout");
-                       Thread.sleep(pollIntervall);
+                       log.error(e.getClass() + " " + e.getMessage());
                }
+               return status;
        }
 
-       @Override
-       public void submit(String workflowName, String inputFile) throws Exception {
-               this.workflowName = workflowName;
-               this.inputFile = inputFile;
-               this.inputFileName = Paths.get(inputFile).getFileName().toString();
-               List<BasicDBObject> workflows = getWorkflows();
-               if (workflows == null)
-                       throw new Exception("No workflows");
-
-               long wfID = -1;
-               wfID = getWorkflowId(workflowName, workflows, wfID);
-               if (wfID < 0)
-                       throw new Exception("Workflow does not exist: " + workflowName);
-
-               BasicDBObject jobToSubmit = new BasicDBObject("wf_id", wfID).append("inputfile", inputFile);
-               doSubmit(jobToSubmit);
-       }
 }