From af46e54d731b01d075225e2672582a9b30f70c52 Mon Sep 17 00:00:00 2001 From: =?utf8?q?V=C3=A1s=C3=A1ry=20D=C3=A1niel?= Date: Mon, 26 Nov 2018 15:34:25 +0000 Subject: [PATCH] =?utf8?q?Feature=20#107=20Folyamat=20futtat=C3=A1s=20?= =?utf8?q?=C3=A9s=20monitoroz=C3=A1s=20kiaj=C3=A1nl=C3=A1sa=20WS-en?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit git-tfs-id: [http://tfs.userrendszerhaz.hu:8080/tfs/DefaultCollection]$/MediaCube;C31350 --- .../run-mediacube-server-bsh.launch | 2 +- .../jobtemplates/fake.xml | 2 +- .../user/jobengine/server/steps/FakeStep.java | 5 +- .../user/commons/nexio/api/MediaListener.java | 3 +- .../src/user/jobengine/server/JobEngine.java | 5 +- .../user/jobengine/zk/model/JobListModel.java | 9 +- .../META-INF/MANIFEST.MF | 1 + .../WEB-INF/web.xml | 10 ++ .../ws/mediacube/MediaCubeAPIWSServlet.java | 20 +++ .../ws/mediacube/MediaCubeAPIWSSocket.java | 135 ++++++++++++++++++ .../osgi/ws/nexio/NexioWSSocket.java | 71 +++++---- 11 files changed, 222 insertions(+), 41 deletions(-) create mode 100644 server/user.jobengine.osgi.services/src/user/jobengine/osgi/ws/mediacube/MediaCubeAPIWSServlet.java create mode 100644 server/user.jobengine.osgi.services/src/user/jobengine/osgi/ws/mediacube/MediaCubeAPIWSSocket.java diff --git a/server/-configuration/run-mediacube-server-bsh.launch b/server/-configuration/run-mediacube-server-bsh.launch index c4233291..bdc1613d 100644 --- a/server/-configuration/run-mediacube-server-bsh.launch +++ b/server/-configuration/run-mediacube-server-bsh.launch @@ -19,7 +19,7 @@ - + diff --git a/server/user.jobengine.executors/jobtemplates/fake.xml b/server/user.jobengine.executors/jobtemplates/fake.xml index 6f0dc280..c1e88519 100644 --- a/server/user.jobengine.executors/jobtemplates/fake.xml +++ b/server/user.jobengine.executors/jobtemplates/fake.xml @@ -1,5 +1,5 @@ - + diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/FakeStep.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/FakeStep.java index f93d3afc..fb3c72d1 100644 --- a/server/user.jobengine.executors/src/user/jobengine/server/steps/FakeStep.java +++ b/server/user.jobengine.executors/src/user/jobengine/server/steps/FakeStep.java @@ -3,7 +3,6 @@ package user.jobengine.server.steps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import user.commons.MediaCubeMarker; import user.jobengine.server.IJobEngine; import user.jobengine.server.IJobRuntime; @@ -16,7 +15,7 @@ public class FakeStep extends JobStep { public Object[] execute(long itemID, IJobEngine jobEngine, IJobRuntime jobRuntime) throws Exception { Object[] result = { itemID + 1 }; - ((MediaCubeMarker) jobRuntime.getMarker()).setTo("vasary@elgekko.net"); + //((MediaCubeMarker) jobRuntime.getMarker()).setTo("vasary@elgekko.net"); //logger.info(jobRuntime.getMarker(), "usefullink"); logger.info(jobRuntime.getMarker(), "Starting with {} #{}", itemID, jobRuntime.getId()); @@ -32,7 +31,7 @@ public class FakeStep extends JobStep { // if (i == 2) // throw new Exception("TESZT"); - //logger.info("Progress {}", jobRuntime.getProgress()); + logger.info("Progress {}", jobRuntime.getProgress()); } } catch (Exception e) { logger.error(jobRuntime.getMarker(), e.getMessage()); diff --git a/server/user.jobengine.osgi.commons/src/user/commons/nexio/api/MediaListener.java b/server/user.jobengine.osgi.commons/src/user/commons/nexio/api/MediaListener.java index 34326026..52cfbca3 100644 --- a/server/user.jobengine.osgi.commons/src/user/commons/nexio/api/MediaListener.java +++ b/server/user.jobengine.osgi.commons/src/user/commons/nexio/api/MediaListener.java @@ -215,7 +215,8 @@ public class MediaListener { } } } catch (Exception e) { - logger.catching(e); + //logger.catching(e); + logger.error(e.getMessage()); forceRestart(); } } diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java index 1a0e9657..fca18054 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java @@ -13,6 +13,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -170,7 +171,7 @@ public class JobEngine implements IJobEngine { private SchedulerService schedulerService = null; - private ArrayList jobChangedListenerList = new ArrayList(); + private List jobChangedListenerList = new CopyOnWriteArrayList<>(); /** * A MAM motor példányosítása. Ennek során létrejönnek a várakozási sorok, az ütemező szál, az üzenet kezelő szál. @@ -853,7 +854,7 @@ public class JobEngine implements IJobEngine { result = new JobRuntime(this, program); result.setPersister(itemManager); result.setTemplate(template); - result.setName(name); + result.setName(name == null ? program.getTemplate().getName() : name); result.setParameters(parameters); result.setService(program.getTemplate().isService()); submit(result); diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/zk/model/JobListModel.java b/server/user.jobengine.osgi.server/src/user/jobengine/zk/model/JobListModel.java index 19aaf9b7..e927663f 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/zk/model/JobListModel.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/zk/model/JobListModel.java @@ -174,12 +174,17 @@ public class JobListModel extends AsyncBaseModel implements IJobChangedListener @Command public void restartNexio() { + INexioAPI nexioAPI = ComponentBinder.getNexioAPI(); + NexioDispatcher dispatcher = nexioAPI.getDispatcher(); + if (dispatcher.isRestartInProgress()) { + Messagebox.show("A NEXIO szinkronizálás újraindítása folyamatban van."); + return; + } + EventListener confirmedEvent = evt -> { if (evt.getName().equals("onCancel")) return; try { - INexioAPI nexioAPI = ComponentBinder.getNexioAPI(); - NexioDispatcher dispatcher = nexioAPI.getDispatcher(); dispatcher.restart(); } catch (Exception e) { Messagebox.show(e.getMessage()); diff --git a/server/user.jobengine.osgi.services/META-INF/MANIFEST.MF b/server/user.jobengine.osgi.services/META-INF/MANIFEST.MF index 00d93990..4ccfcb54 100644 --- a/server/user.jobengine.osgi.services/META-INF/MANIFEST.MF +++ b/server/user.jobengine.osgi.services/META-INF/MANIFEST.MF @@ -38,5 +38,6 @@ Import-Package: com.fasterxml.jackson.annotation;version="2.4.5", user.jobengine.server Export-Package: user.jobengine.osgi.rest, user.jobengine.osgi.rest.octopus, + user.jobengine.osgi.ws.mediacube, user.jobengine.osgi.ws.nexio Require-Bundle: org.jboss.resteasy.jaxrs;bundle-version="3.0.11" diff --git a/server/user.jobengine.osgi.services/WEB-INF/web.xml b/server/user.jobengine.osgi.services/WEB-INF/web.xml index 8702cb52..d861e1a9 100644 --- a/server/user.jobengine.osgi.services/WEB-INF/web.xml +++ b/server/user.jobengine.osgi.services/WEB-INF/web.xml @@ -40,6 +40,16 @@ /nexio/* + + WS-MEDIACUBEAPI + user.jobengine.osgi.ws.mediacube.MediaCubeAPIWSServlet + 1 + + + WS-MEDIACUBEAPI + /wsapi/* + + diff --git a/server/user.jobengine.osgi.services/src/user/jobengine/osgi/ws/mediacube/MediaCubeAPIWSServlet.java b/server/user.jobengine.osgi.services/src/user/jobengine/osgi/ws/mediacube/MediaCubeAPIWSServlet.java new file mode 100644 index 00000000..4f9394a5 --- /dev/null +++ b/server/user.jobengine.osgi.services/src/user/jobengine/osgi/ws/mediacube/MediaCubeAPIWSServlet.java @@ -0,0 +1,20 @@ +package user.jobengine.osgi.ws.mediacube; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; + +@SuppressWarnings("serial") +public class MediaCubeAPIWSServlet extends WebSocketServlet { + private static final Logger logger = LogManager.getLogger(); + + public MediaCubeAPIWSServlet() { + logger.info("Created"); + } + + @Override + public void configure(WebSocketServletFactory factory) { + factory.register(MediaCubeAPIWSSocket.class); + } +} diff --git a/server/user.jobengine.osgi.services/src/user/jobengine/osgi/ws/mediacube/MediaCubeAPIWSSocket.java b/server/user.jobengine.osgi.services/src/user/jobengine/osgi/ws/mediacube/MediaCubeAPIWSSocket.java new file mode 100644 index 00000000..61ce989b --- /dev/null +++ b/server/user.jobengine.osgi.services/src/user/jobengine/osgi/ws/mediacube/MediaCubeAPIWSSocket.java @@ -0,0 +1,135 @@ +package user.jobengine.osgi.ws.mediacube; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.ibm.nosql.json.JSONUtil; +import com.ibm.nosql.json.api.BasicDBObject; + +import user.commons.JobStatus; +import user.jobengine.db.ItemManagerData.SignalType; +import user.jobengine.osgi.rest.ComponentBinder; +import user.jobengine.osgi.rest.ServiceObjectMapper; +import user.jobengine.server.IJobChangedListener; +import user.jobengine.server.IJobEngine; +import user.jobengine.server.IJobRuntime; + +public class MediaCubeAPIWSSocket extends WebSocketAdapter { + private static final Logger logger = LogManager.getLogger(); + public static final String DATA = "data"; + public static final String LIST = "list"; + public static final String STARTJOB = "startjob"; + public static final String ACTION = "action"; + public static final String ERROR = "error"; + private static final String NO_ACTION_SPECIFIED = "Missing action type."; + private static final String TEMPLATE = "template"; + private static final String PARAMETERS = "parameters"; + private static final String NO_TEMPLATE_SPECIFIED = "Missing template name."; + private static final String NO_PARAMETERS_SPECIFIED = "Missing job parameters."; + private static ObjectMapper mapper; + private static Map jobChangedListeners = new ConcurrentHashMap<>(); + + public MediaCubeAPIWSSocket() { + mapper = ServiceObjectMapper.createMapper(); + logger.info("EventSocket created"); + } + + public void asyncSendResponse(Object response) { + try { + String data = mapper.writeValueAsString(response); + logger.info("Sending " + data); + Session session = getSession(); + if (session != null) + session.getRemote().sendStringByFuture(data); + } catch (Exception e) { + logger.catching(e); + } + } + + @Override + public void onWebSocketClose(int statusCode, String reason) { + super.onWebSocketClose(statusCode, reason); + logger.info("Socket Closed: [{}]", reason); + } + + @Override + public void onWebSocketConnect(Session sess) { + super.onWebSocketConnect(sess); + logger.info("Socket Connected: " + sess); + } + + @Override + public void onWebSocketError(Throwable cause) { + super.onWebSocketError(cause); + logger.info("Error: [{}]", String.valueOf(cause.getMessage())); + } + + @Override + public void onWebSocketText(String message) { + super.onWebSocketText(message); + try { + + BasicDBObject data = (BasicDBObject) JSONUtil.jsonToDbObject(message); + if (!data.containsKey(ACTION)) + throw new Exception(NO_ACTION_SPECIFIED); + String action = data.getString(ACTION); + switch (action) { + case STARTJOB: + startJob(data, getSession()); + break; + default: + throw new Exception("Unhandled action : " + action); + } + } catch (Exception e) { + logger.catching(e); + asyncSendResponse(new BasicDBObject(ERROR, e.getMessage())); + } + } + + private void removeJobChangedEventListener(long id) { + IJobChangedListener jobChangedListener = jobChangedListeners.remove(id); + if (jobChangedListener == null) + return; + logger.info("Removing job {}", id); + IJobEngine jobengine = ComponentBinder.getJobengine(); + jobengine.removeJobChangedEventListener(jobChangedListener); + Session session = getSession(); + if (session != null) + session.close(); + } + + private void startJob(BasicDBObject data, Session session) throws Exception { + if (!data.containsKey(TEMPLATE)) + throw new Exception(NO_TEMPLATE_SPECIFIED); + String template = data.getString(TEMPLATE); + if (!data.containsKey(PARAMETERS)) + throw new Exception(NO_PARAMETERS_SPECIFIED); + BasicDBObject parameters = (BasicDBObject) data.get(PARAMETERS); + IJobEngine jobengine = ComponentBinder.getJobengine(); + IJobRuntime runtime = jobengine.submit(template, null, parameters); + IJobChangedListener jobChangedListener = event -> { + try { + IJobRuntime job = event.getJob(); + if (job.getId() != runtime.getId()) + return; + BasicDBObject response = new BasicDBObject("jobID", job.getId()).append("status", job.getStatus()).append("signal", event.getSignalType()) + .append("progress", job.getProgress()); + asyncSendResponse(response); + if (SignalType.DELETE.equals(event.getSignalType()) || JobStatus.SUSPENDED.equals(job.getStatus())) { + removeJobChangedEventListener(runtime.getId()); + } + } catch (Exception e) { + removeJobChangedEventListener(runtime.getId()); + logger.catching(e); + } + }; + jobengine.addJobChangedEventListener(jobChangedListener); + jobChangedListeners.put(runtime.getId(), jobChangedListener); + } +} diff --git a/server/user.jobengine.osgi.services/src/user/jobengine/osgi/ws/nexio/NexioWSSocket.java b/server/user.jobengine.osgi.services/src/user/jobengine/osgi/ws/nexio/NexioWSSocket.java index 590159f0..66894b94 100644 --- a/server/user.jobengine.osgi.services/src/user/jobengine/osgi/ws/nexio/NexioWSSocket.java +++ b/server/user.jobengine.osgi.services/src/user/jobengine/osgi/ws/nexio/NexioWSSocket.java @@ -1,5 +1,6 @@ package user.jobengine.osgi.ws.nexio; +import java.io.IOException; import java.util.List; import org.apache.logging.log4j.LogManager; @@ -22,14 +23,35 @@ public class NexioWSSocket extends WebSocketAdapter { public static final String DATA = "data"; public static final String LIST = "list"; public static final String ACTION = "action"; - private static final String NO_ACTION_SPECIFIED = "No action specified"; + public static final String ERROR = "error"; + private static final String NO_ACTION_SPECIFIED = "Missing action type."; private static ObjectMapper mapper; + public static void asyncSendResponse(Session session, Object response) throws JsonProcessingException { + String data = mapper.writeValueAsString(response); + session.getRemote().sendStringByFuture(data); + } + public NexioWSSocket() { mapper = ServiceObjectMapper.createMapper(); logger.info("EventSocket created"); } + private void listNexio(BasicDBObject response) throws Exception { + INexioAPI nexioAPI = ComponentBinder.getNexioAPI(); + if (nexioAPI == null) + throw new Exception("Nexio service is unreachable"); + List clips = nexioAPI.getItems(null, true); + response.append(DATA, clips); + } + + @Override + public void onWebSocketClose(int statusCode, String reason) { + super.onWebSocketClose(statusCode, reason); + //NexioAPIServlet.getInstance().removeSession(getSession()); + logger.info("Socket Closed: [{}]", reason); + } + @Override public void onWebSocketConnect(Session sess) { super.onWebSocketConnect(sess); @@ -37,52 +59,39 @@ public class NexioWSSocket extends WebSocketAdapter { logger.info("Socket Connected: " + sess); } + @Override + public void onWebSocketError(Throwable cause) { + super.onWebSocketError(cause); + logger.info("Error: [{}]", String.valueOf(cause.getMessage())); + } + @Override public void onWebSocketText(String message) { super.onWebSocketText(message); //logger.info("Received: " + message); try { - INexioAPI nexioAPI = ComponentBinder.getNexioAPI(); - if (nexioAPI == null) - throw new Exception("Nexio service is unreachable"); BasicDBObject data = (BasicDBObject) JSONUtil.jsonToDbObject(message); - if (!data.containsKey(ACTION)) { - getSession().getRemote().sendString(new BasicDBObject("error", NO_ACTION_SPECIFIED).toString()); + if (!data.containsKey(ACTION)) throw new Exception(NO_ACTION_SPECIFIED); - } - - //TODO check action etc. String action = data.getString(ACTION); BasicDBObject response = new BasicDBObject(ACTION, action); switch (action) { case LIST: - List clips = nexioAPI.getItems(null, true); - response.append(DATA, clips); - asyncSendResponse(getSession(), response); + listNexio(response); break; + default: + throw new Exception("Unhandled action : " + action); + } + asyncSendResponse(getSession(), response); } catch (Exception e) { - logger.error(e); + logger.catching(e); + try { + getSession().getRemote().sendString(new BasicDBObject(ERROR, e.getMessage()).toString()); + } catch (IOException e1) { + } } } - - public static void asyncSendResponse(Session session, Object response) throws JsonProcessingException { - String data = mapper.writeValueAsString(response); - session.getRemote().sendStringByFuture(data); - } - - @Override - public void onWebSocketClose(int statusCode, String reason) { - super.onWebSocketClose(statusCode, reason); - //NexioAPIServlet.getInstance().removeSession(getSession()); - logger.info("Socket Closed: [{}]", reason); - } - - @Override - public void onWebSocketError(Throwable cause) { - super.onWebSocketError(cause); - logger.info("Error: [{}]", String.valueOf(cause.getMessage())); - } } -- 2.54.0