From: vasary.daniel Date: Fri, 11 Dec 2020 15:28:26 +0000 (+0000) Subject: git-tfs-id: [http://tfs.userrendszerhaz.hu:8080/tfs/DefaultCollection]$/MediaCube... X-Git-Url: http://git.useribm.hu/?a=commitdiff_plain;h=7f2fc9a67b7477eecb21b55ff4c0e475498bf247;p=mediacube.git git-tfs-id: [tfs.userrendszerhaz.hu:8080/tfs/DefaultCollection]$/MediaCube;C32103 --- diff --git a/server/-configuration/scheduledjobs.json b/server/-configuration/scheduledjobs.json index d429ad34..ce5b7c17 100644 --- a/server/-configuration/scheduledjobs.json +++ b/server/-configuration/scheduledjobs.json @@ -10,6 +10,9 @@ }, { "template": "test-fork-cancelable.xml", + "active": false, + "executeimmediate": true, + "cronexpression": "0/5 * * * * ? *", "parameters": [ {"name": "itemID", "value": 1, "type": "java.lang.Long"} ] }, { diff --git a/server/user.jobengine.executors/config/config.xml b/server/user.jobengine.executors/config/config.xml index 2b3595c0..9744a772 100644 --- a/server/user.jobengine.executors/config/config.xml +++ b/server/user.jobengine.executors/config/config.xml @@ -1,52 +1,53 @@ - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/CancelableStep.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/CancelableStep.java index bcf29f2c..14cfbd6f 100644 --- a/server/user.jobengine.executors/src/user/jobengine/server/steps/CancelableStep.java +++ b/server/user.jobengine.executors/src/user/jobengine/server/steps/CancelableStep.java @@ -17,7 +17,7 @@ public class CancelableStep extends JobStep { @StepEntry public Object[] execute(int param) throws Exception { try { - getJobRuntime().setRelated("TESZT1"); + getJobRuntime().setRelated("TESZT" + param); // ftpTest(); diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/TestForkCancelableStep.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/TestForkCancelableStep.java index 91d00a8d..94c2ee44 100644 --- a/server/user.jobengine.executors/src/user/jobengine/server/steps/TestForkCancelableStep.java +++ b/server/user.jobengine.executors/src/user/jobengine/server/steps/TestForkCancelableStep.java @@ -14,15 +14,17 @@ public class TestForkCancelableStep extends JobStep { // private static final String CHILD_TEMPLATE = "fake-concurrent.xml"; private static final String CHILD_TEMPLATE = "cancelable.xml"; private static final Logger logger = LogManager.getLogger(); - int count = 5; + int count = 1000; @StepEntry public Object[] execute(IJobEngine jobEngine, IJobRuntime jobRuntime) throws Exception { - //jobRuntime.forkPrepare(); + jobRuntime.forkPrepare(); for (int i = 0; i < count; i++) { + if (getJobRuntime().isWaitingCancel()) + break; //jobEngine.submit(jobRuntime, null, CHILD_TEMPLATE, CHILD_TITLE, ListUtils.asMap("itemID", i)); // IJobRuntime runtime = jobEngine.submit(jobRuntime, null, CHILD_TEMPLATE, "JOB " + i, ListUtils.asMap("param", i)); - IJobRuntime runtime = getEngine().submit(null, e -> { + IJobRuntime runtime = getEngine().submit(getJobRuntime(), e -> { if (e.getStatus().equals(JobStatus.CANCELED) || e.getStatus().equals(JobStatus.SUSPENDED)) logger.info("Cleanup occured because status is {} {}", e.getStatus(), ((IJob) e.getSource()).getId()); }, CHILD_TEMPLATE, "JOB " + i, 0, IJobEngine.DEFAULT_OWNER, ListUtils.asMap("param", i)); @@ -30,7 +32,7 @@ public class TestForkCancelableStep extends JobStep { runtime.setRelated("TEST" + runtime.getId()); } - // jobRuntime.forkWaitComplete(); + jobRuntime.forkWaitComplete(); logger.info("Done"); return null; } diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobEngine.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobEngine.java index eb9e8f6e..3f775fab 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobEngine.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobEngine.java @@ -16,6 +16,8 @@ public interface IJobEngine { void addJobChangedEventListener(IJobChangedListener listener); + void addManagedJobChangedListener(IJobChangedListener listener); + void addStepExecutor(IJobStepExecutor executor); void addToExecutorQueue(IJobRuntime jobRuntime); @@ -84,6 +86,8 @@ public interface IJobEngine { void jobCleanup(IJobRuntime jobRuntime); + void keepAliveJobChangedListener(IJobChangedListener listener); + void keepAliveWorker(String remoteAddr); void loadExecutors(); @@ -112,16 +116,18 @@ public interface IJobEngine { void removeFromRunQueue(IJobRuntime jobRuntime); + void removeGarbage(); + void removeJob(long id); void removeJobChangedEventListener(IJobChangedListener listener); void removeSpanwChild(IJobRuntime jobRuntime); - void removeGarbage(); - ClusteredJob requestJob(String className) throws Exception; + void restartGracefully() throws Exception; + void sendMessage(IJobMessage jobMessage); void setAllExecutionDisabled(boolean isDisabled); diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobStepExecutor.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobStepExecutor.java index 5ee69913..7ad2abb1 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobStepExecutor.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobStepExecutor.java @@ -44,7 +44,7 @@ public interface IJobStepExecutor { /** * A v�grehajt� elind�t�sa. */ - void startup(IJobEngine jobEngine) throws Exception; + void startup(IJobEngine jobEngine); ClusteredJob steelJob() throws InterruptedException; diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java index 14d6d51a..0a05285e 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java @@ -64,6 +64,35 @@ import user.tsm.client.TSMException; */ public class JobEngine implements IJobEngine { + private class JobChangedListenerChecker extends Thread { + private volatile boolean shutdown = false; + + @Override + public void run() { + try { + barrier.await(); + } catch (Exception e) { + } + + while (!shutdown) { + try { + Thread.sleep(1000); + removeJobChangedListenerGarbage(); + } catch (InterruptedException e) { + shutdown = true; + } + } + } + + void shutDown() { + shutdown = true; + try { + join(); + } catch (InterruptedException e) { + } + } + } + private class MessageDispatcher extends Thread { private volatile boolean shutdown = false; @@ -183,8 +212,9 @@ public class JobEngine implements IJobEngine { private final Map submittedJobs; private final Map programs; private final Map executors; - private VM worker; + private VM vm; private MessageDispatcher dispatcher; + private JobChangedListenerChecker jobChangedListenerChecker; private IUserMessageQueues userMessageQueues; private final CyclicBarrier barrier; @@ -198,6 +228,7 @@ public class JobEngine implements IJobEngine { private Map remoteWorkers; private String masterServerAddress = System.getProperty("jobengine.master.server", ""); private final JobEngineRemote remoteEngine; + private ConcurrentHashMap keepAliveJobChangedListeners = new ConcurrentHashMap<>(); /** * A MAM motor példányosítása. Ennek során létrejönnek a várakozási sorok, az ütemező szál, az üzenet kezelő szál. @@ -214,7 +245,7 @@ public class JobEngine implements IJobEngine { executors = new LinkedHashMap(); submittedJobs = createJobs(); - barrier = new CyclicBarrier(3); + barrier = new CyclicBarrier(4); nextJobId = new AtomicLong(0); programs = new HashMap(); statusMachine = createStatusMachine(); @@ -240,6 +271,15 @@ public class JobEngine implements IJobEngine { } } + @Override + public void addManagedJobChangedListener(IJobChangedListener listener) { + if (listener != null) { + jobChangedListenerList.add(listener); + keepAliveJobChangedListeners.put(listener, System.currentTimeMillis()); + logger.info("Managed listener added"); + } + } + @Override public void addStepExecutor(IJobStepExecutor executor) { //Class stepClass = executor.getStepClass(); @@ -611,6 +651,12 @@ public class JobEngine implements IJobEngine { statusMachine.processAction(JobAction.FINISH, jobRuntime); } + @Override + public void keepAliveJobChangedListener(IJobChangedListener listener) { + if (keepAliveJobChangedListeners != null) + keepAliveJobChangedListeners.put(listener, System.currentTimeMillis()); + } + @Deprecated @Override public void keepAliveWorker(String remoteAddr) { @@ -876,10 +922,24 @@ public class JobEngine implements IJobEngine { public void removeJobChangedEventListener(IJobChangedListener listener) { if (listener != null) { jobChangedListenerList.remove(listener); - logger.info("Listeners left {}", jobChangedListenerList.size()); } } + synchronized protected void removeJobChangedListenerGarbage() { + List toBeRemoved = new ArrayList<>(); + + for (IJobChangedListener listener : keepAliveJobChangedListeners.keySet()) { + long lastMod = keepAliveJobChangedListeners.get(listener); + if (System.currentTimeMillis() - lastMod > 5 * 1000) + toBeRemoved.add(listener); + } + toBeRemoved.forEach(r -> { + logger.info("Removing listener"); + removeJobChangedEventListener(r); + keepAliveJobChangedListeners.remove(r); + }); + } + @Override public void removeSpanwChild(IJobRuntime jobRuntime) { IJobRuntime parent = getJobById(jobRuntime.getParentJobId()); @@ -903,6 +963,29 @@ public class JobEngine implements IJobEngine { return job; } + @Override + public void restartGracefully() throws Exception { + isRunning = false; + if (schedulerService != null) + schedulerService.shutdown(); + + vm.shutDown(); + shutdownExecutors(); + + logger.info("JobEngine gracefully stopped"); + + loadPrograms(); + loadExecutors(); + + vm.start(); + + startupExecutors(); + + schedulerService = new SchedulerService(this); + schedulerService.startup(); + isRunning = true; + } + @Override public void sendMessage(IJobMessage jobMessage) { messageQueue.add(jobMessage); @@ -929,9 +1012,10 @@ public class JobEngine implements IJobEngine { if (schedulerService != null) schedulerService.shutdown(); - worker.shutDown(); + vm.shutDown(); shutdownExecutors(); dispatcher.shutDown(); + jobChangedListenerChecker.shutDown(); try { TSMClient.CleanUpMultithread(); @@ -997,15 +1081,16 @@ public class JobEngine implements IJobEngine { loadPrograms(); loadExecutors(); - worker = new VM(); + vm = new VM(); dispatcher = new MessageDispatcher(); userMessageQueues = new UserMessageQueues(); + jobChangedListenerChecker = new JobChangedListenerChecker(); - worker.start(); + vm.start(); dispatcher.start(); + jobChangedListenerChecker.start(); - for (IJobStepExecutor executor : executors.values()) - executor.startup(this); + startupExecutors(); barrier.await(); schedulerService = new SchedulerService(this); @@ -1020,6 +1105,11 @@ public class JobEngine implements IJobEngine { } + private void startupExecutors() { + for (IJobStepExecutor executor : executors.values()) + executor.startup(this); + } + @Override public void storeJob(IJobRuntime runtime) { submittedJobs.put(runtime.getId(), runtime); diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java index 6f26e22f..0ded9362 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java @@ -122,11 +122,12 @@ public class JobStepExecutor implements IJobStepExecutor { } private void runStepObject(IJobRuntime jobRuntime, Object[] inputs) throws Throwable { + jobRuntime.setStatus(JobStatus.EXECUTING); + jobRuntime.NotifyUpdate(); + IJobStep step = createStepObject(); if (step == null) throw new Exception("Step object is null"); - jobRuntime.setStatus(JobStatus.EXECUTING); - jobRuntime.NotifyUpdate(); logger.debug("{} executing", jobRuntime); jobRuntime.incrementPriority(); Object[] outputs = step.run(jobEngine, jobRuntime, inputs); @@ -319,9 +320,7 @@ public class JobStepExecutor implements IJobStepExecutor { } @Override - public void startup(IJobEngine jobEngine) throws Exception { - if (jobEngine == null) - throw new NullPointerException("jobEngine"); + public void startup(IJobEngine jobEngine) { this.jobEngine = jobEngine; if (workers != null) { for (Worker w : workers) diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/scheduler/SchedulerService.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/scheduler/SchedulerService.java index 1144f4f4..859ce080 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/scheduler/SchedulerService.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/scheduler/SchedulerService.java @@ -53,7 +53,7 @@ public class SchedulerService { private IJobEngine jobEngine = null; private BasicDBList scheduleJobs; - public SchedulerService(IJobEngine jobEngine) throws Exception { + public SchedulerService(IJobEngine jobEngine) { // http://www.quartz-scheduler.org/documentation/quartz-2.2.x/configuration/ // https://docs.oracle.com/cd/E12058_01/doc/doc.1014/e12030/cron_expressions.htm this.jobEngine = jobEngine; diff --git a/server/user.mediacube.gui/WEB-INF/zk.xml b/server/user.mediacube.gui/WEB-INF/zk.xml index 4594df9d..f921ea3a 100644 --- a/server/user.mediacube.gui/WEB-INF/zk.xml +++ b/server/user.mediacube.gui/WEB-INF/zk.xml @@ -19,7 +19,11 @@ https://www.zkoss.org/wiki/ZK_Configuration_Reference/zk.xml - + + + 2 + + /resources/i3-label_hu.properties @@ -39,9 +43,9 @@ https://www.zkoss.org/wiki/ZK_Configuration_Reference/zk.xml user.jobengine.zk.util.DesktopCleanupListener - - - + + user.jobengine.zk.util.SessionCleanupListener + diff --git a/server/user.mediacube.gui/pages/joblist.zul b/server/user.mediacube.gui/pages/joblist.zul index 9d03d956..480bbcb4 100644 --- a/server/user.mediacube.gui/pages/joblist.zul +++ b/server/user.mediacube.gui/pages/joblist.zul @@ -1,6 +1,7 @@ +