스케줄러 구조 리팩토링
This commit is contained in:
@@ -1,433 +1,56 @@
|
||||
package com.caliverse.admin.scheduler;
|
||||
|
||||
import com.caliverse.admin.scheduler.config.MissedExecutionConfig;
|
||||
import com.caliverse.admin.scheduler.config.ScheduleExecutionConfig;
|
||||
import com.caliverse.admin.scheduler.service.NoticeScheduler;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.*;
|
||||
import com.caliverse.admin.scheduler.entity.SchedulerType;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.LocalTime;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class SchedulerManager {
|
||||
private final ScheduledExecutorService scheduler;
|
||||
|
||||
// 실행 중인 작업 추적을 위한 Map
|
||||
private final Map<String, ScheduledTaskInfo> scheduledTasks = new ConcurrentHashMap<>();
|
||||
|
||||
// 재시도 설정
|
||||
private static final int MAX_RETRY_COUNT = 3;
|
||||
private static final long RETRY_DELAY_MS = 1000;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
//스케줄 프로세스 관리
|
||||
private static class ScheduledTaskInfo {
|
||||
private final Scheduler task;
|
||||
private final ScheduledFuture<?> future;
|
||||
private final LocalDateTime startTime;
|
||||
private int retryCount;
|
||||
private ScheduleStatus status;
|
||||
// 실행 중인 작업 추적을 위한 Map
|
||||
private final Map<SchedulerType, Scheduler> schedulers = new ConcurrentHashMap<>();
|
||||
|
||||
public static ScheduledTaskInfo of(Scheduler task, ScheduledFuture<?> future) {
|
||||
return ScheduledTaskInfo.builder()
|
||||
.task(task)
|
||||
.future(future)
|
||||
.startTime(LocalDateTime.now())
|
||||
.retryCount(0)
|
||||
.status(ScheduleStatus.SCHEDULED)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum ScheduleStatus {
|
||||
SCHEDULED("예약됨"),
|
||||
RUNNING("실행 중"),
|
||||
COMPLETED("완료됨"),
|
||||
FAILED("실패"),
|
||||
CANCELLED("취소됨");
|
||||
|
||||
private final String description;
|
||||
}
|
||||
|
||||
//스케줄 실행
|
||||
public void scheduleTask(Scheduler task) {
|
||||
String taskId = task.getSchedulerConfig().getTaskId();
|
||||
|
||||
// 이미 존재하는 작업 체크
|
||||
if (scheduledTasks.containsKey(taskId)) {
|
||||
ScheduledTaskInfo existingTask = scheduledTasks.get(taskId);
|
||||
if (shouldReplaceExisting(existingTask)) {
|
||||
log.info("Replacing existing task: {}", taskId);
|
||||
cancelTask(taskId);
|
||||
} else {
|
||||
log.warn("Task {} is already scheduled and running", taskId);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
//타입별 스케줄러 등록
|
||||
ScheduledFuture<?> future = scheduleTaskByType(task);
|
||||
if (future != null) {
|
||||
ScheduledTaskInfo taskInfo = ScheduledTaskInfo.of(task, future);
|
||||
scheduledTasks.put(taskId, taskInfo);
|
||||
log.info("Successfully scheduled task: {}", taskId);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to schedule task: {}", taskId, e);
|
||||
throw new SchedulerException("Failed to schedule task: " + taskId, e);
|
||||
}
|
||||
}
|
||||
|
||||
//스케줄 상태 체크
|
||||
private boolean shouldReplaceExisting(ScheduledTaskInfo taskInfo) {
|
||||
return taskInfo.getStatus() == ScheduleStatus.FAILED ||
|
||||
taskInfo.getStatus() == ScheduleStatus.COMPLETED ||
|
||||
taskInfo.getFuture().isDone();
|
||||
}
|
||||
|
||||
//스케줄 타입 별 처리
|
||||
private ScheduledFuture<?> scheduleTaskByType(Scheduler task) {
|
||||
return switch (task.getScheduleType()) {
|
||||
case IMMEDIATE -> scheduleImmediate(task);
|
||||
case DELAYED -> scheduleDelayed(task);
|
||||
case RECURRING -> scheduleRecurring(task);
|
||||
case PERIODIC -> schedulePeriodic(task);
|
||||
};
|
||||
}
|
||||
|
||||
//즉시 실행
|
||||
private ScheduledFuture<?> scheduleImmediate(Scheduler task) {
|
||||
return (ScheduledFuture<?>) scheduler.submit(() -> executeTaskWithRetry(task));
|
||||
}
|
||||
|
||||
//지연 실행
|
||||
private ScheduledFuture<?> scheduleDelayed(Scheduler task) {
|
||||
ScheduleExecutionConfig config = task.getSchedulerConfig();
|
||||
long delay = calculateDelay(config.getStartTime());
|
||||
|
||||
if (delay < 0) {
|
||||
log.error("Task {} scheduled for past time", config.getTaskId());
|
||||
return null;
|
||||
}
|
||||
|
||||
return scheduler.schedule(
|
||||
() -> executeTaskWithRetry(task),
|
||||
delay,
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
}
|
||||
|
||||
//반복 실행
|
||||
private ScheduledFuture<?> scheduleRecurring(Scheduler task) {
|
||||
ScheduleExecutionConfig config = task.getSchedulerConfig();
|
||||
long initialDelay = calculateDelay(config.getStartTime());
|
||||
|
||||
if (initialDelay < 0) {
|
||||
Duration timeSinceStart = Duration.between(config.getStartTime(), LocalDateTime.now());
|
||||
|
||||
// 종료 시간 체크
|
||||
if (config.getEndTime() != null && LocalDateTime.now().isAfter(config.getEndTime())) {
|
||||
log.warn("Task {} scheduled end time has passed", config.getTaskId());
|
||||
return null;
|
||||
}
|
||||
|
||||
// 다음 실행 시간 계산
|
||||
long interval = config.getInterval().toMillis();
|
||||
long missedExecutions = timeSinceStart.toMillis() / interval;
|
||||
initialDelay = interval - (timeSinceStart.toMillis() % interval);
|
||||
|
||||
log.info("Task {} was scheduled to start {} ago. Missed {} executions. " +
|
||||
"Next execution in {} ms",
|
||||
config.getTaskId(),
|
||||
timeSinceStart,
|
||||
missedExecutions,
|
||||
initialDelay);
|
||||
|
||||
if (shouldProcessMissedExecutions(task)) {
|
||||
processMissedExecutions(task, (int) missedExecutions);
|
||||
}
|
||||
}
|
||||
|
||||
return scheduler.scheduleAtFixedRate(
|
||||
() -> {
|
||||
String taskId = config.getTaskId();
|
||||
ScheduledTaskInfo taskInfo = scheduledTasks.get(taskId);
|
||||
|
||||
try {
|
||||
// NoticeSchedule 경우 추가 검증
|
||||
if (task instanceof NoticeScheduler noticeTask) {
|
||||
if (noticeTask.hasReachedMaxExecutions()) {
|
||||
cancelTask(taskId);
|
||||
log.info("Cancelling notice task - reached max executions. TaskId: {}", taskId);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!noticeTask.isValidExecutionCount()) {
|
||||
log.error("Invalid execution count detected. TaskId: {}", taskId);
|
||||
cancelTask(taskId);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (shouldStopRecurring(task)) {
|
||||
cancelTask(taskId);
|
||||
return;
|
||||
}
|
||||
|
||||
taskInfo.setStatus(ScheduleStatus.RUNNING);
|
||||
task.execute();
|
||||
taskInfo.setStatus(ScheduleStatus.SCHEDULED);
|
||||
|
||||
} catch (Exception e) {
|
||||
handleTaskExecutionFailure(task, taskInfo, e);
|
||||
}
|
||||
},
|
||||
initialDelay,
|
||||
config.getInterval().toMillis(),
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
}
|
||||
|
||||
//주기적 실행
|
||||
private ScheduledFuture<?> schedulePeriodic(Scheduler task){
|
||||
ScheduleExecutionConfig config = task.getSchedulerConfig();
|
||||
LocalTime executionTime = config.getDailyExecutionTime();
|
||||
|
||||
if (executionTime == null) {
|
||||
log.error("Daily execution time is not set for task: {}", config.getTaskId());
|
||||
return null;
|
||||
}
|
||||
|
||||
// 현재 시간
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
|
||||
// 시작일이 미래인 경우, 시작일의 지정 시간을 첫 실행 시간으로 설정
|
||||
// 시작일이 과거인 경우, 오늘 또는 다음 날의 지정 시간을 첫 실행 시간으로 설정
|
||||
LocalDateTime firstExecution = calculateFirstExecution(now, config.getStartTime(), executionTime);
|
||||
|
||||
// 종료 시간 확인
|
||||
if (config.getEndTime() != null && firstExecution.isAfter(config.getEndTime())) {
|
||||
log.warn("Task {} end time has already passed", config.getTaskId());
|
||||
return null;
|
||||
}
|
||||
|
||||
// 첫 실행까지의 지연 시간 계산
|
||||
long initialDelay = Duration.between(now, firstExecution).toMillis();
|
||||
|
||||
// 24시간을 밀리초로 변환
|
||||
long dailyInterval = Duration.ofDays(1).toMillis();
|
||||
|
||||
return scheduler.scheduleAtFixedRate(
|
||||
() -> executeDaily(task),
|
||||
initialDelay,
|
||||
dailyInterval,
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
}
|
||||
|
||||
private LocalDateTime calculateFirstExecution(
|
||||
LocalDateTime now,
|
||||
LocalDateTime startTime,
|
||||
LocalTime executionTime
|
||||
) {
|
||||
LocalDateTime candidateTime = now.with(executionTime);
|
||||
|
||||
// 시작 시간이 미래인 경우
|
||||
if (startTime.isAfter(now)) {
|
||||
candidateTime = startTime.with(executionTime);
|
||||
}
|
||||
|
||||
// 만약 오늘의 실행 시간이 이미 지났다면 다음 날로 설정
|
||||
if (candidateTime.isBefore(now)) {
|
||||
candidateTime = candidateTime.plusDays(1);
|
||||
}
|
||||
|
||||
return candidateTime;
|
||||
}
|
||||
|
||||
private void executeDaily(Scheduler task) {
|
||||
String taskId = task.getSchedulerConfig().getTaskId();
|
||||
ScheduledTaskInfo taskInfo = scheduledTasks.get(taskId);
|
||||
|
||||
try {
|
||||
// 현재 시간이 종료 시간을 지났는지 확인
|
||||
if (shouldStopDaily(task)) {
|
||||
cancelTask(taskId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 지정된 시간에만 실행
|
||||
LocalTime currentTime = LocalTime.now();
|
||||
LocalTime executionTime = task.getSchedulerConfig().getDailyExecutionTime();
|
||||
|
||||
if (currentTime.getHour() == executionTime.getHour() &&
|
||||
currentTime.getMinute() == executionTime.getMinute()) {
|
||||
|
||||
taskInfo.setStatus(ScheduleStatus.RUNNING);
|
||||
task.execute();
|
||||
taskInfo.setStatus(ScheduleStatus.SCHEDULED);
|
||||
|
||||
log.info("Daily task {} executed at scheduled time: {}",
|
||||
taskId, executionTime);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
handleTaskExecutionFailure(task, taskInfo, e);
|
||||
}
|
||||
}
|
||||
|
||||
//종료일자 체크
|
||||
private boolean shouldStopDaily(Scheduler task) {
|
||||
ScheduleExecutionConfig config = task.getSchedulerConfig();
|
||||
if (config.getEndTime() == null) {
|
||||
return false;
|
||||
}
|
||||
return LocalDateTime.now().isAfter(config.getEndTime());
|
||||
}
|
||||
|
||||
private boolean shouldProcessMissedExecutions(Scheduler task) {
|
||||
// 작업 유형에 따라 누락된 실행 처리 여부 결정
|
||||
return task.getSchedulerConfig().isProcessMissedExecutions();
|
||||
}
|
||||
|
||||
private void processMissedExecutions(Scheduler task, int missedCount) {
|
||||
// 누락된 실행 처리를 위한 설정 가져오기
|
||||
MissedExecutionConfig missedConfig = task.getSchedulerConfig().getMissedExecutionConfig();
|
||||
if (missedConfig == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
int executionsToProcess = Math.min(
|
||||
missedCount,
|
||||
missedConfig.getMaxMissedExecutionsToProcess()
|
||||
public SchedulerManager(List<Scheduler> schedulerList) {
|
||||
schedulerList.forEach(scheduler ->
|
||||
schedulers.put(scheduler.getSchedulerType(), scheduler)
|
||||
);
|
||||
|
||||
if (executionsToProcess <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
log.info("Processing {} missed executions for task {}", executionsToProcess, task.getSchedulerConfig().getTaskId());
|
||||
|
||||
for (int i = 0; i < executionsToProcess; i++) {
|
||||
if (missedConfig.isSequentialProcessing()) {
|
||||
task.execute();
|
||||
} else {
|
||||
// 마지막 실행만 처리
|
||||
if (i == executionsToProcess - 1) {
|
||||
task.execute();
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Error processing missed executions for task {}",
|
||||
task.getSchedulerConfig().getTaskId(), e);
|
||||
}
|
||||
}, scheduler);
|
||||
log.info("Initialized {} schedulers: {}",
|
||||
schedulerList.size(),
|
||||
schedulerList.stream()
|
||||
.map(Scheduler::getSchedulerType)
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
private void executeTaskWithRetry(Scheduler task) {
|
||||
String taskId = task.getSchedulerConfig().getTaskId();
|
||||
ScheduledTaskInfo taskInfo = scheduledTasks.get(taskId);
|
||||
|
||||
try {
|
||||
taskInfo.setStatus(ScheduleStatus.RUNNING);
|
||||
task.execute();
|
||||
taskInfo.setStatus(ScheduleStatus.COMPLETED);
|
||||
log.info("Task {} completed successfully", taskId);
|
||||
|
||||
} catch (Exception e) {
|
||||
handleTaskExecutionFailure(task, taskInfo, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleTaskExecutionFailure(Scheduler task, ScheduledTaskInfo taskInfo, Exception e) {
|
||||
String taskId = task.getSchedulerConfig().getTaskId();
|
||||
taskInfo.setRetryCount(taskInfo.getRetryCount() + 1);
|
||||
|
||||
if (taskInfo.getRetryCount() < MAX_RETRY_COUNT) {
|
||||
log.warn("Task {} failed, attempting retry {}/{}",
|
||||
taskId, taskInfo.getRetryCount(), MAX_RETRY_COUNT, e);
|
||||
|
||||
scheduler.schedule(
|
||||
() -> executeTaskWithRetry(task),
|
||||
RETRY_DELAY_MS,
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
public void executeScheduler(SchedulerType type) {
|
||||
Scheduler scheduler = schedulers.get(type);
|
||||
if (scheduler != null) {
|
||||
scheduler.execute();
|
||||
} else {
|
||||
log.error("Task {} failed after {} retries", taskId, MAX_RETRY_COUNT, e);
|
||||
taskInfo.setStatus(ScheduleStatus.FAILED);
|
||||
cancelTask(taskId);
|
||||
log.warn("No scheduler found for type: {}", type);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean shouldStopRecurring(Scheduler task) {
|
||||
ScheduleExecutionConfig config = task.getSchedulerConfig();
|
||||
|
||||
// 종료 시간 체크
|
||||
if (config.getEndTime() != null && LocalDateTime.now().isAfter(config.getEndTime())) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 반복 횟수 체크
|
||||
if (config.getRepeatCount() != null) {
|
||||
String taskId = config.getTaskId();
|
||||
ScheduledTaskInfo taskInfo = scheduledTasks.get(taskId);
|
||||
return taskInfo.getRetryCount() >= config.getRepeatCount();
|
||||
}
|
||||
|
||||
return false;
|
||||
public void executeAllSchedulers() {
|
||||
schedulers.values().forEach(Scheduler::execute);
|
||||
}
|
||||
|
||||
public void cancelTask(String taskId) {
|
||||
ScheduledTaskInfo taskInfo = scheduledTasks.get(taskId);
|
||||
if (taskInfo != null) {
|
||||
taskInfo.getFuture().cancel(false);
|
||||
taskInfo.setStatus(ScheduleStatus.CANCELLED);
|
||||
scheduledTasks.remove(taskId);
|
||||
log.info("Task {} cancelled and removed from scheduler", taskId);
|
||||
}
|
||||
public void addScheduler(Scheduler scheduler) {
|
||||
schedulers.put(scheduler.getSchedulerType(), scheduler);
|
||||
log.info("Added scheduler: {}", scheduler.getSchedulerType());
|
||||
}
|
||||
|
||||
public void cancelAllTasks() {
|
||||
scheduledTasks.keySet().forEach(this::cancelTask);
|
||||
}
|
||||
|
||||
private long calculateDelay(LocalDateTime startTime) {
|
||||
return Duration.between(LocalDateTime.now(), startTime).toMillis();
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void shutdown() {
|
||||
cancelAllTasks();
|
||||
scheduler.shutdown();
|
||||
try {
|
||||
if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
|
||||
scheduler.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
scheduler.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
// 스케줄러 관련 예외 클래스
|
||||
public static class SchedulerException extends RuntimeException {
|
||||
public SchedulerException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
public void removeScheduler(SchedulerType type) {
|
||||
schedulers.remove(type);
|
||||
log.info("Removed scheduler: {}", type);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user