From e2b23e51a010f4d7f42f113af36fc7071a9b1f37 Mon Sep 17 00:00:00 2001 From: =?utf8?q?V=C3=A1s=C3=A1ry=20D=C3=A1niel?= Date: Fri, 22 May 2020 15:39:16 +0000 Subject: [PATCH] git-tfs-id: [http://tfs.userrendszerhaz.hu:8080/tfs/DefaultCollection]$/MediaCube;C31853 --- .../mediacube-dev-user.properties | 6 +- .../executors/tests/MediaBaseTest.java | 43 ++- .../config/config-worker.xml | 1 + .../jobtemplates/fake-spawn.xml | 45 +++ .../jobtemplates/fake.xml | 18 +- .../server/steps/CancelableStep.java | 2 +- .../steps/CleanupMountedLocationStep.java | 8 +- .../jobengine/server/steps/FakeSpawnStep.java | 41 +++ .../user/jobengine/server/steps/FakeStep.java | 15 +- .../server/steps/HSMMigrateStep.java | 4 + .../server/steps/NEXIOCheckerStep.java | 6 +- .../src/user/commons/IJob.java | 2 +- .../src/user/commons/Job.java | 19 +- .../user/commons/cluster/ClusteredJob.java | 82 ++++++ .../src/user/commons/mediatool/MediaInfo.java | 2 +- .../user/jobengine/db/ItemManagerData.java | 2 +- server/user.jobengine.osgi.server/.classpath | 1 + .../log/mediacube-err.log | 0 .../log/mediacube.log | 0 .../resources/i3-label_hu.properties | 2 +- .../jobengine/server/ClusteredJobRuntime.java | 10 + .../src/user/jobengine/server/IJobEngine.java | 16 +- .../user/jobengine/server/IJobRuntime.java | 26 ++ .../jobengine/server/IJobStepExecutor.java | 13 +- .../src/user/jobengine/server/JobEngine.java | 277 ++++++++++++------ .../jobengine/server/JobEngineRemote.java | 72 +++++ .../src/user/jobengine/server/JobRuntime.java | 211 ++++++++++--- .../jobengine/server/JobStepExecutor.java | 224 +++++++------- .../server/actions/StatusMachine.java | 2 +- .../user/jobengine/zk/model/JobListModel.java | 10 +- .../zk/model/RetrieveBatchSelectorModel.java | 3 + .../user/jobengine/server/IT/JobengineIT.java | 78 ++++- .../server/IT/ProrityChangeTests.java | 187 ++++++++++++ .../jobengine/server/PriorityEntryTest.java | 39 ++- .../osgi/mediacube/ClusterService.java | 25 +- .../osgi/mediacube/ClusteredJob.java | 23 -- 36 files changed, 1174 insertions(+), 341 deletions(-) create mode 100644 server/user.jobengine.executors/jobtemplates/fake-spawn.xml create mode 100644 server/user.jobengine.executors/src/user/jobengine/server/steps/FakeSpawnStep.java create mode 100644 server/user.jobengine.osgi.commons/src/user/commons/cluster/ClusteredJob.java create mode 100644 server/user.jobengine.osgi.server/log/mediacube-err.log create mode 100644 server/user.jobengine.osgi.server/log/mediacube.log create mode 100644 server/user.jobengine.osgi.server/src/user/jobengine/server/ClusteredJobRuntime.java create mode 100644 server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngineRemote.java create mode 100644 server/user.jobengine.osgi.server/test/user/jobengine/server/IT/ProrityChangeTests.java delete mode 100644 server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusteredJob.java diff --git a/server/-configuration/mediacube-dev-user.properties b/server/-configuration/mediacube-dev-user.properties index 4879ae5f..6f427b53 100644 --- a/server/-configuration/mediacube-dev-user.properties +++ b/server/-configuration/mediacube-dev-user.properties @@ -1,11 +1,13 @@ +org.slf4j.simpleLogger.defaultLogLevel=debug jobengine.jobsteps.root=../user.jobengine.executors/bin jobengine.jobsteps.config=../user.jobengine.executors/config/config-worker.xml jobengine.jobtemplates.root=../user.jobengine.executors/jobtemplates +jobengine.jobsteps.groovy.root=../user.jobengine.executors/src/user/jobengine/server/steps jetty.home=../-configuration/jetty jetty.etc.config.urls=etc/user-jetty.xml,etc/user-jetty-ssl.xml,etc/user-jetty-ssl-context.xml,etc/user-jetty-http.xml,etc/user-jetty-https.xml -log4j.configurationFile=../-configuration/log4j2.xml +log4j.configurationFile=../-configuration/log4j2-test.xml jobengine.db.url=jdbc:db2://10.228.198.1:50000/mediaarc:retrieveMessagesFromServerOnGetMessage=true; jobengine.db.user=db2admin @@ -14,5 +16,5 @@ jobengine.nosql.db.url=jdbc:db2://10.228.198.1:50000/mccache:retrieveMessagesFro jobengine.nosql.db.user=db2admin jobengine.nosql.db.password=password -jobengine.master.server=http://localhost:8888 +#jobengine.master.server=http://localhost:8888 javax.ws.rs.ext.RuntimeDelegate=org.jboss.resteasy.spi.ResteasyProviderFactory \ No newline at end of file diff --git a/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/MediaBaseTest.java b/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/MediaBaseTest.java index 5d5debb0..8bf6b149 100644 --- a/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/MediaBaseTest.java +++ b/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/MediaBaseTest.java @@ -1,5 +1,9 @@ package hu.user.mediacube.executors.tests; +import java.text.SimpleDateFormat; +import java.time.Duration; +import java.time.Instant; +import java.util.Iterator; import java.util.List; import org.apache.commons.io.FilenameUtils; @@ -9,6 +13,9 @@ import org.junit.Test; import user.commons.RemoteFile; import user.commons.StoreUri; +import user.commons.nexio.api.Clip; +import user.commons.nexio.api.Controller; +import user.commons.nexio.api.Mediabase; import user.commons.remotestore.FtpDirectoryLister; import user.commons.remotestore.IDirectoryLister; import user.commons.remotestore.RemoteStoreProtocol; @@ -16,18 +23,22 @@ import user.commons.remotestore.RemoteStoreProtocol; public class MediaBaseTest { @Test - public void listMediaBase() throws Exception { + public void listMediaBaseFTP() throws Exception { + Instant start = Instant.now(); StoreUri nexioUri = new StoreUri(); nexioUri.setProtocol(RemoteStoreProtocol.FTP); nexioUri.setUri("10.10.1.55"); nexioUri.setPortNumber(2098); nexioUri.setUserName("ftp"); nexioUri.setPassword("ftp"); + int i = 0; try { FTPClient ftp = ((FtpDirectoryLister) nexioUri.getLister()).connect(); IDirectoryLister lister = nexioUri.getLister(); List list = lister.list(); for (RemoteFile rf : list) { + if (i > 9) + break; if (rf.getIsFolder()) continue; String baseName = FilenameUtils.getBaseName(rf.getName()); @@ -40,6 +51,8 @@ public class MediaBaseTest { } catch (Exception ie) { System.err.println(ie.getMessage()); } + i++; + } } catch (Exception e) { @@ -47,7 +60,35 @@ public class MediaBaseTest { } finally { nexioUri.cleanUp(); } + Instant end = Instant.now(); + System.out.println(Duration.between(start, end)); } + @Test + public void listMediaBaseNEXIO() throws Exception { + Instant start = Instant.now(); + + Controller controller = new Controller("10.10.1.55"); + + controller.connect(); + Mediabase mediabase = controller.getMediabase(); + int i = 100; + try { + SimpleDateFormat df = new SimpleDateFormat("yyy-MM-dd HH:mm:ss"); + Iterator clips = mediabase.getClips(); + while (clips.hasNext() && i > 0) { + Clip clip = clips.next(); + System.out.println(clip.getId() + " " + clip.getXid().get() + " " + df.format(clip.getModifiedTimestamp().getTime())); + i--; + } + } catch (Exception e) { + System.err.println(e.getMessage()); + } finally { + controller.disconnect(); + } + Instant end = Instant.now(); + System.out.println(Duration.between(start, end)); + + } } diff --git a/server/user.jobengine.executors/config/config-worker.xml b/server/user.jobengine.executors/config/config-worker.xml index 5764b63e..21490264 100644 --- a/server/user.jobengine.executors/config/config-worker.xml +++ b/server/user.jobengine.executors/config/config-worker.xml @@ -1,4 +1,5 @@ + \ No newline at end of file diff --git a/server/user.jobengine.executors/jobtemplates/fake-spawn.xml b/server/user.jobengine.executors/jobtemplates/fake-spawn.xml new file mode 100644 index 00000000..bec5abeb --- /dev/null +++ b/server/user.jobengine.executors/jobtemplates/fake-spawn.xml @@ -0,0 +1,45 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/server/user.jobengine.executors/jobtemplates/fake.xml b/server/user.jobengine.executors/jobtemplates/fake.xml index bec5abeb..3b2404bf 100644 --- a/server/user.jobengine.executors/jobtemplates/fake.xml +++ b/server/user.jobengine.executors/jobtemplates/fake.xml @@ -4,23 +4,17 @@ - - - - + - - - @@ -31,15 +25,5 @@ - - - - - - - - - - \ No newline at end of file diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/CancelableStep.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/CancelableStep.java index 6a6c8ff3..79e0a57b 100644 --- a/server/user.jobengine.executors/src/user/jobengine/server/steps/CancelableStep.java +++ b/server/user.jobengine.executors/src/user/jobengine/server/steps/CancelableStep.java @@ -7,7 +7,7 @@ public class CancelableStep extends JobStep { public Object[] execute() throws Exception { try { for (int i = 0; i < count; i++) { - if (getJobRuntime().isWaitingCancel() || getJobRuntime().isWaitingSuspend()) + if (getJobRuntime().isWaitingCancel()) break; Thread.sleep(1000); setProgress((i + 1) * count); diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/CleanupMountedLocationStep.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/CleanupMountedLocationStep.java index c754867f..d6559574 100644 --- a/server/user.jobengine.executors/src/user/jobengine/server/steps/CleanupMountedLocationStep.java +++ b/server/user.jobengine.executors/src/user/jobengine/server/steps/CleanupMountedLocationStep.java @@ -177,6 +177,11 @@ public class CleanupMountedLocationStep extends JobStep implements FileVisitor

killDateFiles = getKillDateFiles(filePath); if (killDateFiles == null || killDateFiles.size() == 0) { diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/FakeSpawnStep.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/FakeSpawnStep.java new file mode 100644 index 00000000..28f0e13e --- /dev/null +++ b/server/user.jobengine.executors/src/user/jobengine/server/steps/FakeSpawnStep.java @@ -0,0 +1,41 @@ +package user.jobengine.server.steps; + +import java.util.Arrays; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class FakeSpawnStep extends JobStep { + private static final Logger logger = LogManager.getLogger(); + private int count = 10; + + @StepEntry + public Object[] execute(long itemID, Iterable iter) throws Exception { + + if (getJobRuntime().getSpawnOrder() == 0) { + count = 5; + } + logger.info(getMarker(), "Starting params: {}, {}, spawnOrder {}", itemID, iter, getJobRuntime().getSpawnOrder()); + + try { + int step = 100 / count; + for (int i = 0; i < count; i++) { + if (!canContinue()) + break; + setProgress((i + 1) * step); + for (int j = 0; j < 100; j++) { + Thread.sleep(10); + } + logger.info("Progress {}", getJobRuntime().getProgress()); + } + + } catch (Exception e) { + logger.error(getMarker(), e.getMessage()); + throw e; + } + Object[] result = Arrays.asList(10, 20).toArray(); + logger.info("Returning {}, {}", result[0], result[1]); + return result; + } + +} diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/FakeStep.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/FakeStep.java index 799e5abf..aa9d1540 100644 --- a/server/user.jobengine.executors/src/user/jobengine/server/steps/FakeStep.java +++ b/server/user.jobengine.executors/src/user/jobengine/server/steps/FakeStep.java @@ -10,12 +10,9 @@ public class FakeStep extends JobStep { private int count = 10; @StepEntry - public Object[] execute(long itemID, Iterable iter) throws Exception { + public Object[] execute(long itemID) throws Exception { - if (getJobRuntime().getSpawnOrder() == 0) { - count = 5; - } - logger.info(getMarker(), "Starting params: {}, {}, spawnOrder {}", itemID, iter, getJobRuntime().getSpawnOrder()); + logger.info(getMarker(), "Starting params: {}"); try { int step = 100 / count; @@ -23,10 +20,10 @@ public class FakeStep extends JobStep { if (!canContinue()) break; setProgress((i + 1) * step); - for (int j = 0; j < 100; j++) { - Thread.sleep(10); + for (int j = 0; j < 10; j++) { + Thread.sleep(1); } - logger.info("Progress {}", getJobRuntime().getProgress()); + //logger.info("{} Progress {}, p{}", getJobRuntime().getId(), getJobRuntime().getProgress(), getJobRuntime().getPriority()); } } catch (Exception e) { @@ -34,7 +31,7 @@ public class FakeStep extends JobStep { throw e; } Object[] result = Arrays.asList(10, 20).toArray(); - logger.info("Returning {}, {}", result[0], result[1]); + //logger.info("Returning {}, {}", result[0], result[1]); return result; } diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/HSMMigrateStep.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/HSMMigrateStep.java index 7bcd3176..b5df0ee4 100644 --- a/server/user.jobengine.executors/src/user/jobengine/server/steps/HSMMigrateStep.java +++ b/server/user.jobengine.executors/src/user/jobengine/server/steps/HSMMigrateStep.java @@ -357,6 +357,10 @@ public class HSMMigrateStep extends JobStep { if (targetLength > sourceLength) { throw new Exception("Hiba! A fájl túl nagy lett."); } + + if (getJobRuntime().isWaitingCancel()) { + break; + } } targetLength = targetFile.length(); diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/NEXIOCheckerStep.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/NEXIOCheckerStep.java index ae15789f..fc1a8e07 100644 --- a/server/user.jobengine.executors/src/user/jobengine/server/steps/NEXIOCheckerStep.java +++ b/server/user.jobengine.executors/src/user/jobengine/server/steps/NEXIOCheckerStep.java @@ -43,12 +43,14 @@ public class NEXIOCheckerStep extends JobStep { controller = new Controller(storeUri.getRootPath(), storeUri.getPortNumber()); controller.connect(); Mediabase mediabase = controller.getMediabase(); - //SimpleDateFormat df = new SimpleDateFormat("yyyy.MM.dd HH:mm:ss"); Iterator clips = mediabase.getClips(); int count = limit; while (clips.hasNext()) { + + if (getJobRuntime().isWaitingCancel()) + break; + Clip clip = clips.next(); - //String id = clip.getId().get(); String title = clip.getXid().get(); Timestamp modified = Timestamp.from(clip.getModifiedTimestamp().toInstant()); Timestamp created = Timestamp.from(clip.getRecordDateTimestamp().toInstant()); diff --git a/server/user.jobengine.osgi.commons/src/user/commons/IJob.java b/server/user.jobengine.osgi.commons/src/user/commons/IJob.java index 5539265f..5a9dbfb8 100644 --- a/server/user.jobengine.osgi.commons/src/user/commons/IJob.java +++ b/server/user.jobengine.osgi.commons/src/user/commons/IJob.java @@ -27,7 +27,7 @@ public interface IJob extends IEntityBase { String getTemplate(); - void IncrementPriority(); + void incrementPriority(); void NotifyUpdate(); diff --git a/server/user.jobengine.osgi.commons/src/user/commons/Job.java b/server/user.jobengine.osgi.commons/src/user/commons/Job.java index 87eeb59b..1bd10cd2 100644 --- a/server/user.jobengine.osgi.commons/src/user/commons/Job.java +++ b/server/user.jobengine.osgi.commons/src/user/commons/Job.java @@ -41,12 +41,13 @@ public class Job extends Syncable implements IJob, Comparable { * 1. szempont prioritas 2. szempont azonos prioritasnal a rogzites datuma */ @Override - public int compareTo(IJob job0) { - int ret = (this.priority - job0.getPriority()); - if ((ret == 0) && (this.submitted != null) && (job0.getSubmitted() != null)) { - ret = (int) (job0.getSubmitted().getTime() - this.submitted.getTime()); - } - return ret; + public int compareTo(IJob job) { + int res = 0; + if (getPriority() == job.getPriority()) + res = getSubmitted().getTime() < job.getSubmitted().getTime() ? -1 : 1; + else + res = (getPriority() > job.getPriority() ? -1 : 1); + return res; } @Override @@ -108,7 +109,7 @@ public class Job extends Syncable implements IJob, Comparable { } @Override - public void IncrementPriority() { + public void incrementPriority() { priority++; } @@ -147,7 +148,9 @@ public class Job extends Syncable implements IJob, Comparable { @Override public void setPriority(int priority) { - this.priority = priority; + if (this.priority != priority) { + this.priority = priority; + } } @Override diff --git a/server/user.jobengine.osgi.commons/src/user/commons/cluster/ClusteredJob.java b/server/user.jobengine.osgi.commons/src/user/commons/cluster/ClusteredJob.java new file mode 100644 index 00000000..ca9e6818 --- /dev/null +++ b/server/user.jobengine.osgi.commons/src/user/commons/cluster/ClusteredJob.java @@ -0,0 +1,82 @@ +package user.commons.cluster; + +import java.sql.Timestamp; + +import user.commons.JobStatus; + +public class ClusteredJob { + + private long id; + private String name; + private String description; + private String template; + private Object[] inputs; + private Timestamp submitted; + private JobStatus status; + private int progress; + + public String getDescription() { + return description; + } + + public long getId() { + return id; + } + + public Object[] getInputs() { + return inputs; + } + + public String getName() { + return name; + } + + public int getProgress() { + return progress; + } + + public JobStatus getStatus() { + return status; + } + + public Timestamp getSubmitted() { + return submitted; + } + + public String getTemplate() { + return template; + } + + public void setDescription(String description) { + this.description = description; + } + + public void setId(long id) { + this.id = id; + } + + public void setInputs(Object[] inputs) { + this.inputs = inputs; + } + + public void setName(String name) { + this.name = name; + } + + public void setProgress(int progress) { + this.progress = progress; + } + + public void setStatus(JobStatus status) { + this.status = status; + } + + public void setSubmitted(Timestamp submitted) { + this.submitted = submitted; + } + + public void setTemplate(String template) { + this.template = template; + } + +} diff --git a/server/user.jobengine.osgi.commons/src/user/commons/mediatool/MediaInfo.java b/server/user.jobengine.osgi.commons/src/user/commons/mediatool/MediaInfo.java index 1710acba..f1044180 100644 --- a/server/user.jobengine.osgi.commons/src/user/commons/mediatool/MediaInfo.java +++ b/server/user.jobengine.osgi.commons/src/user/commons/mediatool/MediaInfo.java @@ -44,7 +44,7 @@ public class MediaInfo { if (decoder != null && decoder.getCodecType() == MediaDescriptor.Type.MEDIA_VIDEO) { videoStreamId = i; - frames = stream.getDuration(); + frames = stream.getNumFrames(); break; } } diff --git a/server/user.jobengine.osgi.db/src/user/jobengine/db/ItemManagerData.java b/server/user.jobengine.osgi.db/src/user/jobengine/db/ItemManagerData.java index 9c5d0b98..71f0bd6b 100644 --- a/server/user.jobengine.osgi.db/src/user/jobengine/db/ItemManagerData.java +++ b/server/user.jobengine.osgi.db/src/user/jobengine/db/ItemManagerData.java @@ -77,7 +77,7 @@ public class ItemManagerData { } public enum SignalType { - CREATE(0), UPDATE(1), DELETE(2); + CREATE(0), UPDATE(1), DELETE(2), EXECUTE(3); private final long value; diff --git a/server/user.jobengine.osgi.server/.classpath b/server/user.jobengine.osgi.server/.classpath index 21dfa925..132eee5b 100644 --- a/server/user.jobengine.osgi.server/.classpath +++ b/server/user.jobengine.osgi.server/.classpath @@ -46,5 +46,6 @@ + diff --git a/server/user.jobengine.osgi.server/log/mediacube-err.log b/server/user.jobengine.osgi.server/log/mediacube-err.log new file mode 100644 index 00000000..e69de29b diff --git a/server/user.jobengine.osgi.server/log/mediacube.log b/server/user.jobengine.osgi.server/log/mediacube.log new file mode 100644 index 00000000..e69de29b diff --git a/server/user.jobengine.osgi.server/resources/i3-label_hu.properties b/server/user.jobengine.osgi.server/resources/i3-label_hu.properties index eb54b61d..1fb0a2d2 100644 --- a/server/user.jobengine.osgi.server/resources/i3-label_hu.properties +++ b/server/user.jobengine.osgi.server/resources/i3-label_hu.properties @@ -1,4 +1,4 @@ -version=2.5.2 +version=2.6.0 footer=2016-2020 © Copyright User Rendszerház Kft. login_info=Információ diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/ClusteredJobRuntime.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/ClusteredJobRuntime.java new file mode 100644 index 00000000..d3fce5df --- /dev/null +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/ClusteredJobRuntime.java @@ -0,0 +1,10 @@ +package user.jobengine.server; + +import user.commons.cluster.ClusteredJob; + +public class ClusteredJobRuntime extends JobRuntime { + + public ClusteredJobRuntime(ClusteredJob job, IJobEngine jobEngine, IJobStatusChangedListener listener) { + super(job, jobEngine, listener); + } +} diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobEngine.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobEngine.java index 752bba97..8c9a722a 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobEngine.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobEngine.java @@ -3,6 +3,7 @@ package user.jobengine.server; import java.util.Map; import user.commons.Job; +import user.commons.cluster.ClusteredJob; import user.jobengine.db.IItemManager; import user.jobengine.server.messagequeue.IUserMessageQueues; import user.jobengine.server.messages.IJobMessage; @@ -10,6 +11,7 @@ import user.jobengine.server.scheduler.ScheduledJob; import user.jobengine.server.scheduler.SchedulerService; public interface IJobEngine { + static final int QUEUE_POLL_INTERVAL_MS = 1; void addJobChangedEventListener(IJobChangedListener listener); @@ -19,6 +21,8 @@ public interface IJobEngine { void addToRunQueue(IJobRuntime jobRuntime); + void applyPriorityChange(IJobRuntime jobRuntime); + void bindItemManagerService(IItemManager service); boolean deleteProgram(String fileName); @@ -41,6 +45,8 @@ public interface IJobEngine { void executeSendMessageToUserInstruction(IJobRuntime jobRuntime); + void executeWaitForSpawnJobsInstruction(IJobRuntime jobRuntime); + void fireJobChangedEvent(JobChangedEvent event); Map getExecutors(); @@ -53,15 +59,13 @@ public interface IJobEngine { IJobRuntime getJobById(long jobId); - IJobRuntime getJobForRemote(String className) throws Exception; - Map getJobs(); IProgram getProgram(String name); Map getPrograms(); - IJobRuntime getRemoteJob(String className); + JobEngineRemote getRemoteEngine(); ScheduledJob getScheduledJob(String template); @@ -109,9 +113,11 @@ public interface IJobEngine { void removeJobChangedEventListener(IJobChangedListener listener); + void removeSpanwChild(IJobRuntime jobRuntime); + void removeSuspended(); - void rePrioritization(IJobRuntime jobRuntime); + ClusteredJob requestJob(String className) throws Exception; void sendMessage(IJobMessage jobMessage); @@ -125,6 +131,8 @@ public interface IJobEngine { void startup(); + void storeJob(IJobRuntime runtime); + IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template, String name, Map parameters) throws JobEngineException; diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobRuntime.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobRuntime.java index 5f2f6c93..c5d57844 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobRuntime.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobRuntime.java @@ -9,21 +9,29 @@ import org.apache.logging.log4j.Marker; import user.commons.IEntityPersister; import user.commons.IJob; import user.commons.JobStatus; +import user.jobengine.server.instructions.CallJobStepInstruction; import user.jobengine.server.instructions.IInstruction; public interface IJobRuntime extends IJob { + void addChild(JobRuntime c); void addEventListener(IJobStatusChangedListener listener); + void addSpawnChild(IJobRuntime runtime); + void addVariable(String name, Class type); void arrangeStack(); void cancelForkPrepare() throws InterruptedException; + boolean canContinueExecution(); + void checkStackParameter() throws RuntimeException, IllegalArgumentException; + IJobRuntime createCopy(); + void decrementInstructionPointer(); public void done(); @@ -32,12 +40,18 @@ public interface IJobRuntime extends IJob { void forkWaitComplete() throws InterruptedException; + CallJobStepInstruction getCurrentCallJobStepInstruction(); + IInstruction getCurrentInstruction(); + String getCurrentStep(); + Marker getFinishMarker(); int getIp(); + IJobEngine getJobEngine(); + Marker getMarker(); IInstruction getNextInstruction() throws NoSuchElementException; @@ -52,12 +66,16 @@ public interface IJobRuntime extends IJob { JobStatus getSavedStatus(); + int getSpawnOrder(); + Stack getStack(); Object getVariable(String name); Map getVariables(); + int getWeight(); + boolean hasNextInstruction(); void incrementProgress(int progress); @@ -66,6 +84,8 @@ public interface IJobRuntime extends IJob { boolean isService(); + boolean isWaitFinish(); + boolean isWaitingCancel(); boolean isWaitingExecutor(); @@ -78,6 +98,8 @@ public interface IJobRuntime extends IJob { void removeEventListener(IJobStatusChangedListener listener); + void removeSpanwChild(long id); + void reset(); void restoreStack(); @@ -86,11 +108,15 @@ public interface IJobRuntime extends IJob { void saveStatus(); + void setCurrentStep(String currentStep); + @Override void setParameters(Map parameters); void setService(boolean isService); + void setSpawnOrder(int spawnOrder); + void setVariable(String name, Object value); void swapStack(); diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobStepExecutor.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobStepExecutor.java index ac4a6c58..5ee69913 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobStepExecutor.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/IJobStepExecutor.java @@ -2,10 +2,11 @@ package user.jobengine.server; import java.util.concurrent.PriorityBlockingQueue; +import user.commons.cluster.ClusteredJob; import user.jobengine.server.steps.IJobStep; /** - * Folyamat l�p�s v�grehajt� interface. + * Folyamat lepes vegrehajto interface. */ public interface IJobStepExecutor { static final String PROCESSING_LOCALLY = "Processing locally"; @@ -23,12 +24,14 @@ public interface IJobStepExecutor { PriorityBlockingQueue getQueue(); /** - * V�grehajt� l�p�s implement�ci�j�nak lek�rdez�se. + * Vegrehajte lepes implementaciojanak lekerdezese. * - * @return L�p�s implement�ci�. + * @return Lepes implementacio. */ Class getStepClass(); + String getStepUnitName(); + boolean isRemoteEnabled(); void revoke(IJobRuntime jobRuntime); @@ -43,7 +46,7 @@ public interface IJobStepExecutor { */ void startup(IJobEngine jobEngine) throws Exception; - IJobRuntime steelJob() throws InterruptedException; + ClusteredJob steelJob() throws InterruptedException; /** * Folyamat elhelyez�se a v�grehajt� v�rakoz�si sor�ba. @@ -51,7 +54,7 @@ public interface IJobStepExecutor { * @param job * Folyamat. */ - void submit(IJobRuntime job); + void submit(IJobRuntime... job); void waitShutdown(); diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java index 9eb30de7..6519b314 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java @@ -23,20 +23,18 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import javax.ws.rs.core.Response; - +import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.jboss.resteasy.client.jaxrs.ResteasyClient; -import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder; -import org.jboss.resteasy.client.jaxrs.ResteasyWebTarget; +import com.ibm.nosql.json.api.BasicDBList; import com.ibm.nosql.json.api.BasicDBObject; import user.commons.Job; import user.commons.JobStatus; import user.commons.RemoteFile; import user.commons.StoreUri; +import user.commons.cluster.ClusteredJob; import user.commons.nosql.NoSQLUtils; import user.commons.remotestore.DirectoryUtils; import user.commons.remotestore.RemoteStoreProtocol; @@ -48,6 +46,7 @@ import user.jobengine.server.actions.StatusMachine; import user.jobengine.server.ast.Encoder; import user.jobengine.server.ast.JobTemplate; import user.jobengine.server.ast.Parser; +import user.jobengine.server.instructions.CallJobStepInstruction; import user.jobengine.server.instructions.IInstruction; import user.jobengine.server.messagequeue.IUserMessage; import user.jobengine.server.messagequeue.IUserMessageQueues; @@ -57,7 +56,6 @@ import user.jobengine.server.messages.JobStepCompletedMessage; import user.jobengine.server.messages.UserReplyMessage; import user.jobengine.server.scheduler.ScheduledJob; import user.jobengine.server.scheduler.SchedulerService; -import user.jobengine.server.steps.IJobStep; import user.tsm.client.TSMClient; import user.tsm.client.TSMException; @@ -78,20 +76,20 @@ public class JobEngine implements IJobEngine { while (!shutdown) { try { - IJobMessage message = messageQueue.poll(50, TimeUnit.MILLISECONDS); - if (message != null) { + IJobMessage message = messageQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); + if (message != null) message.process(JobEngine.this); - } + } catch (InterruptedException e) { shutdown = true; } } + //a leallitas utan az osszes fuggo uzenet vegrehajtasa while (!messageQueue.isEmpty()) { try { - IJobMessage message = messageQueue.poll(50, TimeUnit.MILLISECONDS); - if (message != null) { + IJobMessage message = messageQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); + if (message != null) message.process(JobEngine.this); - } } catch (InterruptedException e) { shutdown = true; } @@ -120,14 +118,26 @@ public class JobEngine implements IJobEngine { while (!shutdown) { try { - IJobRuntime jobRuntime = runQueue.poll(10, TimeUnit.MILLISECONDS); + Thread.sleep(QUEUE_POLL_INTERVAL_MS); + //IJobRuntime jobRuntime = runQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); + IJobRuntime jobRuntime = runQueue.poll(); if (jobRuntime != null) { - while (jobRuntime.hasNextInstruction() && jobRuntime.isRunable()) { + logger.debug("Processing {}", jobRuntime.getId()); + //varakozo esetben vegrehajtjuk a kovetkezo utasitast + if (jobRuntime.hasNextInstruction() && jobRuntime.isWaitFinish()) { ir = jobRuntime.getNextInstruction(); ir.execute(JobEngine.this, jobRuntime); + } else { + //normal esetben elfutunk a kovetkezo job step-ig, vagy vegig + while (jobRuntime.hasNextInstruction() && jobRuntime.isRunable()) { + ir = jobRuntime.getNextInstruction(); + ir.execute(JobEngine.this, jobRuntime); + } } + if (!jobRuntime.hasNextInstruction() && jobRuntime.isRunable()) jobCleanup(jobRuntime); + } } catch (Exception e) { logger.error("Critical VM error!", e); @@ -187,6 +197,7 @@ public class JobEngine implements IJobEngine { private List jobChangedListenerList = new CopyOnWriteArrayList<>(); private Map remoteWorkers; private String masterServerAddress = System.getProperty("jobengine.master.server", ""); + private final JobEngineRemote remoteEngine; /** * A MAM motor példányosítása. Ennek során létrejönnek a várakozási sorok, az ütemező szál, az üzenet kezelő szál. @@ -213,6 +224,10 @@ public class JobEngine implements IJobEngine { remoteWorkers = new ConcurrentHashMap<>(); //logger.info("JobEngine created"); + if (isWorker()) + remoteEngine = createRemoteEngine(); + else + remoteEngine = null; } public void activate() { @@ -227,13 +242,13 @@ public class JobEngine implements IJobEngine { @Override public void addStepExecutor(IJobStepExecutor executor) { - Class stepClass = executor.getStepClass(); - String canonicalName = stepClass.getCanonicalName(); - if (!executors.containsKey(canonicalName)) { - executors.put(canonicalName, executor); - logger.debug("Executor registered: " + stepClass); + //Class stepClass = executor.getStepClass(); + String unitName = executor.getStepUnitName(); + if (!executors.containsKey(unitName)) { + logger.info("Executor registered {}", unitName); + executors.put(unitName, executor); } else - logger.debug("Executor already registered: " + stepClass); + logger.debug("Executor already registered {}", unitName); } @@ -244,10 +259,15 @@ public class JobEngine implements IJobEngine { if (typeName == null) throw new Exception(jobRuntime.toString() + " illegal execution state detected: executor name is null."); String executorName = String.valueOf(typeName); - if (!executors.containsKey(executorName)) { + if (!executors.containsKey(executorName)) throw new Exception(jobRuntime.toString() + " executor is unavailable: " + executorName); - } - executors.get(executorName).submit(jobRuntime); + + //a lista mindig letezik, de sima futtataskor csak 1 elemu, az aktualis van benne + //ha van parhuzamos futtatas, akkor letrejonnek benne a step szellemek is, es meghivodik a fork + List jobs = spawnJobs(jobRuntime, executorName); + executors.get(executorName).submit(jobs.toArray(new IJobRuntime[] {})); + jobs.forEach(r -> fireJobChangedEvent(new JobChangedEvent(r, SignalType.EXECUTE))); + } catch (Exception e) { logger.catching(e); suspendWaitExecutorJob(e, jobRuntime); @@ -266,6 +286,41 @@ public class JobEngine implements IJobEngine { } } + /** + * Job prioritasa megvaltozott. Job main queue-ban es a JobStepExecutorok soraiban is ujra kell rendezni a jobokat. Az ujrarendezes a put-nal is megy + * automatikusan, mivel eltero prioritasu jobot dobhatnak be vagy default prioritasut a korabban atrendezett listaba. + * + * @param jobRuntime + */ + @Override + public void applyPriorityChange(IJobRuntime jobRuntime) { + logger.info("rePrioritization start for {}", jobRuntime.getId()); + // synchronized(this.runQueue){ + + //job main queue reorder + + if (this.runQueue.contains(jobRuntime)) { + logger.info("runQueue"); + this.runQueue.remove(jobRuntime); + try { + this.runQueue.put(jobRuntime); + } catch (InterruptedException e) { + } + } + + //JobStepExecutor reorder + if (this.executors != null) { + for (IJobStepExecutor exec : executors.values()) { + if (exec.containsRuntime(jobRuntime)) { + logger.info("executor"); + exec.changePriority(jobRuntime); + } + } + } + + // } logger.info("rePrioritization end"); + } + @Override public synchronized void bindItemManagerService(IItemManager service) { setItemManager(service); @@ -288,6 +343,10 @@ public class JobEngine implements IJobEngine { return new ConcurrentHashMap(); } + protected JobEngineRemote createRemoteEngine() { + return new JobEngineRemote(masterServerAddress); + } + protected IStatusMachine createStatusMachine() { return new StatusMachine(this); } @@ -319,9 +378,20 @@ public class JobEngine implements IJobEngine { public void executeAssignVariableInstruction(IJobRuntime jobRuntime) { Object value = jobRuntime.popFromStack(); String name = (String) jobRuntime.popFromStack(); + + //a gyerek job-ok visszateresi erteket a spawn sorszammal kiegeszitve mentjuk + //TODO el kell tudni erni a kovetkezo lepesekbol ezeket az ertekeket + if (jobRuntime.getParentJobId() > 0) { + IJobRuntime parentRuntime = getJobById(jobRuntime.getParentJobId()); + parentRuntime.setVariable(name + jobRuntime.getSpawnOrder(), value); + } + jobRuntime.setVariable(name, value); } + /*** + * Fuggetlen (beagyazott) alfolyamat letrehozasa + */ @Override public void executeCallConcurrentJobStepInstruction(IJobRuntime jobRuntime, IProgram subProgram) { JobRuntime c = new JobRuntime(this, jobRuntime, subProgram); @@ -379,6 +449,21 @@ public class JobEngine implements IJobEngine { getUserMessageQueues().addMessage(jobRuntime, catalogName, messageNumber, true, inputs); } + @Override + public void executeWaitForSpawnJobsInstruction(IJobRuntime jobRuntime) { + //logger.info("Processing {} {}", jobRuntime.getId(), jobRuntime.canContinueExecution()); + + if (jobRuntime.canContinueExecution()) { + jobRuntime.setStatus(JobStatus.RUNABLE); + } else { + if (!JobStatus.WAIT_FINISH.equals(jobRuntime.getStatus())) { + jobRuntime.setStatus(JobStatus.WAIT_FINISH); + } + jobRuntime.decrementInstructionPointer(); + } + addToRunQueue(jobRuntime); + } + @Override public void fireJobChangedEvent(JobChangedEvent event) { for (IJobChangedListener listener : jobChangedListenerList) { @@ -423,17 +508,6 @@ public class JobEngine implements IJobEngine { return submittedJobs.get(jobId); } - @Override - public IJobRuntime getJobForRemote(String className) throws Exception { - if (!executors.containsKey(className)) - throw new Exception("Unregistered executor request: " + className); - - IJobStepExecutor executor = executors.get(className); - if (!executor.isRemoteEnabled()) - throw new Exception("Job is not registered for remote workers: " + className); - return executor.steelJob(); - } - @Override public Map getJobs() { return submittedJobs; @@ -454,23 +528,8 @@ public class JobEngine implements IJobEngine { } @Override - public IJobRuntime getRemoteJob(String className) { - IJobRuntime result = null; - ResteasyClient client = new ResteasyClientBuilder().build(); - ResteasyWebTarget target = client.target(masterServerAddress).path("/services/rest/cluster/getjob").queryParam("className", className); - Response response = null; - try { - response = target.request().get(); - if (response.getEntity() instanceof IJobRuntime) - result = (IJobRuntime) response.getEntity(); - } catch (Exception e) { - logger.error(e.getMessage()); - } finally { - logger.debug("Response status: {}", response.getStatus()); - if (response != null) - response.close(); - } - return result; + public JobEngineRemote getRemoteEngine() { + return remoteEngine; } @Override @@ -535,9 +594,11 @@ public class JobEngine implements IJobEngine { } private void jobCleanup(IJobRuntime jobRuntime) { + logger.info("Cleanup {}", jobRuntime.getId()); statusMachine.processAction(JobAction.FINISH, jobRuntime); } + @Deprecated @Override public void keepAliveWorker(String remoteAddr) { remoteWorkers.put(remoteAddr, LocalDate.now()); @@ -585,12 +646,13 @@ public class JobEngine implements IJobEngine { throw new Exception("File not exists: " + filePath); logger.info("Loading template: " + filePath); stream = new FileInputStream(filePath); + Parser parser = new Parser(stream); - Encoder encoder = new Encoder(); JobTemplate template = parser.parse(); template.validate(); template.setFileName(fileName); + Encoder encoder = new Encoder(); IProgram program = (IProgram) encoder.visitJobTemplate(template, null); if (programs.containsKey(fileName)) @@ -633,6 +695,7 @@ public class JobEngine implements IJobEngine { try { String filePath = templateRoot + name; logger.info("Loading template: " + name); + // System.out.println(name); stream = new FileInputStream(filePath); Parser parser = new Parser(stream); Encoder encoder = new Encoder(); @@ -670,11 +733,25 @@ public class JobEngine implements IJobEngine { public void processJobStepCompletedMessage(IJobMessage message) { // TODO cancel nem megy, valszeg itt van gubasz IJobRuntime jobRuntime = getJobById(message.getJobId()); + + if (jobRuntime.getParentJobId() > 0) + removeSpanwChild(jobRuntime); + + JobStepCompletedMessage m = (JobStepCompletedMessage) message; + //kesz vagyunk, jelezni + if (isWorker()) { + statusMachine.processAction(JobAction.DONE, jobRuntime); + return; + } + + //a cancel hamarabb megjott? + //ha remote akkot tuti if (jobRuntime == null) { - //a cancel hamarabb megjott? + } - JobStepCompletedMessage m = (JobStepCompletedMessage) message; - putOutputsToStack(jobRuntime, m.getOutputs()); + Object[] outputs = m.getOutputs(); + putOutputsToStack(jobRuntime, outputs); + statusMachine.processAction(JobAction.DONE, jobRuntime); } @@ -761,6 +838,16 @@ public class JobEngine implements IJobEngine { } } + @Override + public void removeSpanwChild(IJobRuntime jobRuntime) { + IJobRuntime parent = getJobById(jobRuntime.getParentJobId()); + if (parent == null) + return; + + parent.removeSpanwChild(jobRuntime.getId()); + + } + @Override public void removeSuspended() { List removeId = new ArrayList<>(); @@ -772,37 +859,17 @@ public class JobEngine implements IJobEngine { submittedJobs.remove(id); } - /** - * Job prioritasa megvaltozott. Job main queue-ban es a JobStepExecutorok soraiban is ujra kell rendezni a jobokat. Az ujrarendezes a put-nal is megy - * automatikusan, mivel eltero prioritasu jobot dobhatnak be vagy default prioritasut a korabban atrendezett listaba. - * - * @param jobRuntime - */ @Override - public void rePrioritization(IJobRuntime jobRuntime) { - // synchronized(this.runQueue){ - - //1. JobStepExecutor reorder - if (this.executors != null) { - for (IJobStepExecutor exec : executors.values()) { - if (exec.containsRuntime(jobRuntime)) { - exec.changePriority(jobRuntime); - } - } - } + public ClusteredJob requestJob(String className) throws Exception { + if (!executors.containsKey(className)) + throw new Exception("Unregistered executor request: " + className); - //2. job main queue reorder - if (this.runQueue.contains(jobRuntime)) { - this.runQueue.remove(jobRuntime); - try { - this.runQueue.put(jobRuntime); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } + IJobStepExecutor executor = executors.get(className); + if (!executor.isRemoteEnabled()) + throw new Exception("Job is not registered for remote workers: " + className); - // } + ClusteredJob job = executor.steelJob(); + return job; } @Override @@ -847,16 +914,50 @@ public class JobEngine implements IJobEngine { if (executors == null) return; for (IJobStepExecutor executor : executors.values()) { - logger.trace("Notify executor {}", executor.getStepClass()); + logger.trace("Notify executor {}", executor.getStepUnitName()); executor.shutdown(); } for (IJobStepExecutor executor : executors.values()) { - logger.info("Stopping executor {}", executor.getStepClass()); + logger.info("Stopping executor {}", executor.getStepUnitName()); executor.waitShutdown(); } } + private List spawnJobs(IJobRuntime jobRuntime, String executorName) throws InterruptedException { + List result = new ArrayList<>(); + + CallJobStepInstruction currentInstruction = jobRuntime.getCurrentCallJobStepInstruction(); + if (currentInstruction != null) { + String forEach = currentInstruction.getForEach(); + if (StringUtils.isNotBlank(forEach)) { + Object parameter = jobRuntime.getParameter(forEach); + if (parameter == null) + parameter = jobRuntime.getVariable(forEach); + + //a sima array helyett ezt jobb hasznalni + if (parameter != null && parameter instanceof BasicDBList) { + + BasicDBList iter = (BasicDBList) parameter; + for (int i = 1; i < iter.size(); i++) { + IJobRuntime jobRuntimeCopy = new JobRuntime(jobRuntime); + jobRuntimeCopy.setSpawnOrder(i); + jobRuntimeCopy.add(); + + jobRuntime.addSpawnChild(jobRuntimeCopy); + + storeJob(jobRuntimeCopy); + result.add(jobRuntimeCopy); + } + } + } + + } + + result.add(jobRuntime); + return result; + } + @Override public void startup() { try { @@ -888,6 +989,12 @@ public class JobEngine implements IJobEngine { } + @Override + public void storeJob(IJobRuntime runtime) { + submittedJobs.put(runtime.getId(), runtime); + logger.debug("+++ {} stored in VM ", runtime); + } + private void submit(IJobRuntime runtime) { runtime.setSubmitted(new Timestamp(System.currentTimeMillis())); runtime.add(); diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngineRemote.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngineRemote.java new file mode 100644 index 00000000..76ee4458 --- /dev/null +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngineRemote.java @@ -0,0 +1,72 @@ +package user.jobengine.server; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder; + +import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider; + +import user.commons.cluster.ClusteredJob; +import user.commons.rest.ServiceObjectMapper; + +public class JobEngineRemote { + private static final Logger logger = LogManager.getLogger(); + private final Client client; + private final WebTarget root; + + public JobEngineRemote(String masterServerAddress) { + //config + //https://www.programcreek.com/java-api-examples/?class=org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder&method=register + //trace + //https://docs.jboss.org/resteasy/docs/4.0.0.Final/userguide/html/Tracing_Feature.html + + JacksonJaxbJsonProvider jaxbProvider = new JacksonJaxbJsonProvider(ServiceObjectMapper.getMapper(), JacksonJaxbJsonProvider.DEFAULT_ANNOTATIONS); + ResteasyClientBuilder builder = new ResteasyClientBuilder(); + builder.register(jaxbProvider); + client = builder.build(); + root = client.target(masterServerAddress); + } + + public ClusteredJob getRemoteJob(String className) { + ClusteredJob result = null; + WebTarget target = root.path("/services/rest/cluster/getjob").queryParam("className", className); + Response response = null; + try { + response = target.request().get(); + if (Status.OK.getStatusCode() != response.getStatus() && response.getEntity() instanceof ClusteredJob) + result = (ClusteredJob) response.getEntity(); + } catch (Exception e) { + logger.error(e.getMessage()); + } finally { + logger.debug("Response status: {}", response.getStatus()); + if (response != null) + response.close(); + } + return result; + } + + public void reportJobStatus(ClusteredJob job) { + WebTarget target = root.path("/services/rest/cluster/notifyjob"); + Response response = null; + try { + //response = target.request().post(Entity.entity(mapper.writeValueAsString(job), MediaType.APPLICATION_JSON)); + response = target.request().post(Entity.entity(job, MediaType.APPLICATION_JSON)); + if (Status.OK.getStatusCode() != response.getStatus()) + throw new Exception("Unexpected reponse occured"); + } catch (Exception e) { + logger.error(e.getMessage()); + } finally { + logger.debug("Response status: {}", response.getStatus()); + if (response != null) + response.close(); + } + } + +} diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobRuntime.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobRuntime.java index 33a4b99b..7af28c3a 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobRuntime.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobRuntime.java @@ -28,6 +28,7 @@ import user.commons.Job; import user.commons.JobStatus; import user.commons.MediaCubeFinishMarker; import user.commons.MediaCubeMarker; +import user.commons.cluster.ClusteredJob; import user.jobengine.db.ItemManagerData.SignalType; import user.jobengine.db.JobParameters; import user.jobengine.server.instructions.CallJobStepInstruction; @@ -42,34 +43,51 @@ public class JobRuntime extends Job implements IJobRuntime { private Stack stack = null; private Stack savedStack = null; private int ip; - private final EventListenerList listeners; + private final EventListenerList listeners = new EventListenerList(); private final Map variables; private Map parameters; private JobStatus savedStatus; private final IJobEngine jobEngine; private double currentProgress; - private int runtimeWeight; - private List children; + private int weight; private MediaCubeMarker sessionMarker; private MediaCubeMarker finishMarker; private boolean isService; private Semaphore forkSempahore; - private IJobChangedListener jobChangedListener; + private IJobChangedListener spawnJobListener; + private String currentStep; + //Fuggetlen (beagyazott) alfolyamatok + private List children; + //Parhuzamosan futtatot komplett job-ok, amiket bevarunk, ha a megfelelo step hivja + a forEach parhuzamositas private List childrenIDs; + private int spawnOrder; public JobRuntime() { - this.listeners = new EventListenerList(); variables = null; jobEngine = null; program = null; status = JobStatus.RUNABLE; } + public JobRuntime(ClusteredJob job, IJobEngine jobEngine, IJobStatusChangedListener listener) { + this.jobEngine = jobEngine; + this.persister = jobEngine.getItemManager(); + variables = null; + program = null; + name = job.getName(); + description = job.getDescription(); + submitted = job.getSubmitted(); + template = job.getTemplate(); + status = JobStatus.RUNABLE; + //CREATE notifikacio miatt + setId(job.getId()); + addEventListener(listener); + } + public JobRuntime(IJob job) { variables = null; jobEngine = null; program = null; - listeners = null; id = job.getId(); description = job.getDescription(); name = job.getName(); @@ -85,7 +103,6 @@ public class JobRuntime extends Job implements IJobRuntime { public JobRuntime(IJobEngine jobEngine, IJobRuntime runtime, IProgram program) { this.program = program; this.jobEngine = jobEngine; - this.listeners = new EventListenerList(); this.submitted = new Timestamp(System.currentTimeMillis()); this.stack = new Stack(); this.status = JobStatus.RUNABLE; @@ -99,7 +116,6 @@ public class JobRuntime extends Job implements IJobRuntime { if (program == null) throw new NullPointerException("program"); this.jobEngine = jobEngine; - this.listeners = new EventListenerList(); this.program = program; this.stack = new Stack(); this.variables = new HashMap(); @@ -111,7 +127,6 @@ public class JobRuntime extends Job implements IJobRuntime { throw new NullPointerException("program"); this.jobEngine = jobEngine; this.ip = 0; - this.listeners = new EventListenerList(); this.program = program; this.stack = new Stack(); this.variables = new HashMap(); @@ -120,6 +135,30 @@ public class JobRuntime extends Job implements IJobRuntime { fromJob(job); } + /*** + * Parhuzamosan es blokkoltan futtatando lepesek letrehozasa + * + * @param runtime + */ + @SuppressWarnings("unchecked") + public JobRuntime(IJobRuntime runtime) { + this.program = new Program(runtime.getProgram()); + this.jobEngine = runtime.getJobEngine(); + this.submitted = runtime.getSubmitted(); + this.stack = (Stack) runtime.getStack().clone(); + this.status = runtime.getStatus(); + this.ip = runtime.getIp(); + this.variables = new HashMap<>(runtime.getVariables()); + this.parameters = new HashMap<>(runtime.getParameters()); + this.persister = runtime.getPersister(); + this.template = runtime.getTemplate(); + this.weight = runtime.getWeight(); + this.name = runtime.getName(); + this.owner = runtime.getOwner(); + program.removeAfter(getIp()); + //logger.info("Program {}", program); + } + @Override public void add() { IJob job = toJob(); @@ -145,6 +184,14 @@ public class JobRuntime extends Job implements IJobRuntime { listeners.add(IJobStatusChangedListener.class, listener); } + @Override + public void addSpawnChild(IJobRuntime runtime) { + if (childrenIDs == null) + childrenIDs = Collections.synchronizedList(new ArrayList<>()); + childrenIDs.add(runtime.getId()); + runtime.setParentJobId(id); + } + @Override public void addVariable(String name, Class type) { if (variables.containsKey(name)) @@ -171,6 +218,11 @@ public class JobRuntime extends Job implements IJobRuntime { forkSempahore.release(); } + @Override + public boolean canContinueExecution() { + return childrenIDs == null || childrenIDs.size() == 0; + } + @Override public void checkStackParameter() throws RuntimeException, IllegalArgumentException { Class requiredType = (Class) popFromStack(); @@ -189,6 +241,11 @@ public class JobRuntime extends Job implements IJobRuntime { } + @Override + public IJobRuntime createCopy() { + return null; + } + @Override public void decrementInstructionPointer() { if (this.ip == 0) @@ -207,11 +264,11 @@ public class JobRuntime extends Job implements IJobRuntime { public boolean forkPrepare() throws InterruptedException { boolean result = false; forkSempahore = new Semaphore(1); - if (jobChangedListener == null) { - logger.info("Preparing fork"); + if (spawnJobListener == null) { + logger.info("Preparing spawn"); childrenIDs = Collections.synchronizedList(new ArrayList<>()); - jobChangedListener = event -> { + spawnJobListener = event -> { IJobRuntime child = event.getJob(); if (event.getSignalType().equals(SignalType.CREATE)) { if (child.getParentJobId() == getId()) { @@ -231,7 +288,7 @@ public class JobRuntime extends Job implements IJobRuntime { if (childrenIDs.size() == 0) forkSempahore.release(); } - if (!jobEngine.isRunning()) + if (!getJobEngine().isRunning()) forkSempahore.release(); //A gyerek(ek) el sem indultak, pl. nem letezik a template @@ -240,7 +297,7 @@ public class JobRuntime extends Job implements IJobRuntime { }; logger.info("Adding job changed listener"); - jobEngine.addJobChangedEventListener(jobChangedListener); + getJobEngine().addJobChangedEventListener(spawnJobListener); result = true; forkSempahore.acquire(); } else { @@ -252,13 +309,16 @@ public class JobRuntime extends Job implements IJobRuntime { @Override public void forkWaitComplete() throws InterruptedException { - logger.info("Waiting for semaphore" + forkSempahore); + //atlagos mukodes + if (forkSempahore == null) + return; + logger.info("Waiting for semaphore {}", forkSempahore); forkSempahore.acquire(); logger.info("Removing job changed listener"); - if (jobEngine.isRunning()) { + if (getJobEngine().isRunning()) { logger.info("Removing job changed listener"); - jobEngine.removeJobChangedEventListener(jobChangedListener); - jobChangedListener = null; + getJobEngine().removeJobChangedEventListener(spawnJobListener); + spawnJobListener = null; childrenIDs = null; } else { logger.info("Instruction pointer repositioned"); @@ -282,9 +342,40 @@ public class JobRuntime extends Job implements IJobRuntime { parametersFromByteArray(); } + @Override + public CallJobStepInstruction getCurrentCallJobStepInstruction() { + CallJobStepInstruction result = null; + + int i = getIp(); + if (i < program.getInstructionsCount()) { + while (true) { + IInstruction instruction = program.get(i); + if (instruction instanceof CallJobStepInstruction) { + result = (CallJobStepInstruction) instruction; + break; + } + + i--; + if (i < 0) + break; + } + + } + + return result; + } + @Override public IInstruction getCurrentInstruction() { - return program.get(getIp()); + IInstruction result = null; + if (getIp() < program.getInstructionsCount()) + result = program.get(getIp()); + return result; + } + + @Override + public String getCurrentStep() { + return currentStep; } @Override @@ -299,6 +390,11 @@ public class JobRuntime extends Job implements IJobRuntime { return ip; } + @Override + public IJobEngine getJobEngine() { + return jobEngine; + } + /*** * Log session marker. A teljes folyamat osszes naplobejegyzese osszegyujtheto a segitsegevel. MediaCubeMarker tipusu, folyamatonkent uj peldany jon letre. */ @@ -356,6 +452,11 @@ public class JobRuntime extends Job implements IJobRuntime { return savedStatus; } + @Override + public int getSpawnOrder() { + return spawnOrder; + } + @Override public Stack getStack() { return stack; @@ -381,23 +482,35 @@ public class JobRuntime extends Job implements IJobRuntime { return variables; } + @Override + public int getWeight() { + return weight; + } + @Override public boolean hasNextInstruction() { boolean result = false; - if (program.getInstructionsCount() > 0) + if (program != null && program.getInstructionsCount() > 0) result = (this.ip == (program.getInstructionsCount())) ? false : true; return result; } @Override public void incrementProgress(int progress) { + //remote ghost + if (program == null) { + setProgress(progress); + NotifyUpdate(); + return; + } + List instructions = program.getInstructions(); IInstruction currentInstruction = program.get(ip - 1); - if (runtimeWeight == 0) { + if (getWeight() == 0) { for (IInstruction instruction : instructions) if (instruction.getClass().equals(CallJobStepInstruction.class)) { int weight = instruction.getWeight(); - runtimeWeight += weight; + this.weight = getWeight() + weight; } } currentProgress = 0; @@ -406,9 +519,9 @@ public class JobRuntime extends Job implements IJobRuntime { if (instruction.getClass().equals(CallJobStepInstruction.class)) { if (instruction == currentInstruction) break; - currentProgress += (double) instruction.getWeight() * 100 / runtimeWeight; + currentProgress += (double) instruction.getWeight() * 100 / getWeight(); } - double currentDelta = (double) currentWeight * progress / runtimeWeight; + double currentDelta = (double) currentWeight * progress / getWeight(); currentProgress = Math.ceil(currentProgress + currentDelta); if (currentProgress - getProgress() > 4 || currentProgress == 100) { @@ -423,7 +536,7 @@ public class JobRuntime extends Job implements IJobRuntime { @Override public boolean isRunable() { - return (status == JobStatus.RUNABLE) ? true : false; + return JobStatus.RUNABLE.equals(status); } private boolean isRuntimeAssignable(Class fromType, Class toType) { @@ -438,6 +551,11 @@ public class JobRuntime extends Job implements IJobRuntime { return isService; } + @Override + public boolean isWaitFinish() { + return JobStatus.WAIT_FINISH.equals(status); + } + @Override public boolean isWaitingCancel() { return getStatus() == JobStatus.WAIT_CANCEL; @@ -518,6 +636,11 @@ public class JobRuntime extends Job implements IJobRuntime { listeners.remove(IJobStatusChangedListener.class, listener); } + @Override + public void removeSpanwChild(long id) { + childrenIDs.remove(id); + } + @Override public void reset() { currentProgress = 0; @@ -545,11 +668,28 @@ public class JobRuntime extends Job implements IJobRuntime { savedStatus = status; } + @Override + public void setCurrentStep(String currentStep) { + this.currentStep = currentStep; + } + @Override public void setDescription(String description) { super.setDescription(description); } + /* + private final EventListenerList listeners = new EventListenerList(); + private List children; + private MediaCubeMarker sessionMarker; + private MediaCubeMarker finishMarker; + private boolean isService; + private Semaphore forkSempahore; + private IJobChangedListener jobChangedListener; + private List childrenIDs; + + * */ + @Override public void setId(long id) { super.setId(id); @@ -573,6 +713,11 @@ public class JobRuntime extends Job implements IJobRuntime { this.isService = isService; } + @Override + public void setSpawnOrder(int spawnOrder) { + this.spawnOrder = spawnOrder; + } + @Override public void setStatus(JobStatus status) { if (this.status != status) { @@ -585,23 +730,12 @@ public class JobRuntime extends Job implements IJobRuntime { @Override public void setVariable(String name, Object value) { - /* - //castnal elszall - Class type = null; - try { - type = (Class) getVariable(name); - } catch (ClassCastException e) { - throw new IllegalStateException("multiple set"); - } - if (value != null && !type.equals(value.getClass())) - throw new IllegalArgumentException("name " + name + " value " + value); - */ variables.put(name, value); } private void signal(SignalType signalType) { - if (jobEngine != null) - jobEngine.fireJobChangedEvent(new JobChangedEvent(this, signalType)); + if (getJobEngine() != null) + getJobEngine().fireJobChangedEvent(new JobChangedEvent(this, signalType)); } @Override @@ -633,4 +767,5 @@ public class JobRuntime extends Job implements IJobRuntime { job.setParentJobId(getParentJobId()); return job; } + } diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java index b992c3d3..56275397 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java @@ -1,6 +1,7 @@ package user.jobengine.server; import java.net.URLClassLoader; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -10,19 +11,18 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.osgi.framework.Bundle; -import org.osgi.framework.BundleContext; -import org.osgi.framework.FrameworkUtil; -import org.osgi.framework.wiring.BundleWiring; +import groovy.lang.GroovyClassLoader; import user.commons.JobStatus; +import user.commons.cluster.ClusteredJob; import user.jobengine.server.messages.JobStepCompletedMessage; import user.jobengine.server.messages.JobStepSkippedMessage; import user.jobengine.server.steps.IJobStep; public class JobStepExecutor implements IJobStepExecutor { + private class Worker extends Thread { - private static final int WAIT_FOR_REMOTE = 30000; + private static final int WAIT_FOR_REMOTE = 3000; private volatile boolean shutdown = false; @Override @@ -36,64 +36,70 @@ public class JobStepExecutor implements IJobStepExecutor { IJobStep step = null; while (true) { try { - if (jobEngine.isWorker()) { - //a worker is csak azokat akarja vegrehajtani - if (isRemote) { - Object o = jobEngine.getRemoteJob(className); + Thread.sleep(IJobEngine.QUEUE_POLL_INTERVAL_MS); - if (o == null) { - Thread.sleep(1000); - continue; - } - jobRuntime = (IJobRuntime) o; + if (jobEngine.isWorker() && isRemote) { + //a worker is csak azokat akarja vegrehajtani + ClusteredJob job = jobEngine.getRemoteEngine().getRemoteJob(getStepUnitName()); - jobRuntime.setDescription(PROCESSING_REMOTLY); - } - } else { - //SessionUtil.getMediaCubeConfig().getJobQueuePollInterval() - jobRuntime = queue.poll(10, TimeUnit.MILLISECONDS); - if (jobRuntime == null && shutdown) { - logger.trace("Shutting down"); - break; - } - if (jobRuntime == null) - continue; + //TODO remote-ba jelezni, hogy nem sikerult if (shutdown) { logger.trace("{} skipping by shutdown", jobRuntime); jobEngine.sendMessage(new JobStepSkippedMessage(jobRuntime.getId())); - continue; + break; } + if (job != null) { + //TODO set job accepted = PROCESSING_REMOTLY + if error then feedback? + //jobRuntime.setDescription(PROCESSING_REMOTLY); + jobRuntime = new ClusteredJobRuntime(job, jobEngine, e -> { + IJobRuntime runtime = (IJobRuntime) e.getSource(); + job.setStatus(runtime.getStatus()); + job.setProgress(runtime.getProgress()); + jobEngine.getRemoteEngine().reportJobStatus(job); + }); + jobEngine.storeJob(jobRuntime); + + Object[] inputs = job.getInputs(); + runStepObject(jobRuntime, inputs); + } + } + + jobRuntime = queue.poll(); + + if (shutdown) { + logger.trace("Shutting down"); + break; + } + + if (jobRuntime != null) { long submitted = jobRuntime.getSubmitted().getTime(); long current = System.currentTimeMillis(); boolean timeout = current - submitted > WAIT_FOR_REMOTE; + + //ha remote, de nem jelentkezik senki, akkor helyi vegrehajtas if (isRemote) { if (timeout) { logger.info("Remote JobStep timed out, processing locally."); } else { - //logger.info("JobStep is remote, waiting for remote processor"); - if (!WAIT_REMOTE_PROCESSOR.equals(jobRuntime.getDescription())) - jobRuntime.setDescription(WAIT_REMOTE_PROCESSOR); + // if (!WAIT_REMOTE_PROCESSOR.equals(jobRuntime.getDescription())) + // jobRuntime.setDescription(WAIT_REMOTE_PROCESSOR); queue.put(jobRuntime); - //skip local processor continue; } } - jobRuntime.setDescription(PROCESSING_LOCALLY); + + logger.info("Executing locally {}", jobRuntime.getId()); + //jobRuntime.setDescription(PROCESSING_LOCALLY); + Object[] inputs = jobEngine.getInputsFromStack(jobRuntime); + runStepObject(jobRuntime, inputs); + } + + if (shutdown) { + logger.trace("Shutting down"); + break; } - //processing locally - Object[] inputs = jobEngine.getInputsFromStack(jobRuntime); - Object[] outputs = null; - jobRuntime.setStatus(JobStatus.EXECUTING); - jobRuntime.NotifyUpdate(); - step = createStepObject(); - if (step == null) - throw new Exception("Step object is null"); - logger.debug("{} executing", jobRuntime); - jobRuntime.IncrementPriority(); - outputs = step.run(jobEngine, jobRuntime, inputs); - jobEngine.sendMessage(new JobStepCompletedMessage(jobRuntime.getId(), outputs)); } catch (Throwable e) { logger.error("Error in {}", jobRuntime); Throwable t = e.getCause() == null ? e : e.getCause(); @@ -109,6 +115,20 @@ public class JobStepExecutor implements IJobStepExecutor { } } + private void runStepObject(IJobRuntime jobRuntime, Object[] inputs) throws Throwable { + IJobStep step = createStepObject(); + if (step == null) + throw new Exception("Step object is null"); + jobRuntime.setStatus(JobStatus.EXECUTING); + jobRuntime.NotifyUpdate(); + logger.debug("{} executing", jobRuntime); + jobRuntime.incrementPriority(); + Object[] outputs = step.run(jobEngine, jobRuntime, inputs); + + //TODO itt lekezelni a remote notification-t + jobEngine.sendMessage(new JobStepCompletedMessage(jobRuntime.getId(), outputs)); + } + public void shutdown() { this.shutdown = true; } @@ -129,7 +149,7 @@ public class JobStepExecutor implements IJobStepExecutor { private CountDownLatch barrier; private Class stepClass; private int maxConcurrent; - private String className; + private String stepUnitName; private boolean isRemote; public JobStepExecutor() { @@ -149,10 +169,9 @@ public class JobStepExecutor implements IJobStepExecutor { @Override public void changePriority(IJobRuntime runtime) { - if (queue != null && (runtime != null)) { - if (queue.remove(runtime)) { + if (queue != null && runtime != null) { + if (queue.remove(runtime)) queue.put(runtime); - } } } @@ -167,20 +186,23 @@ public class JobStepExecutor implements IJobStepExecutor { @Override @SuppressWarnings("unchecked") - public void create(String className, int maxConcurrent, boolean isRemote) throws JobEngineException { - this.className = className; + public void create(String stepUnitName, int maxConcurrent, boolean isRemote) throws JobEngineException { + this.stepUnitName = stepUnitName; this.isRemote = isRemote; - logger = LogManager.getLogger(getClass().getSimpleName() + ":" + className); - logger.debug("Creating executor {}, instances {}", className, maxConcurrent); - if (StringUtils.isEmpty(className)) - throw new JobEngineException("Step class name can't be null."); - - try { - URLClassLoader loader = URLClassLoader.newInstance(DynamicClassLocator.makeURLs(), getClass().getClassLoader()); - stepClass = (Class) loader.loadClass(className); - } catch (ClassNotFoundException e) { - logger.catching(e); - throw new JobEngineException("System can't load JobStep implementation: " + className); + logger = LogManager.getLogger(getClass().getSimpleName() + ":" + stepUnitName); + logger.debug("Creating executor {}, instances {}", stepUnitName, maxConcurrent); + if (StringUtils.isEmpty(stepUnitName)) + throw new JobEngineException("Step unit name can't be null."); + + if (!isGroovyStep()) { + //a groovy-nak nem kell + try { + URLClassLoader loader = URLClassLoader.newInstance(DynamicClassLocator.makeURLs(), getClass().getClassLoader()); + stepClass = (Class) loader.loadClass(stepUnitName); + } catch (ClassNotFoundException e1) { + logger.catching(e1); + throw new JobEngineException("System can't load JobStep implementation: " + stepUnitName); + } } queue = new PriorityBlockingQueue(); @@ -193,8 +215,16 @@ public class JobStepExecutor implements IJobStepExecutor { } } - protected IJobStep createStepObject() throws InstantiationException, IllegalAccessException { - return stepClass.newInstance(); + protected IJobStep createStepObject() throws Exception { + IJobStep result = null; + + if (isGroovyStep()) { + GroovyClassLoader gcl = new GroovyClassLoader(); + Class myClass = gcl.parseClass(Paths.get(System.getProperty("jobengine.jobsteps.groovy.root", ""), stepUnitName).toFile()); + result = (IJobStep) myClass.newInstance(); + } else + result = stepClass.newInstance(); + return result; } @Override @@ -202,47 +232,25 @@ public class JobStepExecutor implements IJobStepExecutor { return maxConcurrent; } - private ClassLoader getParentClassLoader() { - ClassLoader parentClassLoader = getClass().getClassLoader(); - Bundle bundle = FrameworkUtil.getBundle(getClass()); - if (bundle != null) { - BundleContext bundleContext = bundle.getBundleContext(); - if (bundleContext != null) { - BundleWiring bundleWiring = bundle.adapt(BundleWiring.class); - parentClassLoader = bundleWiring.getClassLoader(); - } - } - return parentClassLoader; - } - @Override public PriorityBlockingQueue getQueue() { return this.queue; } - // @Override - // public void synchronize() { - // if (priorityQueue.size() > 0 && workers.size() > queue.size()) { - // IJobRuntime jobRuntime = priorityQueue.poll(); - // try { - // queue.put(jobRuntime); - // } catch (InterruptedException e) { - // e.printStackTrace(); - // } - // } - // } - - @SuppressWarnings("unchecked") @Override public Class getStepClass() { - //TODO miért hozunk létre mindíg újat - // if (stepClass != null) { - // DynamicClassLoader loader = new DynamicClassLoader(getClass().getClassLoader()); - // stepClass = (Class) loader.loadClass(stepClass.getCanonicalName()); - // } return stepClass; } + @Override + public String getStepUnitName() { + return stepUnitName; + } + + private boolean isGroovyStep() { + return stepUnitName.toLowerCase().endsWith(".java"); + } + @Override public boolean isRemoteEnabled() { return isRemote; @@ -250,9 +258,7 @@ public class JobStepExecutor implements IJobStepExecutor { @Override public void revoke(IJobRuntime jobRuntime) { - //synchronized(queue){ queue.remove(jobRuntime); - //} } @Override @@ -280,15 +286,30 @@ public class JobStepExecutor implements IJobStepExecutor { } @Override - public IJobRuntime steelJob() throws InterruptedException { - return queue.poll(5, TimeUnit.MILLISECONDS); + public ClusteredJob steelJob() throws InterruptedException { + ClusteredJob result = null; + IJobRuntime jobRuntime = queue.poll(IJobEngine.QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); + + if (jobRuntime != null) { + result = new ClusteredJob(); + result.setId(jobRuntime.getId()); + result.setName(jobRuntime.getName()); + result.setDescription(jobRuntime.getDescription()); + result.setTemplate(jobRuntime.getTemplate()); + Object[] inputs = jobEngine.getInputsFromStack(jobRuntime); + result.setInputs(inputs); + } + + return result; } @Override - public void submit(IJobRuntime jobRuntime) { - queue.put(jobRuntime); - jobRuntime.setDescription(stepClass.getSimpleName()); - //logger.info("Executor got ! {}", jobRuntime); + public void submit(IJobRuntime... jobRuntime) { + for (IJobRuntime r : jobRuntime) { + logger.info("Adding job {} to executor queue", r.getId()); + r.setCurrentStep(stepUnitName); + queue.put(r); + } } @Override @@ -296,5 +317,4 @@ public class JobStepExecutor implements IJobStepExecutor { for (Worker w : workers) w.waitShutdown(); } - } diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/actions/StatusMachine.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/actions/StatusMachine.java index f1757260..213a0d11 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/actions/StatusMachine.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/actions/StatusMachine.java @@ -30,7 +30,7 @@ public class StatusMachine implements IStatusMachine { StatusMachineAction machineAction = new StatusMachineAction(jobAction, jobRuntime.getStatus()); if (actions.containsKey(machineAction)) { IJobStatusAction action = actions.get(machineAction); - logger.info("{} status change {} -> {}", jobRuntime, jobRuntime.getStatus(), jobAction); + logger.debug("{} changes {} -> {}", jobRuntime.getId(), jobRuntime.getStatus(), jobAction); action.processAction(jobEngine, jobRuntime); } else { logger.warn("No status processor registered for {} -> {}", jobAction, jobRuntime.getStatus()); diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/zk/model/JobListModel.java b/server/user.jobengine.osgi.server/src/user/jobengine/zk/model/JobListModel.java index c3f0b647..e55498b0 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/zk/model/JobListModel.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/zk/model/JobListModel.java @@ -40,8 +40,8 @@ public class JobListModel extends AsyncBaseModel implements IJobChangedListener @Command public void cancelJobs() { if (this.jobList.getSelection() != null) { - for (IJobRuntime job : this.jobList.getSelection()) { - jobEngine.sendMessage(new CancelRequest(job.getId())); + for (IJobRuntime jobRuntime : this.jobList.getSelection()) { + jobEngine.sendMessage(new CancelRequest(jobRuntime.getId())); } } } @@ -49,9 +49,9 @@ public class JobListModel extends AsyncBaseModel implements IJobChangedListener @Command public void changeJobsPriority() { if (this.jobList.getSelection() != null) { - for (IJobRuntime job : this.jobList.getSelection()) { - job.setPriority(newPriority); - jobEngine.rePrioritization(job); + for (IJobRuntime jobRuntime : this.jobList.getSelection()) { + jobRuntime.setPriority(newPriority); + jobEngine.applyPriorityChange(jobRuntime); } } } diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/zk/model/RetrieveBatchSelectorModel.java b/server/user.jobengine.osgi.server/src/user/jobengine/zk/model/RetrieveBatchSelectorModel.java index e634fc71..b44bf702 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/zk/model/RetrieveBatchSelectorModel.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/zk/model/RetrieveBatchSelectorModel.java @@ -99,6 +99,9 @@ public class RetrieveBatchSelectorModel extends BaseModel { IJobEngine jobEngine = JobEngine.getInstance(); ScheduledJob scheduledJob = jobEngine.getScheduledJob(JOBTEMPLATE); + if (scheduledJob == null) + throw new Exception("A sablon nem található: " + JOBTEMPLATE); + Map parameters = scheduledJob.getJobParameters(); parameters.put(HOUSEID, houseId); parameters.put(RECIPIENT, email); diff --git a/server/user.jobengine.osgi.server/test/user/jobengine/server/IT/JobengineIT.java b/server/user.jobengine.osgi.server/test/user/jobengine/server/IT/JobengineIT.java index b3b59398..4564a71a 100644 --- a/server/user.jobengine.osgi.server/test/user/jobengine/server/IT/JobengineIT.java +++ b/server/user.jobengine.osgi.server/test/user/jobengine/server/IT/JobengineIT.java @@ -7,14 +7,17 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URISyntaxException; import java.net.URL; +import java.sql.Timestamp; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Stack; import org.junit.BeforeClass; import org.junit.Test; import user.commons.JobStatus; +import user.commons.cluster.ClusteredJob; import user.jobengine.db.IItemManager; import user.jobengine.db.ItemManager; import user.jobengine.db.ItemManagerData.SignalType; @@ -23,6 +26,7 @@ import user.jobengine.server.IJobRuntime; import user.jobengine.server.IJobStatusChangedListener; import user.jobengine.server.JobEngine; import user.jobengine.server.JobEngineException; +import user.jobengine.server.JobEngineRemote; import user.jobengine.server.JobStatusChangedEvent; import user.jobengine.server.ThreadSynchronizer; @@ -174,24 +178,58 @@ public class JobengineIT { * @throws Exception */ @Test - public void remote() throws Exception { + public void remote_worker() throws Exception { + //ez barmi lehet + System.setProperty("jobengine.master.server", "http://localhost:8888"); final ThreadSynchronizer sync = new ThreadSynchronizer(); - final IJobEngine jobEngine = new JobEngine(); - jobEngine.startup(); - jobEngine.bindItemManagerService(manager); - /* - Map parameters = new HashMap<>(); - parameters.put("itemID", 100); - IJobRuntime runtime = jobEngine.submit(null, e -> { - if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus())) - sync.suspend(); - }, "fake.xml", "Fake", parameters); - jobEngine.addJobChangedEventListener(e -> { - if (e.getSignalType().equals(SignalType.UPDATE)) { - System.out.println(e.getJob().getProgress()); + + ClusteredJob job = new ClusteredJob(); + job.setId(1); + job.setName("Teszt"); + job.setSubmitted(new Timestamp(System.currentTimeMillis())); + job.setTemplate("teszt.xml"); + //itemId + job.setInputs(new Object[] { 100 }); + + final Stack jobs = new Stack<>(); + jobs.push(job); + final IJobEngine jobEngine = new JobEngine() { + + @Override + protected JobEngineRemote createRemoteEngine() { + return new JobEngineRemote("") { + @Override + public ClusteredJob getRemoteJob(String className) { + ClusteredJob job = null; + if (!jobs.isEmpty()) + job = jobs.pop(); + return job; + } + + @Override + public void reportJobStatus(ClusteredJob job) { + System.out.println("Report:" + job.getStatus()); } - }); - */ + }; + } + + }; + jobEngine.bindItemManagerService(manager); + jobEngine.addJobChangedEventListener(e -> { + // if (e.getSignalType().equals(SignalType.UPDATE)) { + // System.out.println(e.getJob().getProgress()); + // } + + if (SignalType.CREATE.equals(e.getSignalType())) + System.out.println(e.getSignalType()); + if (SignalType.UPDATE.equals(e.getSignalType())) { + JobStatus status = e.getJob().getStatus(); + System.out.println(status); + if (JobStatus.FINISHED.equals(status) || JobStatus.SUSPENDED.equals(status)) + sync.suspend(); + } + }); + jobEngine.startup(); sync.waitSuspend(); sync.resume(); jobEngine.shutdown(); @@ -199,6 +237,14 @@ public class JobengineIT { //assertEquals(JobStatus.FINISHED, runtime.getStatus()); } + @Test + public void reportJobStatus() throws Exception { + JobEngineRemote sut = new JobEngineRemote("http://localhost:8888"); + ClusteredJob job = new ClusteredJob(); + job.setId(1); + sut.reportJobStatus(job); + } + /*** * NEXIO adatok szinkronizalo folyamat futtatasa * diff --git a/server/user.jobengine.osgi.server/test/user/jobengine/server/IT/ProrityChangeTests.java b/server/user.jobengine.osgi.server/test/user/jobengine/server/IT/ProrityChangeTests.java new file mode 100644 index 00000000..bd2b99c4 --- /dev/null +++ b/server/user.jobengine.osgi.server/test/user/jobengine/server/IT/ProrityChangeTests.java @@ -0,0 +1,187 @@ +package user.jobengine.server.IT; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.FileInputStream; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import user.commons.JobStatus; +import user.jobengine.db.IItemManager; +import user.jobengine.db.ItemManager; +import user.jobengine.db.ItemManagerData.SignalType; +import user.jobengine.server.IJobEngine; +import user.jobengine.server.IJobRuntime; +import user.jobengine.server.JobEngine; + +public class ProrityChangeTests { + private static IItemManager manager; + private static IJobEngine jobEngine; + private static Map jobParams = new HashMap<>(); + private static int JOB_COUNT = 10; + + @BeforeClass + public static void initialize() throws Exception { + //Kornyezeti valtozok betoltese + Properties properties = new Properties(); + URL srcLocation = ProrityChangeTests.class.getProtectionDomain().getCodeSource().getLocation(); + URL location = new URL(srcLocation, "../../-configuration/mediacube-dev-user.properties"); + properties.load(new FileInputStream(location.toURI().getPath().toString())); + System.getProperties().putAll(properties); + + manager = new ItemManager(); + manager.connect(); + + jobEngine = new JobEngine(); + jobEngine.startup(); + jobEngine.bindItemManagerService(manager); + + jobParams.put("itemID", 100); + } + + @AfterClass + public static void terminate() throws Exception { + jobEngine.shutdown(); + manager.disconnect(); + } + + @Test + public void testAfterExecutorSubmitCompleted() throws Exception { + CountDownLatch startLatch = new CountDownLatch(JOB_COUNT); + CountDownLatch finishLatch = new CountDownLatch(JOB_COUNT); + List runtimes = new ArrayList<>(); + List results = new ArrayList<>(); + + jobEngine.addJobChangedEventListener(e -> { + if (e.getSignalType().equals(SignalType.EXECUTE)) + startLatch.countDown(); + }); + + for (int i = 0; i < JOB_COUNT; i++) { + IJobRuntime jobRuntime = jobEngine.submit(null, e -> { + if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus())) { + results.add((IJobRuntime) e.getSource()); + finishLatch.countDown(); + } + }, "fake.xml", "Fake", jobParams); + runtimes.add(jobRuntime); + } + + startLatch.await(); + + IJobRuntime highPriorityJob = runtimes.get(runtimes.size() - 1); + highPriorityJob.incrementPriority(); + jobEngine.applyPriorityChange(highPriorityJob); + + finishLatch.await(); + + for (int i = 0; i < JOB_COUNT; i++) + assertEquals(JobStatus.FINISHED, runtimes.get(i).getStatus()); + + assertFalse(runtimes.indexOf(highPriorityJob) == results.indexOf(highPriorityJob)); + + } + + @Test + public void testAfterSubmitCompleted() throws Exception { + CountDownLatch startLatch = new CountDownLatch(JOB_COUNT); + CountDownLatch finishLatch = new CountDownLatch(JOB_COUNT); + List runtimes = new ArrayList<>(); + List results = new ArrayList<>(); + + jobEngine.addJobChangedEventListener(e -> { + if (e.getSignalType().equals(SignalType.CREATE)) + startLatch.countDown(); + }); + + for (int i = 0; i < JOB_COUNT; i++) { + IJobRuntime jobRuntime = jobEngine.submit(null, e -> { + if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus())) { + results.add((IJobRuntime) e.getSource()); + finishLatch.countDown(); + } + }, "fake.xml", "Fake", jobParams); + runtimes.add(jobRuntime); + } + + startLatch.await(); + + IJobRuntime highPriorityJob = runtimes.get(runtimes.size() - 1); + highPriorityJob.incrementPriority(); + jobEngine.applyPriorityChange(highPriorityJob); + + finishLatch.await(); + + for (int i = 0; i < JOB_COUNT; i++) + assertEquals(JobStatus.FINISHED, runtimes.get(i).getStatus()); + + assertFalse(runtimes.indexOf(highPriorityJob) == results.indexOf(highPriorityJob)); + + } + + @Test + public void testUnderSubmit() throws Exception { + CountDownLatch finishLatch = new CountDownLatch(JOB_COUNT); + List runtimes = new ArrayList<>(); + List results = new ArrayList<>(); + + for (int i = 0; i < JOB_COUNT; i++) { + IJobRuntime jobRuntime = jobEngine.submit(null, e -> { + if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus())) { + results.add((IJobRuntime) e.getSource()); + finishLatch.countDown(); + } + }, "fake.xml", "Fake", jobParams); + runtimes.add(jobRuntime); + } + + IJobRuntime highPriorityJob = runtimes.get(runtimes.size() - 1); + highPriorityJob.incrementPriority(); + jobEngine.applyPriorityChange(highPriorityJob); + + finishLatch.await(); + + for (int i = 0; i < JOB_COUNT; i++) + assertEquals(JobStatus.FINISHED, runtimes.get(i).getStatus()); + + assertFalse(runtimes.indexOf(highPriorityJob) == results.indexOf(highPriorityJob)); + } + + @Test + public void testUnderSubmitEqualPriority() throws Exception { + CountDownLatch finishLatch = new CountDownLatch(JOB_COUNT); + List runtimes = new ArrayList<>(); + List results = new ArrayList<>(); + + for (int i = 0; i < JOB_COUNT; i++) { + IJobRuntime jobRuntime = jobEngine.submit(null, e -> { + if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus())) { + results.add((IJobRuntime) e.getSource()); + finishLatch.countDown(); + } + }, "fake.xml", "Fake", jobParams); + runtimes.add(jobRuntime); + } + + IJobRuntime highPriorityJob = runtimes.get(runtimes.size() - 1); + + finishLatch.await(); + + for (int i = 0; i < JOB_COUNT; i++) + assertEquals(JobStatus.FINISHED, runtimes.get(i).getStatus()); + + assertTrue(runtimes.indexOf(highPriorityJob) == results.indexOf(highPriorityJob)); + } + +} diff --git a/server/user.jobengine.osgi.server/test/user/jobengine/server/PriorityEntryTest.java b/server/user.jobengine.osgi.server/test/user/jobengine/server/PriorityEntryTest.java index f6f24d46..e4b67fa0 100644 --- a/server/user.jobengine.osgi.server/test/user/jobengine/server/PriorityEntryTest.java +++ b/server/user.jobengine.osgi.server/test/user/jobengine/server/PriorityEntryTest.java @@ -12,13 +12,6 @@ import org.junit.Test; import user.commons.IJob; public class PriorityEntryTest { - // private PriorityEntry firstEntry; - // private PriorityEntry secondEntry; - private JobRuntime firstEntry; - private JobRuntime secondEntry; - private JobRuntime thirdEntry; - private BlockingQueue queue; - private class JobComparator implements Comparator { @Override @@ -32,6 +25,14 @@ public class PriorityEntryTest { } + // private PriorityEntry firstEntry; + // private PriorityEntry secondEntry; + private JobRuntime firstEntry; + private JobRuntime secondEntry; + private JobRuntime thirdEntry; + + private BlockingQueue queue; + @Before public void setup() { firstEntry = new JobRuntime(); @@ -58,18 +59,36 @@ public class PriorityEntryTest { } @Test - public void testPriorityEntry_IncrementPriority() throws Exception { + public void testPriorityEntry_incrementPriority() throws Exception { + // Fixture + queue.add(firstEntry); + queue.add(secondEntry); + queue.add(thirdEntry); + + queue.remove(secondEntry); + secondEntry.incrementPriority(); + queue.add(secondEntry); + + // Exercise + IJobRuntime current = queue.poll(); + + // Verify + assertEquals(secondEntry, current); + } + + @Test + public void testPriorityEntry_incrementPriority_async_poll() throws Exception { // Fixture queue.add(firstEntry); queue.add(secondEntry); queue.add(thirdEntry); + secondEntry.incrementPriority(); queue.remove(secondEntry); - secondEntry.IncrementPriority(); queue.add(secondEntry); // Exercise - IJobRuntime current = queue.take(); + IJobRuntime current = queue.poll(); // Verify assertEquals(secondEntry, current); diff --git a/server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusterService.java b/server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusterService.java index 3ba06f8e..673fd67a 100644 --- a/server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusterService.java +++ b/server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusterService.java @@ -2,6 +2,7 @@ package user.jobengine.osgi.mediacube; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; @@ -12,6 +13,7 @@ import javax.ws.rs.core.Response; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import user.commons.cluster.ClusteredJob; import user.jobengine.osgi.rest.ComponentBinder; import user.jobengine.server.IJobEngine; @@ -31,11 +33,11 @@ public class ClusterService { public Response getJob(@QueryParam("className") String className) { Response result = null; try { - //IJobRuntime job = jobEngine.getJobForRemote(className); - ClusteredJob j = new ClusteredJob(); - j.setId(100); - j.setName("Jobname"); - result = Response.ok().entity(j).build(); + ClusteredJob job = jobEngine.requestJob(className); + if (job == null) + result = Response.noContent().build(); + else + result = Response.ok().entity(job).build(); } catch (Exception e) { result = Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build(); } @@ -56,4 +58,17 @@ public class ClusterService { } return result; } + + @POST + @Path("/notifyjob") + @Produces({ MediaType.APPLICATION_JSON }) + public Response notifyJob(ClusteredJob job) { + Response result = null; + try { + result = Response.ok().build(); + } catch (Exception e) { + result = Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build(); + } + return result; + } } diff --git a/server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusteredJob.java b/server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusteredJob.java deleted file mode 100644 index 85f20671..00000000 --- a/server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusteredJob.java +++ /dev/null @@ -1,23 +0,0 @@ -package user.jobengine.osgi.mediacube; - -public class ClusteredJob { - private long id; - private String name; - - public long getId() { - return id; - } - - public String getName() { - return name; - } - - public void setId(long id) { - this.id = id; - } - - public void setName(String name) { - this.name = name; - } - -} -- 2.54.0