From dea2c2e95be0b62094306ed888df60391bc7086a Mon Sep 17 00:00:00 2001 From: Sweidan Omar Date: Mon, 6 Dec 2021 12:30:36 +0000 Subject: [PATCH] git-tfs-id: [http://tfs.userrendszerhaz.hu:8080/tfs/DefaultCollection]$/MediaCube;C32492 --- .../src/user/jobengine/server/JobEngine.java | 97 +++++++++++-------- 1 file changed, 55 insertions(+), 42 deletions(-) diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java index 7857e8e1..239059b9 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java @@ -99,7 +99,7 @@ public class JobEngine implements IJobEngine { shutdown = true; } } - //a leallitas utan az osszes fuggo uzenet vegrehajtasa + // a leallitas utan az osszes fuggo uzenet vegrehajtasa while (!messageQueue.isEmpty()) { try { IJobMessage message = messageQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); @@ -134,16 +134,17 @@ public class JobEngine implements IJobEngine { while (!shutdown) { try { Thread.sleep(QUEUE_POLL_INTERVAL_MS); - //IJobRuntime jobRuntime = runQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); + // IJobRuntime jobRuntime = runQueue.poll(QUEUE_POLL_INTERVAL_MS, + // TimeUnit.MILLISECONDS); IJobRuntime jobRuntime = runQueue.poll(); if (jobRuntime != null) { logger.debug("Processing {}", jobRuntime.getId()); - //varakozo esetben vegrehajtjuk a kovetkezo utasitast + // varakozo esetben vegrehajtjuk a kovetkezo utasitast if (jobRuntime.hasNextInstruction() && jobRuntime.isWaitFinish()) { ir = jobRuntime.getNextInstruction(); ir.execute(JobEngine.this, jobRuntime); } else { - //normal esetben elfutunk a kovetkezo job step-ig, vagy vegig + // normal esetben elfutunk a kovetkezo job step-ig, vagy vegig while (jobRuntime.hasNextInstruction() && jobRuntime.isRunable()) { ir = jobRuntime.getNextInstruction(); ir.execute(JobEngine.this, jobRuntime); @@ -211,7 +212,8 @@ public class JobEngine implements IJobEngine { private IJobEngineConfiguration jobEngineConfiguration; /** - * A MAM motor példányosítása. Ennek során létrejönnek a várakozási sorok, az ütemező szál, az üzenet kezelő szál. + * A MAM motor példányosítása. Ennek során létrejönnek a várakozási sorok, az + * ütemező szál, az üzenet kezelő szál. */ public JobEngine() { @@ -226,7 +228,7 @@ public class JobEngine implements IJobEngine { instance = this; remoteWorkers = new ConcurrentHashMap<>(); - //logger.info("JobEngine created"); + // logger.info("JobEngine created"); if (isWorker()) remoteEngine = createRemoteEngine(); else @@ -250,13 +252,16 @@ public class JobEngine implements IJobEngine { try { Object typeName = jobRuntime.popFromStack(); if (typeName == null) - throw new Exception(jobRuntime.toString() + " illegal execution state detected: executor name is null."); + throw new Exception( + jobRuntime.toString() + " illegal execution state detected: executor name is null."); String executorName = String.valueOf(typeName); if (!jobEngineConfiguration.getExecutors().containsKey(executorName)) throw new Exception(jobRuntime.toString() + " executor is unavailable: " + executorName); - //a lista mindig letezik, de sima futtataskor csak 1 elemu, az aktualis van benne - //ha van parhuzamos futtatas, akkor letrejonnek benne a step szellemek is, es meghivodik a fork + // a lista mindig letezik, de sima futtataskor csak 1 elemu, az aktualis van + // benne + // ha van parhuzamos futtatas, akkor letrejonnek benne a step szellemek is, es + // meghivodik a fork List jobs = spawnJobs(jobRuntime, executorName); jobEngineConfiguration.getExecutors().get(executorName).submit(jobs.toArray(new IJobRuntime[] {})); jobs.forEach(r -> fireJobChangedEvent(new JobChangedEvent(r, SignalType.EXECUTE))); @@ -280,17 +285,19 @@ public class JobEngine implements IJobEngine { } /** - * Job prioritasa megvaltozott. Job main queue-ban es a JobStepExecutorok soraiban is ujra kell rendezni a jobokat. Az ujrarendezes a put-nal is megy - * automatikusan, mivel eltero prioritasu jobot dobhatnak be vagy default prioritasut a korabban atrendezett listaba. + * Job prioritasa megvaltozott. Job main queue-ban es a JobStepExecutorok + * soraiban is ujra kell rendezni a jobokat. Az ujrarendezes a put-nal is megy + * automatikusan, mivel eltero prioritasu jobot dobhatnak be vagy default + * prioritasut a korabban atrendezett listaba. * * @param jobRuntime */ @Override public void applyPriorityChange(IJobRuntime jobRuntime) { logger.info("rePrioritization start for {}", jobRuntime.getId()); - // synchronized(this.runQueue){ + // synchronized(this.runQueue){ - //job main queue reorder + // job main queue reorder if (this.runQueue.contains(jobRuntime)) { logger.info("runQueue"); @@ -301,7 +308,7 @@ public class JobEngine implements IJobEngine { } } - //JobStepExecutor reorder + // JobStepExecutor reorder if (jobEngineConfiguration.getExecutors() != null) { for (IJobStepExecutor exec : jobEngineConfiguration.getExecutors().values()) { if (exec.containsRuntime(jobRuntime)) { @@ -311,7 +318,7 @@ public class JobEngine implements IJobEngine { } } - // } logger.info("rePrioritization end"); + // } logger.info("rePrioritization end"); } @Override @@ -328,13 +335,14 @@ public class JobEngine implements IJobEngine { } private void bootstrap() throws JobEngineException { - //submit("fake-noparams.xml", "Bootstrap", null); + // submit("fake-noparams.xml", "Bootstrap", null); } private void closeSessionLog(IJobRuntime jobRuntime) { if (!jobRuntime.isService() && jobRuntime.getParentJobId() == 0) { if (JobStatus.FINISHED.equals(jobRuntime.getStatus())) - logger.info(jobRuntime.getFinishMarker(), "A '{}' folyamat futása sikeresen véget ért.", jobRuntime.getName()); + logger.info(jobRuntime.getFinishMarker(), "A '{}' folyamat futása sikeresen véget ért.", + jobRuntime.getName()); else logger.error(jobRuntime.getFinishMarker(), "A '{}' folyamat futása megszakadt.", jobRuntime.getName()); } @@ -360,8 +368,8 @@ public class JobEngine implements IJobEngine { Object value = jobRuntime.popFromStack(); String name = (String) jobRuntime.popFromStack(); - //a gyerek job-ok visszateresi erteket a spawn sorszammal kiegeszitve mentjuk - //TODO el kell tudni erni a kovetkezo lepesekbol ezeket az ertekeket + // a gyerek job-ok visszateresi erteket a spawn sorszammal kiegeszitve mentjuk + // TODO el kell tudni erni a kovetkezo lepesekbol ezeket az ertekeket if (jobRuntime.getParentJobId() > 0) { IJobRuntime parentRuntime = getJobById(jobRuntime.getParentJobId()); parentRuntime.setVariable(name + jobRuntime.getSpawnOrder(), value); @@ -444,7 +452,8 @@ public class JobEngine implements IJobEngine { @Override public void executeWaitForSpawnJobsInstruction(IJobRuntime jobRuntime) { - //logger.info("Processing {} {}", jobRuntime.getId(), jobRuntime.canContinueExecution()); + // logger.info("Processing {} {}", jobRuntime.getId(), + // jobRuntime.canContinueExecution()); if (jobRuntime.canContinueExecution()) { jobRuntime.setStatus(JobStatus.RUNABLE); @@ -546,7 +555,7 @@ public class JobEngine implements IJobEngine { private void isRunnable(IProgram program) throws JobEngineException { JobTemplate template = program.getTemplate(); - //A JOB xml-ben beállítható, hogy futhatnak e párhuzamosan. + // A JOB xml-ben beállítható, hogy futhatnak e párhuzamosan. if (template.isMultiInstance()) return; List runningJobs = itemManager.getRunningJobs(template.getFileName()); @@ -554,8 +563,8 @@ public class JobEngine implements IJobEngine { for (Job job : runningJobs) { Job runningJob = getJob(job.getId()); if (runningJob != null && runningJob.getStatus() != JobStatus.SUSPENDED) - throw new JobEngineException(String.format("Can not submit job. Job with %s.%s already running", template.getFileName(), - template.getName())); + throw new JobEngineException(String.format("Can not submit job. Job with %s.%s already running", + template.getFileName(), template.getName())); } } } @@ -586,7 +595,7 @@ public class JobEngine implements IJobEngine { boolean result = false; if (keepAliveJobChangedListeners != null) { long now = System.currentTimeMillis(); - //ha mar hozza van adva, nem adja hozza + // ha mar hozza van adva, nem adja hozza result = addJobChangedEventListener(listener); keepAliveJobChangedListeners.put(listener, now); logger.debug("Refreshing listener {}, now {} ({})", listener, now, keepAliveJobChangedListeners.size()); @@ -610,7 +619,7 @@ public class JobEngine implements IJobEngine { return; } - //a gyerekek miatt nem az! + // a gyerekek miatt nem az! if (!jobRuntime.isCancelable()) return; @@ -629,20 +638,20 @@ public class JobEngine implements IJobEngine { return; } - //TODO ez hibat okoz az archivalasnal, mert hamarabb eltavolitja a childUd-ket + // TODO ez hibat okoz az archivalasnal, mert hamarabb eltavolitja a childUd-ket - // if (jobRuntime.getParentJobId() > 0) - // removeSpanwChild(jobRuntime); + // if (jobRuntime.getParentJobId() > 0) + // removeSpanwChild(jobRuntime); JobStepCompletedMessage m = (JobStepCompletedMessage) message; - //kesz vagyunk, jelezni + // kesz vagyunk, jelezni if (isWorker()) { statusMachine.processAction(JobAction.DONE, jobRuntime); return; } - //a cancel hamarabb megjott? - //ha remote akkot tuti + // a cancel hamarabb megjott? + // ha remote akkor tuti if (jobRuntime == null) { } @@ -736,7 +745,8 @@ public class JobEngine implements IJobEngine { List removeId = new ArrayList<>(); for (Long id : submittedJobs.keySet()) { IJobRuntime runtime = submittedJobs.get(id); - if (runtime != null && (JobStatus.SUSPENDED.equals(runtime.getStatus()) || JobStatus.CANCELED.equals(runtime.getStatus()))) + if (runtime != null && (JobStatus.SUSPENDED.equals(runtime.getStatus()) + || JobStatus.CANCELED.equals(runtime.getStatus()))) removeId.add(id); } for (Long id : removeId) @@ -890,7 +900,7 @@ public class JobEngine implements IJobEngine { if (parameter == null) parameter = jobRuntime.getVariable(forEach); - //a sima array helyett ezt jobb hasznalni + // a sima array helyett ezt jobb hasznalni if (parameter != null && parameter instanceof BasicDBList) { BasicDBList iter = (BasicDBList) parameter; @@ -965,8 +975,8 @@ public class JobEngine implements IJobEngine { } @Override - public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template, String name, int priority, String owner, - Map parameters) throws JobEngineException { + public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template, + String name, int priority, String owner, Map parameters) throws JobEngineException { IJobRuntime result = null; IProgram program = getProgram(template); if (program != null) { @@ -991,15 +1001,16 @@ public class JobEngine implements IJobEngine { } @Override - public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template, String name, Map parameters) - throws JobEngineException { + public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template, + String name, Map parameters) throws JobEngineException { IJobRuntime result = null; result = submit(parent, statusListener, template, name, 0, DEFAULT_OWNER, parameters); return result; } @Override - public IJobRuntime submit(IJobRuntime parent, String template, String name, int priority, Map parameters) throws JobEngineException { + public IJobRuntime submit(IJobRuntime parent, String template, String name, int priority, + Map parameters) throws JobEngineException { IJobRuntime result = null; result = submit(parent, null, template, name, 0, DEFAULT_OWNER, parameters); return result; @@ -1014,19 +1025,21 @@ public class JobEngine implements IJobEngine { @Override public IJobRuntime submit(String template, String name, Map parameters) throws JobEngineException { - //Az ütemezett task-okat configból a Quartz futtatja + // Az ütemezett task-okat configból a Quartz futtatja if (isAllExecutionDisabled) { logger.info("JobEngine is disabled, can not submit job '{}'", name); return null; } IJobRuntime result = null; IProgram program = getProgram(template); - result = submit(null, null, template, name == null ? program.getTemplate().getName() : name, 0, DEFAULT_OWNER, parameters); + result = submit(null, null, template, name == null ? program.getTemplate().getName() : name, 0, DEFAULT_OWNER, + parameters); return result; } @Override - public IJobRuntime submit(String template, String name, Map parameters, String owner) throws JobEngineException { + public IJobRuntime submit(String template, String name, Map parameters, String owner) + throws JobEngineException { IJobRuntime result = null; result = submit(null, null, template, name, 0, owner, parameters); return result; @@ -1037,7 +1050,7 @@ public class JobEngine implements IJobEngine { String description = t.getClass().getSimpleName() + " : " + t.getMessage(); jobRuntime.setDescription(description); logger.error(description); - //TODO itt miert FINISH a kovetkezo allapot, miert nem SUSPEND + // TODO itt miert FINISH a kovetkezo allapot, miert nem SUSPEND statusMachine.processAction(JobAction.FINISH, jobRuntime); closeSessionLog(jobRuntime); } -- 2.54.0