From a8b113452086dab4c706c33019ee1e5b654aa95e Mon Sep 17 00:00:00 2001 From: =?utf8?q?V=C3=A1s=C3=A1ry=20D=C3=A1niel?= Date: Wed, 6 May 2020 15:26:09 +0000 Subject: [PATCH] git-tfs-id: [http://tfs.userrendszerhaz.hu:8080/tfs/DefaultCollection]$/MediaCube;C31811 --- .../run-mediacube-server-user-worker.launch | 30 ++++++ server/-configuration/scheduledjobs.json | 2 +- .../config/config.xml | 84 ++++++++-------- .../server/steps/WorkerKeepAliveStep.java | 8 +- .../META-INF/MANIFEST.MF | 3 + .../server/ExecutorConfigParser.java | 7 +- .../src/user/jobengine/server/IJobEngine.java | 4 + .../jobengine/server/IJobStepExecutor.java | 6 +- .../src/user/jobengine/server/JobEngine.java | 42 +++++++- .../jobengine/server/JobStepExecutor.java | 71 +++++++++----- .../server/actions/StatusMachine.java | 2 +- .../server/scheduler/SchedulerService.java | 6 +- .../user/jobengine/server/steps/JobStep.java | 6 +- .../user/jobengine/server/IT/JobengineIT.java | 96 ++++++++++--------- .../user/jobengine/server/JobIntegration.java | 18 ++-- .../osgi/mediacube/ClusterService.java | 6 +- 16 files changed, 250 insertions(+), 141 deletions(-) create mode 100644 server/-configuration/run-mediacube-server-user-worker.launch diff --git a/server/-configuration/run-mediacube-server-user-worker.launch b/server/-configuration/run-mediacube-server-user-worker.launch new file mode 100644 index 00000000..5f5185c3 --- /dev/null +++ b/server/-configuration/run-mediacube-server-user-worker.launch @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/server/-configuration/scheduledjobs.json b/server/-configuration/scheduledjobs.json index 0f881970..4693367e 100644 --- a/server/-configuration/scheduledjobs.json +++ b/server/-configuration/scheduledjobs.json @@ -1,6 +1,6 @@ {"joblist":[ { - "active": true, + "active": false, "executeimmediate": false, "cronexpression": "0/10 * * * * ? *", "template": "worker-keepalive.xml", diff --git a/server/user.jobengine.executors/config/config.xml b/server/user.jobengine.executors/config/config.xml index 9e925cb0..5062fc97 100644 --- a/server/user.jobengine.executors/config/config.xml +++ b/server/user.jobengine.executors/config/config.xml @@ -1,45 +1,45 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/WorkerKeepAliveStep.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/WorkerKeepAliveStep.java index d181ac3e..87de9666 100644 --- a/server/user.jobengine.executors/src/user/jobengine/server/steps/WorkerKeepAliveStep.java +++ b/server/user.jobengine.executors/src/user/jobengine/server/steps/WorkerKeepAliveStep.java @@ -14,7 +14,11 @@ public class WorkerKeepAliveStep extends JobStep { @StepEntry public Object[] execute(String masterAddress) throws Exception { - logger.info("Keep alive response: {}", ping(masterAddress)); + int ping = ping(masterAddress); + if (ping == 404) + logger.error("Keep alive response: {}", ping); + else + logger.debug("Keep alive response: {}", ping); return null; } @@ -30,7 +34,7 @@ public class WorkerKeepAliveStep extends JobStep { } catch (Exception e) { logger.error(e.getMessage()); } finally { - logger.info("Keep alive response: {}", response.getStatus()); + logger.debug("Keep alive response: {}", response.getStatus()); if (response != null) response.close(); } diff --git a/server/user.jobengine.osgi.server/META-INF/MANIFEST.MF b/server/user.jobengine.osgi.server/META-INF/MANIFEST.MF index b51a2ed5..cd748171 100644 --- a/server/user.jobengine.osgi.server/META-INF/MANIFEST.MF +++ b/server/user.jobengine.osgi.server/META-INF/MANIFEST.MF @@ -6,11 +6,14 @@ Bundle-Version: 1.0.0 Service-Component: OSGI-INF/component.xml, OSGI-INF/componentBinder.xml Import-Package: javax.servlet;version="3.1.0", javax.servlet.http;version="3.1.0", + javax.ws.rs.client, + javax.ws.rs.core, org.apache.commons.io.output;version="2.2.0", org.apache.logging.log4j;version="2.8.2", org.apache.logging.log4j.message;version="2.8.2", org.eclipse.core.runtime.adaptor, org.eclipse.osgi.framework.console;version="1.1.0", + org.jboss.resteasy.client.jaxrs, org.osgi.framework;version="1.5.0", org.osgi.framework.wiring;version="1.2.0", org.osgi.util.tracker;version="1.4.0", diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/ExecutorConfigParser.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/ExecutorConfigParser.java index 32707860..921c79d6 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/ExecutorConfigParser.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/ExecutorConfigParser.java @@ -10,10 +10,10 @@ import org.apache.logging.log4j.Logger; public class ExecutorConfigParser { private static final Logger logger = LogManager.getLogger(); - private final InputStream stream; - private static String EXECUTORS = "executors"; + private static String EXECUTOR = "executors/executor"; + private final InputStream stream; public ExecutorConfigParser(InputStream stream) { this.stream = stream; @@ -28,9 +28,10 @@ public class ExecutorConfigParser { digester.setClassLoader(this.getClass().getClassLoader()); digester.addObjectCreate(EXECUTORS, ArrayList.class); digester.addObjectCreate(EXECUTOR, JobStepExecutor.class); - digester.addCallMethod(EXECUTOR, "create", 2, new Class[] { String.class, int.class }); + digester.addCallMethod(EXECUTOR, "create", 3, new Class[] { String.class, int.class, boolean.class }); digester.addCallParam(EXECUTOR, 0, "className"); digester.addCallParam(EXECUTOR, 1, "maxConcurrent"); + digester.addCallParam(EXECUTOR, 2, "isRemote"); digester.addSetNext(EXECUTOR, "add"); result = (List) (List) digester.parse(stream); } catch (Exception e) { diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobEngine.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobEngine.java index b9dd419d..ca139059 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobEngine.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobEngine.java @@ -59,6 +59,8 @@ public interface IJobEngine { Map getPrograms(); + IJobRuntime getRemoteJob(String className); + ScheduledJob getScheduledJob(String template); SchedulerService getScheduler(); @@ -71,6 +73,8 @@ public interface IJobEngine { boolean isScheduledExecutionDisabled(); + boolean isWorker(); + void keepAliveWorker(String remoteAddr); void loadExecutors(); diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobStepExecutor.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobStepExecutor.java index 991accaa..2ebe3314 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobStepExecutor.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobStepExecutor.java @@ -8,11 +8,15 @@ import user.jobengine.server.steps.IJobStep; * Folyamat l�p�s v�grehajt� interface. */ public interface IJobStepExecutor { + static final String PROCESSING_LOCALLY = "Processing locally"; + static final String PROCESSING_REMOTLY = "Processing remotly"; + static final String WAIT_REMOTE_PROCESSOR = "Waiting for remote processor"; + void changePriority(IJobRuntime runtime); boolean containsRuntime(IJobRuntime runtime); - void create(String name, int maxConcurrent) throws JobEngineException; + void create(String name, int maxConcurrent, boolean isRemote) throws JobEngineException; int getMaxConcurrent(); diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java index 412d8342..e4c26b1c 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java @@ -21,8 +21,13 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import javax.ws.rs.core.Response; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.jboss.resteasy.client.jaxrs.ResteasyClient; +import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder; +import org.jboss.resteasy.client.jaxrs.ResteasyWebTarget; import com.ibm.nosql.json.api.BasicDBObject; @@ -51,7 +56,6 @@ import user.jobengine.server.messages.UserReplyMessage; import user.jobengine.server.scheduler.ScheduledJob; import user.jobengine.server.scheduler.SchedulerService; import user.jobengine.server.steps.IJobStep; -import user.jobengine.zk.util.SessionUtil; import user.tsm.client.TSMClient; import user.tsm.client.TSMException; @@ -114,7 +118,7 @@ public class JobEngine implements IJobEngine { while (!shutdown) { try { - IJobRuntime jobRuntime = runQueue.poll(SessionUtil.getMediaCubeConfig().getJobQueuePollInterval(), TimeUnit.MILLISECONDS); + IJobRuntime jobRuntime = runQueue.poll(10, TimeUnit.MILLISECONDS); if (jobRuntime != null) { while (jobRuntime.hasNextInstruction() && jobRuntime.isRunable()) { ir = jobRuntime.getNextInstruction(); @@ -180,6 +184,7 @@ public class JobEngine implements IJobEngine { private List jobChangedListenerList = new CopyOnWriteArrayList<>(); private Map remoteWorkers; + private String masterServerAddress = System.getProperty("jobengine.master.server", "false"); /** * A MAM motor példányosítása. Ennek során létrejönnek a várakozási sorok, az ütemező szál, az üzenet kezelő szál. @@ -435,6 +440,27 @@ public class JobEngine implements IJobEngine { return programs; } + @Override + public IJobRuntime getRemoteJob(String className) { + IJobRuntime result = null; + ResteasyClient client = new ResteasyClientBuilder().build(); + ResteasyWebTarget target = client.target(masterServerAddress).path("/services/rest/cluster/getjob").queryParam("className", className); + Response response = null; + try { + response = target.request().get(); + if (response.getEntity() != null) { + + } + } catch (Exception e) { + logger.error(e.getMessage()); + } finally { + logger.debug("Keep alive response: {}", response.getStatus()); + if (response != null) + response.close(); + } + return result; + } + @Override public ScheduledJob getScheduledJob(String template) { List jobs = NoSQLUtils.asList(schedulerService.getScheduleJobs()); @@ -491,6 +517,11 @@ public class JobEngine implements IJobEngine { return isScheduledExecutionDisabled; } + @Override + public boolean isWorker() { + return Boolean.parseBoolean(masterServerAddress); + } + private void jobCleanup(IJobRuntime jobRuntime) { statusMachine.processAction(JobAction.FINISH, jobRuntime); } @@ -576,6 +607,8 @@ public class JobEngine implements IJobEngine { store.setFileFilter("*.xml"); String templateRoot = store.toString(true); logger.info("Template root is: " + templateRoot); + File f = new File(templateRoot); + System.out.println(f.getAbsolutePath()); List files = store.getRemoteFiles(); for (RemoteFile file : files) { @@ -699,7 +732,7 @@ public class JobEngine implements IJobEngine { if (submittedJobs.containsKey(id)) { IJobRuntime jobRuntime = submittedJobs.remove(id); fireJobChangedEvent(new JobChangedEvent(jobRuntime, SignalType.DELETE)); - logger.info("--- {} removed from VM", jobRuntime); + logger.debug("--- {} removed from VM", jobRuntime); closeSessionLog(jobRuntime); } @@ -844,7 +877,7 @@ public class JobEngine implements IJobEngine { runtime.add(); addToRunQueue(runtime); submittedJobs.put(runtime.getId(), runtime); - logger.info("+++ {} added to VM ", runtime); + logger.debug("+++ {} added to VM ", runtime); } @Override @@ -948,5 +981,4 @@ public class JobEngine implements IJobEngine { logger.error("Couldn't shutdown jobEngine", e); } } - } diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java index 2c21dcff..14531c7e 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java @@ -36,18 +36,45 @@ public class JobStepExecutor implements IJobStepExecutor { IJobStep step = null; while (true) { try { - jobRuntime = queue.poll(SessionUtil.getMediaCubeConfig().getJobQueuePollInterval(), TimeUnit.MILLISECONDS); - if (jobRuntime == null && shutdown) { - logger.trace("Shutting down"); - break; + if (jobEngine.isWorker()) { + jobRuntime = jobEngine.getRemoteJob(className); + jobRuntime.setDescription(PROCESSING_REMOTLY); + } else { + jobRuntime = queue.poll(SessionUtil.getMediaCubeConfig().getJobQueuePollInterval(), TimeUnit.MILLISECONDS); + if (jobRuntime == null && shutdown) { + logger.trace("Shutting down"); + break; + } + if (jobRuntime == null) + continue; + if (shutdown) { + logger.trace("{} skipping by shutdown", jobRuntime); + jobEngine.sendMessage(new JobStepSkippedMessage(jobRuntime.getId())); + continue; + } + + long submitted = jobRuntime.getSubmitted().getTime(); + long current = System.currentTimeMillis(); + boolean timeout = current - submitted > 3000; + if (isRemote) { + if (timeout) { + logger.info("Remote JobStep timed out, processing locally."); + } else { + //logger.info("JobStep is remote, waiting for remote processor"); + if (!WAIT_REMOTE_PROCESSOR.equals(jobRuntime.getDescription())) + jobRuntime.setDescription(WAIT_REMOTE_PROCESSOR); + queue.put(jobRuntime); + //skip local processor + continue; + } + } + jobRuntime.setDescription(PROCESSING_LOCALLY); } + if (jobRuntime == null) continue; - if (shutdown) { - logger.trace("{} skipping by shutdown", jobRuntime); - jobEngine.sendMessage(new JobStepSkippedMessage(jobRuntime.getId())); - continue; - } + + //processing locally Object[] inputs = jobEngine.getInputsFromStack(jobRuntime); Object[] outputs = null; jobRuntime.setStatus(JobStatus.EXECUTING); @@ -55,12 +82,8 @@ public class JobStepExecutor implements IJobStepExecutor { step = createStepObject(); if (step == null) throw new Exception("Step object is null"); - logger.info("{} executing", jobRuntime); + logger.debug("{} executing", jobRuntime); jobRuntime.IncrementPriority(); - - // logger.info("Executor thread name is {} id is {}", Thread.currentThread().getName(), - // Thread.currentThread().getId()); - outputs = step.run(jobEngine, jobRuntime, inputs); jobEngine.sendMessage(new JobStepCompletedMessage(jobRuntime.getId(), outputs)); } catch (Throwable e) { @@ -99,16 +122,21 @@ public class JobStepExecutor implements IJobStepExecutor { private Class stepClass; private int maxConcurrent; private String className; + private boolean isRemote; public JobStepExecutor() { } public JobStepExecutor(Class clazz, int maxConcurrent) throws Exception { - this(clazz.getName(), maxConcurrent); + this(clazz.getName(), maxConcurrent, false); } public JobStepExecutor(String className, int maxConcurrent) throws Exception { - create(className, maxConcurrent); + this(className, maxConcurrent, false); + } + + public JobStepExecutor(String className, int maxConcurrent, boolean isRemote) throws Exception { + create(className, maxConcurrent, false); } @Override @@ -131,25 +159,22 @@ public class JobStepExecutor implements IJobStepExecutor { @Override @SuppressWarnings("unchecked") - public void create(String className, int maxConcurrent) throws JobEngineException { + public void create(String className, int maxConcurrent, boolean isRemote) throws JobEngineException { this.className = className; + this.isRemote = isRemote; logger = LogManager.getLogger(getClass().getSimpleName() + ":" + className); logger.debug("Creating executor {}, instances {}", className, maxConcurrent); if (StringUtils.isEmpty(className)) throw new JobEngineException("Step class name can't be null."); - // throw new - // JobEngineException("Illegal concurrent instance count. Must be greater then 0."); - //DynamicClassLoader loader = new DynamicClassLoader(getClass().getClassLoader()); + try { - //ClassLoader parentClassLoader = getParentClassLoader(); URLClassLoader loader = URLClassLoader.newInstance(DynamicClassLocator.makeURLs(), getClass().getClassLoader()); stepClass = (Class) loader.loadClass(className); } catch (ClassNotFoundException e) { logger.catching(e); throw new JobEngineException("System can't load JobStep implementation: " + className); } - // priorityQueue = new PriorityBlockingQueue(); - //queue = new LinkedBlockingQueue(); + queue = new PriorityBlockingQueue(); this.maxConcurrent = maxConcurrent; if (maxConcurrent > 0) { diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/actions/StatusMachine.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/actions/StatusMachine.java index 698e7af3..a030bcdd 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/actions/StatusMachine.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/actions/StatusMachine.java @@ -30,7 +30,7 @@ public class StatusMachine implements IStatusMachine { StatusMachineAction machineAction = new StatusMachineAction(jobAction, jobRuntime.getStatus()); if (actions.containsKey(machineAction)) { IJobStatusAction action = actions.get(machineAction); - logger.info("{} status change {} -> {}", jobRuntime, jobRuntime.getStatus(), jobAction); + logger.debug("{} status change {} -> {}", jobRuntime, jobRuntime.getStatus(), jobAction); action.processAction(jobEngine, jobRuntime); } diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/scheduler/SchedulerService.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/scheduler/SchedulerService.java index 180db6af..e752c0cc 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/scheduler/SchedulerService.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/scheduler/SchedulerService.java @@ -166,8 +166,10 @@ public class SchedulerService { */ private void loadStartupJobsFromConfig() throws Exception { String configPath = System.getProperty("jobengine.jobscheduling.config"); - if (configPath == null || "".equals(configPath)) - throw new Exception("Missing system property: 'jobengine.jobscheduling.config'"); + if (configPath == null || "".equals(configPath)) { + logger.error("Missing system property: 'jobengine.jobscheduling.config'"); + return; + } File schedulingConfigFile = new File(configPath); if (schedulingConfigFile.exists()) { String jsonConfig = new String(Files.readAllBytes(Paths.get(schedulingConfigFile.getAbsolutePath()))); diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/steps/JobStep.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/steps/JobStep.java index 42f8953b..6959e845 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/steps/JobStep.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/steps/JobStep.java @@ -65,7 +65,7 @@ public class JobStep implements IJobStep { private Object[] executeMethod(Method method, Object[] extendedInputs) throws Exception { Object[] result; try { - logger.info("Executing {}", method); + logger.debug("Executing {}", method); result = (Object[]) method.invoke(this, extendedInputs); } catch (Exception e) { if (e instanceof IllegalArgumentException) { @@ -98,7 +98,7 @@ public class JobStep implements IJobStep { /*** * A runtime nevevel egyezo rendszerszintu gyerek. A szulo rendszer szintu MEDIACUBE cimkes. - * + * * @return */ protected Marker getMarker() { @@ -131,7 +131,7 @@ public class JobStep implements IJobStep { /*** * JobRuntime session marker - * + * * @return */ protected MediaCubeMarker getSessionMarker() { diff --git a/server/user.jobengine.osgi.server/test/user/jobengine/server/IT/JobengineIT.java b/server/user.jobengine.osgi.server/test/user/jobengine/server/IT/JobengineIT.java index 0698b022..faa3ed0d 100644 --- a/server/user.jobengine.osgi.server/test/user/jobengine/server/IT/JobengineIT.java +++ b/server/user.jobengine.osgi.server/test/user/jobengine/server/IT/JobengineIT.java @@ -31,7 +31,7 @@ public class JobengineIT { //Kornyezeti valtozok betoltese Properties properties = new Properties(); URL srcLocation = JobengineIT.class.getProtectionDomain().getCodeSource().getLocation(); - URL location = new URL(srcLocation, "../../-configuration/mediacube-dev.properties"); + URL location = new URL(srcLocation, "../../-configuration/mediacube-dev-user.properties"); properties.load(new FileInputStream(location.toURI().getPath().toString())); System.getProperties().putAll(properties); @@ -41,42 +41,21 @@ public class JobengineIT { } /*** - * Teszt folyamat futtatasa - * @throws Exception - */ - @Test - public void fake() throws Exception { - final ThreadSynchronizer sync = new ThreadSynchronizer(); - final IJobEngine jobEngine = new JobEngine(); - jobEngine.bindItemManagerService(manager); - Map parameters = new HashMap<>(); - //parameters.put("itemID", 100); - IJobRuntime runtime = jobEngine.submit("fake.xml", "Fake", parameters); - runtime.addEventListener(new IJobStatusChangedListener() { - @Override - public void statusChanged(JobStatusChangedEvent event) { - if (JobStatus.FINISHED.equals(event.getStatus())) - sync.suspend(); - - } - }); - sync.waitSuspend(); - sync.resume(); - jobEngine.shutdown(); - } - - /*** - * NEXIO adatok szinkronizalo folyamat futtatasa + * Archivalo folyamat futtatasa + * * @throws Exception */ @Test - public void sync_nexio() throws Exception { + public void archive_media() throws Exception { final ThreadSynchronizer sync = new ThreadSynchronizer(); IJobEngine jobEngine = new JobEngine(); - jobEngine.bindItemManagerService(manager); + jobEngine.setItemManager(manager); + jobEngine.loadPrograms(); + jobEngine.loadExecutors(); + jobEngine.startup(); Map parameters = new HashMap<>(); - parameters.put("itemID", 100); - IJobRuntime runtime = jobEngine.submit("sync-nexio.xml", "Synchronize NEXIO", parameters); + parameters.put("mediaFolder", "/tmp"); + IJobRuntime runtime = jobEngine.submit("archivemedia.xml", "Fake", parameters); runtime.addEventListener(new IJobStatusChangedListener() { @Override public void statusChanged(JobStatusChangedEvent event) { @@ -91,18 +70,19 @@ public class JobengineIT { } /*** - * Ket teszt folyamat futtatasa egyszerre + * Teszt folyamat futtatasa + * * @throws Exception */ - @Test(expected = JobEngineException.class) - public void fake_overlap() throws Exception { + @Test + public void fake() throws Exception { final ThreadSynchronizer sync = new ThreadSynchronizer(); - IJobEngine jobEngine = new JobEngine(); + final IJobEngine jobEngine = new JobEngine(); + jobEngine.startup(); jobEngine.bindItemManagerService(manager); Map parameters = new HashMap<>(); - parameters.put("itemID", 100); + //parameters.put("itemID", 100); IJobRuntime runtime = jobEngine.submit("fake.xml", "Fake", parameters); - jobEngine.submit("fake.xml", "Fake", parameters); runtime.addEventListener(new IJobStatusChangedListener() { @Override public void statusChanged(JobStatusChangedEvent event) { @@ -118,6 +98,7 @@ public class JobengineIT { /*** * Ket teszt folyamat futtatasa egymas utan + * * @throws Exception */ @Test @@ -156,20 +137,45 @@ public class JobengineIT { } /*** - * Archivalo folyamat futtatasa + * Ket teszt folyamat futtatasa egyszerre + * + * @throws Exception + */ + @Test(expected = JobEngineException.class) + public void fake_overlap() throws Exception { + final ThreadSynchronizer sync = new ThreadSynchronizer(); + IJobEngine jobEngine = new JobEngine(); + jobEngine.bindItemManagerService(manager); + Map parameters = new HashMap<>(); + parameters.put("itemID", 100); + IJobRuntime runtime = jobEngine.submit("fake.xml", "Fake", parameters); + jobEngine.submit("fake.xml", "Fake", parameters); + runtime.addEventListener(new IJobStatusChangedListener() { + @Override + public void statusChanged(JobStatusChangedEvent event) { + if (JobStatus.FINISHED.equals(event.getStatus())) + sync.suspend(); + + } + }); + sync.waitSuspend(); + sync.resume(); + jobEngine.shutdown(); + } + + /*** + * NEXIO adatok szinkronizalo folyamat futtatasa + * * @throws Exception */ @Test - public void archive_media() throws Exception { + public void sync_nexio() throws Exception { final ThreadSynchronizer sync = new ThreadSynchronizer(); IJobEngine jobEngine = new JobEngine(); - jobEngine.setItemManager(manager); - jobEngine.loadPrograms(); - jobEngine.loadExecutors(); - jobEngine.startup(); + jobEngine.bindItemManagerService(manager); Map parameters = new HashMap<>(); - parameters.put("mediaFolder", "/tmp"); - IJobRuntime runtime = jobEngine.submit("archivemedia.xml", "Fake", parameters); + parameters.put("itemID", 100); + IJobRuntime runtime = jobEngine.submit("sync-nexio.xml", "Synchronize NEXIO", parameters); runtime.addEventListener(new IJobStatusChangedListener() { @Override public void statusChanged(JobStatusChangedEvent event) { diff --git a/server/user.jobengine.osgi.server/test/user/jobengine/server/JobIntegration.java b/server/user.jobengine.osgi.server/test/user/jobengine/server/JobIntegration.java index c218a7ed..34c84260 100644 --- a/server/user.jobengine.osgi.server/test/user/jobengine/server/JobIntegration.java +++ b/server/user.jobengine.osgi.server/test/user/jobengine/server/JobIntegration.java @@ -116,6 +116,15 @@ public class JobIntegration { // program.addInstruction(new AssignVariableInstruction()); } + public void sendReply(JobStatusChangedEvent event) { + if (event.getStatus() == JobStatus.WAIT_USERMESSAGE) { + IJobRuntime jobRuntime = (IJobRuntime) event.getSource(); + UserReplyMessage message = new UserReplyMessage(jobRuntime.getId()); + message.setReply(userReply); + jobEngine.sendMessage(message); + } + } + @Before public void setup() throws Exception { jobEngine = new JobEngine() { @@ -141,13 +150,4 @@ public class JobIntegration { synchronizer.resume(); jobEngine.shutdown(); } - - public void sendReply(JobStatusChangedEvent event) { - if (event.getStatus() == JobStatus.WAIT_USERMESSAGE) { - IJobRuntime jobRuntime = (IJobRuntime) event.getSource(); - UserReplyMessage message = new UserReplyMessage(jobRuntime.getId()); - message.setReply(userReply); - jobEngine.sendMessage(message); - } - } } diff --git a/server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusterService.java b/server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusterService.java index 3eb430b6..bf252521 100644 --- a/server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusterService.java +++ b/server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusterService.java @@ -10,7 +10,6 @@ import javax.ws.rs.core.Response; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import user.jobengine.db.IItemManager; import user.jobengine.osgi.rest.ComponentBinder; import user.jobengine.server.IJobEngine; @@ -18,11 +17,10 @@ import user.jobengine.server.IJobEngine; public class ClusterService { private static final Logger logger = LogManager.getLogger(); - private IItemManager itemManager = ComponentBinder.getItemManager(); private IJobEngine jobEngine = ComponentBinder.getJobengine(); public ClusterService() { - logger.info("Created"); + logger.debug("Created"); } @GET @@ -30,7 +28,7 @@ public class ClusterService { public Response keepAlive(@Context HttpServletRequest request, @QueryParam("responseRoot") String responseRoot) { Response result = null; try { - logger.info("Keepalive {}, {}, {}", request.getRemoteAddr(), request.getRemoteHost(), request.getRemotePort()); + logger.debug("Keepalive {}, {}, {}", request.getRemoteAddr(), request.getRemoteHost(), request.getRemotePort()); jobEngine.keepAliveWorker(request.getRemoteAddr()); result = Response.ok().build(); -- 2.54.0