From 8092ba5947e617937363618dfc3ca3edd843ab92 Mon Sep 17 00:00:00 2001 From: "vasary.daniel" Date: Thu, 26 Nov 2020 22:29:11 +0000 Subject: [PATCH] git-tfs-id: [http://tfs.userrendszerhaz.hu:8080/tfs/DefaultCollection]$/MediaCube;C32065 --- client/Maestro/RestoreMedia.cs | 12 ++-- client/MediaCubeClient/MediaCubeStrings.cs | 3 + client/MediaCubeClient/MediaCubeWSApi.cs | 66 +++++++++++++++---- .../commons/log4j2/appender/SmtpManager.java | 13 ++-- .../server/steps/CancelableStep.java | 54 +++++++++++---- .../resources/i3-label_hu.properties | 2 +- .../src/user/jobengine/server/JobEngine.java | 32 ++++----- .../jobengine/server/JobStepExecutor.java | 9 ++- 8 files changed, 136 insertions(+), 55 deletions(-) diff --git a/client/Maestro/RestoreMedia.cs b/client/Maestro/RestoreMedia.cs index 18115a13..af3ccfac 100644 --- a/client/Maestro/RestoreMedia.cs +++ b/client/Maestro/RestoreMedia.cs @@ -35,13 +35,19 @@ namespace Maestro { return; } messageBus.Subscribe(m => { + if (!(m.Data is JObject)) { + throw new Exception("Unexpected data type: " + m.Data.GetType()); + } + BeginInvoke((Action)(() => { if (m.Finished) { if (m.Content?.Length > 0) systemBus.Send(m); Close(); - } else - progressBar.Value = m.Data.As("progress"); + } else { + var innerData = m.Data as JObject; + progressBar.Value = innerData.As("progress"); + } })); }); api = new MediaCubeWSApi(mediaCubeMetadata.WSServer, messageBus); @@ -58,8 +64,6 @@ namespace Maestro { } private void RestoreMedia_FormClosing(object sender, FormClosingEventArgs e) { - if (api != null) - api.Close(); } } } diff --git a/client/MediaCubeClient/MediaCubeStrings.cs b/client/MediaCubeClient/MediaCubeStrings.cs index 84d28223..c888ba2f 100644 --- a/client/MediaCubeClient/MediaCubeStrings.cs +++ b/client/MediaCubeClient/MediaCubeStrings.cs @@ -4,9 +4,12 @@ public static readonly string NAME = "name"; public static readonly string LIST = "list"; public static readonly string STARTJOB = "startjob"; + public static readonly string QUERYMEDIANAMES = "querymedianames"; public static readonly string ERROR = "error"; public static readonly string STATUS = "status"; public static readonly string TEMPLATE = "template"; public static readonly string PARAMETERS = "parameters"; + public static readonly string QUERY = "query"; + public static readonly string VERSION = "version"; } } diff --git a/client/MediaCubeClient/MediaCubeWSApi.cs b/client/MediaCubeClient/MediaCubeWSApi.cs index 7e023a13..32f8abad 100644 --- a/client/MediaCubeClient/MediaCubeWSApi.cs +++ b/client/MediaCubeClient/MediaCubeWSApi.cs @@ -5,12 +5,14 @@ using Newtonsoft.Json; using Newtonsoft.Json.Linq; using NLog; using System; +using System.Collections.Generic; +using System.Threading; using WebSocketSharp; namespace MediaCubeClient { public class MediaCubeWSMessage : MaestroMessage { public bool Finished { get; } - public JObject Data { get; } + public JContainer Data { get; } public MediaCubeWSMessage() : base(null) { Finished = true; @@ -20,7 +22,7 @@ namespace MediaCubeClient { Finished = true; } - public MediaCubeWSMessage(JObject data) : base(null) { + public MediaCubeWSMessage(JContainer data) : base(null) { Data = data; } @@ -30,13 +32,12 @@ namespace MediaCubeClient { private const string DATEFORMAT = "yyyy'-'MM'-'dd'T'HH':'mm':'ssK"; private static NLog.Logger logger = LogManager.GetCurrentClassLogger(); private JsonSerializerSettings serializerSettings; + private readonly Connection connection; private IMessageBus messageBus; private WebSocket ws; public MediaCubeWSApi(Connection connection, IMessageBus messageBus) { - ws = new WebSocket(connection.Address.ToString()) { - WaitTime = TimeSpan.FromMilliseconds(connection.Timeout) - }; + this.connection = connection; this.messageBus = messageBus; serializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Objects, @@ -61,6 +62,11 @@ namespace MediaCubeClient { } */ public void SubmitJob(string template, string name, JObject parameters) { + + ws = new WebSocket(connection.Address.ToString()) { + WaitTime = TimeSpan.FromMilliseconds(connection.Timeout) + }; + ws.SslConfiguration.ServerCertificateValidationCallback = (s, c, ch, e) => { return true; }; @@ -73,7 +79,7 @@ namespace MediaCubeClient { } if ("FINISHED".Equals(jo.As(MediaCubeStrings.STATUS))) { ws.Close(); - messageBus?.Send(new MediaCubeWSMessage()); + messageBus?.Send(new MediaCubeWSMessage()); return; } messageBus?.Send(new MediaCubeWSMessage(jo)); @@ -85,11 +91,6 @@ namespace MediaCubeClient { try { ws.Connect(); - } catch { - ws.Close(); - messageBus?.Send(new MediaCubeWSMessage("Sikertelen kapcsolódás a MediaCube szolgáltatáshoz.")); - } - try { JObject data = new JObject(); data.Add(MediaCubeStrings.NAME, JToken.FromObject(name)); data.Add(MediaCubeStrings.ACTION, JToken.FromObject(MediaCubeStrings.STARTJOB)); @@ -97,16 +98,53 @@ namespace MediaCubeClient { data.Add(MediaCubeStrings.PARAMETERS, JToken.FromObject(parameters)); ws.Send(data.Serialize()); } catch (Exception e) { - ws.Close(); - messageBus?.Send(new MediaCubeWSMessage("Sikertelen MediaCube folyamatindítás.")); + messageBus?.Send(new MediaCubeWSMessage("Hiba a MediaCube szolgáltatás használata során. Rendszerüzenet: " + e.Message)); } } + public List QueryMediaNames(string query, string version) { + List result = null; + + ws = new WebSocket(connection.Address.ToString()) { + WaitTime = TimeSpan.FromMilliseconds(connection.Timeout) + }; + + ws.SslConfiguration.ServerCertificateValidationCallback = (s, c, ch, e) => { + return true; + }; + + var barrier = new Barrier(2); + + ws.OnMessage += (s, e) => { + ws.Close(); + JArray jo = JArray.Parse(e.Data); + result = jo.ToObject>(); + messageBus?.Send(new MediaCubeWSMessage(jo)); + barrier.SignalAndWait(); + }; + ws.OnError += (s, e) => { + ws.Close(); + messageBus?.Send(new MediaCubeWSMessage("Hiba a MediaCube szolgáltatás használata során. Rendszerüzenet: " + e.Message)); + }; + + try { + ws.Connect(); + JObject data = new JObject(); + data.Add(MediaCubeStrings.ACTION, JToken.FromObject(MediaCubeStrings.QUERYMEDIANAMES)); + data.Add(MediaCubeStrings.QUERY, JToken.FromObject(query)); + data.Add(MediaCubeStrings.VERSION, JToken.FromObject(version)); + ws.Send(data.Serialize()); + barrier.SignalAndWait(); + } catch (Exception e) { + messageBus?.Send(new MediaCubeWSMessage($"Sikertelen MediaCube folyamatindítás. A rendszer üzenete: {e.Message}")); + } + return result; + } + public void Close() { if (ws != null) ws.Close(); } - } } diff --git a/server/user.commons.log4j2/src/user/commons/log4j2/appender/SmtpManager.java b/server/user.commons.log4j2/src/user/commons/log4j2/appender/SmtpManager.java index ac471ac3..b6558d13 100644 --- a/server/user.commons.log4j2/src/user/commons/log4j2/appender/SmtpManager.java +++ b/server/user.commons.log4j2/src/user/commons/log4j2/appender/SmtpManager.java @@ -157,9 +157,13 @@ public class SmtpManager extends AbstractManager { private static Map> sessionEvents = new ConcurrentHashMap<>(); private static MimeMessage createMimeMessage(final FactoryData data, final Session session, final LogEvent appendEvent) throws MessagingException { - return new MimeMessageBuilder(session).setFrom(data.from).setReplyTo(data.replyto).setRecipients(Message.RecipientType.TO, data.to) - .setRecipients(Message.RecipientType.CC, data.cc).setRecipients(Message.RecipientType.BCC, data.bcc) - .setSubject(data.subject.toSerializable(appendEvent)).build(); + return new MimeMessageBuilder(session).setFrom(data.from) + .setReplyTo(data.replyto) + .setRecipients(Message.RecipientType.TO, data.to) + .setRecipients(Message.RecipientType.CC, data.cc) + .setRecipients(Message.RecipientType.BCC, data.bcc) + .setSubject(data.subject.toSerializable(appendEvent)) + .build(); } public static SmtpManager getSmtpManager(final Configuration config, final String to, final String cc, final String bcc, final String from, @@ -414,7 +418,8 @@ public class SmtpManager extends AbstractManager { private void undoSessionEvents(final MediaCubeMarker mcm) { String sessionID = mcm.getSessionID(); - sessionEvents.remove(sessionID); + if (sessionID != null && sessionEvents.containsKey(sessionID)) + sessionEvents.remove(sessionID); } protected void writeBuffer(List events, final Layout layout, final OutputStream out) throws IOException { diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/CancelableStep.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/CancelableStep.java index 0f21e2ef..16b412b5 100644 --- a/server/user.jobengine.executors/src/user/jobengine/server/steps/CancelableStep.java +++ b/server/user.jobengine.executors/src/user/jobengine/server/steps/CancelableStep.java @@ -1,15 +1,17 @@ package user.jobengine.server.steps; +import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.commons.net.ftp.FTPClient; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.MarkerManager; -import user.commons.MediaCubeMarker; import user.commons.RemoteFile; import user.commons.StoreUri; import user.commons.mediatool.MediaInfo; @@ -24,23 +26,48 @@ public class CancelableStep extends JobStep { private static final Logger logger = LogManager.getLogger(); int count = 10; + void archiveTest() { + String fn = null; + try { + List files = Files.list(Paths.get("/mnt/PROMISE/ARCHIVE")).collect(Collectors.toList()); + for (Path f : files) { + String filename = f.getFileName().toString().toLowerCase(); + if (filename.endsWith(".mov") || filename.endsWith(".mxf")) { + MediaInfo mi = new MediaInfo(f); + mi.process(); + logger.info(MarkerManager.getMarker("MOVTEST"), "{} : hossz {}, audio {}", fn, mi.getFrames(), mi.getAudioStreams()); + } + } + } catch (Exception e) { + logger.info(MarkerManager.getMarker("MOVTEST"), "{} : feldolgozása sikertelen. {}", fn, e.getMessage()); + } + + } + @StepEntry public Object[] execute(int z) throws Exception { try { + archiveTest(); // retrieveTest(); // ftpTest(); - ((MediaCubeMarker) getJobRuntime().getFinishMarker()).setTo("hering.tamas70@gmail.com"); - - for (int i = 0; i < count; i++) { - if (getJobRuntime().isWaitingCancel()) - break; - Thread.sleep(100); - int progress = (i + 1) * 100 / count; - setProgress(progress); - logger.info(getSessionMarker(), "Current {}", i); - } - - logger.info(getSessionMarker(), "Finished"); + // ((MediaCubeMarker) getJobRuntime().getFinishMarker()).setTo("hering.tamas70@gmail.com"); + // + // for (int i = 0; i < count; i++) { + // if (getJobRuntime().isWaitingCancel()) + // break; + // Thread.sleep(100); + // int progress = (i + 1) * 100 / count; + // setProgress(progress); + // logger.info(getSessionMarker(), "Current {}", i); + // + // if (i == 1) { + // logger.info(new MediaCubeUndoMarker(getSessionMarker().getSessionID()), "Nincs feldolgozandó hiány."); + // cancel(); + // + // } + // } + // + // logger.info(getSessionMarker(), "Finished"); } catch (Exception e) { e.printStackTrace(); throw e; @@ -114,5 +141,4 @@ public class CancelableStep extends JobStep { } } - } diff --git a/server/user.jobengine.osgi.server/resources/i3-label_hu.properties b/server/user.jobengine.osgi.server/resources/i3-label_hu.properties index e478d064..fd583a09 100644 --- a/server/user.jobengine.osgi.server/resources/i3-label_hu.properties +++ b/server/user.jobengine.osgi.server/resources/i3-label_hu.properties @@ -1,4 +1,4 @@ -version=2.6.5.1 +version=2.6.5.2 footer=2016-2020 © Copyright User Rendszerház Kft. login_info=Információ diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java index fdb40335..3d687d27 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java @@ -334,8 +334,10 @@ public class JobEngine implements IJobEngine { if (!jobRuntime.isService() && jobRuntime.getParentJobId() == 0) { if (JobStatus.FINISHED.equals(jobRuntime.getStatus())) logger.info(jobRuntime.getFinishMarker(), "A '{}' folyamat futása sikeresen véget ért.", jobRuntime.getName()); - else + if (JobStatus.SUSPENDED.equals(jobRuntime.getStatus())) logger.error(jobRuntime.getFinishMarker(), "A '{}' folyamat futása megszakadt.", jobRuntime.getName()); + if (JobStatus.CANCELED.equals(jobRuntime.getStatus())) + logger.info(jobRuntime.getFinishMarker(), "A '{}' folyamat futása megszakítás miatt véget ért.", jobRuntime.getName()); } } @@ -584,8 +586,8 @@ public class JobEngine implements IJobEngine { for (Job job : runningJobs) { Job runningJob = getJob(job.getId()); if (runningJob != null && runningJob.getStatus() != JobStatus.SUSPENDED) - throw new JobEngineException( - String.format("Can not submit job. Job with %s.%s already running", template.getFileName(), template.getName())); + throw new JobEngineException(String.format("Can not submit job. Job with %s.%s already running", template.getFileName(), + template.getName())); } } } @@ -849,6 +851,18 @@ public class JobEngine implements IJobEngine { runQueue.remove(jobRuntime); } + @Override + public void removeGarbage() { + List removeId = new ArrayList<>(); + for (Long id : submittedJobs.keySet()) { + IJobRuntime runtime = submittedJobs.get(id); + if (runtime != null && (JobStatus.SUSPENDED.equals(runtime.getStatus()) || JobStatus.CANCELED.equals(runtime.getStatus()))) + removeId.add(id); + } + for (Long id : removeId) + submittedJobs.remove(id); + } + @Override public void removeJob(long id) { if (submittedJobs.containsKey(id)) { @@ -877,18 +891,6 @@ public class JobEngine implements IJobEngine { } - @Override - public void removeGarbage() { - List removeId = new ArrayList<>(); - for (Long id : submittedJobs.keySet()) { - IJobRuntime runtime = submittedJobs.get(id); - if (runtime != null && (JobStatus.SUSPENDED.equals(runtime.getStatus()) || JobStatus.CANCELED.equals(runtime.getStatus()))) - removeId.add(id); - } - for (Long id : removeId) - submittedJobs.remove(id); - } - @Override public ClusteredJob requestJob(String className) throws Exception { if (!executors.containsKey(className)) diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java index 7f5e385c..8f32f175 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java @@ -130,9 +130,12 @@ public class JobStepExecutor implements IJobStepExecutor { Object[] outputs = step.run(jobEngine, jobRuntime, inputs); //TODO itt lekezelni a remote notification-t - if (!jobRuntime.isWaitingCancel()) { - jobEngine.sendMessage(new JobStepCompletedMessage(jobRuntime.getId(), outputs)); - } + + //TODO ha ez van, akkor a WAITING_CANCEL allpotban marad, ha nincs, akkor meg neha visszakerul a run queue-ba + // if (!jobRuntime.isWaitingCancel()) { + // jobEngine.sendMessage(new JobStepCompletedMessage(jobRuntime.getId(), outputs)); + // } + jobEngine.sendMessage(new JobStepCompletedMessage(jobRuntime.getId(), outputs)); } public void shutdown() { -- 2.54.0