Scheduler service - schedlock

parent 4e812b11
...@@ -99,10 +99,12 @@ ...@@ -99,10 +99,12 @@
<dependency> <dependency>
<groupId>net.javacrumbs.shedlock</groupId> <groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-spring</artifactId> <artifactId>shedlock-spring</artifactId>
<version>4.30.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>net.javacrumbs.shedlock</groupId> <groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-provider-jdbc-template</artifactId> <artifactId>shedlock-provider-jdbc-template</artifactId>
<version>4.30.0</version>
</dependency> </dependency>
<!-- Fin de Lock para scheduling --> <!-- Fin de Lock para scheduling -->
......
package com.bytesw.bytebot.etl.batch.beans;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import javax.sql.DataSource;
@Configuration
public class DBConfig{
@Value("${spring.datasource.driverClassName}")
private String classDriver;
@Value("${spring.datasource.url}")
private String url;
@Value("${spring.datasource.username}")
private String user;
@Value("${spring.datasource.password}")
private String password;
@Bean
public DataSource dataSource(){
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName(classDriver);
dataSource.setUrl(url);
dataSource.setUsername(user);
dataSource.setPassword(password);
return dataSource;
}
}
\ No newline at end of file
...@@ -27,6 +27,10 @@ import com.bytesw.bytebot.repository.*; ...@@ -27,6 +27,10 @@ import com.bytesw.bytebot.repository.*;
import com.bytesw.xdf.multitenant.core.ThreadLocalStorage; import com.bytesw.xdf.multitenant.core.ThreadLocalStorage;
import com.google.gson.Gson; import com.google.gson.Gson;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import net.javacrumbs.shedlock.core.DefaultLockingTaskExecutor;
import net.javacrumbs.shedlock.core.LockingTaskExecutor;
import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.batch.core.*; import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
...@@ -37,11 +41,15 @@ import org.springframework.batch.item.ItemReader; ...@@ -37,11 +41,15 @@ import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger; import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger; import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.Duration; import java.time.Duration;
...@@ -55,6 +63,9 @@ import java.util.concurrent.ScheduledFuture; ...@@ -55,6 +63,9 @@ import java.util.concurrent.ScheduledFuture;
// Descomentar lo siguiente para que funcione // Descomentar lo siguiente para que funcione
@Service @Service
@Log4j2 @Log4j2
@Component
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor="PT30S")
public class ScheduleService implements SchedulingConfigurer { public class ScheduleService implements SchedulingConfigurer {
ScheduledTaskRegistrar scheduledTaskRegistrar; ScheduledTaskRegistrar scheduledTaskRegistrar;
...@@ -195,7 +206,8 @@ public class ScheduleService implements SchedulingConfigurer { ...@@ -195,7 +206,8 @@ public class ScheduleService implements SchedulingConfigurer {
/* ETL de Dashboard */ /* ETL de Dashboard */
String key = String.format("%s-%s", tenantIdentifier, identifier); String key = String.format("%s-%s", tenantIdentifier, identifier);
futureMap.put(key, taskRegistrar.getScheduler().schedule(() -> scheduleCron(createJob(tenantIdentifier), tenantIdentifier), trigger)); futureMap.put(key, taskRegistrar.getScheduler().schedule(() -> scheduleCron(createJob(), tenantIdentifier), trigger));
/* Extraer el id del proceso - Delete */ /* Extraer el id del proceso - Delete */
ProcessETL processDelete = processETLRepository.findByName(ProcessETLEnum.PROCESS_DELETE.getName()); ProcessETL processDelete = processETLRepository.findByName(ProcessETLEnum.PROCESS_DELETE.getName());
...@@ -280,12 +292,12 @@ public class ScheduleService implements SchedulingConfigurer { ...@@ -280,12 +292,12 @@ public class ScheduleService implements SchedulingConfigurer {
String keyDataSens = ""; String keyDataSens = "";
for (DeleteDataSensBean data : deleteDataSensBeans) { for (DeleteDataSensBean data : deleteDataSensBeans) {
Agent agent = agentRepository.findById(data.getAgenId()).get(); Agent agent = agentRepository.findById(data.getAgenId()).get();
Trigger trigger = new CronTrigger("0 0/1 * * * *", timeZone); Trigger trigger = new CronTrigger(cronExpression, timeZone);
keyDataSens = String.format("deleteSensible-%s-%s", tenantIdentifier, data.getAgenId()); keyDataSens = String.format("deleteSensible-%s-%s", tenantIdentifier, data.getAgenId());
if (!futureMap.containsKey(keyDataSens)) { if (!futureMap.containsKey(keyDataSens)) {
keys.add(keyDataSens); keys.add(keyDataSens);
futureMap.put(keyDataSens, taskRegistrar.getScheduler() futureMap.put(keyDataSens, taskRegistrar.getScheduler()
.schedule(() -> scheduleCron(createJobDataSens(tenantIdentifier, data), tenantIdentifier), trigger)); .schedule(() -> scheduleCron(createJobDataSens(data), tenantIdentifier), trigger));
} }
} }
} else { } else {
...@@ -338,12 +350,19 @@ public class ScheduleService implements SchedulingConfigurer { ...@@ -338,12 +350,19 @@ public class ScheduleService implements SchedulingConfigurer {
} }
/* Métodos utilizados para ETL de dashboard */ /* Métodos utilizados para ETL de dashboard */
@Scheduled(cron = "${application.byte-bot.batch.cron}")
private Job createJob(String tenantIdentifier) { @SchedulerLock(name = "processJob", lockAtLeastFor = "30S", lockAtMostFor = "40S")
ThreadLocalStorage.setTenantName(tenantIdentifier); protected Job createJob() {
return jobBuilderFactory.get("processJob") List<Agent> agentDeployed = agentRepository.findByStatus(AgentStatusEnum.DEPLOYED);
.incrementer(new RunIdIncrementer()).listener(listener) if (!agentDeployed.isEmpty()) {
.flow(createStep(tenantIdentifier)).end().build(); ThreadLocalStorage.setTenantName(tenant);
return jobBuilderFactory.get("processJob")
.incrementer(new RunIdIncrementer()).listener(listener)
.flow(createStep(tenant)).end().build();
} else {
log.info("No hay agentes deployados.");
return null;
}
} }
private Step createStep(String tenantIdentifier) { private Step createStep(String tenantIdentifier) {
...@@ -374,13 +393,12 @@ public class ScheduleService implements SchedulingConfigurer { ...@@ -374,13 +393,12 @@ public class ScheduleService implements SchedulingConfigurer {
} }
/* Métodos ETL de data sensible */ /* Métodos ETL de data sensible */
private Job createJobDataSens(DeleteDataSensBean data) {
private Job createJobDataSens(String tenantIdentifier, DeleteDataSensBean data) {
System.out.println("ETL de eliminacion"); System.out.println("ETL de eliminacion");
ThreadLocalStorage.setTenantName(tenantIdentifier); ThreadLocalStorage.setTenantName(tenant);
return jobBuilderFactory.get( String.format("processDataSensible-%d",+data.getAgenId())) return jobBuilderFactory.get( String.format("processDataSensible-%d",data.getAgenId()))
.incrementer(new RunIdIncrementer()).listener(listener) .incrementer(new RunIdIncrementer()).listener(listener)
.flow(createStepDataSens(tenantIdentifier, data)).end().build(); .flow(createStepDataSens(tenant, data)).end().build();
} }
private Step createStepDataSens(String tenantIdentifier, DeleteDataSensBean data) { private Step createStepDataSens(String tenantIdentifier, DeleteDataSensBean data) {
...@@ -442,6 +460,7 @@ public class ScheduleService implements SchedulingConfigurer { ...@@ -442,6 +460,7 @@ public class ScheduleService implements SchedulingConfigurer {
return properties.getTenants().stream().filter(x -> x.getId().equals(tenantId)).findFirst(); return properties.getTenants().stream().filter(x -> x.getId().equals(tenantId)).findFirst();
} }
private void scheduleCron(Job job, String tenantId) { private void scheduleCron(Job job, String tenantId) {
UUID traceID = UUID.randomUUID(); UUID traceID = UUID.randomUUID();
Map<String, JobParameter> maps = new HashMap<>(); Map<String, JobParameter> maps = new HashMap<>();
...@@ -451,13 +470,15 @@ public class ScheduleService implements SchedulingConfigurer { ...@@ -451,13 +470,15 @@ public class ScheduleService implements SchedulingConfigurer {
JobParameters parameters = new JobParameters(maps); JobParameters parameters = new JobParameters(maps);
Set<JobExecution> jobExecutions = new HashSet<>(); Set<JobExecution> jobExecutions = new HashSet<>();
try { try {
jobExecutions = jobExplorer.findRunningJobExecutions(job.getName()); if (job != null) {
String JobInfo = String.format("Jobs {} en Ejecución: {}", job.getName(), jobExecutions.size()); jobExecutions = jobExplorer.findRunningJobExecutions(job.getName());
log.info(JobInfo); String JobInfo = String.format("Jobs {} en Ejecución: {}", job.getName(), jobExecutions.size());
if (jobExecutions.isEmpty()) { log.info(JobInfo);
asyncJobLauncher.run(job, parameters); if (jobExecutions.isEmpty()) {
} else { asyncJobLauncher.run(job, parameters);
log.info("El Job " + job.getName() + " no se ejecutará porque hay jobs pendientes: " + jobExecutions.size()); } else {
log.info("El Job " + job.getName() + " no se ejecutará porque hay jobs pendientes: " + jobExecutions.size());
}
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
......
package com.bytesw.bytebot.etl.batch.service;
import com.bytesw.bytebot.etl.batch.beans.DBConfig;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
public class SchedulerConfiguration {
@Bean
public LockProvider lockProvider(DataSource dataSource) {
DBConfig dbConfig = new DBConfig();
dataSource = dbConfig.dataSource();
return new JdbcTemplateLockProvider(dataSource);
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment