git-tfs-id: [http://tfs.userrendszerhaz.hu:8080/tfs/DefaultCollection]$/MediaCube...
authorVásáry Dániel <daniel.vasary@userrendszerhaz.hu>
Fri, 22 May 2020 15:39:16 +0000 (15:39 +0000)
committerVásáry Dániel <daniel.vasary@userrendszerhaz.hu>
Fri, 22 May 2020 15:39:16 +0000 (15:39 +0000)
36 files changed:
server/-configuration/mediacube-dev-user.properties
server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/MediaBaseTest.java
server/user.jobengine.executors/config/config-worker.xml
server/user.jobengine.executors/jobtemplates/fake-spawn.xml [new file with mode: 0644]
server/user.jobengine.executors/jobtemplates/fake.xml
server/user.jobengine.executors/src/user/jobengine/server/steps/CancelableStep.java
server/user.jobengine.executors/src/user/jobengine/server/steps/CleanupMountedLocationStep.java
server/user.jobengine.executors/src/user/jobengine/server/steps/FakeSpawnStep.java [new file with mode: 0644]
server/user.jobengine.executors/src/user/jobengine/server/steps/FakeStep.java
server/user.jobengine.executors/src/user/jobengine/server/steps/HSMMigrateStep.java
server/user.jobengine.executors/src/user/jobengine/server/steps/NEXIOCheckerStep.java
server/user.jobengine.osgi.commons/src/user/commons/IJob.java
server/user.jobengine.osgi.commons/src/user/commons/Job.java
server/user.jobengine.osgi.commons/src/user/commons/cluster/ClusteredJob.java [new file with mode: 0644]
server/user.jobengine.osgi.commons/src/user/commons/mediatool/MediaInfo.java
server/user.jobengine.osgi.db/src/user/jobengine/db/ItemManagerData.java
server/user.jobengine.osgi.server/.classpath
server/user.jobengine.osgi.server/log/mediacube-err.log [new file with mode: 0644]
server/user.jobengine.osgi.server/log/mediacube.log [new file with mode: 0644]
server/user.jobengine.osgi.server/resources/i3-label_hu.properties
server/user.jobengine.osgi.server/src/user/jobengine/server/ClusteredJobRuntime.java [new file with mode: 0644]
server/user.jobengine.osgi.server/src/user/jobengine/server/IJobEngine.java
server/user.jobengine.osgi.server/src/user/jobengine/server/IJobRuntime.java
server/user.jobengine.osgi.server/src/user/jobengine/server/IJobStepExecutor.java
server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java
server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngineRemote.java [new file with mode: 0644]
server/user.jobengine.osgi.server/src/user/jobengine/server/JobRuntime.java
server/user.jobengine.osgi.server/src/user/jobengine/server/JobStepExecutor.java
server/user.jobengine.osgi.server/src/user/jobengine/server/actions/StatusMachine.java
server/user.jobengine.osgi.server/src/user/jobengine/zk/model/JobListModel.java
server/user.jobengine.osgi.server/src/user/jobengine/zk/model/RetrieveBatchSelectorModel.java
server/user.jobengine.osgi.server/test/user/jobengine/server/IT/JobengineIT.java
server/user.jobengine.osgi.server/test/user/jobengine/server/IT/ProrityChangeTests.java [new file with mode: 0644]
server/user.jobengine.osgi.server/test/user/jobengine/server/PriorityEntryTest.java
server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusterService.java
server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusteredJob.java [deleted file]

index 4879ae5f92b1ecac0d06cdb4bb0e79bca898d788..6f427b536a668d280e9296227c1e8de400074024 100644 (file)
@@ -1,11 +1,13 @@
+org.slf4j.simpleLogger.defaultLogLevel=debug\r
 jobengine.jobsteps.root=../user.jobengine.executors/bin\r
 jobengine.jobsteps.config=../user.jobengine.executors/config/config-worker.xml\r
 jobengine.jobtemplates.root=../user.jobengine.executors/jobtemplates\r
+jobengine.jobsteps.groovy.root=../user.jobengine.executors/src/user/jobengine/server/steps\r
 \r
 jetty.home=../-configuration/jetty\r
 jetty.etc.config.urls=etc/user-jetty.xml,etc/user-jetty-ssl.xml,etc/user-jetty-ssl-context.xml,etc/user-jetty-http.xml,etc/user-jetty-https.xml\r
 \r
-log4j.configurationFile=../-configuration/log4j2.xml\r
+log4j.configurationFile=../-configuration/log4j2-test.xml\r
 \r
 jobengine.db.url=jdbc:db2://10.228.198.1:50000/mediaarc:retrieveMessagesFromServerOnGetMessage=true;\r
 jobengine.db.user=db2admin\r
@@ -14,5 +16,5 @@ jobengine.nosql.db.url=jdbc:db2://10.228.198.1:50000/mccache:retrieveMessagesFro
 jobengine.nosql.db.user=db2admin\r
 jobengine.nosql.db.password=password\r
 \r
-jobengine.master.server=http://localhost:8888\r
+#jobengine.master.server=http://localhost:8888\r
 javax.ws.rs.ext.RuntimeDelegate=org.jboss.resteasy.spi.ResteasyProviderFactory
\ No newline at end of file
index 5d5debb0ccb5215c09baebc3cd123bb7b78b1984..8bf6b14973548bc58a8cfa10254d75693a6765f2 100644 (file)
@@ -1,5 +1,9 @@
 package hu.user.mediacube.executors.tests;\r
 \r
+import java.text.SimpleDateFormat;\r
+import java.time.Duration;\r
+import java.time.Instant;\r
+import java.util.Iterator;\r
 import java.util.List;\r
 \r
 import org.apache.commons.io.FilenameUtils;\r
@@ -9,6 +13,9 @@ import org.junit.Test;
 \r
 import user.commons.RemoteFile;\r
 import user.commons.StoreUri;\r
+import user.commons.nexio.api.Clip;\r
+import user.commons.nexio.api.Controller;\r
+import user.commons.nexio.api.Mediabase;\r
 import user.commons.remotestore.FtpDirectoryLister;\r
 import user.commons.remotestore.IDirectoryLister;\r
 import user.commons.remotestore.RemoteStoreProtocol;\r
@@ -16,18 +23,22 @@ import user.commons.remotestore.RemoteStoreProtocol;
 public class MediaBaseTest {\r
 \r
        @Test\r
-       public void listMediaBase() throws Exception {\r
+       public void listMediaBaseFTP() throws Exception {\r
+               Instant start = Instant.now();\r
                StoreUri nexioUri = new StoreUri();\r
                nexioUri.setProtocol(RemoteStoreProtocol.FTP);\r
                nexioUri.setUri("10.10.1.55");\r
                nexioUri.setPortNumber(2098);\r
                nexioUri.setUserName("ftp");\r
                nexioUri.setPassword("ftp");\r
+               int i = 0;\r
                try {\r
                        FTPClient ftp = ((FtpDirectoryLister) nexioUri.getLister()).connect();\r
                        IDirectoryLister lister = nexioUri.getLister();\r
                        List<RemoteFile> list = lister.list();\r
                        for (RemoteFile rf : list) {\r
+                               if (i > 9)\r
+                                       break;\r
                                if (rf.getIsFolder())\r
                                        continue;\r
                                String baseName = FilenameUtils.getBaseName(rf.getName());\r
@@ -40,6 +51,8 @@ public class MediaBaseTest {
                                } catch (Exception ie) {\r
                                        System.err.println(ie.getMessage());\r
                                }\r
+                               i++;\r
+\r
                        }\r
 \r
                } catch (Exception e) {\r
@@ -47,7 +60,35 @@ public class MediaBaseTest {
                } finally {\r
                        nexioUri.cleanUp();\r
                }\r
+               Instant end = Instant.now();\r
+               System.out.println(Duration.between(start, end));\r
 \r
        }\r
 \r
+       @Test\r
+       public void listMediaBaseNEXIO() throws Exception {\r
+               Instant start = Instant.now();\r
+\r
+               Controller controller = new Controller("10.10.1.55");\r
+\r
+               controller.connect();\r
+               Mediabase mediabase = controller.getMediabase();\r
+               int i = 100;\r
+               try {\r
+                       SimpleDateFormat df = new SimpleDateFormat("yyy-MM-dd HH:mm:ss");\r
+                       Iterator<Clip> clips = mediabase.getClips();\r
+                       while (clips.hasNext() && i > 0) {\r
+                               Clip clip = clips.next();\r
+                               System.out.println(clip.getId() + " " + clip.getXid().get() + " " + df.format(clip.getModifiedTimestamp().getTime()));\r
+                               i--;\r
+                       }\r
+               } catch (Exception e) {\r
+                       System.err.println(e.getMessage());\r
+               } finally {\r
+                       controller.disconnect();\r
+               }\r
+               Instant end = Instant.now();\r
+               System.out.println(Duration.between(start, end));\r
+\r
+       }\r
 }\r
index 5764b63eb97c406a273e5b2a5d06d245d2ef932c..21490264f457fd76858401e98b39f4560a75c7ce 100644 (file)
@@ -1,4 +1,5 @@
 <?xml version="1.0" encoding="UTF-8"?>\r
 <executors>\r
        <executor className="user.jobengine.server.steps.CancelableStep" maxConcurrent="1" isRemote="true" />\r
+       <executor className="FakeStep.java" maxConcurrent="1" />\r
 </executors>
\ No newline at end of file
diff --git a/server/user.jobengine.executors/jobtemplates/fake-spawn.xml b/server/user.jobengine.executors/jobtemplates/fake-spawn.xml
new file mode 100644 (file)
index 0000000..bec5abe
--- /dev/null
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>\r
+<jobtemplate multiInstance="true" name="Fake">\r
+<declarations>\r
+       <parameters>\r
+               <parameter name="itemID" type="java.lang.Long"/>\r
+       </parameters>\r
+       <parameters>\r
+               <parameter name="iter" type="java.lang.Iterable"/>\r
+       </parameters>\r
+       <variables>\r
+               <variable name="resultID" type="java.lang.Long"/>\r
+               <variable name="resultID1" type="java.lang.Long"/>\r
+       </variables>\r
+</declarations>\r
+<commands>\r
+       <calljobstep type="FakeStep.java" weight="5" forEach="iter">\r
+               <inputs>\r
+                       <input>\r
+                               <parameter name="itemID" />\r
+                       </input>\r
+                       <input>\r
+                               <parameter name="iter" />\r
+                       </input>\r
+               </inputs>\r
+               <outputs>\r
+                       <output>\r
+                               <variable name="resultID" />\r
+                       </output>\r
+                       <output>\r
+                               <variable name="resultID1" />\r
+                       </output>\r
+               </outputs>\r
+       </calljobstep>\r
+       <calljobstep type="MergeStep.java" weight="1">\r
+               <inputs>\r
+                       <input>\r
+                               <variable name="resultID" />\r
+                       </input>\r
+                       <input>\r
+                               <variable name="resultID1" />\r
+                       </input>\r
+               </inputs>\r
+       </calljobstep>\r
+</commands>\r
+</jobtemplate>
\ No newline at end of file
index bec5abeb841492ee9d91ce1bc684150478693950..3b2404bf5026033ef825c22e889793f549cc9653 100644 (file)
@@ -4,23 +4,17 @@
        <parameters>\r
                <parameter name="itemID" type="java.lang.Long"/>\r
        </parameters>\r
-       <parameters>\r
-               <parameter name="iter" type="java.lang.Iterable"/>\r
-       </parameters>\r
        <variables>\r
                <variable name="resultID" type="java.lang.Long"/>\r
                <variable name="resultID1" type="java.lang.Long"/>\r
        </variables>\r
 </declarations>\r
 <commands>\r
-       <calljobstep type="FakeStep.java" weight="5" forEach="iter">\r
+       <calljobstep type="FakeStep.java" weight="1">\r
                <inputs>\r
                        <input>\r
                                <parameter name="itemID" />\r
                        </input>\r
-                       <input>\r
-                               <parameter name="iter" />\r
-                       </input>\r
                </inputs>\r
                <outputs>\r
                        <output>\r
                        </output>\r
                </outputs>\r
        </calljobstep>\r
-       <calljobstep type="MergeStep.java" weight="1">\r
-               <inputs>\r
-                       <input>\r
-                               <variable name="resultID" />\r
-                       </input>\r
-                       <input>\r
-                               <variable name="resultID1" />\r
-                       </input>\r
-               </inputs>\r
-       </calljobstep>\r
 </commands>\r
 </jobtemplate>
\ No newline at end of file
index 6a6c8ff377942c46596dbc032e1e79405ce28c50..79e0a57b5313629103ddd69c28ca917a0616be69 100644 (file)
@@ -7,7 +7,7 @@ public class CancelableStep extends JobStep {
        public Object[] execute() throws Exception {\r
                try {\r
                        for (int i = 0; i < count; i++) {\r
-                               if (getJobRuntime().isWaitingCancel() || getJobRuntime().isWaitingSuspend())\r
+                               if (getJobRuntime().isWaitingCancel())\r
                                        break;\r
                                Thread.sleep(1000);\r
                                setProgress((i + 1) * count);\r
index c754867fab32e551d148938bfedd469b55eefeb0..d6559574ea694bb509de75bf982edcde614b4d77 100644 (file)
@@ -177,6 +177,11 @@ public class CleanupMountedLocationStep extends JobStep implements FileVisitor<P
 \r
        @Override\r
        public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {\r
+\r
+               //A .-al kezdodo mappakat kihagyjuk\r
+               if (dir.getFileName().toString().startsWith("."))\r
+                       return FileVisitResult.SKIP_SUBTREE;\r
+\r
                return FileVisitResult.CONTINUE;\r
        }\r
 \r
@@ -185,9 +190,6 @@ public class CleanupMountedLocationStep extends JobStep implements FileVisitor<P
 \r
                setProgress(currentCount[0] * 100 / allCount[0]);\r
 \r
-               if (STATUSFOLDER.equals(filePath.getParent().getFileName()))\r
-                       logger.info("Skipping {}", filePath);\r
-\r
                logger.info("Checking {}", filePath);\r
                List<Path> killDateFiles = getKillDateFiles(filePath);\r
                if (killDateFiles == null || killDateFiles.size() == 0) {\r
diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/FakeSpawnStep.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/FakeSpawnStep.java
new file mode 100644 (file)
index 0000000..28f0e13
--- /dev/null
@@ -0,0 +1,41 @@
+package user.jobengine.server.steps;\r
+\r
+import java.util.Arrays;\r
+\r
+import org.apache.logging.log4j.LogManager;\r
+import org.apache.logging.log4j.Logger;\r
+\r
+public class FakeSpawnStep extends JobStep {\r
+       private static final Logger logger = LogManager.getLogger();\r
+       private int count = 10;\r
+\r
+       @StepEntry\r
+       public Object[] execute(long itemID, Iterable<?> iter) throws Exception {\r
+\r
+               if (getJobRuntime().getSpawnOrder() == 0) {\r
+                       count = 5;\r
+               }\r
+               logger.info(getMarker(), "Starting params: {}, {}, spawnOrder {}", itemID, iter, getJobRuntime().getSpawnOrder());\r
+\r
+               try {\r
+                       int step = 100 / count;\r
+                       for (int i = 0; i < count; i++) {\r
+                               if (!canContinue())\r
+                                       break;\r
+                               setProgress((i + 1) * step);\r
+                               for (int j = 0; j < 100; j++) {\r
+                                       Thread.sleep(10);\r
+                               }\r
+                               logger.info("Progress {}", getJobRuntime().getProgress());\r
+                       }\r
+\r
+               } catch (Exception e) {\r
+                       logger.error(getMarker(), e.getMessage());\r
+                       throw e;\r
+               }\r
+               Object[] result = Arrays.asList(10, 20).toArray();\r
+               logger.info("Returning {}, {}", result[0], result[1]);\r
+               return result;\r
+       }\r
+\r
+}\r
index 799e5abf2b518f2ce074af368e59df2e45065885..aa9d1540cfdb941afc6e433fd95cafee804760d9 100644 (file)
@@ -10,12 +10,9 @@ public class FakeStep extends JobStep {
        private int count = 10;\r
 \r
        @StepEntry\r
-       public Object[] execute(long itemID, Iterable<?> iter) throws Exception {\r
+       public Object[] execute(long itemID) throws Exception {\r
 \r
-               if (getJobRuntime().getSpawnOrder() == 0) {\r
-                       count = 5;\r
-               }\r
-               logger.info(getMarker(), "Starting params: {}, {}, spawnOrder {}", itemID, iter, getJobRuntime().getSpawnOrder());\r
+               logger.info(getMarker(), "Starting params: {}");\r
 \r
                try {\r
                        int step = 100 / count;\r
@@ -23,10 +20,10 @@ public class FakeStep extends JobStep {
                                if (!canContinue())\r
                                        break;\r
                                setProgress((i + 1) * step);\r
-                               for (int j = 0; j < 100; j++) {\r
-                                       Thread.sleep(10);\r
+                               for (int j = 0; j < 10; j++) {\r
+                                       Thread.sleep(1);\r
                                }\r
-                               logger.info("Progress {}", getJobRuntime().getProgress());\r
+                               //logger.info("{} Progress {}, p{}", getJobRuntime().getId(), getJobRuntime().getProgress(), getJobRuntime().getPriority());\r
                        }\r
 \r
                } catch (Exception e) {\r
@@ -34,7 +31,7 @@ public class FakeStep extends JobStep {
                        throw e;\r
                }\r
                Object[] result = Arrays.asList(10, 20).toArray();\r
-               logger.info("Returning {}, {}", result[0], result[1]);\r
+               //logger.info("Returning {}, {}", result[0], result[1]);\r
                return result;\r
        }\r
 \r
index 7bcd3176ad3dab68f84d1c56d445381d306e8135..b5df0ee4c9d516e404d5f4bca34cbe18c516a565 100644 (file)
@@ -357,6 +357,10 @@ public class HSMMigrateStep extends JobStep {
                                if (targetLength > sourceLength) {\r
                                        throw new Exception("Hiba! A fájl túl nagy lett.");\r
                                }\r
+\r
+                               if (getJobRuntime().isWaitingCancel()) {\r
+                                       break;\r
+                               }\r
                        }\r
 \r
                        targetLength = targetFile.length();\r
index ae15789fd5314b6c0ead23ca1924824cf3162212..fc1a8e0783f76562b059d5159b5ed5d838811049 100644 (file)
@@ -43,12 +43,14 @@ public class NEXIOCheckerStep extends JobStep {
                        controller = new Controller(storeUri.getRootPath(), storeUri.getPortNumber());\r
                        controller.connect();\r
                        Mediabase mediabase = controller.getMediabase();\r
-                       //SimpleDateFormat df = new SimpleDateFormat("yyyy.MM.dd HH:mm:ss");\r
                        Iterator<Clip> clips = mediabase.getClips();\r
                        int count = limit;\r
                        while (clips.hasNext()) {\r
+\r
+                               if (getJobRuntime().isWaitingCancel())\r
+                                       break;\r
+\r
                                Clip clip = clips.next();\r
-                               //String id = clip.getId().get();\r
                                String title = clip.getXid().get();\r
                                Timestamp modified = Timestamp.from(clip.getModifiedTimestamp().toInstant());\r
                                Timestamp created = Timestamp.from(clip.getRecordDateTimestamp().toInstant());\r
index 5539265f69e57a108a8bec7b1f6fbf5f09684680..5a9dbfb84e9a58ae61d59972720b56e0169ac0c7 100644 (file)
@@ -27,7 +27,7 @@ public interface IJob extends IEntityBase {
 \r
        String getTemplate();\r
 \r
-       void IncrementPriority();\r
+       void incrementPriority();\r
 \r
        void NotifyUpdate();\r
 \r
index 87eeb59bde27b5e02322777b35be883bfd8c0c92..1bd10cd28d3c8bb018447fc3225acd46eee2b845 100644 (file)
@@ -41,12 +41,13 @@ public class Job extends Syncable implements IJob, Comparable<IJob> {
         * 1. szempont prioritas 2. szempont azonos prioritasnal a rogzites datuma\r
         */\r
        @Override\r
-       public int compareTo(IJob job0) {\r
-               int ret = (this.priority - job0.getPriority());\r
-               if ((ret == 0) && (this.submitted != null) && (job0.getSubmitted() != null)) {\r
-                       ret = (int) (job0.getSubmitted().getTime() - this.submitted.getTime());\r
-               }\r
-               return ret;\r
+       public int compareTo(IJob job) {\r
+               int res = 0;\r
+               if (getPriority() == job.getPriority())\r
+                       res = getSubmitted().getTime() < job.getSubmitted().getTime() ? -1 : 1;\r
+               else\r
+                       res = (getPriority() > job.getPriority() ? -1 : 1);\r
+               return res;\r
        }\r
 \r
        @Override\r
@@ -108,7 +109,7 @@ public class Job extends Syncable implements IJob, Comparable<IJob> {
        }\r
 \r
        @Override\r
-       public void IncrementPriority() {\r
+       public void incrementPriority() {\r
                priority++;\r
        }\r
 \r
@@ -147,7 +148,9 @@ public class Job extends Syncable implements IJob, Comparable<IJob> {
 \r
        @Override\r
        public void setPriority(int priority) {\r
-               this.priority = priority;\r
+               if (this.priority != priority) {\r
+                       this.priority = priority;\r
+               }\r
        }\r
 \r
        @Override\r
diff --git a/server/user.jobengine.osgi.commons/src/user/commons/cluster/ClusteredJob.java b/server/user.jobengine.osgi.commons/src/user/commons/cluster/ClusteredJob.java
new file mode 100644 (file)
index 0000000..ca9e681
--- /dev/null
@@ -0,0 +1,82 @@
+package user.commons.cluster;\r
+\r
+import java.sql.Timestamp;\r
+\r
+import user.commons.JobStatus;\r
+\r
+public class ClusteredJob {\r
+\r
+       private long id;\r
+       private String name;\r
+       private String description;\r
+       private String template;\r
+       private Object[] inputs;\r
+       private Timestamp submitted;\r
+       private JobStatus status;\r
+       private int progress;\r
+\r
+       public String getDescription() {\r
+               return description;\r
+       }\r
+\r
+       public long getId() {\r
+               return id;\r
+       }\r
+\r
+       public Object[] getInputs() {\r
+               return inputs;\r
+       }\r
+\r
+       public String getName() {\r
+               return name;\r
+       }\r
+\r
+       public int getProgress() {\r
+               return progress;\r
+       }\r
+\r
+       public JobStatus getStatus() {\r
+               return status;\r
+       }\r
+\r
+       public Timestamp getSubmitted() {\r
+               return submitted;\r
+       }\r
+\r
+       public String getTemplate() {\r
+               return template;\r
+       }\r
+\r
+       public void setDescription(String description) {\r
+               this.description = description;\r
+       }\r
+\r
+       public void setId(long id) {\r
+               this.id = id;\r
+       }\r
+\r
+       public void setInputs(Object[] inputs) {\r
+               this.inputs = inputs;\r
+       }\r
+\r
+       public void setName(String name) {\r
+               this.name = name;\r
+       }\r
+\r
+       public void setProgress(int progress) {\r
+               this.progress = progress;\r
+       }\r
+\r
+       public void setStatus(JobStatus status) {\r
+               this.status = status;\r
+       }\r
+\r
+       public void setSubmitted(Timestamp submitted) {\r
+               this.submitted = submitted;\r
+       }\r
+\r
+       public void setTemplate(String template) {\r
+               this.template = template;\r
+       }\r
+\r
+}\r
index 1710acba8747d2c031c9e7c4618e049efc071f4a..f1044180ee976c755373bff00a4179462cff2585 100644 (file)
@@ -44,7 +44,7 @@ public class MediaInfo {
 \r
                                if (decoder != null && decoder.getCodecType() == MediaDescriptor.Type.MEDIA_VIDEO) {\r
                                        videoStreamId = i;\r
-                                       frames = stream.getDuration();\r
+                                       frames = stream.getNumFrames();\r
                                        break;\r
                                }\r
                        }\r
index 9c5d0b98abdc01fce6d3a472bf04f9721b7e0e6a..71f0bd6b95ab6c8b5814bb2b344d9b7b750a9a4f 100644 (file)
@@ -77,7 +77,7 @@ public class ItemManagerData {
        }\r
 \r
        public enum SignalType {\r
-               CREATE(0), UPDATE(1), DELETE(2);\r
+               CREATE(0), UPDATE(1), DELETE(2), EXECUTE(3);\r
 \r
                private final long value;\r
 \r
index 21dfa92540ac3b16c3538026d0a9a38ffbf326cf..132eee5be2db4d7059decb9326584ca28f5bcff4 100644 (file)
@@ -46,5 +46,6 @@
        <classpathentry exported="true" kind="lib" path="WEB-INF/lib/zul.jar"/>\r
        <classpathentry exported="true" kind="lib" path="WEB-INF/lib/zuti.jar"/>\r
        <classpathentry exported="true" kind="lib" path="WEB-INF/lib/zweb.jar"/>\r
+       <classpathentry kind="lib" path="/-dependencies/target/repository/plugins/org.apache.logging.log4j.core_2.8.2.jar"/>\r
        <classpathentry kind="output" path="bin"/>\r
 </classpath>\r
diff --git a/server/user.jobengine.osgi.server/log/mediacube-err.log b/server/user.jobengine.osgi.server/log/mediacube-err.log
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/server/user.jobengine.osgi.server/log/mediacube.log b/server/user.jobengine.osgi.server/log/mediacube.log
new file mode 100644 (file)
index 0000000..e69de29
index eb54b61de8e0fb7940450d0dd3276b5428ef3d54..1fb0a2d2446746f88ee62261388fab7330b532f9 100644 (file)
@@ -1,4 +1,4 @@
-version=2.5.2\r
+version=2.6.0\r
 footer=2016-2020 © Copyright User Rendszerház Kft.\r
 \r
 login_info=Információ\r
diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/ClusteredJobRuntime.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/ClusteredJobRuntime.java
new file mode 100644 (file)
index 0000000..d3fce5d
--- /dev/null
@@ -0,0 +1,10 @@
+package user.jobengine.server;\r
+\r
+import user.commons.cluster.ClusteredJob;\r
+\r
+public class ClusteredJobRuntime extends JobRuntime {\r
+\r
+       public ClusteredJobRuntime(ClusteredJob job, IJobEngine jobEngine, IJobStatusChangedListener listener) {\r
+               super(job, jobEngine, listener);\r
+       }\r
+}\r
index 752bba977d33740e365e2c992b5d3efd6e4376b9..8c9a722af2416299bda588e1566c11cd00bf8d31 100644 (file)
@@ -3,6 +3,7 @@ package user.jobengine.server;
 import java.util.Map;
 
 import user.commons.Job;
+import user.commons.cluster.ClusteredJob;
 import user.jobengine.db.IItemManager;
 import user.jobengine.server.messagequeue.IUserMessageQueues;
 import user.jobengine.server.messages.IJobMessage;
@@ -10,6 +11,7 @@ import user.jobengine.server.scheduler.ScheduledJob;
 import user.jobengine.server.scheduler.SchedulerService;
 
 public interface IJobEngine {
+       static final int QUEUE_POLL_INTERVAL_MS = 1;
 
        void addJobChangedEventListener(IJobChangedListener listener);
 
@@ -19,6 +21,8 @@ public interface IJobEngine {
 
        void addToRunQueue(IJobRuntime jobRuntime);
 
+       void applyPriorityChange(IJobRuntime jobRuntime);
+
        void bindItemManagerService(IItemManager service);
 
        boolean deleteProgram(String fileName);
@@ -41,6 +45,8 @@ public interface IJobEngine {
 
        void executeSendMessageToUserInstruction(IJobRuntime jobRuntime);
 
+       void executeWaitForSpawnJobsInstruction(IJobRuntime jobRuntime);
+
        void fireJobChangedEvent(JobChangedEvent event);
 
        Map<String, IJobStepExecutor> getExecutors();
@@ -53,15 +59,13 @@ public interface IJobEngine {
 
        IJobRuntime getJobById(long jobId);
 
-       IJobRuntime getJobForRemote(String className) throws Exception;
-
        Map<Long, IJobRuntime> getJobs();
 
        IProgram getProgram(String name);
 
        Map<String, IProgram> getPrograms();
 
-       IJobRuntime getRemoteJob(String className);
+       JobEngineRemote getRemoteEngine();
 
        ScheduledJob getScheduledJob(String template);
 
@@ -109,9 +113,11 @@ public interface IJobEngine {
 
        void removeJobChangedEventListener(IJobChangedListener listener);
 
+       void removeSpanwChild(IJobRuntime jobRuntime);
+
        void removeSuspended();
 
-       void rePrioritization(IJobRuntime jobRuntime);
+       ClusteredJob requestJob(String className) throws Exception;
 
        void sendMessage(IJobMessage jobMessage);
 
@@ -125,6 +131,8 @@ public interface IJobEngine {
 
        void startup();
 
+       void storeJob(IJobRuntime runtime);
+
        IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template, String name, Map<String, Object> parameters)
                        throws JobEngineException;
 
index 5f2f6c930637010216eaee4cfe1999f99d601a42..c5d57844e31f6aa6e09f86eb724e79a266131e11 100644 (file)
@@ -9,21 +9,29 @@ import org.apache.logging.log4j.Marker;
 import user.commons.IEntityPersister;\r
 import user.commons.IJob;\r
 import user.commons.JobStatus;\r
+import user.jobengine.server.instructions.CallJobStepInstruction;\r
 import user.jobengine.server.instructions.IInstruction;\r
 \r
 public interface IJobRuntime extends IJob {\r
+\r
        void addChild(JobRuntime c);\r
 \r
        void addEventListener(IJobStatusChangedListener listener);\r
 \r
+       void addSpawnChild(IJobRuntime runtime);\r
+\r
        void addVariable(String name, Class<?> type);\r
 \r
        void arrangeStack();\r
 \r
        void cancelForkPrepare() throws InterruptedException;\r
 \r
+       boolean canContinueExecution();\r
+\r
        void checkStackParameter() throws RuntimeException, IllegalArgumentException;\r
 \r
+       IJobRuntime createCopy();\r
+\r
        void decrementInstructionPointer();\r
 \r
        public void done();\r
@@ -32,12 +40,18 @@ public interface IJobRuntime extends IJob {
 \r
        void forkWaitComplete() throws InterruptedException;\r
 \r
+       CallJobStepInstruction getCurrentCallJobStepInstruction();\r
+\r
        IInstruction getCurrentInstruction();\r
 \r
+       String getCurrentStep();\r
+\r
        Marker getFinishMarker();\r
 \r
        int getIp();\r
 \r
+       IJobEngine getJobEngine();\r
+\r
        Marker getMarker();\r
 \r
        IInstruction getNextInstruction() throws NoSuchElementException;\r
@@ -52,12 +66,16 @@ public interface IJobRuntime extends IJob {
 \r
        JobStatus getSavedStatus();\r
 \r
+       int getSpawnOrder();\r
+\r
        Stack<Object> getStack();\r
 \r
        Object getVariable(String name);\r
 \r
        Map<String, Object> getVariables();\r
 \r
+       int getWeight();\r
+\r
        boolean hasNextInstruction();\r
 \r
        void incrementProgress(int progress);\r
@@ -66,6 +84,8 @@ public interface IJobRuntime extends IJob {
 \r
        boolean isService();\r
 \r
+       boolean isWaitFinish();\r
+\r
        boolean isWaitingCancel();\r
 \r
        boolean isWaitingExecutor();\r
@@ -78,6 +98,8 @@ public interface IJobRuntime extends IJob {
 \r
        void removeEventListener(IJobStatusChangedListener listener);\r
 \r
+       void removeSpanwChild(long id);\r
+\r
        void reset();\r
 \r
        void restoreStack();\r
@@ -86,11 +108,15 @@ public interface IJobRuntime extends IJob {
 \r
        void saveStatus();\r
 \r
+       void setCurrentStep(String currentStep);\r
+\r
        @Override\r
        void setParameters(Map<String, Object> parameters);\r
 \r
        void setService(boolean isService);\r
 \r
+       void setSpawnOrder(int spawnOrder);\r
+\r
        void setVariable(String name, Object value);\r
 \r
        void swapStack();\r
index ac4a6c58ba5372998507eabe6e4e2bbf6a42ece3..5ee69913bde56537416778c5fabf367010c96616 100644 (file)
@@ -2,10 +2,11 @@ package user.jobengine.server;
 
 import java.util.concurrent.PriorityBlockingQueue;
 
+import user.commons.cluster.ClusteredJob;
 import user.jobengine.server.steps.IJobStep;
 
 /**
- * Folyamat l�p�s v�grehajt� interface.
+ * Folyamat lepes vegrehajto interface.
  */
 public interface IJobStepExecutor {
        static final String PROCESSING_LOCALLY = "Processing locally";
@@ -23,12 +24,14 @@ public interface IJobStepExecutor {
        PriorityBlockingQueue<IJobRuntime> getQueue();
 
        /**
-        * V�grehajt� l�p�s implement�ci�j�nak lek�rdez�se.
+        * Vegrehajte lepes implementaciojanak lekerdezese.
         *
-        * @return L�p�s implement�ci�.
+        * @return Lepes implementacio.
         */
        Class<IJobStep> getStepClass();
 
+       String getStepUnitName();
+
        boolean isRemoteEnabled();
 
        void revoke(IJobRuntime jobRuntime);
@@ -43,7 +46,7 @@ public interface IJobStepExecutor {
         */
        void startup(IJobEngine jobEngine) throws Exception;
 
-       IJobRuntime steelJob() throws InterruptedException;
+       ClusteredJob steelJob() throws InterruptedException;
 
        /**
         * Folyamat elhelyez�se a v�grehajt� v�rakoz�si sor�ba.
@@ -51,7 +54,7 @@ public interface IJobStepExecutor {
         * @param job
         *            Folyamat.
         */
-       void submit(IJobRuntime job);
+       void submit(IJobRuntime... job);
 
        void waitShutdown();
 
index 9eb30de7dfbf2e80ac42d3134b1062bc38a4c7e7..6519b314b8359d4de9ac493a770c36497d5ee3ad 100644 (file)
@@ -23,20 +23,18 @@ import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.ws.rs.core.Response;
-
+import org.apache.commons.lang.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.jboss.resteasy.client.jaxrs.ResteasyClient;
-import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;
-import org.jboss.resteasy.client.jaxrs.ResteasyWebTarget;
 
+import com.ibm.nosql.json.api.BasicDBList;
 import com.ibm.nosql.json.api.BasicDBObject;
 
 import user.commons.Job;
 import user.commons.JobStatus;
 import user.commons.RemoteFile;
 import user.commons.StoreUri;
+import user.commons.cluster.ClusteredJob;
 import user.commons.nosql.NoSQLUtils;
 import user.commons.remotestore.DirectoryUtils;
 import user.commons.remotestore.RemoteStoreProtocol;
@@ -48,6 +46,7 @@ import user.jobengine.server.actions.StatusMachine;
 import user.jobengine.server.ast.Encoder;
 import user.jobengine.server.ast.JobTemplate;
 import user.jobengine.server.ast.Parser;
+import user.jobengine.server.instructions.CallJobStepInstruction;
 import user.jobengine.server.instructions.IInstruction;
 import user.jobengine.server.messagequeue.IUserMessage;
 import user.jobengine.server.messagequeue.IUserMessageQueues;
@@ -57,7 +56,6 @@ import user.jobengine.server.messages.JobStepCompletedMessage;
 import user.jobengine.server.messages.UserReplyMessage;
 import user.jobengine.server.scheduler.ScheduledJob;
 import user.jobengine.server.scheduler.SchedulerService;
-import user.jobengine.server.steps.IJobStep;
 import user.tsm.client.TSMClient;
 import user.tsm.client.TSMException;
 
@@ -78,20 +76,20 @@ public class JobEngine implements IJobEngine {
 
                        while (!shutdown) {
                                try {
-                                       IJobMessage message = messageQueue.poll(50, TimeUnit.MILLISECONDS);
-                                       if (message != null) {
+                                       IJobMessage message = messageQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+                                       if (message != null)
                                                message.process(JobEngine.this);
-                                       }
+
                                } catch (InterruptedException e) {
                                        shutdown = true;
                                }
                        }
+                       //a leallitas utan az osszes fuggo uzenet vegrehajtasa
                        while (!messageQueue.isEmpty()) {
                                try {
-                                       IJobMessage message = messageQueue.poll(50, TimeUnit.MILLISECONDS);
-                                       if (message != null) {
+                                       IJobMessage message = messageQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+                                       if (message != null)
                                                message.process(JobEngine.this);
-                                       }
                                } catch (InterruptedException e) {
                                        shutdown = true;
                                }
@@ -120,14 +118,26 @@ public class JobEngine implements IJobEngine {
 
                        while (!shutdown) {
                                try {
-                                       IJobRuntime jobRuntime = runQueue.poll(10, TimeUnit.MILLISECONDS);
+                                       Thread.sleep(QUEUE_POLL_INTERVAL_MS);
+                                       //IJobRuntime jobRuntime = runQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+                                       IJobRuntime jobRuntime = runQueue.poll();
                                        if (jobRuntime != null) {
-                                               while (jobRuntime.hasNextInstruction() && jobRuntime.isRunable()) {
+                                               logger.debug("Processing {}", jobRuntime.getId());
+                                               //varakozo esetben vegrehajtjuk a kovetkezo utasitast
+                                               if (jobRuntime.hasNextInstruction() && jobRuntime.isWaitFinish()) {
                                                        ir = jobRuntime.getNextInstruction();
                                                        ir.execute(JobEngine.this, jobRuntime);
+                                               } else {
+                                                       //normal esetben elfutunk a kovetkezo job step-ig, vagy vegig
+                                                       while (jobRuntime.hasNextInstruction() && jobRuntime.isRunable()) {
+                                                               ir = jobRuntime.getNextInstruction();
+                                                               ir.execute(JobEngine.this, jobRuntime);
+                                                       }
                                                }
+
                                                if (!jobRuntime.hasNextInstruction() && jobRuntime.isRunable())
                                                        jobCleanup(jobRuntime);
+
                                        }
                                } catch (Exception e) {
                                        logger.error("Critical VM error!", e);
@@ -187,6 +197,7 @@ public class JobEngine implements IJobEngine {
        private List<IJobChangedListener> jobChangedListenerList = new CopyOnWriteArrayList<>();
        private Map<String, LocalDate> remoteWorkers;
        private String masterServerAddress = System.getProperty("jobengine.master.server", "");
+       private final JobEngineRemote remoteEngine;
 
        /**
         * A MAM motor példányosítása. Ennek során létrejönnek a várakozási sorok, az ütemező szál, az üzenet kezelő szál.
@@ -213,6 +224,10 @@ public class JobEngine implements IJobEngine {
 
                remoteWorkers = new ConcurrentHashMap<>();
                //logger.info("JobEngine created");
+               if (isWorker())
+                       remoteEngine = createRemoteEngine();
+               else
+                       remoteEngine = null;
        }
 
        public void activate() {
@@ -227,13 +242,13 @@ public class JobEngine implements IJobEngine {
 
        @Override
        public void addStepExecutor(IJobStepExecutor executor) {
-               Class<IJobStep> stepClass = executor.getStepClass();
-               String canonicalName = stepClass.getCanonicalName();
-               if (!executors.containsKey(canonicalName)) {
-                       executors.put(canonicalName, executor);
-                       logger.debug("Executor registered: " + stepClass);
+               //Class<IJobStep> stepClass = executor.getStepClass();
+               String unitName = executor.getStepUnitName();
+               if (!executors.containsKey(unitName)) {
+                       logger.info("Executor registered {}", unitName);
+                       executors.put(unitName, executor);
                } else
-                       logger.debug("Executor already registered: " + stepClass);
+                       logger.debug("Executor already registered {}", unitName);
 
        }
 
@@ -244,10 +259,15 @@ public class JobEngine implements IJobEngine {
                        if (typeName == null)
                                throw new Exception(jobRuntime.toString() + " illegal execution state detected: executor name is null.");
                        String executorName = String.valueOf(typeName);
-                       if (!executors.containsKey(executorName)) {
+                       if (!executors.containsKey(executorName))
                                throw new Exception(jobRuntime.toString() + " executor is unavailable: " + executorName);
-                       }
-                       executors.get(executorName).submit(jobRuntime);
+
+                       //a lista mindig letezik, de sima futtataskor csak 1 elemu, az aktualis van benne
+                       //ha van parhuzamos futtatas, akkor letrejonnek benne a step szellemek is, es meghivodik a fork
+                       List<IJobRuntime> jobs = spawnJobs(jobRuntime, executorName);
+                       executors.get(executorName).submit(jobs.toArray(new IJobRuntime[] {}));
+                       jobs.forEach(r -> fireJobChangedEvent(new JobChangedEvent(r, SignalType.EXECUTE)));
+
                } catch (Exception e) {
                        logger.catching(e);
                        suspendWaitExecutorJob(e, jobRuntime);
@@ -266,6 +286,41 @@ public class JobEngine implements IJobEngine {
                }
        }
 
+       /**
+        * Job prioritasa megvaltozott. Job main queue-ban es a JobStepExecutorok soraiban is ujra kell rendezni a jobokat. Az ujrarendezes a put-nal is megy
+        * automatikusan, mivel eltero prioritasu jobot dobhatnak be vagy default prioritasut a korabban atrendezett listaba.
+        *
+        * @param jobRuntime
+        */
+       @Override
+       public void applyPriorityChange(IJobRuntime jobRuntime) {
+               logger.info("rePrioritization start for {}", jobRuntime.getId());
+               //              synchronized(this.runQueue){
+
+               //job main queue reorder
+
+               if (this.runQueue.contains(jobRuntime)) {
+                       logger.info("runQueue");
+                       this.runQueue.remove(jobRuntime);
+                       try {
+                               this.runQueue.put(jobRuntime);
+                       } catch (InterruptedException e) {
+                       }
+               }
+
+               //JobStepExecutor reorder
+               if (this.executors != null) {
+                       for (IJobStepExecutor exec : executors.values()) {
+                               if (exec.containsRuntime(jobRuntime)) {
+                                       logger.info("executor");
+                                       exec.changePriority(jobRuntime);
+                               }
+                       }
+               }
+
+               //              }               logger.info("rePrioritization end");
+       }
+
        @Override
        public synchronized void bindItemManagerService(IItemManager service) {
                setItemManager(service);
@@ -288,6 +343,10 @@ public class JobEngine implements IJobEngine {
                return new ConcurrentHashMap<Long, IJobRuntime>();
        }
 
+       protected JobEngineRemote createRemoteEngine() {
+               return new JobEngineRemote(masterServerAddress);
+       }
+
        protected IStatusMachine createStatusMachine() {
                return new StatusMachine(this);
        }
@@ -319,9 +378,20 @@ public class JobEngine implements IJobEngine {
        public void executeAssignVariableInstruction(IJobRuntime jobRuntime) {
                Object value = jobRuntime.popFromStack();
                String name = (String) jobRuntime.popFromStack();
+
+               //a gyerek job-ok visszateresi erteket a spawn sorszammal kiegeszitve mentjuk
+               //TODO el kell tudni erni a kovetkezo lepesekbol ezeket az ertekeket
+               if (jobRuntime.getParentJobId() > 0) {
+                       IJobRuntime parentRuntime = getJobById(jobRuntime.getParentJobId());
+                       parentRuntime.setVariable(name + jobRuntime.getSpawnOrder(), value);
+               }
+
                jobRuntime.setVariable(name, value);
        }
 
+       /***
+        * Fuggetlen (beagyazott) alfolyamat letrehozasa
+        */
        @Override
        public void executeCallConcurrentJobStepInstruction(IJobRuntime jobRuntime, IProgram subProgram) {
                JobRuntime c = new JobRuntime(this, jobRuntime, subProgram);
@@ -379,6 +449,21 @@ public class JobEngine implements IJobEngine {
                getUserMessageQueues().addMessage(jobRuntime, catalogName, messageNumber, true, inputs);
        }
 
+       @Override
+       public void executeWaitForSpawnJobsInstruction(IJobRuntime jobRuntime) {
+               //logger.info("Processing {} {}", jobRuntime.getId(), jobRuntime.canContinueExecution());
+
+               if (jobRuntime.canContinueExecution()) {
+                       jobRuntime.setStatus(JobStatus.RUNABLE);
+               } else {
+                       if (!JobStatus.WAIT_FINISH.equals(jobRuntime.getStatus())) {
+                               jobRuntime.setStatus(JobStatus.WAIT_FINISH);
+                       }
+                       jobRuntime.decrementInstructionPointer();
+               }
+               addToRunQueue(jobRuntime);
+       }
+
        @Override
        public void fireJobChangedEvent(JobChangedEvent event) {
                for (IJobChangedListener listener : jobChangedListenerList) {
@@ -423,17 +508,6 @@ public class JobEngine implements IJobEngine {
                return submittedJobs.get(jobId);
        }
 
-       @Override
-       public IJobRuntime getJobForRemote(String className) throws Exception {
-               if (!executors.containsKey(className))
-                       throw new Exception("Unregistered executor request: " + className);
-
-               IJobStepExecutor executor = executors.get(className);
-               if (!executor.isRemoteEnabled())
-                       throw new Exception("Job is not registered for remote workers: " + className);
-               return executor.steelJob();
-       }
-
        @Override
        public Map<Long, IJobRuntime> getJobs() {
                return submittedJobs;
@@ -454,23 +528,8 @@ public class JobEngine implements IJobEngine {
        }
 
        @Override
-       public IJobRuntime getRemoteJob(String className) {
-               IJobRuntime result = null;
-               ResteasyClient client = new ResteasyClientBuilder().build();
-               ResteasyWebTarget target = client.target(masterServerAddress).path("/services/rest/cluster/getjob").queryParam("className", className);
-               Response response = null;
-               try {
-                       response = target.request().get();
-                       if (response.getEntity() instanceof IJobRuntime)
-                               result = (IJobRuntime) response.getEntity();
-               } catch (Exception e) {
-                       logger.error(e.getMessage());
-               } finally {
-                       logger.debug("Response status: {}", response.getStatus());
-                       if (response != null)
-                               response.close();
-               }
-               return result;
+       public JobEngineRemote getRemoteEngine() {
+               return remoteEngine;
        }
 
        @Override
@@ -535,9 +594,11 @@ public class JobEngine implements IJobEngine {
        }
 
        private void jobCleanup(IJobRuntime jobRuntime) {
+               logger.info("Cleanup {}", jobRuntime.getId());
                statusMachine.processAction(JobAction.FINISH, jobRuntime);
        }
 
+       @Deprecated
        @Override
        public void keepAliveWorker(String remoteAddr) {
                remoteWorkers.put(remoteAddr, LocalDate.now());
@@ -585,12 +646,13 @@ public class JobEngine implements IJobEngine {
                                throw new Exception("File not exists: " + filePath);
                        logger.info("Loading template: " + filePath);
                        stream = new FileInputStream(filePath);
+
                        Parser parser = new Parser(stream);
-                       Encoder encoder = new Encoder();
                        JobTemplate template = parser.parse();
                        template.validate();
                        template.setFileName(fileName);
 
+                       Encoder encoder = new Encoder();
                        IProgram program = (IProgram) encoder.visitJobTemplate(template, null);
 
                        if (programs.containsKey(fileName))
@@ -633,6 +695,7 @@ public class JobEngine implements IJobEngine {
                                try {
                                        String filePath = templateRoot + name;
                                        logger.info("Loading template: " + name);
+                                       //                                      System.out.println(name);
                                        stream = new FileInputStream(filePath);
                                        Parser parser = new Parser(stream);
                                        Encoder encoder = new Encoder();
@@ -670,11 +733,25 @@ public class JobEngine implements IJobEngine {
        public void processJobStepCompletedMessage(IJobMessage message) {
                // TODO cancel nem megy, valszeg itt van gubasz
                IJobRuntime jobRuntime = getJobById(message.getJobId());
+
+               if (jobRuntime.getParentJobId() > 0)
+                       removeSpanwChild(jobRuntime);
+
+               JobStepCompletedMessage m = (JobStepCompletedMessage) message;
+               //kesz vagyunk, jelezni
+               if (isWorker()) {
+                       statusMachine.processAction(JobAction.DONE, jobRuntime);
+                       return;
+               }
+
+               //a cancel hamarabb megjott?
+               //ha remote akkot tuti
                if (jobRuntime == null) {
-                       //a cancel hamarabb megjott?
+
                }
-               JobStepCompletedMessage m = (JobStepCompletedMessage) message;
-               putOutputsToStack(jobRuntime, m.getOutputs());
+               Object[] outputs = m.getOutputs();
+               putOutputsToStack(jobRuntime, outputs);
+
                statusMachine.processAction(JobAction.DONE, jobRuntime);
        }
 
@@ -761,6 +838,16 @@ public class JobEngine implements IJobEngine {
                }
        }
 
+       @Override
+       public void removeSpanwChild(IJobRuntime jobRuntime) {
+               IJobRuntime parent = getJobById(jobRuntime.getParentJobId());
+               if (parent == null)
+                       return;
+
+               parent.removeSpanwChild(jobRuntime.getId());
+
+       }
+
        @Override
        public void removeSuspended() {
                List<Long> removeId = new ArrayList<>();
@@ -772,37 +859,17 @@ public class JobEngine implements IJobEngine {
                        submittedJobs.remove(id);
        }
 
-       /**
-        * Job prioritasa megvaltozott. Job main queue-ban es a JobStepExecutorok soraiban is ujra kell rendezni a jobokat. Az ujrarendezes a put-nal is megy
-        * automatikusan, mivel eltero prioritasu jobot dobhatnak be vagy default prioritasut a korabban atrendezett listaba.
-        *
-        * @param jobRuntime
-        */
        @Override
-       public void rePrioritization(IJobRuntime jobRuntime) {
-               //              synchronized(this.runQueue){
-
-               //1. JobStepExecutor reorder
-               if (this.executors != null) {
-                       for (IJobStepExecutor exec : executors.values()) {
-                               if (exec.containsRuntime(jobRuntime)) {
-                                       exec.changePriority(jobRuntime);
-                               }
-                       }
-               }
+       public ClusteredJob requestJob(String className) throws Exception {
+               if (!executors.containsKey(className))
+                       throw new Exception("Unregistered executor request: " + className);
 
-               //2. job main queue reorder
-               if (this.runQueue.contains(jobRuntime)) {
-                       this.runQueue.remove(jobRuntime);
-                       try {
-                               this.runQueue.put(jobRuntime);
-                       } catch (InterruptedException e) {
-                               // TODO Auto-generated catch block
-                               e.printStackTrace();
-                       }
-               }
+               IJobStepExecutor executor = executors.get(className);
+               if (!executor.isRemoteEnabled())
+                       throw new Exception("Job is not registered for remote workers: " + className);
 
-               //              }
+               ClusteredJob job = executor.steelJob();
+               return job;
        }
 
        @Override
@@ -847,16 +914,50 @@ public class JobEngine implements IJobEngine {
                if (executors == null)
                        return;
                for (IJobStepExecutor executor : executors.values()) {
-                       logger.trace("Notify executor {}", executor.getStepClass());
+                       logger.trace("Notify executor {}", executor.getStepUnitName());
                        executor.shutdown();
                }
                for (IJobStepExecutor executor : executors.values()) {
-                       logger.info("Stopping executor {}", executor.getStepClass());
+                       logger.info("Stopping executor {}", executor.getStepUnitName());
                        executor.waitShutdown();
                }
 
        }
 
+       private List<IJobRuntime> spawnJobs(IJobRuntime jobRuntime, String executorName) throws InterruptedException {
+               List<IJobRuntime> result = new ArrayList<>();
+
+               CallJobStepInstruction currentInstruction = jobRuntime.getCurrentCallJobStepInstruction();
+               if (currentInstruction != null) {
+                       String forEach = currentInstruction.getForEach();
+                       if (StringUtils.isNotBlank(forEach)) {
+                               Object parameter = jobRuntime.getParameter(forEach);
+                               if (parameter == null)
+                                       parameter = jobRuntime.getVariable(forEach);
+
+                               //a sima array helyett ezt jobb hasznalni
+                               if (parameter != null && parameter instanceof BasicDBList) {
+
+                                       BasicDBList iter = (BasicDBList) parameter;
+                                       for (int i = 1; i < iter.size(); i++) {
+                                               IJobRuntime jobRuntimeCopy = new JobRuntime(jobRuntime);
+                                               jobRuntimeCopy.setSpawnOrder(i);
+                                               jobRuntimeCopy.add();
+
+                                               jobRuntime.addSpawnChild(jobRuntimeCopy);
+
+                                               storeJob(jobRuntimeCopy);
+                                               result.add(jobRuntimeCopy);
+                                       }
+                               }
+                       }
+
+               }
+
+               result.add(jobRuntime);
+               return result;
+       }
+
        @Override
        public void startup() {
                try {
@@ -888,6 +989,12 @@ public class JobEngine implements IJobEngine {
 
        }
 
+       @Override
+       public void storeJob(IJobRuntime runtime) {
+               submittedJobs.put(runtime.getId(), runtime);
+               logger.debug("+++ {} stored in VM ", runtime);
+       }
+
        private void submit(IJobRuntime runtime) {
                runtime.setSubmitted(new Timestamp(System.currentTimeMillis()));
                runtime.add();
diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngineRemote.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngineRemote.java
new file mode 100644 (file)
index 0000000..76ee445
--- /dev/null
@@ -0,0 +1,72 @@
+package user.jobengine.server;\r
+\r
+import javax.ws.rs.client.Client;\r
+import javax.ws.rs.client.Entity;\r
+import javax.ws.rs.client.WebTarget;\r
+import javax.ws.rs.core.MediaType;\r
+import javax.ws.rs.core.Response;\r
+import javax.ws.rs.core.Response.Status;\r
+\r
+import org.apache.logging.log4j.LogManager;\r
+import org.apache.logging.log4j.Logger;\r
+import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;\r
+\r
+import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider;\r
+\r
+import user.commons.cluster.ClusteredJob;\r
+import user.commons.rest.ServiceObjectMapper;\r
+\r
+public class JobEngineRemote {\r
+       private static final Logger logger = LogManager.getLogger();\r
+       private final Client client;\r
+       private final WebTarget root;\r
+\r
+       public JobEngineRemote(String masterServerAddress) {\r
+               //config\r
+               //https://www.programcreek.com/java-api-examples/?class=org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder&method=register\r
+               //trace\r
+               //https://docs.jboss.org/resteasy/docs/4.0.0.Final/userguide/html/Tracing_Feature.html\r
+\r
+               JacksonJaxbJsonProvider jaxbProvider = new JacksonJaxbJsonProvider(ServiceObjectMapper.getMapper(), JacksonJaxbJsonProvider.DEFAULT_ANNOTATIONS);\r
+               ResteasyClientBuilder builder = new ResteasyClientBuilder();\r
+               builder.register(jaxbProvider);\r
+               client = builder.build();\r
+               root = client.target(masterServerAddress);\r
+       }\r
+\r
+       public ClusteredJob getRemoteJob(String className) {\r
+               ClusteredJob result = null;\r
+               WebTarget target = root.path("/services/rest/cluster/getjob").queryParam("className", className);\r
+               Response response = null;\r
+               try {\r
+                       response = target.request().get();\r
+                       if (Status.OK.getStatusCode() != response.getStatus() && response.getEntity() instanceof ClusteredJob)\r
+                               result = (ClusteredJob) response.getEntity();\r
+               } catch (Exception e) {\r
+                       logger.error(e.getMessage());\r
+               } finally {\r
+                       logger.debug("Response status: {}", response.getStatus());\r
+                       if (response != null)\r
+                               response.close();\r
+               }\r
+               return result;\r
+       }\r
+\r
+       public void reportJobStatus(ClusteredJob job) {\r
+               WebTarget target = root.path("/services/rest/cluster/notifyjob");\r
+               Response response = null;\r
+               try {\r
+                       //response = target.request().post(Entity.entity(mapper.writeValueAsString(job), MediaType.APPLICATION_JSON));\r
+                       response = target.request().post(Entity.entity(job, MediaType.APPLICATION_JSON));\r
+                       if (Status.OK.getStatusCode() != response.getStatus())\r
+                               throw new Exception("Unexpected reponse occured");\r
+               } catch (Exception e) {\r
+                       logger.error(e.getMessage());\r
+               } finally {\r
+                       logger.debug("Response status: {}", response.getStatus());\r
+                       if (response != null)\r
+                               response.close();\r
+               }\r
+       }\r
+\r
+}\r
index 33a4b99bfc342094638ae8fa3043181e7af9ff67..7af28c3a186383a36bce33913d7fe665051ebabe 100644 (file)
@@ -28,6 +28,7 @@ import user.commons.Job;
 import user.commons.JobStatus;
 import user.commons.MediaCubeFinishMarker;
 import user.commons.MediaCubeMarker;
+import user.commons.cluster.ClusteredJob;
 import user.jobengine.db.ItemManagerData.SignalType;
 import user.jobengine.db.JobParameters;
 import user.jobengine.server.instructions.CallJobStepInstruction;
@@ -42,34 +43,51 @@ public class JobRuntime extends Job implements IJobRuntime {
        private Stack<Object> stack = null;
        private Stack<Object> savedStack = null;
        private int ip;
-       private final EventListenerList listeners;
+       private final EventListenerList listeners = new EventListenerList();
        private final Map<String, Object> variables;
        private Map<String, Object> parameters;
        private JobStatus savedStatus;
        private final IJobEngine jobEngine;
        private double currentProgress;
-       private int runtimeWeight;
-       private List<JobRuntime> children;
+       private int weight;
        private MediaCubeMarker sessionMarker;
        private MediaCubeMarker finishMarker;
        private boolean isService;
        private Semaphore forkSempahore;
-       private IJobChangedListener jobChangedListener;
+       private IJobChangedListener spawnJobListener;
+       private String currentStep;
+       //Fuggetlen (beagyazott) alfolyamatok
+       private List<JobRuntime> children;
+       //Parhuzamosan futtatot komplett job-ok, amiket bevarunk, ha a megfelelo step hivja + a forEach parhuzamositas
        private List<Long> childrenIDs;
+       private int spawnOrder;
 
        public JobRuntime() {
-               this.listeners = new EventListenerList();
                variables = null;
                jobEngine = null;
                program = null;
                status = JobStatus.RUNABLE;
        }
 
+       public JobRuntime(ClusteredJob job, IJobEngine jobEngine, IJobStatusChangedListener listener) {
+               this.jobEngine = jobEngine;
+               this.persister = jobEngine.getItemManager();
+               variables = null;
+               program = null;
+               name = job.getName();
+               description = job.getDescription();
+               submitted = job.getSubmitted();
+               template = job.getTemplate();
+               status = JobStatus.RUNABLE;
+               //CREATE notifikacio miatt
+               setId(job.getId());
+               addEventListener(listener);
+       }
+
        public JobRuntime(IJob job) {
                variables = null;
                jobEngine = null;
                program = null;
-               listeners = null;
                id = job.getId();
                description = job.getDescription();
                name = job.getName();
@@ -85,7 +103,6 @@ public class JobRuntime extends Job implements IJobRuntime {
        public JobRuntime(IJobEngine jobEngine, IJobRuntime runtime, IProgram program) {
                this.program = program;
                this.jobEngine = jobEngine;
-               this.listeners = new EventListenerList();
                this.submitted = new Timestamp(System.currentTimeMillis());
                this.stack = new Stack<Object>();
                this.status = JobStatus.RUNABLE;
@@ -99,7 +116,6 @@ public class JobRuntime extends Job implements IJobRuntime {
                if (program == null)
                        throw new NullPointerException("program");
                this.jobEngine = jobEngine;
-               this.listeners = new EventListenerList();
                this.program = program;
                this.stack = new Stack<Object>();
                this.variables = new HashMap<String, Object>();
@@ -111,7 +127,6 @@ public class JobRuntime extends Job implements IJobRuntime {
                        throw new NullPointerException("program");
                this.jobEngine = jobEngine;
                this.ip = 0;
-               this.listeners = new EventListenerList();
                this.program = program;
                this.stack = new Stack<Object>();
                this.variables = new HashMap<String, Object>();
@@ -120,6 +135,30 @@ public class JobRuntime extends Job implements IJobRuntime {
                fromJob(job);
        }
 
+       /***
+        * Parhuzamosan es blokkoltan futtatando lepesek letrehozasa
+        *
+        * @param runtime
+        */
+       @SuppressWarnings("unchecked")
+       public JobRuntime(IJobRuntime runtime) {
+               this.program = new Program(runtime.getProgram());
+               this.jobEngine = runtime.getJobEngine();
+               this.submitted = runtime.getSubmitted();
+               this.stack = (Stack<Object>) runtime.getStack().clone();
+               this.status = runtime.getStatus();
+               this.ip = runtime.getIp();
+               this.variables = new HashMap<>(runtime.getVariables());
+               this.parameters = new HashMap<>(runtime.getParameters());
+               this.persister = runtime.getPersister();
+               this.template = runtime.getTemplate();
+               this.weight = runtime.getWeight();
+               this.name = runtime.getName();
+               this.owner = runtime.getOwner();
+               program.removeAfter(getIp());
+               //logger.info("Program {}", program);
+       }
+
        @Override
        public void add() {
                IJob job = toJob();
@@ -145,6 +184,14 @@ public class JobRuntime extends Job implements IJobRuntime {
                listeners.add(IJobStatusChangedListener.class, listener);
        }
 
+       @Override
+       public void addSpawnChild(IJobRuntime runtime) {
+               if (childrenIDs == null)
+                       childrenIDs = Collections.synchronizedList(new ArrayList<>());
+               childrenIDs.add(runtime.getId());
+               runtime.setParentJobId(id);
+       }
+
        @Override
        public void addVariable(String name, Class<?> type) {
                if (variables.containsKey(name))
@@ -171,6 +218,11 @@ public class JobRuntime extends Job implements IJobRuntime {
                        forkSempahore.release();
        }
 
+       @Override
+       public boolean canContinueExecution() {
+               return childrenIDs == null || childrenIDs.size() == 0;
+       }
+
        @Override
        public void checkStackParameter() throws RuntimeException, IllegalArgumentException {
                Class<?> requiredType = (Class<?>) popFromStack();
@@ -189,6 +241,11 @@ public class JobRuntime extends Job implements IJobRuntime {
 
        }
 
+       @Override
+       public IJobRuntime createCopy() {
+               return null;
+       }
+
        @Override
        public void decrementInstructionPointer() {
                if (this.ip == 0)
@@ -207,11 +264,11 @@ public class JobRuntime extends Job implements IJobRuntime {
        public boolean forkPrepare() throws InterruptedException {
                boolean result = false;
                forkSempahore = new Semaphore(1);
-               if (jobChangedListener == null) {
-                       logger.info("Preparing fork");
+               if (spawnJobListener == null) {
+                       logger.info("Preparing spawn");
                        childrenIDs = Collections.synchronizedList(new ArrayList<>());
 
-                       jobChangedListener = event -> {
+                       spawnJobListener = event -> {
                                IJobRuntime child = event.getJob();
                                if (event.getSignalType().equals(SignalType.CREATE)) {
                                        if (child.getParentJobId() == getId()) {
@@ -231,7 +288,7 @@ public class JobRuntime extends Job implements IJobRuntime {
                                        if (childrenIDs.size() == 0)
                                                forkSempahore.release();
                                }
-                               if (!jobEngine.isRunning())
+                               if (!getJobEngine().isRunning())
                                        forkSempahore.release();
 
                                //A gyerek(ek) el sem indultak, pl. nem letezik a template
@@ -240,7 +297,7 @@ public class JobRuntime extends Job implements IJobRuntime {
 
                        };
                        logger.info("Adding job changed listener");
-                       jobEngine.addJobChangedEventListener(jobChangedListener);
+                       getJobEngine().addJobChangedEventListener(spawnJobListener);
                        result = true;
                        forkSempahore.acquire();
                } else {
@@ -252,13 +309,16 @@ public class JobRuntime extends Job implements IJobRuntime {
 
        @Override
        public void forkWaitComplete() throws InterruptedException {
-               logger.info("Waiting for semaphore" + forkSempahore);
+               //atlagos mukodes
+               if (forkSempahore == null)
+                       return;
+               logger.info("Waiting for semaphore {}", forkSempahore);
                forkSempahore.acquire();
                logger.info("Removing job changed listener");
-               if (jobEngine.isRunning()) {
+               if (getJobEngine().isRunning()) {
                        logger.info("Removing job changed listener");
-                       jobEngine.removeJobChangedEventListener(jobChangedListener);
-                       jobChangedListener = null;
+                       getJobEngine().removeJobChangedEventListener(spawnJobListener);
+                       spawnJobListener = null;
                        childrenIDs = null;
                } else {
                        logger.info("Instruction pointer repositioned");
@@ -282,9 +342,40 @@ public class JobRuntime extends Job implements IJobRuntime {
                parametersFromByteArray();
        }
 
+       @Override
+       public CallJobStepInstruction getCurrentCallJobStepInstruction() {
+               CallJobStepInstruction result = null;
+
+               int i = getIp();
+               if (i < program.getInstructionsCount()) {
+                       while (true) {
+                               IInstruction instruction = program.get(i);
+                               if (instruction instanceof CallJobStepInstruction) {
+                                       result = (CallJobStepInstruction) instruction;
+                                       break;
+                               }
+
+                               i--;
+                               if (i < 0)
+                                       break;
+                       }
+
+               }
+
+               return result;
+       }
+
        @Override
        public IInstruction getCurrentInstruction() {
-               return program.get(getIp());
+               IInstruction result = null;
+               if (getIp() < program.getInstructionsCount())
+                       result = program.get(getIp());
+               return result;
+       }
+
+       @Override
+       public String getCurrentStep() {
+               return currentStep;
        }
 
        @Override
@@ -299,6 +390,11 @@ public class JobRuntime extends Job implements IJobRuntime {
                return ip;
        }
 
+       @Override
+       public IJobEngine getJobEngine() {
+               return jobEngine;
+       }
+
        /***
         * Log session marker. A teljes folyamat osszes naplobejegyzese osszegyujtheto a segitsegevel. MediaCubeMarker tipusu, folyamatonkent uj peldany jon letre.
         */
@@ -356,6 +452,11 @@ public class JobRuntime extends Job implements IJobRuntime {
                return savedStatus;
        }
 
+       @Override
+       public int getSpawnOrder() {
+               return spawnOrder;
+       }
+
        @Override
        public Stack<Object> getStack() {
                return stack;
@@ -381,23 +482,35 @@ public class JobRuntime extends Job implements IJobRuntime {
                return variables;
        }
 
+       @Override
+       public int getWeight() {
+               return weight;
+       }
+
        @Override
        public boolean hasNextInstruction() {
                boolean result = false;
-               if (program.getInstructionsCount() > 0)
+               if (program != null && program.getInstructionsCount() > 0)
                        result = (this.ip == (program.getInstructionsCount())) ? false : true;
                return result;
        }
 
        @Override
        public void incrementProgress(int progress) {
+               //remote ghost
+               if (program == null) {
+                       setProgress(progress);
+                       NotifyUpdate();
+                       return;
+               }
+
                List<IInstruction> instructions = program.getInstructions();
                IInstruction currentInstruction = program.get(ip - 1);
-               if (runtimeWeight == 0) {
+               if (getWeight() == 0) {
                        for (IInstruction instruction : instructions)
                                if (instruction.getClass().equals(CallJobStepInstruction.class)) {
                                        int weight = instruction.getWeight();
-                                       runtimeWeight += weight;
+                                       this.weight = getWeight() + weight;
                                }
                }
                currentProgress = 0;
@@ -406,9 +519,9 @@ public class JobRuntime extends Job implements IJobRuntime {
                        if (instruction.getClass().equals(CallJobStepInstruction.class)) {
                                if (instruction == currentInstruction)
                                        break;
-                               currentProgress += (double) instruction.getWeight() * 100 / runtimeWeight;
+                               currentProgress += (double) instruction.getWeight() * 100 / getWeight();
                        }
-               double currentDelta = (double) currentWeight * progress / runtimeWeight;
+               double currentDelta = (double) currentWeight * progress / getWeight();
                currentProgress = Math.ceil(currentProgress + currentDelta);
 
                if (currentProgress - getProgress() > 4 || currentProgress == 100) {
@@ -423,7 +536,7 @@ public class JobRuntime extends Job implements IJobRuntime {
 
        @Override
        public boolean isRunable() {
-               return (status == JobStatus.RUNABLE) ? true : false;
+               return JobStatus.RUNABLE.equals(status);
        }
 
        private boolean isRuntimeAssignable(Class<?> fromType, Class<?> toType) {
@@ -438,6 +551,11 @@ public class JobRuntime extends Job implements IJobRuntime {
                return isService;
        }
 
+       @Override
+       public boolean isWaitFinish() {
+               return JobStatus.WAIT_FINISH.equals(status);
+       }
+
        @Override
        public boolean isWaitingCancel() {
                return getStatus() == JobStatus.WAIT_CANCEL;
@@ -518,6 +636,11 @@ public class JobRuntime extends Job implements IJobRuntime {
                listeners.remove(IJobStatusChangedListener.class, listener);
        }
 
+       @Override
+       public void removeSpanwChild(long id) {
+               childrenIDs.remove(id);
+       }
+
        @Override
        public void reset() {
                currentProgress = 0;
@@ -545,11 +668,28 @@ public class JobRuntime extends Job implements IJobRuntime {
                savedStatus = status;
        }
 
+       @Override
+       public void setCurrentStep(String currentStep) {
+               this.currentStep = currentStep;
+       }
+
        @Override
        public void setDescription(String description) {
                super.setDescription(description);
        }
 
+       /*
+               private final EventListenerList listeners = new EventListenerList();
+               private List<JobRuntime> children;
+               private MediaCubeMarker sessionMarker;
+               private MediaCubeMarker finishMarker;
+               private boolean isService;
+               private Semaphore forkSempahore;
+               private IJobChangedListener jobChangedListener;
+               private List<Long> childrenIDs;
+
+        * */
+
        @Override
        public void setId(long id) {
                super.setId(id);
@@ -573,6 +713,11 @@ public class JobRuntime extends Job implements IJobRuntime {
                this.isService = isService;
        }
 
+       @Override
+       public void setSpawnOrder(int spawnOrder) {
+               this.spawnOrder = spawnOrder;
+       }
+
        @Override
        public void setStatus(JobStatus status) {
                if (this.status != status) {
@@ -585,23 +730,12 @@ public class JobRuntime extends Job implements IJobRuntime {
 
        @Override
        public void setVariable(String name, Object value) {
-               /*
-                               //castnal elszall
-                               Class<?> type = null;
-                               try {
-                                       type = (Class<?>) getVariable(name);
-                               } catch (ClassCastException e) {
-                                       throw new IllegalStateException("multiple set");
-                               }
-                               if (value != null && !type.equals(value.getClass()))
-                                       throw new IllegalArgumentException("name " + name + " value " + value);
-               */
                variables.put(name, value);
        }
 
        private void signal(SignalType signalType) {
-               if (jobEngine != null)
-                       jobEngine.fireJobChangedEvent(new JobChangedEvent(this, signalType));
+               if (getJobEngine() != null)
+                       getJobEngine().fireJobChangedEvent(new JobChangedEvent(this, signalType));
        }
 
        @Override
@@ -633,4 +767,5 @@ public class JobRuntime extends Job implements IJobRuntime {
                job.setParentJobId(getParentJobId());
                return job;
        }
+
 }
index b992c3d3affc2e8921686935b72c3a9160563430..56275397fd05db1575301119dee92ceed7cc89b0 100644 (file)
@@ -1,6 +1,7 @@
 package user.jobengine.server;
 
 import java.net.URLClassLoader;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -10,19 +11,18 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.FrameworkUtil;
-import org.osgi.framework.wiring.BundleWiring;
 
+import groovy.lang.GroovyClassLoader;
 import user.commons.JobStatus;
+import user.commons.cluster.ClusteredJob;
 import user.jobengine.server.messages.JobStepCompletedMessage;
 import user.jobengine.server.messages.JobStepSkippedMessage;
 import user.jobengine.server.steps.IJobStep;
 
 public class JobStepExecutor implements IJobStepExecutor {
+
        private class Worker extends Thread {
-               private static final int WAIT_FOR_REMOTE = 30000;
+               private static final int WAIT_FOR_REMOTE = 3000;
                private volatile boolean shutdown = false;
 
                @Override
@@ -36,64 +36,70 @@ public class JobStepExecutor implements IJobStepExecutor {
                        IJobStep step = null;
                        while (true) {
                                try {
-                                       if (jobEngine.isWorker()) {
-                                               //a worker is csak azokat akarja vegrehajtani
-                                               if (isRemote) {
-                                                       Object o = jobEngine.getRemoteJob(className);
+                                       Thread.sleep(IJobEngine.QUEUE_POLL_INTERVAL_MS);
 
-                                                       if (o == null) {
-                                                               Thread.sleep(1000);
-                                                               continue;
-                                                       }
-                                                       jobRuntime = (IJobRuntime) o;
+                                       if (jobEngine.isWorker() && isRemote) {
+                                               //a worker is csak azokat akarja vegrehajtani
+                                               ClusteredJob job = jobEngine.getRemoteEngine().getRemoteJob(getStepUnitName());
 
-                                                       jobRuntime.setDescription(PROCESSING_REMOTLY);
-                                               }
-                                       } else {
-                                               //SessionUtil.getMediaCubeConfig().getJobQueuePollInterval()
-                                               jobRuntime = queue.poll(10, TimeUnit.MILLISECONDS);
-                                               if (jobRuntime == null && shutdown) {
-                                                       logger.trace("Shutting down");
-                                                       break;
-                                               }
-                                               if (jobRuntime == null)
-                                                       continue;
+                                               //TODO remote-ba jelezni, hogy nem sikerult
                                                if (shutdown) {
                                                        logger.trace("{} skipping by shutdown", jobRuntime);
                                                        jobEngine.sendMessage(new JobStepSkippedMessage(jobRuntime.getId()));
-                                                       continue;
+                                                       break;
                                                }
 
+                                               if (job != null) {
+                                                       //TODO set job accepted = PROCESSING_REMOTLY + if error then feedback?
+                                                       //jobRuntime.setDescription(PROCESSING_REMOTLY);
+                                                       jobRuntime = new ClusteredJobRuntime(job, jobEngine, e -> {
+                                                               IJobRuntime runtime = (IJobRuntime) e.getSource();
+                                                               job.setStatus(runtime.getStatus());
+                                                               job.setProgress(runtime.getProgress());
+                                                               jobEngine.getRemoteEngine().reportJobStatus(job);
+                                                       });
+                                                       jobEngine.storeJob(jobRuntime);
+
+                                                       Object[] inputs = job.getInputs();
+                                                       runStepObject(jobRuntime, inputs);
+                                               }
+                                       }
+
+                                       jobRuntime = queue.poll();
+
+                                       if (shutdown) {
+                                               logger.trace("Shutting down");
+                                               break;
+                                       }
+
+                                       if (jobRuntime != null) {
                                                long submitted = jobRuntime.getSubmitted().getTime();
                                                long current = System.currentTimeMillis();
                                                boolean timeout = current - submitted > WAIT_FOR_REMOTE;
+
+                                               //ha remote, de nem jelentkezik senki, akkor helyi vegrehajtas
                                                if (isRemote) {
                                                        if (timeout) {
                                                                logger.info("Remote JobStep timed out, processing locally.");
                                                        } else {
-                                                               //logger.info("JobStep is remote, waiting for remote processor");
-                                                               if (!WAIT_REMOTE_PROCESSOR.equals(jobRuntime.getDescription()))
-                                                                       jobRuntime.setDescription(WAIT_REMOTE_PROCESSOR);
+                                                               //                                                                      if (!WAIT_REMOTE_PROCESSOR.equals(jobRuntime.getDescription()))
+                                                               //                                                                              jobRuntime.setDescription(WAIT_REMOTE_PROCESSOR);
                                                                queue.put(jobRuntime);
-                                                               //skip local processor
                                                                continue;
                                                        }
                                                }
-                                               jobRuntime.setDescription(PROCESSING_LOCALLY);
+
+                                               logger.info("Executing locally {}", jobRuntime.getId());
+                                               //jobRuntime.setDescription(PROCESSING_LOCALLY);
+                                               Object[] inputs = jobEngine.getInputsFromStack(jobRuntime);
+                                               runStepObject(jobRuntime, inputs);
+                                       }
+
+                                       if (shutdown) {
+                                               logger.trace("Shutting down");
+                                               break;
                                        }
 
-                                       //processing locally
-                                       Object[] inputs = jobEngine.getInputsFromStack(jobRuntime);
-                                       Object[] outputs = null;
-                                       jobRuntime.setStatus(JobStatus.EXECUTING);
-                                       jobRuntime.NotifyUpdate();
-                                       step = createStepObject();
-                                       if (step == null)
-                                               throw new Exception("Step object is null");
-                                       logger.debug("{} executing", jobRuntime);
-                                       jobRuntime.IncrementPriority();
-                                       outputs = step.run(jobEngine, jobRuntime, inputs);
-                                       jobEngine.sendMessage(new JobStepCompletedMessage(jobRuntime.getId(), outputs));
                                } catch (Throwable e) {
                                        logger.error("Error in {}", jobRuntime);
                                        Throwable t = e.getCause() == null ? e : e.getCause();
@@ -109,6 +115,20 @@ public class JobStepExecutor implements IJobStepExecutor {
                        }
                }
 
+               private void runStepObject(IJobRuntime jobRuntime, Object[] inputs) throws Throwable {
+                       IJobStep step = createStepObject();
+                       if (step == null)
+                               throw new Exception("Step object is null");
+                       jobRuntime.setStatus(JobStatus.EXECUTING);
+                       jobRuntime.NotifyUpdate();
+                       logger.debug("{} executing", jobRuntime);
+                       jobRuntime.incrementPriority();
+                       Object[] outputs = step.run(jobEngine, jobRuntime, inputs);
+
+                       //TODO itt lekezelni a remote notification-t
+                       jobEngine.sendMessage(new JobStepCompletedMessage(jobRuntime.getId(), outputs));
+               }
+
                public void shutdown() {
                        this.shutdown = true;
                }
@@ -129,7 +149,7 @@ public class JobStepExecutor implements IJobStepExecutor {
        private CountDownLatch barrier;
        private Class<IJobStep> stepClass;
        private int maxConcurrent;
-       private String className;
+       private String stepUnitName;
        private boolean isRemote;
 
        public JobStepExecutor() {
@@ -149,10 +169,9 @@ public class JobStepExecutor implements IJobStepExecutor {
 
        @Override
        public void changePriority(IJobRuntime runtime) {
-               if (queue != null && (runtime != null)) {
-                       if (queue.remove(runtime)) {
+               if (queue != null && runtime != null) {
+                       if (queue.remove(runtime))
                                queue.put(runtime);
-                       }
                }
        }
 
@@ -167,20 +186,23 @@ public class JobStepExecutor implements IJobStepExecutor {
 
        @Override
        @SuppressWarnings("unchecked")
-       public void create(String className, int maxConcurrent, boolean isRemote) throws JobEngineException {
-               this.className = className;
+       public void create(String stepUnitName, int maxConcurrent, boolean isRemote) throws JobEngineException {
+               this.stepUnitName = stepUnitName;
                this.isRemote = isRemote;
-               logger = LogManager.getLogger(getClass().getSimpleName() + ":" + className);
-               logger.debug("Creating executor {}, instances {}", className, maxConcurrent);
-               if (StringUtils.isEmpty(className))
-                       throw new JobEngineException("Step class name can't be null.");
-
-               try {
-                       URLClassLoader loader = URLClassLoader.newInstance(DynamicClassLocator.makeURLs(), getClass().getClassLoader());
-                       stepClass = (Class<IJobStep>) loader.loadClass(className);
-               } catch (ClassNotFoundException e) {
-                       logger.catching(e);
-                       throw new JobEngineException("System can't load JobStep implementation: " + className);
+               logger = LogManager.getLogger(getClass().getSimpleName() + ":" + stepUnitName);
+               logger.debug("Creating executor {}, instances {}", stepUnitName, maxConcurrent);
+               if (StringUtils.isEmpty(stepUnitName))
+                       throw new JobEngineException("Step unit name can't be null.");
+
+               if (!isGroovyStep()) {
+                       //a groovy-nak nem kell
+                       try {
+                               URLClassLoader loader = URLClassLoader.newInstance(DynamicClassLocator.makeURLs(), getClass().getClassLoader());
+                               stepClass = (Class<IJobStep>) loader.loadClass(stepUnitName);
+                       } catch (ClassNotFoundException e1) {
+                               logger.catching(e1);
+                               throw new JobEngineException("System can't load JobStep implementation: " + stepUnitName);
+                       }
                }
 
                queue = new PriorityBlockingQueue<IJobRuntime>();
@@ -193,8 +215,16 @@ public class JobStepExecutor implements IJobStepExecutor {
                }
        }
 
-       protected IJobStep createStepObject() throws InstantiationException, IllegalAccessException {
-               return stepClass.newInstance();
+       protected IJobStep createStepObject() throws Exception {
+               IJobStep result = null;
+
+               if (isGroovyStep()) {
+                       GroovyClassLoader gcl = new GroovyClassLoader();
+                       Class myClass = gcl.parseClass(Paths.get(System.getProperty("jobengine.jobsteps.groovy.root", ""), stepUnitName).toFile());
+                       result = (IJobStep) myClass.newInstance();
+               } else
+                       result = stepClass.newInstance();
+               return result;
        }
 
        @Override
@@ -202,47 +232,25 @@ public class JobStepExecutor implements IJobStepExecutor {
                return maxConcurrent;
        }
 
-       private ClassLoader getParentClassLoader() {
-               ClassLoader parentClassLoader = getClass().getClassLoader();
-               Bundle bundle = FrameworkUtil.getBundle(getClass());
-               if (bundle != null) {
-                       BundleContext bundleContext = bundle.getBundleContext();
-                       if (bundleContext != null) {
-                               BundleWiring bundleWiring = bundle.adapt(BundleWiring.class);
-                               parentClassLoader = bundleWiring.getClassLoader();
-                       }
-               }
-               return parentClassLoader;
-       }
-
        @Override
        public PriorityBlockingQueue<IJobRuntime> getQueue() {
                return this.queue;
        }
 
-       // @Override
-       // public void synchronize() {
-       // if (priorityQueue.size() > 0 && workers.size() > queue.size()) {
-       // IJobRuntime jobRuntime = priorityQueue.poll();
-       // try {
-       // queue.put(jobRuntime);
-       // } catch (InterruptedException e) {
-       // e.printStackTrace();
-       // }
-       // }
-       // }
-
-       @SuppressWarnings("unchecked")
        @Override
        public Class<IJobStep> getStepClass() {
-               //TODO miért hozunk létre mindíg újat
-               //              if (stepClass != null) {
-               //                      DynamicClassLoader loader = new DynamicClassLoader(getClass().getClassLoader());
-               //                      stepClass = (Class<IJobStep>) loader.loadClass(stepClass.getCanonicalName());
-               //              }
                return stepClass;
        }
 
+       @Override
+       public String getStepUnitName() {
+               return stepUnitName;
+       }
+
+       private boolean isGroovyStep() {
+               return stepUnitName.toLowerCase().endsWith(".java");
+       }
+
        @Override
        public boolean isRemoteEnabled() {
                return isRemote;
@@ -250,9 +258,7 @@ public class JobStepExecutor implements IJobStepExecutor {
 
        @Override
        public void revoke(IJobRuntime jobRuntime) {
-               //synchronized(queue){
                queue.remove(jobRuntime);
-               //}
        }
 
        @Override
@@ -280,15 +286,30 @@ public class JobStepExecutor implements IJobStepExecutor {
        }
 
        @Override
-       public IJobRuntime steelJob() throws InterruptedException {
-               return queue.poll(5, TimeUnit.MILLISECONDS);
+       public ClusteredJob steelJob() throws InterruptedException {
+               ClusteredJob result = null;
+               IJobRuntime jobRuntime = queue.poll(IJobEngine.QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+
+               if (jobRuntime != null) {
+                       result = new ClusteredJob();
+                       result.setId(jobRuntime.getId());
+                       result.setName(jobRuntime.getName());
+                       result.setDescription(jobRuntime.getDescription());
+                       result.setTemplate(jobRuntime.getTemplate());
+                       Object[] inputs = jobEngine.getInputsFromStack(jobRuntime);
+                       result.setInputs(inputs);
+               }
+
+               return result;
        }
 
        @Override
-       public void submit(IJobRuntime jobRuntime) {
-               queue.put(jobRuntime);
-               jobRuntime.setDescription(stepClass.getSimpleName());
-               //logger.info("Executor got ! {}", jobRuntime);
+       public void submit(IJobRuntime... jobRuntime) {
+               for (IJobRuntime r : jobRuntime) {
+                       logger.info("Adding job {} to executor queue", r.getId());
+                       r.setCurrentStep(stepUnitName);
+                       queue.put(r);
+               }
        }
 
        @Override
@@ -296,5 +317,4 @@ public class JobStepExecutor implements IJobStepExecutor {
                for (Worker w : workers)
                        w.waitShutdown();
        }
-
 }
index f17572602f877aa6213477481a475d0c593c0bb6..213a0d11c73b6847a38fa9da81349d87057d99f2 100644 (file)
@@ -30,7 +30,7 @@ public class StatusMachine implements IStatusMachine {
                StatusMachineAction machineAction = new StatusMachineAction(jobAction, jobRuntime.getStatus());\r
                if (actions.containsKey(machineAction)) {\r
                        IJobStatusAction action = actions.get(machineAction);\r
-                       logger.info("{} status change {} -> {}", jobRuntime, jobRuntime.getStatus(), jobAction);\r
+                       logger.debug("{} changes {} -> {}", jobRuntime.getId(), jobRuntime.getStatus(), jobAction);\r
                        action.processAction(jobEngine, jobRuntime);\r
                } else {\r
                        logger.warn("No status processor registered for {} -> {}", jobAction, jobRuntime.getStatus());\r
index c3f0b6477b3e4affa1abadb264d349a18d8cdda6..e55498b0ed40d7d0c52fae295728997b65c66d3a 100644 (file)
@@ -40,8 +40,8 @@ public class JobListModel extends AsyncBaseModel implements IJobChangedListener
        @Command\r
        public void cancelJobs() {\r
                if (this.jobList.getSelection() != null) {\r
-                       for (IJobRuntime job : this.jobList.getSelection()) {\r
-                               jobEngine.sendMessage(new CancelRequest(job.getId()));\r
+                       for (IJobRuntime jobRuntime : this.jobList.getSelection()) {\r
+                               jobEngine.sendMessage(new CancelRequest(jobRuntime.getId()));\r
                        }\r
                }\r
        }\r
@@ -49,9 +49,9 @@ public class JobListModel extends AsyncBaseModel implements IJobChangedListener
        @Command\r
        public void changeJobsPriority() {\r
                if (this.jobList.getSelection() != null) {\r
-                       for (IJobRuntime job : this.jobList.getSelection()) {\r
-                               job.setPriority(newPriority);\r
-                               jobEngine.rePrioritization(job);\r
+                       for (IJobRuntime jobRuntime : this.jobList.getSelection()) {\r
+                               jobRuntime.setPriority(newPriority);\r
+                               jobEngine.applyPriorityChange(jobRuntime);\r
                        }\r
                }\r
        }\r
index e634fc715150e36830ca7d1efa60bd703da41abd..b44bf7026d13eae21ed47c09a7be5e775fc87987 100644 (file)
@@ -99,6 +99,9 @@ public class RetrieveBatchSelectorModel extends BaseModel {
 \r
                        IJobEngine jobEngine = JobEngine.getInstance();\r
                        ScheduledJob scheduledJob = jobEngine.getScheduledJob(JOBTEMPLATE);\r
+                       if (scheduledJob == null)\r
+                               throw new Exception("A sablon nem található: " + JOBTEMPLATE);\r
+\r
                        Map<String, Object> parameters = scheduledJob.getJobParameters();\r
                        parameters.put(HOUSEID, houseId);\r
                        parameters.put(RECIPIENT, email);\r
index b3b59398ab4725ee7f743fecd5e0ed6b9b1c436b..4564a71aabe101c0f7c3e8c7e565f85e476cc76d 100644 (file)
@@ -7,14 +7,17 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.sql.Timestamp;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Stack;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import user.commons.JobStatus;
+import user.commons.cluster.ClusteredJob;
 import user.jobengine.db.IItemManager;
 import user.jobengine.db.ItemManager;
 import user.jobengine.db.ItemManagerData.SignalType;
@@ -23,6 +26,7 @@ import user.jobengine.server.IJobRuntime;
 import user.jobengine.server.IJobStatusChangedListener;
 import user.jobengine.server.JobEngine;
 import user.jobengine.server.JobEngineException;
+import user.jobengine.server.JobEngineRemote;
 import user.jobengine.server.JobStatusChangedEvent;
 import user.jobengine.server.ThreadSynchronizer;
 
@@ -174,24 +178,58 @@ public class JobengineIT {
         * @throws Exception
         */
        @Test
-       public void remote() throws Exception {
+       public void remote_worker() throws Exception {
+               //ez barmi lehet
+               System.setProperty("jobengine.master.server", "http://localhost:8888");
                final ThreadSynchronizer sync = new ThreadSynchronizer();
-               final IJobEngine jobEngine = new JobEngine();
-               jobEngine.startup();
-               jobEngine.bindItemManagerService(manager);
-               /*
-                               Map<String, Object> parameters = new HashMap<>();
-                               parameters.put("itemID", 100);
-                               IJobRuntime runtime = jobEngine.submit(null, e -> {
-                                       if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus()))
-                                               sync.suspend();
-                               }, "fake.xml", "Fake", parameters);
-                               jobEngine.addJobChangedEventListener(e -> {
-                                       if (e.getSignalType().equals(SignalType.UPDATE)) {
-                                               System.out.println(e.getJob().getProgress());
+
+               ClusteredJob job = new ClusteredJob();
+               job.setId(1);
+               job.setName("Teszt");
+               job.setSubmitted(new Timestamp(System.currentTimeMillis()));
+               job.setTemplate("teszt.xml");
+               //itemId
+               job.setInputs(new Object[] { 100 });
+
+               final Stack<ClusteredJob> jobs = new Stack<>();
+               jobs.push(job);
+               final IJobEngine jobEngine = new JobEngine() {
+
+                       @Override
+                       protected JobEngineRemote createRemoteEngine() {
+                               return new JobEngineRemote("") {
+                                       @Override
+                                       public ClusteredJob getRemoteJob(String className) {
+                                               ClusteredJob job = null;
+                                               if (!jobs.isEmpty())
+                                                       job = jobs.pop();
+                                               return job;
+                                       }
+
+                                       @Override
+                                       public void reportJobStatus(ClusteredJob job) {
+                                               System.out.println("Report:" + job.getStatus());
                                        }
-                               });
-               */
+                               };
+                       }
+
+               };
+               jobEngine.bindItemManagerService(manager);
+               jobEngine.addJobChangedEventListener(e -> {
+                       //                      if (e.getSignalType().equals(SignalType.UPDATE)) {
+                       //                              System.out.println(e.getJob().getProgress());
+                       //                      }
+
+                       if (SignalType.CREATE.equals(e.getSignalType()))
+                               System.out.println(e.getSignalType());
+                       if (SignalType.UPDATE.equals(e.getSignalType())) {
+                               JobStatus status = e.getJob().getStatus();
+                               System.out.println(status);
+                               if (JobStatus.FINISHED.equals(status) || JobStatus.SUSPENDED.equals(status))
+                                       sync.suspend();
+                       }
+               });
+               jobEngine.startup();
                sync.waitSuspend();
                sync.resume();
                jobEngine.shutdown();
@@ -199,6 +237,14 @@ public class JobengineIT {
                //assertEquals(JobStatus.FINISHED, runtime.getStatus());
        }
 
+       @Test
+       public void reportJobStatus() throws Exception {
+               JobEngineRemote sut = new JobEngineRemote("http://localhost:8888");
+               ClusteredJob job = new ClusteredJob();
+               job.setId(1);
+               sut.reportJobStatus(job);
+       }
+
        /***
         * NEXIO adatok szinkronizalo folyamat futtatasa
         *
diff --git a/server/user.jobengine.osgi.server/test/user/jobengine/server/IT/ProrityChangeTests.java b/server/user.jobengine.osgi.server/test/user/jobengine/server/IT/ProrityChangeTests.java
new file mode 100644 (file)
index 0000000..bd2b99c
--- /dev/null
@@ -0,0 +1,187 @@
+package user.jobengine.server.IT;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileInputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import user.commons.JobStatus;
+import user.jobengine.db.IItemManager;
+import user.jobengine.db.ItemManager;
+import user.jobengine.db.ItemManagerData.SignalType;
+import user.jobengine.server.IJobEngine;
+import user.jobengine.server.IJobRuntime;
+import user.jobengine.server.JobEngine;
+
+public class ProrityChangeTests {
+       private static IItemManager manager;
+       private static IJobEngine jobEngine;
+       private static Map<String, Object> jobParams = new HashMap<>();
+       private static int JOB_COUNT = 10;
+
+       @BeforeClass
+       public static void initialize() throws Exception {
+               //Kornyezeti valtozok betoltese
+               Properties properties = new Properties();
+               URL srcLocation = ProrityChangeTests.class.getProtectionDomain().getCodeSource().getLocation();
+               URL location = new URL(srcLocation, "../../-configuration/mediacube-dev-user.properties");
+               properties.load(new FileInputStream(location.toURI().getPath().toString()));
+               System.getProperties().putAll(properties);
+
+               manager = new ItemManager();
+               manager.connect();
+
+               jobEngine = new JobEngine();
+               jobEngine.startup();
+               jobEngine.bindItemManagerService(manager);
+
+               jobParams.put("itemID", 100);
+       }
+
+       @AfterClass
+       public static void terminate() throws Exception {
+               jobEngine.shutdown();
+               manager.disconnect();
+       }
+
+       @Test
+       public void testAfterExecutorSubmitCompleted() throws Exception {
+               CountDownLatch startLatch = new CountDownLatch(JOB_COUNT);
+               CountDownLatch finishLatch = new CountDownLatch(JOB_COUNT);
+               List<IJobRuntime> runtimes = new ArrayList<>();
+               List<IJobRuntime> results = new ArrayList<>();
+
+               jobEngine.addJobChangedEventListener(e -> {
+                       if (e.getSignalType().equals(SignalType.EXECUTE))
+                               startLatch.countDown();
+               });
+
+               for (int i = 0; i < JOB_COUNT; i++) {
+                       IJobRuntime jobRuntime = jobEngine.submit(null, e -> {
+                               if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus())) {
+                                       results.add((IJobRuntime) e.getSource());
+                                       finishLatch.countDown();
+                               }
+                       }, "fake.xml", "Fake", jobParams);
+                       runtimes.add(jobRuntime);
+               }
+
+               startLatch.await();
+
+               IJobRuntime highPriorityJob = runtimes.get(runtimes.size() - 1);
+               highPriorityJob.incrementPriority();
+               jobEngine.applyPriorityChange(highPriorityJob);
+
+               finishLatch.await();
+
+               for (int i = 0; i < JOB_COUNT; i++)
+                       assertEquals(JobStatus.FINISHED, runtimes.get(i).getStatus());
+
+               assertFalse(runtimes.indexOf(highPriorityJob) == results.indexOf(highPriorityJob));
+
+       }
+
+       @Test
+       public void testAfterSubmitCompleted() throws Exception {
+               CountDownLatch startLatch = new CountDownLatch(JOB_COUNT);
+               CountDownLatch finishLatch = new CountDownLatch(JOB_COUNT);
+               List<IJobRuntime> runtimes = new ArrayList<>();
+               List<IJobRuntime> results = new ArrayList<>();
+
+               jobEngine.addJobChangedEventListener(e -> {
+                       if (e.getSignalType().equals(SignalType.CREATE))
+                               startLatch.countDown();
+               });
+
+               for (int i = 0; i < JOB_COUNT; i++) {
+                       IJobRuntime jobRuntime = jobEngine.submit(null, e -> {
+                               if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus())) {
+                                       results.add((IJobRuntime) e.getSource());
+                                       finishLatch.countDown();
+                               }
+                       }, "fake.xml", "Fake", jobParams);
+                       runtimes.add(jobRuntime);
+               }
+
+               startLatch.await();
+
+               IJobRuntime highPriorityJob = runtimes.get(runtimes.size() - 1);
+               highPriorityJob.incrementPriority();
+               jobEngine.applyPriorityChange(highPriorityJob);
+
+               finishLatch.await();
+
+               for (int i = 0; i < JOB_COUNT; i++)
+                       assertEquals(JobStatus.FINISHED, runtimes.get(i).getStatus());
+
+               assertFalse(runtimes.indexOf(highPriorityJob) == results.indexOf(highPriorityJob));
+
+       }
+
+       @Test
+       public void testUnderSubmit() throws Exception {
+               CountDownLatch finishLatch = new CountDownLatch(JOB_COUNT);
+               List<IJobRuntime> runtimes = new ArrayList<>();
+               List<IJobRuntime> results = new ArrayList<>();
+
+               for (int i = 0; i < JOB_COUNT; i++) {
+                       IJobRuntime jobRuntime = jobEngine.submit(null, e -> {
+                               if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus())) {
+                                       results.add((IJobRuntime) e.getSource());
+                                       finishLatch.countDown();
+                               }
+                       }, "fake.xml", "Fake", jobParams);
+                       runtimes.add(jobRuntime);
+               }
+
+               IJobRuntime highPriorityJob = runtimes.get(runtimes.size() - 1);
+               highPriorityJob.incrementPriority();
+               jobEngine.applyPriorityChange(highPriorityJob);
+
+               finishLatch.await();
+
+               for (int i = 0; i < JOB_COUNT; i++)
+                       assertEquals(JobStatus.FINISHED, runtimes.get(i).getStatus());
+
+               assertFalse(runtimes.indexOf(highPriorityJob) == results.indexOf(highPriorityJob));
+       }
+
+       @Test
+       public void testUnderSubmitEqualPriority() throws Exception {
+               CountDownLatch finishLatch = new CountDownLatch(JOB_COUNT);
+               List<IJobRuntime> runtimes = new ArrayList<>();
+               List<IJobRuntime> results = new ArrayList<>();
+
+               for (int i = 0; i < JOB_COUNT; i++) {
+                       IJobRuntime jobRuntime = jobEngine.submit(null, e -> {
+                               if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus())) {
+                                       results.add((IJobRuntime) e.getSource());
+                                       finishLatch.countDown();
+                               }
+                       }, "fake.xml", "Fake", jobParams);
+                       runtimes.add(jobRuntime);
+               }
+
+               IJobRuntime highPriorityJob = runtimes.get(runtimes.size() - 1);
+
+               finishLatch.await();
+
+               for (int i = 0; i < JOB_COUNT; i++)
+                       assertEquals(JobStatus.FINISHED, runtimes.get(i).getStatus());
+
+               assertTrue(runtimes.indexOf(highPriorityJob) == results.indexOf(highPriorityJob));
+       }
+
+}
index f6f24d460adadedafe2fc92b77e0639695749144..e4b67fa0c736e00cc007289b375a65d0aaa75217 100644 (file)
@@ -12,13 +12,6 @@ import org.junit.Test;
 import user.commons.IJob;
 
 public class PriorityEntryTest {
-       // private PriorityEntry<Long> firstEntry;
-       // private PriorityEntry<Long> secondEntry;
-       private JobRuntime firstEntry;
-       private JobRuntime secondEntry;
-       private JobRuntime thirdEntry;
-       private BlockingQueue<IJobRuntime> queue;
-
        private class JobComparator implements Comparator<IJob> {
 
                @Override
@@ -32,6 +25,14 @@ public class PriorityEntryTest {
 
        }
 
+       // private PriorityEntry<Long> firstEntry;
+       // private PriorityEntry<Long> secondEntry;
+       private JobRuntime firstEntry;
+       private JobRuntime secondEntry;
+       private JobRuntime thirdEntry;
+
+       private BlockingQueue<IJobRuntime> queue;
+
        @Before
        public void setup() {
                firstEntry = new JobRuntime();
@@ -58,18 +59,36 @@ public class PriorityEntryTest {
        }
 
        @Test
-       public void testPriorityEntry_IncrementPriority() throws Exception {
+       public void testPriorityEntry_incrementPriority() throws Exception {
+               // Fixture
+               queue.add(firstEntry);
+               queue.add(secondEntry);
+               queue.add(thirdEntry);
+
+               queue.remove(secondEntry);
+               secondEntry.incrementPriority();
+               queue.add(secondEntry);
+
+               // Exercise
+               IJobRuntime current = queue.poll();
+
+               // Verify
+               assertEquals(secondEntry, current);
+       }
+
+       @Test
+       public void testPriorityEntry_incrementPriority_async_poll() throws Exception {
                // Fixture
                queue.add(firstEntry);
                queue.add(secondEntry);
                queue.add(thirdEntry);
 
+               secondEntry.incrementPriority();
                queue.remove(secondEntry);
-               secondEntry.IncrementPriority();
                queue.add(secondEntry);
 
                // Exercise
-               IJobRuntime current = queue.take();
+               IJobRuntime current = queue.poll();
 
                // Verify
                assertEquals(secondEntry, current);
index 3ba06f8e25cd2372c0505d69e631709be3d864bd..673fd67a64d229597c77e8049e54cc70ff86a360 100644 (file)
@@ -2,6 +2,7 @@ package user.jobengine.osgi.mediacube;
 \r
 import javax.servlet.http.HttpServletRequest;\r
 import javax.ws.rs.GET;\r
+import javax.ws.rs.POST;\r
 import javax.ws.rs.Path;\r
 import javax.ws.rs.Produces;\r
 import javax.ws.rs.QueryParam;\r
@@ -12,6 +13,7 @@ import javax.ws.rs.core.Response;
 import org.apache.logging.log4j.LogManager;\r
 import org.apache.logging.log4j.Logger;\r
 \r
+import user.commons.cluster.ClusteredJob;\r
 import user.jobengine.osgi.rest.ComponentBinder;\r
 import user.jobengine.server.IJobEngine;\r
 \r
@@ -31,11 +33,11 @@ public class ClusterService {
        public Response getJob(@QueryParam("className") String className) {\r
                Response result = null;\r
                try {\r
-                       //IJobRuntime job = jobEngine.getJobForRemote(className);\r
-                       ClusteredJob j = new ClusteredJob();\r
-                       j.setId(100);\r
-                       j.setName("Jobname");\r
-                       result = Response.ok().entity(j).build();\r
+                       ClusteredJob job = jobEngine.requestJob(className);\r
+                       if (job == null)\r
+                               result = Response.noContent().build();\r
+                       else\r
+                               result = Response.ok().entity(job).build();\r
                } catch (Exception e) {\r
                        result = Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build();\r
                }\r
@@ -56,4 +58,17 @@ public class ClusterService {
                }\r
                return result;\r
        }\r
+\r
+       @POST\r
+       @Path("/notifyjob")\r
+       @Produces({ MediaType.APPLICATION_JSON })\r
+       public Response notifyJob(ClusteredJob job) {\r
+               Response result = null;\r
+               try {\r
+                       result = Response.ok().build();\r
+               } catch (Exception e) {\r
+                       result = Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build();\r
+               }\r
+               return result;\r
+       }\r
 }\r
diff --git a/server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusteredJob.java b/server/user.jobengine.osgi.services/src/user/jobengine/osgi/mediacube/ClusteredJob.java
deleted file mode 100644 (file)
index 85f2067..0000000
+++ /dev/null
@@ -1,23 +0,0 @@
-package user.jobengine.osgi.mediacube;\r
-\r
-public class ClusteredJob {\r
-       private long id;\r
-       private String name;\r
-\r
-       public long getId() {\r
-               return id;\r
-       }\r
-\r
-       public String getName() {\r
-               return name;\r
-       }\r
-\r
-       public void setId(long id) {\r
-               this.id = id;\r
-       }\r
-\r
-       public void setName(String name) {\r
-               this.name = name;\r
-       }\r
-\r
-}\r