package com.ltkj.web.config.timer; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.ltkj.common.enums.DataSourceType; import com.ltkj.db.DataSourceConfig; import com.ltkj.db.DataSourceContextHolder; import com.ltkj.framework.datasource.DynamicDataSourceContextHolder; import com.ltkj.hosp.domain.DictHosp; import com.ltkj.hosp.domain.TjJcycxm; import com.ltkj.hosp.mapper.TestMapper; import com.ltkj.hosp.service.*; import com.ltkj.hosp.sqlDomain.*; import com.ltkj.system.service.ISysConfigService; import jodd.util.StringUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.time.Instant; import java.time.LocalTime; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; @Slf4j @Profile("!dev & !test") @Component public class AsyncResultTask { @Autowired private TaskScheduler taskScheduler; // 定时任务调度器 @Resource private ITjOrderService orderService; @Autowired private ISysConfigService configService; @Resource private LtkjExamJcbgdService jcbgdService; @Resource private LtkjExamJcsqdService jcsqdService; @Resource private LtkjHybgdService ltkjHybgdService; @Resource private LtkjHysqdService ltkjHysqdService; @Resource private TestMapper testMapper; @Autowired private TjJcycxmService jcycxmService; @Autowired private AwsService awsService; @Resource private IDictHospService dictHospService; @Autowired private DataSourceConfig dataSourceConfig; private final Map lockMap = new ConcurrentHashMap<>(); // 用来跟踪已调度任务的用户ID集合,避免重复添加任务 private final Set scheduledTasks = new HashSet<>(); private static final SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 初始化方法,启动时调用,获取所有用户并创建任务 @PostConstruct public void init() { DataSourceContextHolder.setDataSourceKey(DataSourceType.MASTER.name()); // 获取所有需要同步日志的用户列表 LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(DictHosp::getIsAutoAsyncJg,1); List hospList = dictHospService.list(wrapper); hospList.forEach(this::createTask); // 为每个用户创建任务 // 定时检查是否有新的用户需要添加任务 schedulePeriodicTaskCheck(); } // 日志文件名称列表 // 创建定时任务的方法,用户ID不存在任务时才会添加 public void createTask(DictHosp dictHosp) { log.info("进入创建任务方法 ->{}",dictHosp.getCode()); log.info("已有任务集合 ->{}", scheduledTasks); // 如果任务已经存在,则不重复添加 if (scheduledTasks.contains(dictHosp.getCode())) { return; } // 将该用户ID添加到已调度任务集合中 scheduledTasks.add(dictHosp.getCode()); // 创建定时任务,任务将在15分钟后执行 /* taskScheduler.schedule(() -> { try { } catch (Exception e) { log.error("创建任务时发生异常:{}", e.getMessage()); } }, Instant.now().plus(15, ChronoUnit.MINUTES));*/ log.info("任务创建成功 ->{}",dictHosp.getCode()); // 执行任务的超时控制 taskScheduler.schedule(() -> { executeTask(dictHosp); }, Instant.now().plus(15, ChronoUnit.MINUTES)); } private ReentrantLock getLock(String id) { return lockMap.computeIfAbsent(id, k -> new ReentrantLock()); } // 执行日志同步任务的方法 public void executeTask(DictHosp dictHosp) { // 获取锁,确保同一时间只有一个任务执行 log.info("{}执行结果同步加锁前",JSONUtil.toJsonStr(dictHosp)); ReentrantLock reentrantLock = getLock(dictHosp.getCode()); boolean lock = reentrantLock.tryLock(); try { log.info("{}执行结果同步加锁后",JSONUtil.toJsonStr(dictHosp)); log.info("{}执行结果同步", dictHosp.getCode()); dataSourceConfig.addDataSource(dictHosp.getDbname()); DataSourceContextHolder.setDataSourceKey(dictHosp.getDbname()); String dsqkssj = configService.selectConfigByKey("dsqkssj"); String dsqjssj = configService.selectConfigByKey("dsqjssj"); boolean panduandangqianshijian = panduandangqianshijian(dsqkssj, dsqjssj); if (!panduandangqianshijian) { if (lock) { log.info("{}任务开始执行时间 -> {}",dictHosp.getCode(), dateTimeFormat.format(new Date())); long start = System.currentTimeMillis(); // 查询任务数据 log.info("开始执行同步 执行院区->{}", dictHosp.getCode()); List jy = orderService.getDingShiTongBuJianYanTjNum(); List jc = orderService.getDingShiTongBuJianChaTjNum(); if (null != jy && !jy.isEmpty()) { log.info("{}同步检验人员 执行数量->{}{}",dictHosp.getCode(), jy.size(),JSONUtil.toJsonStr(jy)); for (String s : jy) { extractedjianyan(s,dictHosp.getCode()); } } if (null != jc && !jc.isEmpty()) { log.info("{}同步检查人员 执行数量->{}{}",dictHosp.getCode(), jc.size(),JSONUtil.toJsonStr(jc)); for (String s : jc) { extractedjiancha(s,dictHosp.getCode()); } } Map map = new HashMap<>(); testMapper.tjplUpdateDetatilByVi(map); Integer object = (Integer) map.get("total"); log.info("{}执行存储过程后出参->{}",dictHosp.getCode(), object); if (null != object && object.equals(1)) { if (null != jc && !jc.isEmpty()) { for (String s : jc) { jcycxmService.deletedByTjh(s); List list = jcsqdService.getLtkjJcsqdByTjh(s); if(null !=list && !list.isEmpty()){ for (LtkjExamJcsqd jcsqd : list) { if(StringUtil.isNotBlank(jcsqd.getJgzt())){ String regex = configService.selectConfigByKey("jcycxmzz"); if(StringUtil.isBlank(regex)) regex= "。|;|;|,|,"; String[] split = jcsqd.getJgzt().replaceAll("\n", "").split(regex); for (String jg : split) { if(StringUtil.isNotBlank(jg) && !jg.contains("未见异常") && !jg.contains("未见明显异常") && !jg.contains("未见占位") && !jg.contains("未见") && !jg.contains("未见明显")&& !jg.contains("正常")&& !jg.contains("双侧椎间孔无狭窄")&& !jg.contains("无殊")){ TjJcycxm jcycxm=new TjJcycxm(); jcycxm.setTjh(s); jcycxm.setYqid(dictHosp.getCode()); String[] split1 = jcsqd.getJcxmid().split(";"); jcycxm.setProId(split1[0]); jcycxm.setProName(jcsqd.getJcxmmc()); jcycxm.setCreateTime(new Date()); jcycxm.setJcjg(jg); jcycxmService.save(jcycxm); } } } } } } } long end = System.currentTimeMillis(); log.info("{}同步结束 耗时:{}秒",dictHosp.getCode(), (end - start) / 1000); }else { log.info("执行存储过程出参失败"); } } }else { log.info("不在定时器执行时间范围之内!!!"); } } catch (Exception e) { log.error("定时器执行报错"); log.error(String.valueOf(e),e.getMessage()); } finally { reentrantLock.unlock(); scheduledTasks.remove(dictHosp.getCode()); // DataSourceContextHolder.setDataSourceKey(DataSourceType.MASTER.name()); // DataSourceContextHolder.clear(); log.info("任务{}已释放锁",dictHosp.getCode()); } } private boolean panduandangqianshijian(String kssj, String jssj) { ZonedDateTime now = ZonedDateTime.now(); LocalTime time = now.toLocalTime(); return time.isAfter(LocalTime.of(Integer.parseInt(kssj), 0)) && time.isBefore(LocalTime.of(Integer.parseInt(jssj), 0)); } // 定时检查是否有新的用户需要添加任务 private void schedulePeriodicTaskCheck() { checkAndAddNewTasks(); } // 检查并添加新的用户任务 public void checkAndAddNewTasks() { DataSourceContextHolder.setDataSourceKey(DataSourceType.MASTER.name()); // 获取所有需要同步日志的用户列表 LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(DictHosp::getIsAutoAsyncJg,1); List hospList = dictHospService.list(wrapper); log.info("检查并添加需要同步的医院列表:{}", JSONUtil.toJsonStr(hospList)); hospList.forEach(this::createTask); taskScheduler.schedule(this::checkAndAddNewTasks, Instant.now().plus(2, ChronoUnit.MINUTES)); } private void extractedjianyan(String s,String hospId) { try { List awsList = testMapper.getWsxmLtkjHysqdByLisViBySLAVEWS(s); if(null !=awsList && !awsList.isEmpty()){ log.info(awsList.toString()); testMapper.delWsxmjg(s); log.info("删除外送项目成功"); awsService.saveBatch(awsList); log.info("保存外送项目成功"); } List wssq = testMapper.getWsxmLtkjHysqdByLisVi(s); if(null !=wssq && !wssq.isEmpty()){ for (LtkjHysqd hysqd : wssq) { LtkjHysqd jybgid = ltkjHysqdService.getLtkjHysqdByTjhAndTmh(hysqd.getTjh(), hysqd.getTmh()); if (null != jybgid) { ltkjHysqdService.deletedLtkjHysqdByTjhAndTmh(hysqd.getTjh(), hysqd.getTmh()); ltkjHybgdService.deletedLtkjHybgdByTjhAndTmh(hysqd.getTmh()); } if (ltkjHysqdService.save(hysqd)) { List wsbg = testMapper.getWsxmLtkjHybgdByLisVi(s); ltkjHybgdService.saveBatch(wsbg); } } } String isWsByView = configService.selectConfigByKey("is_ws_by_view"); if (StrUtil.isNotBlank(isWsByView) && isWsByView.equals("Y")){ List sqd = testMapper.getWsHySqdByView(s); if (sqd != null && !sqd.isEmpty()){ for (LtkjHysqd hysqd : sqd) { LtkjHysqd jybgid = ltkjHysqdService.getLtkjHysqdByTjhAndTmh(hysqd.getTjh(), hysqd.getTmh()); if (null != jybgid) { ltkjHysqdService.deletedLtkjHysqdByTjhAndTmh(hysqd.getTjh(), hysqd.getTmh()); ltkjHybgdService.deletedLtkjHybgdByTjhAndTmh(hysqd.getTmh()); } if (ltkjHysqdService.save(hysqd)) { List wsbg = testMapper.getWsHyBgdByView(hysqd.getTmh()); ltkjHybgdService.saveBatch(wsbg); } } } } List hysqdList = testMapper.getCcXZxYyLtkjHysqdByLisVi(s,hospId); if (null != hysqdList && !hysqdList.isEmpty()) { for (LtkjHysqd hysqd : hysqdList) { LtkjHysqd jybgid = ltkjHysqdService.getLtkjHysqdByTjhAndTmh(hysqd.getTjh(), hysqd.getTmh()); if (null != jybgid) { ltkjHysqdService.deletedLtkjHysqdByTjhAndTmh(hysqd.getTjh(), hysqd.getTmh()); ltkjHybgdService.deletedLtkjHybgdByTjhAndTmh(hysqd.getTmh()); } if (ltkjHysqdService.save(hysqd)) { List hybgdList = testMapper.getCcXZxYyLtkjHybgdByLisVi(hysqd.getTmh()); // List wsbg = testMapper.getWsxmLtkjHybgdByLisVi(s); // if(null !=wsbg && !wsbg.isEmpty())hybgdList.addAll(wsbg); ltkjHybgdService.saveBatch(hybgdList); } } DynamicDataSourceContextHolder.clearDataSourceType(); } } catch (Exception e) { log.info("同步检验数据失败"); // throw new RuntimeException(e); log.error(String.valueOf(e)); } } private void extractedjiancha(String s,String hospId) { try { List jcsqdList = testMapper.getCcXZxYyPacsLtkjExamJcsqd(s,hospId); log.info("{}自动同步检查申请单数据:{}",hospId,JSONUtil.toJsonStr(jcsqdList)); if (null != jcsqdList && !jcsqdList.isEmpty()) { DynamicDataSourceContextHolder.clearDataSourceType(); jcsqdService.deletedLtkjJcsqdByTjhAndTmh(s); jcbgdService.deletedLtkjJcbgdByTjhAndTmh(s); jcsqdService.saveBatch(jcsqdList); List jcbgdList = testMapper.getCcXZxYyPacsLtkjExamJcbgd(s,hospId); // DynamicDataSourceContextHolder.clearDataSourceType(); log.info("{}自动同步检查报告单数据:{}",hospId,JSONUtil.toJsonStr(jcbgdList)); if (null != jcbgdList && !jcbgdList.isEmpty()) jcbgdService.saveBatch(jcbgdList); DynamicDataSourceContextHolder.clearDataSourceType(); } } catch (Exception e) { log.info("同步检查数据失败"); // throw new RuntimeException(e); log.error(String.valueOf(e)); } } }