--- /dev/null
+package user.jobengine.server.steps;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.Marker;
+import org.apache.logging.log4j.message.Message;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+
+import user.commons.FFAStransAPI;
+import user.commons.IFFAStransAPI;
+import user.commons.StoreUri;
+import user.commons.mediatool.Timecode;
+import user.commons.mediatool.Timecode.Type;
+import user.commons.remotestore.RemoteStoreProtocol;
+import user.jobengine.db.FileType;
+import user.jobengine.db.IItemManager;
+import user.jobengine.db.Media;
+import user.jobengine.db.Store;
+import user.jobengine.server.IJobEngine;
+import user.jobengine.server.IJobRuntime;
+import user.jobengine.server.steps.shared.EscortFiles;
+
+public class TranscodeMediaSamuraiStep extends JobStep {
+ private static final int POLL_INTERVALL = 3000;
+ private static final String MP4EXT = ".MP4";
+ private static final String MXFEXT = ".MXF";
+ private static final String LOWRES_FILETYPE = "Low-res";
+ private static final Logger logger = LogManager.getLogger("TranscodeFFAStranStep");
+ private IItemManager manager;
+ private Store store;
+ private FileType fileType;
+ private Media mediaCubeMedia;
+ private Marker marker;
+
+ @StepEntry
+ public Object[] execute(ArchiveItem archiveItem, Media mediaCubeMedia, String transcoderAddress,
+ String transcoderTemplateName, String globalHiresSourcePath, String localLowresTargetPath,
+ boolean deleteSource, IJobEngine jobEngine, IJobRuntime jobRuntime) throws Exception {
+
+ this.marker = jobRuntime.getSessionMarker();
+ this.manager = jobEngine.getItemManager();
+ this.store = check(manager.getCurrentLowresStore(), "lowres Store");
+ this.fileType = check(manager.getFileType(LOWRES_FILETYPE), "lowres FileType");
+ this.mediaCubeMedia = check(mediaCubeMedia, "mediaCubeMedia");
+ check(archiveItem, "archiveItem");
+ check(transcoderAddress, "transcoderAddress");
+ check(transcoderTemplateName, "transcoderTemplateName");
+ check(globalHiresSourcePath, "globalHiresSourcePath");
+ check(localLowresTargetPath, "localLowresTargetPath");
+
+ File sourceMediaFile = new File(archiveItem.getMediaFile());
+ logger.info("Transcoding {}", archiveItem.getMediaFile());
+ String sourceFileName = sourceMediaFile.getName();
+ Timecode timecode = new Timecode(mediaCubeMedia.getLength(), Type.PAL);
+
+ String details = String.format("%s (%s, %d bytes)", sourceFileName, timecode.toString(),
+ sourceMediaFile.length());
+
+ StoreUri storeUri = store.getTargetStoreUri(RemoteStoreProtocol.LOCAL);
+ if (storeUri == null)
+ throw new Exception("Can not detect proxy folder.");
+
+ String webPath = storeUri.toString(true);
+
+ Path targetPath = null;
+ try {
+ String targetFileName = FilenameUtils.removeExtension(sourceFileName) + MP4EXT;
+ targetPath = Paths.get(localLowresTargetPath, targetFileName);
+ if (!targetPath.toFile().exists()) {
+ // jobRuntime.setDescription(String.format("%s: %s",
+ // jobRuntime.getDescription(), details));
+ jobRuntime.setDescription(String.format("%s transzkódolása", details));
+ String sourceFile = Paths.get(globalHiresSourcePath, sourceFileName).toString();
+ IFFAStransAPI api = new FFAStransAPI(transcoderAddress, p -> {
+ if (p <= 100)
+ jobRuntime.incrementProgress(p);
+ });
+
+ api.submit(transcoderTemplateName, sourceFile);
+ api.monitor(POLL_INTERVALL);
+ }
+
+ // a sikeres transzkod utan nem mindig van ott egybol a fajl
+ long started = System.currentTimeMillis();
+ while (!targetPath.toFile().exists()) {
+ long current = System.currentTimeMillis();
+ // max 5 perc varakozas
+ if (current - started > 5 * 60 * 1000)
+ throw new Exception("Transcode job target file access timed out");
+ Thread.sleep(POLL_INTERVALL);
+ }
+
+ postprocess(targetPath, webPath);
+
+ } catch (Exception e) {
+ logger.catching(e);
+ Message m = new ParameterizedMessage("{} átkódolás hiba: {}", sourceFileName, e.getMessage());
+ logger.error(marker, m);
+ throw new Exception(m.getFormattedMessage());
+ } finally {
+ try {
+ if (deleteSource && sourceMediaFile != null && sourceMediaFile.exists())
+ sourceMediaFile.delete();
+ } catch (Exception e) {
+ logger.catching(e);
+ }
+ try {
+ if (deleteSource && targetPath != null && targetPath.toFile().exists())
+ Files.delete(targetPath);
+ } catch (Exception e) {
+ logger.catching(e);
+ }
+ }
+ return null;
+ }
+
+ private void postprocess(Path transcodedFilePath, String webPath) throws IOException {
+ Path lowresPath = null;
+ try {
+ String transcodedFileName = transcodedFilePath.getFileName().toString();
+ String targetPath = null;
+ if (transcodedFileName.indexOf(".") > 2) {
+ Path subdir = Paths.get(transcodedFileName.substring(0, 1), transcodedFileName.substring(1, 2),
+ transcodedFileName.substring(2, 3));
+ EscortFiles.ensureUNCFolder(webPath, subdir.toString());
+ targetPath = Paths.get(subdir.toString(), transcodedFileName).toString();
+ } else {
+ targetPath = transcodedFileName;
+ }
+ lowresPath = Paths.get(webPath, targetPath);
+ int version = 1;
+ while (lowresPath.toFile().exists()) {
+ String fileName = transcodedFileName + version + MP4EXT;
+ lowresPath = Paths.get(lowresPath.toString().replace(transcodedFileName, fileName));
+ targetPath = targetPath.replace(transcodedFileName, fileName);
+ transcodedFileName = fileName;
+ version++;
+ }
+
+ Files.move(transcodedFilePath, lowresPath);
+ manager.createMediaFile(targetPath, fileType, store, mediaCubeMedia).add();
+ } catch (IOException e) {
+ logger.catching(e);
+ logger.error(marker, "A(z) '{}' állomány mozgatása a '{}' helyre nem sikerült.", transcodedFilePath,
+ lowresPath);
+ throw e;
+ }
+ }
+
+}
--- /dev/null
+package user.commons;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;
+import org.jboss.resteasy.client.jaxrs.ResteasyWebTarget;
+
+import com.ibm.nosql.json.JSONUtil;
+import com.ibm.nosql.json.api.BasicDBObject;
+
+import user.commons.nosql.NoSQLUtils;
+
+public class MediaSamuraiAPI implements IFFAStransAPI {
+
+ private static final Logger logger = LogManager.getLogger();
+
+ public static void main(String[] args) throws Exception {
+ final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
+
+ String inDir = "\\\\10.11.1.90\\data\\";
+ String outDir = "\\\\10.11.1.100\\Promise\\TRANSCODER\\FFASTRANSCODER\\Out\\";
+
+ String inputs[] = { "13-02009-0000-1.mov" };
+ for (int i = 0; i < inputs.length; i++) {
+ final String inputi = inDir + inputs[i];
+ final String outputi = outDir + FilenameUtils.removeExtension(inputs[i]) + ".MP4";
+ // Task task = new Task(inDir + input, outDir + output);
+
+ Runnable task = new Runnable() {
+ private String input = inputi;
+ private String output = outputi;
+
+ @Override
+ public void run() {
+ try {
+ IFFAStransAPI api = new MediaSamuraiAPI("http://10.11.1.111:65445/api/json/v1/", p -> {
+ // System.out.println(output + " progress: " + p);
+ });
+
+ api.submit("MP4", input);
+ api.monitor(1000);
+ Thread.sleep(2000);
+ if (!Files.exists(Paths.get(output)))
+ throw new Exception("Missing " + output);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ };
+
+ executor.execute(task);
+ }
+ executor.shutdown();
+ executor.awaitTermination(1, TimeUnit.HOURS);
+ }
+
+ private ResteasyWebTarget webTarget;
+ private IProgressChangedListener listener;
+ private String jobId;
+ private String inputFile;
+ private String workflowName;
+ private String inputFileName;
+
+ public MediaSamuraiAPI(String apiAddress, IProgressChangedListener listener) {
+ this.listener = listener;
+ webTarget = new ResteasyClientBuilder().build().target(apiAddress);
+ }
+
+ private void doSubmit(BasicDBObject jobToSubmit) throws Exception {
+ ResteasyWebTarget target = webTarget.path("jobs");
+ Response apiResponse = target.request().post(Entity.entity(jobToSubmit.toString(), MediaType.APPLICATION_JSON));
+ if (apiResponse.getStatus() != 202)
+ throw new Exception("Can not submit, response status is: " + apiResponse.getStatus());
+ String json = apiResponse.readEntity(String.class);
+ // logger.info("Transoder response: {}", json);
+ if (StringUtils.isBlank(json))
+ throw new Exception("Can not submit, response JSON is empty");
+ BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json);
+ if (resultObject == null)
+ throw new Exception("Can not submit, response object is null");
+ jobId = resultObject.getString("job_id");
+ logger.info("Job for {} is submitted, ID: {}", inputFile, jobId);
+ }
+
+ @Override
+ public BasicDBObject getHistory() {
+ ResteasyWebTarget target = webTarget.path("history");
+ BasicDBObject result = null;
+ try {
+ Response apiResponse = target.request().get();
+ if (apiResponse.getStatus() != 200) {
+ logger.info("{} | Invalid response {}", inputFileName, apiResponse.getStatus());
+ return null;
+ }
+ String json = apiResponse.readEntity(String.class);
+ BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json);
+ List<BasicDBObject> jobs = NoSQLUtils.asList(resultObject, "history");
+
+ if (jobs == null || jobs.size() == 0) {
+ logger.info("{} | No jobs in response", inputFileName);
+ return null;
+ }
+
+ File f = new File(inputFile);
+
+ for (BasicDBObject job : jobs) {
+ if (job == null)
+ continue;
+
+ String file = NoSQLUtils.asString(job, "file");
+ if (file == null)
+ continue;
+
+ String wf_name = NoSQLUtils.asString(job, "wf_name");
+ if (wf_name == null || !workflowName.equals(wf_name))
+ continue;
+
+ if (file.equals(f.getName())) {
+ // logger.info("Found success history {}", f.getName());
+ result = job;
+ break;
+ }
+ if (file.equals(inputFile)) {
+ // logger.info("Found error history {}", inputFile);
+ result = job;
+ break;
+ }
+ }
+ } catch (Exception e) {
+ logger.error(e.getClass() + " " + e.getMessage());
+ }
+
+ return result;
+ }
+
+ @Override
+ public BasicDBObject getStatus(String jobID) {
+ ResteasyWebTarget target = webTarget.path("jobs");
+
+ BasicDBObject result = null;
+ try {
+ Response apiResponse = target.request().get();
+ // logger.info("Transoder response code: {}", apiResponse.getStatus());
+
+ if (apiResponse.getStatus() != 200) {
+ logger.info("{} | Invalid response {}", inputFileName, apiResponse.getStatus());
+ return null;
+ }
+ String json = apiResponse.readEntity(String.class);
+ // logger.info("Transoder response: {}", json);
+ // System.out.println(json);
+ BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json);
+ List<BasicDBObject> jobs = NoSQLUtils.asList(resultObject, "jobs");
+
+ if (jobs == null || jobs.size() == 0) {
+ logger.info("{} | No jobs in response", inputFileName);
+ return null;
+ }
+ for (BasicDBObject job : jobs) {
+ if (jobID.equals(job.getString("job_id"))) {
+ // logger.info("Found job {}", jobID);
+ result = job;
+ break;
+ }
+ }
+
+ } catch (Exception e) {
+ logger.error(e.getClass() + " " + e.getMessage());
+ }
+
+ return result;
+ }
+
+ private long getWorkflowId(String workflowName, List<BasicDBObject> workflows, long wfID) {
+ for (BasicDBObject workflow : workflows) {
+ if (!workflowName.equals(workflow.getString("wf_name")))
+ continue;
+ wfID = workflow.getLong("wf_id");
+ }
+ return wfID;
+ }
+
+ @Override
+ public List<BasicDBObject> getWorkflows() {
+ ResteasyWebTarget target = webTarget.path("workflows");
+ Response apiResponse = target.request().get();
+ if (apiResponse.getStatus() != 200)
+ return null;
+ String json = apiResponse.readEntity(String.class);
+ BasicDBObject resultObject = (BasicDBObject) JSONUtil.jsonToDbObject(json);
+ return NoSQLUtils.asList(resultObject, "workflows");
+ }
+
+ @Override
+ public void monitor(int pollIntervall) throws InterruptedException, Exception {
+ int progress = 0;
+
+ long started = System.currentTimeMillis();
+
+ BasicDBObject status = null;
+ boolean hasAnyResponse = false;
+ while (true) {
+ long queryTime = System.currentTimeMillis();
+
+ status = getStatus(jobId);
+ if (status != null) {
+ hasAnyResponse = true;
+ List<BasicDBObject> splits = NoSQLUtils.asList(status, "splits");
+
+ int current = 0;
+ if (splits != null && splits.size() > 0) {
+ for (BasicDBObject split : splits) {
+ String prg = NoSQLUtils.asString(split, "progress");
+ current += (int) Float.parseFloat(prg);
+ }
+ current = current / splits.size();
+ }
+ if (current != progress) {
+ progress = current;
+ listener.onProgressChanged(progress);
+ logger.info("{} | {}%", inputFileName, progress);
+ }
+
+ } else {
+ logger.info("{} status does not exist, checking history", inputFileName);
+ listener.onProgressChanged(100);
+ BasicDBObject history = getHistory();
+ if (history != null) {
+ hasAnyResponse = true;
+ long state = NoSQLUtils.asLong(history, "state");
+ String jobEnd = history.getString("job_end");
+ if (history == null || state != 1) {
+ String error = NoSQLUtils.asString(history, "outcome");
+ if (error == null)
+ error = "Not specified error occured";
+ throw new Exception("State: " + state + ", message: " + error);
+ } else {
+ logger.info("{} | completed", inputFileName);
+ break;
+ }
+
+ }
+ }
+
+ // 5 percig nincs valasz
+ if (!hasAnyResponse && (queryTime - started > 10 * 60 * 1000))
+ throw new Exception("Transcoder timeout");
+ Thread.sleep(pollIntervall);
+ }
+ }
+
+ @Override
+ public void submit(String workflowName, String inputFile) throws Exception {
+ this.workflowName = workflowName;
+ this.inputFile = inputFile;
+ this.inputFileName = Paths.get(inputFile).getFileName().toString();
+ List<BasicDBObject> workflows = getWorkflows();
+ if (workflows == null)
+ throw new Exception("No workflows");
+
+ long wfID = -1;
+ wfID = getWorkflowId(workflowName, workflows, wfID);
+ if (wfID < 0)
+ throw new Exception("Workflow does not exist: " + workflowName);
+
+ BasicDBObject jobToSubmit = new BasicDBObject("wf_id", wfID).append("inputfile", inputFile);
+ doSubmit(jobToSubmit);
+ }
+}