git-tfs-id: [http://tfs.userrendszerhaz.hu:8080/tfs/DefaultCollection]$/MediaCube...
authorSweidan Omar <TFS\sweidan.omar>
Mon, 6 Dec 2021 12:30:36 +0000 (12:30 +0000)
committerSweidan Omar <TFS\sweidan.omar>
Mon, 6 Dec 2021 12:30:36 +0000 (12:30 +0000)
server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java

index 7857e8e19e03fd3959c6ecd23a6104d9bfdf5414..239059b9bee07626b9fbda7aae122d87271efe55 100644 (file)
@@ -99,7 +99,7 @@ public class JobEngine implements IJobEngine {
                                        shutdown = true;
                                }
                        }
-                       //a leallitas utan az osszes fuggo uzenet vegrehajtasa
+                       // a leallitas utan az osszes fuggo uzenet vegrehajtasa
                        while (!messageQueue.isEmpty()) {
                                try {
                                        IJobMessage message = messageQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
@@ -134,16 +134,17 @@ public class JobEngine implements IJobEngine {
                        while (!shutdown) {
                                try {
                                        Thread.sleep(QUEUE_POLL_INTERVAL_MS);
-                                       //IJobRuntime jobRuntime = runQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+                                       // IJobRuntime jobRuntime = runQueue.poll(QUEUE_POLL_INTERVAL_MS,
+                                       // TimeUnit.MILLISECONDS);
                                        IJobRuntime jobRuntime = runQueue.poll();
                                        if (jobRuntime != null) {
                                                logger.debug("Processing {}", jobRuntime.getId());
-                                               //varakozo esetben vegrehajtjuk a kovetkezo utasitast
+                                               // varakozo esetben vegrehajtjuk a kovetkezo utasitast
                                                if (jobRuntime.hasNextInstruction() && jobRuntime.isWaitFinish()) {
                                                        ir = jobRuntime.getNextInstruction();
                                                        ir.execute(JobEngine.this, jobRuntime);
                                                } else {
-                                                       //normal esetben elfutunk a kovetkezo job step-ig, vagy vegig
+                                                       // normal esetben elfutunk a kovetkezo job step-ig, vagy vegig
                                                        while (jobRuntime.hasNextInstruction() && jobRuntime.isRunable()) {
                                                                ir = jobRuntime.getNextInstruction();
                                                                ir.execute(JobEngine.this, jobRuntime);
@@ -211,7 +212,8 @@ public class JobEngine implements IJobEngine {
        private IJobEngineConfiguration jobEngineConfiguration;
 
        /**
-        * A MAM motor példányosítása. Ennek során létrejönnek a várakozási sorok, az ütemező szál, az üzenet kezelő szál.
+        * A MAM motor példányosítása. Ennek során létrejönnek a várakozási sorok, az
+        * ütemező szál, az üzenet kezelő szál.
         */
        public JobEngine() {
 
@@ -226,7 +228,7 @@ public class JobEngine implements IJobEngine {
                instance = this;
 
                remoteWorkers = new ConcurrentHashMap<>();
-               //logger.info("JobEngine created");
+               // logger.info("JobEngine created");
                if (isWorker())
                        remoteEngine = createRemoteEngine();
                else
@@ -250,13 +252,16 @@ public class JobEngine implements IJobEngine {
                try {
                        Object typeName = jobRuntime.popFromStack();
                        if (typeName == null)
-                               throw new Exception(jobRuntime.toString() + " illegal execution state detected: executor name is null.");
+                               throw new Exception(
+                                               jobRuntime.toString() + " illegal execution state detected: executor name is null.");
                        String executorName = String.valueOf(typeName);
                        if (!jobEngineConfiguration.getExecutors().containsKey(executorName))
                                throw new Exception(jobRuntime.toString() + " executor is unavailable: " + executorName);
 
-                       //a lista mindig letezik, de sima futtataskor csak 1 elemu, az aktualis van benne
-                       //ha van parhuzamos futtatas, akkor letrejonnek benne a step szellemek is, es meghivodik a fork
+                       // a lista mindig letezik, de sima futtataskor csak 1 elemu, az aktualis van
+                       // benne
+                       // ha van parhuzamos futtatas, akkor letrejonnek benne a step szellemek is, es
+                       // meghivodik a fork
                        List<IJobRuntime> jobs = spawnJobs(jobRuntime, executorName);
                        jobEngineConfiguration.getExecutors().get(executorName).submit(jobs.toArray(new IJobRuntime[] {}));
                        jobs.forEach(r -> fireJobChangedEvent(new JobChangedEvent(r, SignalType.EXECUTE)));
@@ -280,17 +285,19 @@ public class JobEngine implements IJobEngine {
        }
 
        /**
-        * Job prioritasa megvaltozott. Job main queue-ban es a JobStepExecutorok soraiban is ujra kell rendezni a jobokat. Az ujrarendezes a put-nal is megy
-        * automatikusan, mivel eltero prioritasu jobot dobhatnak be vagy default prioritasut a korabban atrendezett listaba.
+        * Job prioritasa megvaltozott. Job main queue-ban es a JobStepExecutorok
+        * soraiban is ujra kell rendezni a jobokat. Az ujrarendezes a put-nal is megy
+        * automatikusan, mivel eltero prioritasu jobot dobhatnak be vagy default
+        * prioritasut a korabban atrendezett listaba.
         *
         * @param jobRuntime
         */
        @Override
        public void applyPriorityChange(IJobRuntime jobRuntime) {
                logger.info("rePrioritization start for {}", jobRuntime.getId());
-               //              synchronized(this.runQueue){
+               // synchronized(this.runQueue){
 
-               //job main queue reorder
+               // job main queue reorder
 
                if (this.runQueue.contains(jobRuntime)) {
                        logger.info("runQueue");
@@ -301,7 +308,7 @@ public class JobEngine implements IJobEngine {
                        }
                }
 
-               //JobStepExecutor reorder
+               // JobStepExecutor reorder
                if (jobEngineConfiguration.getExecutors() != null) {
                        for (IJobStepExecutor exec : jobEngineConfiguration.getExecutors().values()) {
                                if (exec.containsRuntime(jobRuntime)) {
@@ -311,7 +318,7 @@ public class JobEngine implements IJobEngine {
                        }
                }
 
-               //              }               logger.info("rePrioritization end");
+               // } logger.info("rePrioritization end");
        }
 
        @Override
@@ -328,13 +335,14 @@ public class JobEngine implements IJobEngine {
        }
 
        private void bootstrap() throws JobEngineException {
-               //submit("fake-noparams.xml", "Bootstrap", null);
+               // submit("fake-noparams.xml", "Bootstrap", null);
        }
 
        private void closeSessionLog(IJobRuntime jobRuntime) {
                if (!jobRuntime.isService() && jobRuntime.getParentJobId() == 0) {
                        if (JobStatus.FINISHED.equals(jobRuntime.getStatus()))
-                               logger.info(jobRuntime.getFinishMarker(), "A '{}' folyamat futása sikeresen véget ért.", jobRuntime.getName());
+                               logger.info(jobRuntime.getFinishMarker(), "A '{}' folyamat futása sikeresen véget ért.",
+                                               jobRuntime.getName());
                        else
                                logger.error(jobRuntime.getFinishMarker(), "A '{}' folyamat futása megszakadt.", jobRuntime.getName());
                }
@@ -360,8 +368,8 @@ public class JobEngine implements IJobEngine {
                Object value = jobRuntime.popFromStack();
                String name = (String) jobRuntime.popFromStack();
 
-               //a gyerek job-ok visszateresi erteket a spawn sorszammal kiegeszitve mentjuk
-               //TODO el kell tudni erni a kovetkezo lepesekbol ezeket az ertekeket
+               // a gyerek job-ok visszateresi erteket a spawn sorszammal kiegeszitve mentjuk
+               // TODO el kell tudni erni a kovetkezo lepesekbol ezeket az ertekeket
                if (jobRuntime.getParentJobId() > 0) {
                        IJobRuntime parentRuntime = getJobById(jobRuntime.getParentJobId());
                        parentRuntime.setVariable(name + jobRuntime.getSpawnOrder(), value);
@@ -444,7 +452,8 @@ public class JobEngine implements IJobEngine {
 
        @Override
        public void executeWaitForSpawnJobsInstruction(IJobRuntime jobRuntime) {
-               //logger.info("Processing {} {}", jobRuntime.getId(), jobRuntime.canContinueExecution());
+               // logger.info("Processing {} {}", jobRuntime.getId(),
+               // jobRuntime.canContinueExecution());
 
                if (jobRuntime.canContinueExecution()) {
                        jobRuntime.setStatus(JobStatus.RUNABLE);
@@ -546,7 +555,7 @@ public class JobEngine implements IJobEngine {
 
        private void isRunnable(IProgram program) throws JobEngineException {
                JobTemplate template = program.getTemplate();
-               //A JOB xml-ben beállítható, hogy futhatnak e párhuzamosan.
+               // A JOB xml-ben beállítható, hogy futhatnak e párhuzamosan.
                if (template.isMultiInstance())
                        return;
                List<Job> runningJobs = itemManager.getRunningJobs(template.getFileName());
@@ -554,8 +563,8 @@ public class JobEngine implements IJobEngine {
                        for (Job job : runningJobs) {
                                Job runningJob = getJob(job.getId());
                                if (runningJob != null && runningJob.getStatus() != JobStatus.SUSPENDED)
-                                       throw new JobEngineException(String.format("Can not submit job. Job with %s.%s already running", template.getFileName(),
-                                                       template.getName()));
+                                       throw new JobEngineException(String.format("Can not submit job. Job with %s.%s already running",
+                                                       template.getFileName(), template.getName()));
                        }
                }
        }
@@ -586,7 +595,7 @@ public class JobEngine implements IJobEngine {
                boolean result = false;
                if (keepAliveJobChangedListeners != null) {
                        long now = System.currentTimeMillis();
-                       //ha mar hozza van adva, nem adja hozza
+                       // ha mar hozza van adva, nem adja hozza
                        result = addJobChangedEventListener(listener);
                        keepAliveJobChangedListeners.put(listener, now);
                        logger.debug("Refreshing listener {}, now {} ({})", listener, now, keepAliveJobChangedListeners.size());
@@ -610,7 +619,7 @@ public class JobEngine implements IJobEngine {
                        return;
                }
 
-               //a gyerekek miatt nem az!
+               // a gyerekek miatt nem az!
                if (!jobRuntime.isCancelable())
                        return;
 
@@ -629,20 +638,20 @@ public class JobEngine implements IJobEngine {
                        return;
                }
 
-               //TODO ez hibat okoz az archivalasnal, mert hamarabb eltavolitja a childUd-ket
+               // TODO ez hibat okoz az archivalasnal, mert hamarabb eltavolitja a childUd-ket
 
-               //              if (jobRuntime.getParentJobId() > 0)
-               //                      removeSpanwChild(jobRuntime);
+               // if (jobRuntime.getParentJobId() > 0)
+               // removeSpanwChild(jobRuntime);
 
                JobStepCompletedMessage m = (JobStepCompletedMessage) message;
-               //kesz vagyunk, jelezni
+               // kesz vagyunk, jelezni
                if (isWorker()) {
                        statusMachine.processAction(JobAction.DONE, jobRuntime);
                        return;
                }
 
-               //a cancel hamarabb megjott?
-               //ha remote akkot tuti
+               // a cancel hamarabb megjott?
+               // ha remote akkor tuti
                if (jobRuntime == null) {
 
                }
@@ -736,7 +745,8 @@ public class JobEngine implements IJobEngine {
                List<Long> removeId = new ArrayList<>();
                for (Long id : submittedJobs.keySet()) {
                        IJobRuntime runtime = submittedJobs.get(id);
-                       if (runtime != null && (JobStatus.SUSPENDED.equals(runtime.getStatus()) || JobStatus.CANCELED.equals(runtime.getStatus())))
+                       if (runtime != null && (JobStatus.SUSPENDED.equals(runtime.getStatus())
+                                       || JobStatus.CANCELED.equals(runtime.getStatus())))
                                removeId.add(id);
                }
                for (Long id : removeId)
@@ -890,7 +900,7 @@ public class JobEngine implements IJobEngine {
                                if (parameter == null)
                                        parameter = jobRuntime.getVariable(forEach);
 
-                               //a sima array helyett ezt jobb hasznalni
+                               // a sima array helyett ezt jobb hasznalni
                                if (parameter != null && parameter instanceof BasicDBList) {
 
                                        BasicDBList iter = (BasicDBList) parameter;
@@ -965,8 +975,8 @@ public class JobEngine implements IJobEngine {
        }
 
        @Override
-       public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template, String name, int priority, String owner,
-                       Map<String, Object> parameters) throws JobEngineException {
+       public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template,
+                       String name, int priority, String owner, Map<String, Object> parameters) throws JobEngineException {
                IJobRuntime result = null;
                IProgram program = getProgram(template);
                if (program != null) {
@@ -991,15 +1001,16 @@ public class JobEngine implements IJobEngine {
        }
 
        @Override
-       public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template, String name, Map<String, Object> parameters)
-                       throws JobEngineException {
+       public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template,
+                       String name, Map<String, Object> parameters) throws JobEngineException {
                IJobRuntime result = null;
                result = submit(parent, statusListener, template, name, 0, DEFAULT_OWNER, parameters);
                return result;
        }
 
        @Override
-       public IJobRuntime submit(IJobRuntime parent, String template, String name, int priority, Map<String, Object> parameters) throws JobEngineException {
+       public IJobRuntime submit(IJobRuntime parent, String template, String name, int priority,
+                       Map<String, Object> parameters) throws JobEngineException {
                IJobRuntime result = null;
                result = submit(parent, null, template, name, 0, DEFAULT_OWNER, parameters);
                return result;
@@ -1014,19 +1025,21 @@ public class JobEngine implements IJobEngine {
 
        @Override
        public IJobRuntime submit(String template, String name, Map<String, Object> parameters) throws JobEngineException {
-               //Az ütemezett task-okat configból a Quartz futtatja
+               // Az ütemezett task-okat configból a Quartz futtatja
                if (isAllExecutionDisabled) {
                        logger.info("JobEngine is disabled, can not submit job '{}'", name);
                        return null;
                }
                IJobRuntime result = null;
                IProgram program = getProgram(template);
-               result = submit(null, null, template, name == null ? program.getTemplate().getName() : name, 0, DEFAULT_OWNER, parameters);
+               result = submit(null, null, template, name == null ? program.getTemplate().getName() : name, 0, DEFAULT_OWNER,
+                               parameters);
                return result;
        }
 
        @Override
-       public IJobRuntime submit(String template, String name, Map<String, Object> parameters, String owner) throws JobEngineException {
+       public IJobRuntime submit(String template, String name, Map<String, Object> parameters, String owner)
+                       throws JobEngineException {
                IJobRuntime result = null;
                result = submit(null, null, template, name, 0, owner, parameters);
                return result;
@@ -1037,7 +1050,7 @@ public class JobEngine implements IJobEngine {
                String description = t.getClass().getSimpleName() + " : " + t.getMessage();
                jobRuntime.setDescription(description);
                logger.error(description);
-               //TODO itt miert FINISH a kovetkezo allapot, miert nem SUSPEND
+               // TODO itt miert FINISH a kovetkezo allapot, miert nem SUSPEND
                statusMachine.processAction(JobAction.FINISH, jobRuntime);
                closeSessionLog(jobRuntime);
        }