shutdown = true;
}
}
- //a leallitas utan az osszes fuggo uzenet vegrehajtasa
+ // a leallitas utan az osszes fuggo uzenet vegrehajtasa
while (!messageQueue.isEmpty()) {
try {
IJobMessage message = messageQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
while (!shutdown) {
try {
Thread.sleep(QUEUE_POLL_INTERVAL_MS);
- //IJobRuntime jobRuntime = runQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ // IJobRuntime jobRuntime = runQueue.poll(QUEUE_POLL_INTERVAL_MS,
+ // TimeUnit.MILLISECONDS);
IJobRuntime jobRuntime = runQueue.poll();
if (jobRuntime != null) {
logger.debug("Processing {}", jobRuntime.getId());
- //varakozo esetben vegrehajtjuk a kovetkezo utasitast
+ // varakozo esetben vegrehajtjuk a kovetkezo utasitast
if (jobRuntime.hasNextInstruction() && jobRuntime.isWaitFinish()) {
ir = jobRuntime.getNextInstruction();
ir.execute(JobEngine.this, jobRuntime);
} else {
- //normal esetben elfutunk a kovetkezo job step-ig, vagy vegig
+ // normal esetben elfutunk a kovetkezo job step-ig, vagy vegig
while (jobRuntime.hasNextInstruction() && jobRuntime.isRunable()) {
ir = jobRuntime.getNextInstruction();
ir.execute(JobEngine.this, jobRuntime);
private IJobEngineConfiguration jobEngineConfiguration;
/**
- * A MAM motor példányosítása. Ennek során létrejönnek a várakozási sorok, az ütemező szál, az üzenet kezelő szál.
+ * A MAM motor példányosítása. Ennek során létrejönnek a várakozási sorok, az
+ * ütemező szál, az üzenet kezelő szál.
*/
public JobEngine() {
instance = this;
remoteWorkers = new ConcurrentHashMap<>();
- //logger.info("JobEngine created");
+ // logger.info("JobEngine created");
if (isWorker())
remoteEngine = createRemoteEngine();
else
try {
Object typeName = jobRuntime.popFromStack();
if (typeName == null)
- throw new Exception(jobRuntime.toString() + " illegal execution state detected: executor name is null.");
+ throw new Exception(
+ jobRuntime.toString() + " illegal execution state detected: executor name is null.");
String executorName = String.valueOf(typeName);
if (!jobEngineConfiguration.getExecutors().containsKey(executorName))
throw new Exception(jobRuntime.toString() + " executor is unavailable: " + executorName);
- //a lista mindig letezik, de sima futtataskor csak 1 elemu, az aktualis van benne
- //ha van parhuzamos futtatas, akkor letrejonnek benne a step szellemek is, es meghivodik a fork
+ // a lista mindig letezik, de sima futtataskor csak 1 elemu, az aktualis van
+ // benne
+ // ha van parhuzamos futtatas, akkor letrejonnek benne a step szellemek is, es
+ // meghivodik a fork
List<IJobRuntime> jobs = spawnJobs(jobRuntime, executorName);
jobEngineConfiguration.getExecutors().get(executorName).submit(jobs.toArray(new IJobRuntime[] {}));
jobs.forEach(r -> fireJobChangedEvent(new JobChangedEvent(r, SignalType.EXECUTE)));
}
/**
- * Job prioritasa megvaltozott. Job main queue-ban es a JobStepExecutorok soraiban is ujra kell rendezni a jobokat. Az ujrarendezes a put-nal is megy
- * automatikusan, mivel eltero prioritasu jobot dobhatnak be vagy default prioritasut a korabban atrendezett listaba.
+ * Job prioritasa megvaltozott. Job main queue-ban es a JobStepExecutorok
+ * soraiban is ujra kell rendezni a jobokat. Az ujrarendezes a put-nal is megy
+ * automatikusan, mivel eltero prioritasu jobot dobhatnak be vagy default
+ * prioritasut a korabban atrendezett listaba.
*
* @param jobRuntime
*/
@Override
public void applyPriorityChange(IJobRuntime jobRuntime) {
logger.info("rePrioritization start for {}", jobRuntime.getId());
- // synchronized(this.runQueue){
+ // synchronized(this.runQueue){
- //job main queue reorder
+ // job main queue reorder
if (this.runQueue.contains(jobRuntime)) {
logger.info("runQueue");
}
}
- //JobStepExecutor reorder
+ // JobStepExecutor reorder
if (jobEngineConfiguration.getExecutors() != null) {
for (IJobStepExecutor exec : jobEngineConfiguration.getExecutors().values()) {
if (exec.containsRuntime(jobRuntime)) {
}
}
- // } logger.info("rePrioritization end");
+ // } logger.info("rePrioritization end");
}
@Override
}
private void bootstrap() throws JobEngineException {
- //submit("fake-noparams.xml", "Bootstrap", null);
+ // submit("fake-noparams.xml", "Bootstrap", null);
}
private void closeSessionLog(IJobRuntime jobRuntime) {
if (!jobRuntime.isService() && jobRuntime.getParentJobId() == 0) {
if (JobStatus.FINISHED.equals(jobRuntime.getStatus()))
- logger.info(jobRuntime.getFinishMarker(), "A '{}' folyamat futása sikeresen véget ért.", jobRuntime.getName());
+ logger.info(jobRuntime.getFinishMarker(), "A '{}' folyamat futása sikeresen véget ért.",
+ jobRuntime.getName());
else
logger.error(jobRuntime.getFinishMarker(), "A '{}' folyamat futása megszakadt.", jobRuntime.getName());
}
Object value = jobRuntime.popFromStack();
String name = (String) jobRuntime.popFromStack();
- //a gyerek job-ok visszateresi erteket a spawn sorszammal kiegeszitve mentjuk
- //TODO el kell tudni erni a kovetkezo lepesekbol ezeket az ertekeket
+ // a gyerek job-ok visszateresi erteket a spawn sorszammal kiegeszitve mentjuk
+ // TODO el kell tudni erni a kovetkezo lepesekbol ezeket az ertekeket
if (jobRuntime.getParentJobId() > 0) {
IJobRuntime parentRuntime = getJobById(jobRuntime.getParentJobId());
parentRuntime.setVariable(name + jobRuntime.getSpawnOrder(), value);
@Override
public void executeWaitForSpawnJobsInstruction(IJobRuntime jobRuntime) {
- //logger.info("Processing {} {}", jobRuntime.getId(), jobRuntime.canContinueExecution());
+ // logger.info("Processing {} {}", jobRuntime.getId(),
+ // jobRuntime.canContinueExecution());
if (jobRuntime.canContinueExecution()) {
jobRuntime.setStatus(JobStatus.RUNABLE);
private void isRunnable(IProgram program) throws JobEngineException {
JobTemplate template = program.getTemplate();
- //A JOB xml-ben beállítható, hogy futhatnak e párhuzamosan.
+ // A JOB xml-ben beállítható, hogy futhatnak e párhuzamosan.
if (template.isMultiInstance())
return;
List<Job> runningJobs = itemManager.getRunningJobs(template.getFileName());
for (Job job : runningJobs) {
Job runningJob = getJob(job.getId());
if (runningJob != null && runningJob.getStatus() != JobStatus.SUSPENDED)
- throw new JobEngineException(String.format("Can not submit job. Job with %s.%s already running", template.getFileName(),
- template.getName()));
+ throw new JobEngineException(String.format("Can not submit job. Job with %s.%s already running",
+ template.getFileName(), template.getName()));
}
}
}
boolean result = false;
if (keepAliveJobChangedListeners != null) {
long now = System.currentTimeMillis();
- //ha mar hozza van adva, nem adja hozza
+ // ha mar hozza van adva, nem adja hozza
result = addJobChangedEventListener(listener);
keepAliveJobChangedListeners.put(listener, now);
logger.debug("Refreshing listener {}, now {} ({})", listener, now, keepAliveJobChangedListeners.size());
return;
}
- //a gyerekek miatt nem az!
+ // a gyerekek miatt nem az!
if (!jobRuntime.isCancelable())
return;
return;
}
- //TODO ez hibat okoz az archivalasnal, mert hamarabb eltavolitja a childUd-ket
+ // TODO ez hibat okoz az archivalasnal, mert hamarabb eltavolitja a childUd-ket
- // if (jobRuntime.getParentJobId() > 0)
- // removeSpanwChild(jobRuntime);
+ // if (jobRuntime.getParentJobId() > 0)
+ // removeSpanwChild(jobRuntime);
JobStepCompletedMessage m = (JobStepCompletedMessage) message;
- //kesz vagyunk, jelezni
+ // kesz vagyunk, jelezni
if (isWorker()) {
statusMachine.processAction(JobAction.DONE, jobRuntime);
return;
}
- //a cancel hamarabb megjott?
- //ha remote akkot tuti
+ // a cancel hamarabb megjott?
+ // ha remote akkor tuti
if (jobRuntime == null) {
}
List<Long> removeId = new ArrayList<>();
for (Long id : submittedJobs.keySet()) {
IJobRuntime runtime = submittedJobs.get(id);
- if (runtime != null && (JobStatus.SUSPENDED.equals(runtime.getStatus()) || JobStatus.CANCELED.equals(runtime.getStatus())))
+ if (runtime != null && (JobStatus.SUSPENDED.equals(runtime.getStatus())
+ || JobStatus.CANCELED.equals(runtime.getStatus())))
removeId.add(id);
}
for (Long id : removeId)
if (parameter == null)
parameter = jobRuntime.getVariable(forEach);
- //a sima array helyett ezt jobb hasznalni
+ // a sima array helyett ezt jobb hasznalni
if (parameter != null && parameter instanceof BasicDBList) {
BasicDBList iter = (BasicDBList) parameter;
}
@Override
- public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template, String name, int priority, String owner,
- Map<String, Object> parameters) throws JobEngineException {
+ public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template,
+ String name, int priority, String owner, Map<String, Object> parameters) throws JobEngineException {
IJobRuntime result = null;
IProgram program = getProgram(template);
if (program != null) {
}
@Override
- public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template, String name, Map<String, Object> parameters)
- throws JobEngineException {
+ public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template,
+ String name, Map<String, Object> parameters) throws JobEngineException {
IJobRuntime result = null;
result = submit(parent, statusListener, template, name, 0, DEFAULT_OWNER, parameters);
return result;
}
@Override
- public IJobRuntime submit(IJobRuntime parent, String template, String name, int priority, Map<String, Object> parameters) throws JobEngineException {
+ public IJobRuntime submit(IJobRuntime parent, String template, String name, int priority,
+ Map<String, Object> parameters) throws JobEngineException {
IJobRuntime result = null;
result = submit(parent, null, template, name, 0, DEFAULT_OWNER, parameters);
return result;
@Override
public IJobRuntime submit(String template, String name, Map<String, Object> parameters) throws JobEngineException {
- //Az ütemezett task-okat configból a Quartz futtatja
+ // Az ütemezett task-okat configból a Quartz futtatja
if (isAllExecutionDisabled) {
logger.info("JobEngine is disabled, can not submit job '{}'", name);
return null;
}
IJobRuntime result = null;
IProgram program = getProgram(template);
- result = submit(null, null, template, name == null ? program.getTemplate().getName() : name, 0, DEFAULT_OWNER, parameters);
+ result = submit(null, null, template, name == null ? program.getTemplate().getName() : name, 0, DEFAULT_OWNER,
+ parameters);
return result;
}
@Override
- public IJobRuntime submit(String template, String name, Map<String, Object> parameters, String owner) throws JobEngineException {
+ public IJobRuntime submit(String template, String name, Map<String, Object> parameters, String owner)
+ throws JobEngineException {
IJobRuntime result = null;
result = submit(null, null, template, name, 0, owner, parameters);
return result;
String description = t.getClass().getSimpleName() + " : " + t.getMessage();
jobRuntime.setDescription(description);
logger.error(description);
- //TODO itt miert FINISH a kovetkezo allapot, miert nem SUSPEND
+ // TODO itt miert FINISH a kovetkezo allapot, miert nem SUSPEND
statusMachine.processAction(JobAction.FINISH, jobRuntime);
closeSessionLog(jobRuntime);
}