Commit 3a23a4cb by zhangxingmin

push

parent 6945ad0a
......@@ -24,13 +24,12 @@ public class ApiNotificationTaskController implements ApiNotificationTaskFeignCl
private ApiNotificationTaskService apiNotificationTaskService;
/**
* 发送企业微信消息
* 通用发送接口(支持所有渠道)
* @param request
* @return
*/
@Override
public Result<ApiSendResponse> sendWecomMessage(ApiSendRequest request) {
return apiNotificationTaskService.sendWecomMessage(request);
public Result<ApiSendResponse> send(@RequestBody @Validated ApiSendRequest request) {
return apiNotificationTaskService.send(request);
}
// /**
......
......@@ -5,5 +5,5 @@ import com.yd.notice.feign.request.ApiSendRequest;
import com.yd.notice.feign.response.ApiSendResponse;
public interface ApiNotificationTaskService {
Result<ApiSendResponse> sendWecomMessage(ApiSendRequest request);
Result<ApiSendResponse> send(ApiSendRequest request);
}
......@@ -8,7 +8,6 @@ import com.yd.notice.service.service.INotificationTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
......@@ -18,13 +17,12 @@ public class ApiNotificationTaskServiceImpl implements ApiNotificationTaskServic
private INotificationTaskService iNotificationTaskService;
/**
* 发送企业微信消息
* 通用发送接口(支持所有渠道)
* @param request
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class)
public Result<ApiSendResponse> sendWecomMessage(ApiSendRequest request) {
public Result<ApiSendResponse> send(ApiSendRequest request) {
String taskBizId = iNotificationTaskService.createAndSendTask(
request.getChannelBizId(),
request.getTemplateBizId(),
......@@ -36,5 +34,4 @@ public class ApiNotificationTaskServiceImpl implements ApiNotificationTaskServic
return Result.success(response);
}
}
......@@ -16,14 +16,13 @@ import org.springframework.web.bind.annotation.RequestBody;
@FeignClient(name = "yd-notice-api",path = "/notice/api/notificationTask",fallbackFactory = ApiNotificationTaskFeignFallbackFactory.class)
public interface ApiNotificationTaskFeignClient {
/**
* 发送企业微信消息
* 通用发送接口(支持所有渠道)
* @param request
* @return
*/
@PostMapping("/send/wecom")
Result<ApiSendResponse> sendWecomMessage(@Validated @RequestBody ApiSendRequest request);
@PostMapping("/send")
Result<ApiSendResponse> send(@Validated @RequestBody ApiSendRequest request);
// /**
// * 查询消息任务状态
......
......@@ -90,11 +90,28 @@
<version>4.6.0</version>
</dependency>
<!-- Redis 依赖(用于 AccessToken 缓存) -->
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-data-redis</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>com.sun.mail</groupId>
<artifactId>javax.mail</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.6.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-dysmsapi</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-openapi</artifactId>
<version>0.3.6</version>
</dependency>
<!-- Hutool 工具类(可选) -->
<dependency>
......
package com.yd.notice.service.send;
import com.alibaba.fastjson.JSONObject;
import com.yd.notice.service.model.ChannelConfig;
import com.yd.notice.service.model.NotificationTask;
import com.yd.notice.service.service.IChannelConfigService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.mail.javamail.JavaMailSenderImpl;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.stereotype.Component;
import javax.mail.internet.MimeMessage;
import java.util.Properties;
@Slf4j
@Component
@RequiredArgsConstructor
public class EmailMessageSender implements MessageSender {
private final IChannelConfigService channelConfigService;
@Override
public SendResult send(NotificationTask task) {
try {
ChannelConfig config = channelConfigService.getByChannelBizId(task.getChannelBizId());
if (config == null || config.getStatus() != 1) {
return SendResult.failNonRetryable("CONFIG_NOT_EXIST", "渠道配置不存在或已禁用");
}
JSONObject emailConfig = JSONObject.parseObject(config.getConfigValue());
String host = emailConfig.getString("host");
int port = emailConfig.getInteger("port");
String username = emailConfig.getString("username");
String password = emailConfig.getString("password");
String from = emailConfig.getString("from");
JavaMailSenderImpl mailSender = new JavaMailSenderImpl();
mailSender.setHost(host);
mailSender.setPort(port);
mailSender.setUsername(username);
mailSender.setPassword(password);
Properties props = mailSender.getJavaMailProperties();
props.put("mail.transport.protocol", "smtp");
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.starttls.enable", "true");
props.put("mail.smtp.ssl.enable", port == 465);
MimeMessage mimeMessage = mailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(mimeMessage, true, "UTF-8");
helper.setFrom(from);
helper.setTo(task.getReceiver());
helper.setSubject(task.getTitle());
helper.setText(task.getContent(), true); // true表示HTML内容
mailSender.send(mimeMessage);
log.info("邮件发送成功, taskBizId={}, receiver={}", task.getTaskBizId(), task.getReceiver());
return SendResult.success();
} catch (Exception e) {
log.error("邮件发送异常, taskBizId={}", task.getTaskBizId(), e);
// 邮件发送异常一般可重试(网络、超时等),但配置错误不可重试
if (e.getMessage() != null && e.getMessage().contains("Authentication failed")) {
return SendResult.failNonRetryable("AUTH_FAILED", e.getMessage());
}
return SendResult.failRetryable("SEND_EXCEPTION", e.getMessage());
}
}
@Override
public String getSupportedChannelType() {
return "email";
}
}
\ No newline at end of file
package com.yd.notice.service.send;
import com.yd.notice.service.model.NotificationTask;
/**
* 消息发送器接口
*/
public interface MessageSender {
/**
* 发送消息
* @param task 消息任务(含标题、内容、接收人、渠道ID等)
* @return 发送结果(成功/失败+是否可重试)
*/
SendResult send(NotificationTask task);
/**
* 支持的渠道类型,与 channel_config.channel_type 对应
* 例如:wecom, sms, email, wechat, miniprogram, app
*/
String getSupportedChannelType();
}
\ No newline at end of file
package com.yd.notice.service.send;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 发送器路由工厂
*/
@Slf4j
@Component
public class MessageSenderRouter {
private final Map<String, MessageSender> senderMap = new ConcurrentHashMap<>();
private final Map<String, MessageSender> senderCache = new ConcurrentHashMap<>();
// Spring 会自动注入所有 MessageSender 的实现 Bean
public MessageSenderRouter(Map<String, MessageSender> senders) {
for (MessageSender sender : senders.values()) {
String type = sender.getSupportedChannelType();
if (senderMap.containsKey(type)) {
log.warn("渠道类型 {} 重复注册,旧: {}, 新: {}", type, senderMap.get(type).getClass().getName(), sender.getClass().getName());
}
senderMap.put(type, sender);
log.info("注册消息发送器: {} -> {}", type, sender.getClass().getSimpleName());
}
}
public MessageSender getSender(String channelType) {
MessageSender sender = senderCache.get(channelType);
if (sender != null) {
return sender;
}
sender = senderMap.get(channelType);
if (sender == null) {
throw new IllegalArgumentException("不支持的渠道类型: " + channelType);
}
senderCache.put(channelType, sender);
return sender;
}
}
\ No newline at end of file
package com.yd.notice.service.send;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SendResult {
private boolean success;
private boolean retryable; // true: 可重试, false: 永久失败
private String errorCode;
private String errorMsg;
public static SendResult success() {
return new SendResult(true, false, null, null);
}
public static SendResult failRetryable(String errorCode, String errorMsg) {
return new SendResult(false, true, errorCode, errorMsg);
}
public static SendResult failNonRetryable(String errorCode, String errorMsg) {
return new SendResult(false, false, errorCode, errorMsg);
}
}
\ No newline at end of file
package com.yd.notice.service.send;
import com.alibaba.fastjson.JSONObject;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsRequest;
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsResponse;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import com.yd.notice.service.model.ChannelConfig;
import com.yd.notice.service.model.NotificationTask;
import com.yd.notice.service.service.IChannelConfigService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class SmsMessageSender implements MessageSender {
private final IChannelConfigService channelConfigService;
@Override
public SendResult send(NotificationTask task) {
try {
ChannelConfig config = channelConfigService.getByChannelBizId(task.getChannelBizId());
if (config == null || config.getStatus() != 1) {
return SendResult.failNonRetryable("CONFIG_NOT_EXIST", "渠道配置不存在或已禁用");
}
JSONObject smsConfig = JSONObject.parseObject(config.getConfigValue());
String accessKeyId = smsConfig.getString("accessKeyId");
String accessSecret = smsConfig.getString("accessSecret");
String signName = smsConfig.getString("signName");
String templateCode = smsConfig.getString("templateCode");
// 创建旧版客户端
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", accessKeyId, accessSecret);
IAcsClient client = new DefaultAcsClient(profile);
SendSmsRequest request = new SendSmsRequest();
request.setPhoneNumbers(task.getReceiver());
request.setSignName(signName);
request.setTemplateCode(templateCode);
// 模板参数必须是 JSON 字符串,例如 {"code":"123456"}
request.setTemplateParam(task.getContent());
SendSmsResponse response = client.getAcsResponse(request);
String code = response.getCode();
if ("OK".equals(code)) {
log.info("短信发送成功, taskBizId={}, receiver={}", task.getTaskBizId(), task.getReceiver());
return SendResult.success();
} else {
String errMsg = response.getMessage();
log.error("短信发送失败, taskBizId={}, errCode={}, errMsg={}", task.getTaskBizId(), code, errMsg);
if ("isv.MOBILE_NUMBER_ILLEGAL".equals(code) || "isv.INVALID_PARAMETERS".equals(code)) {
return SendResult.failNonRetryable(code, errMsg);
}
return SendResult.failRetryable(code, errMsg);
}
} catch (ClientException e) {
log.error("短信发送异常, taskBizId={}", task.getTaskBizId(), e);
String errCode = e.getErrCode();
if ("isv.MOBILE_NUMBER_ILLEGAL".equals(errCode)) {
return SendResult.failNonRetryable(errCode, e.getMessage());
}
return SendResult.failRetryable(errCode, e.getMessage());
} catch (Exception e) {
log.error("短信发送异常, taskBizId={}", task.getTaskBizId(), e);
return SendResult.failRetryable("SEND_EXCEPTION", e.getMessage());
}
}
@Override
public String getSupportedChannelType() {
return "sms";
}
}
\ No newline at end of file
......@@ -2,49 +2,55 @@ package com.yd.notice.service.send;
import com.alibaba.fastjson.JSONObject;
import com.yd.notice.service.model.ChannelConfig;
import com.yd.notice.service.service.IChannelConfigService;
import me.chanjar.weixin.cp.api.impl.WxCpServiceImpl;
import me.chanjar.weixin.cp.bean.article.NewArticle;
import com.yd.notice.service.model.NotificationTask;
import com.yd.notice.service.service.IChannelConfigService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.error.WxErrorException;
import me.chanjar.weixin.cp.api.WxCpService;
import me.chanjar.weixin.cp.api.impl.WxCpServiceImpl;
import me.chanjar.weixin.cp.bean.message.WxCpMessage;
import me.chanjar.weixin.cp.config.impl.WxCpDefaultConfigImpl;
import org.springframework.stereotype.Service;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
@Slf4j
@Service
@Component
@RequiredArgsConstructor
public class WecomMessageSender {
public class WecomMessageSender implements MessageSender {
private final IChannelConfigService channelConfigService;
/**
* 发送文本消息(动态获取渠道配置)
*/
public boolean sendTextMessage(NotificationTask task) {
// 不可重试的错误码(永久失败)
private static final List<String> NON_RETRYABLE_ERROR_CODES = Arrays.asList(
"81013", // UserID 无效或不存在
"60020", // 应用无权限发送给该用户
"60102", // UserID 不存在
"40003", // 不合法的 UserID
"40014" // 不合法的 access_token
);
@Override
public SendResult send(NotificationTask task) {
try {
// 1. 根据 channelBizId 查询渠道配置
// 1. 查询渠道配置
ChannelConfig config = channelConfigService.getByChannelBizId(task.getChannelBizId());
if (config == null || config.getStatus() != 1) {
log.error("渠道配置不存在或已禁用,channelBizId: {}", task.getChannelBizId());
return false;
return SendResult.failNonRetryable("CONFIG_NOT_EXIST", "渠道配置不存在或已禁用");
}
if (!"wecom".equals(config.getChannelType())) {
log.error("渠道类型不是企业微信,channelBizId: {}", task.getChannelBizId());
return false;
return SendResult.failNonRetryable("CHANNEL_TYPE_ERROR", "渠道类型不是企业微信");
}
// 2. 解析 config_value(JSON格式,需解密)
String configValueJson = config.getConfigValue();
// TODO: 若 config_value 是加密的,此处需要调用解密工具解密
JSONObject wecomConfig = JSONObject.parseObject(configValueJson);
// 2. 解析配置(JSON格式,生产环境需解密)
JSONObject wecomConfig = JSONObject.parseObject(config.getConfigValue());
String corpId = wecomConfig.getString("corpId");
Integer agentId = wecomConfig.getInteger("agentId");
String secret = wecomConfig.getString("secret");
// 3. 动态创建 WxCpService(每次新建,可增加缓存优化)
// 3. 动态创建 WxCpService(可增加缓存优化)
WxCpDefaultConfigImpl wxConfig = new WxCpDefaultConfigImpl();
wxConfig.setCorpId(corpId);
wxConfig.setAgentId(agentId);
......@@ -59,46 +65,26 @@ public class WecomMessageSender {
.content(task.getContent())
.build();
wxCpService.getMessageService().send(message);
log.info("企业微信消息发送成功,taskBizId: {}, channelBizId: {}", task.getTaskBizId(), task.getChannelBizId());
return true;
log.info("企业微信消息发送成功, taskBizId={}, receiver={}", task.getTaskBizId(), task.getReceiver());
return SendResult.success();
} catch (WxErrorException e) {
String errCode = String.valueOf(e.getError().getErrorCode());
String errMsg = e.getMessage();
log.error("企业微信发送失败, taskBizId={}, errCode={}, errMsg={}", task.getTaskBizId(), errCode, errMsg);
if (NON_RETRYABLE_ERROR_CODES.contains(errCode)) {
return SendResult.failNonRetryable(errCode, errMsg);
}
return SendResult.failRetryable(errCode, errMsg);
} catch (Exception e) {
log.error("企业微信消息发送失败,taskBizId: {}, error: {}", task.getTaskBizId(), e.getMessage(), e);
return false;
log.error("企业微信发送异常, taskBizId={}", task.getTaskBizId(), e);
return SendResult.failRetryable("SEND_EXCEPTION", e.getMessage());
}
}
// /**
// * 发送图文卡片消息(带跳转链接)
// */
// public boolean sendNewsMessage(NotificationTask task, String url, String description) {
// try {
// // 使用 builder() 构建 NewArticle 对象
// NewArticle article = NewArticle.builder()
// .title(task.getTitle()) // 消息标题
// .description(description) // 消息描述
// .url(url) // 点击后跳转的链接
// // .picUrl("https://your-domain.com/cover.jpg") // 可选:封面图片
// // .btnText("查看详情") // 可选:按钮文字
// .build();
//
// // 构建 WxCpMessage 消息体
// WxCpMessage message = WxCpMessage.NEWS()
// .agentId(wxCpService.getWxCpConfigStorage().getAgentId())
// .toUser(task.getReceiver())
// .addArticle(article) // 使用NewArticle对象
// .build();
//
// // 发送消息
// wxCpService.getMessageService().send(message);
// log.info("企业微信图文消息发送成功,taskBizId: {}", task.getTaskBizId());
// return true;
//
// } catch (Exception e) {
// log.error("企业微信图文消息发送失败,taskBizId: {}", task.getTaskBizId(), e);
// return false;
// }
// }
@Override
public String getSupportedChannelType() {
return "wecom";
}
}
\ No newline at end of file
......@@ -2,15 +2,19 @@ package com.yd.notice.service.service.impl;
import cn.hutool.core.date.LocalDateTimeUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.yd.common.exception.BusinessException;
import com.yd.notice.service.dao.NotificationTaskMapper;
import com.yd.notice.service.model.ChannelConfig;
import com.yd.notice.service.model.NotificationRecord;
import com.yd.notice.service.model.NotificationTask;
import com.yd.notice.service.dao.NotificationTaskMapper;
import com.yd.notice.service.model.NotificationTemplate;
import com.yd.notice.service.send.WecomMessageSender;
import com.yd.notice.service.send.MessageSender;
import com.yd.notice.service.send.MessageSenderRouter;
import com.yd.notice.service.send.SendResult;
import com.yd.notice.service.service.IChannelConfigService;
import com.yd.notice.service.service.INotificationRecordService;
import com.yd.notice.service.service.INotificationTaskService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.yd.notice.service.service.INotificationTemplateService;
import com.yd.notice.service.utils.TemplateRenderUtil;
import lombok.RequiredArgsConstructor;
......@@ -33,24 +37,22 @@ import java.util.concurrent.Executor;
@Slf4j
@Service
@RequiredArgsConstructor
public class NotificationTaskServiceImpl extends ServiceImpl<NotificationTaskMapper, NotificationTask> implements INotificationTaskService{
public class NotificationTaskServiceImpl extends ServiceImpl<NotificationTaskMapper, NotificationTask> implements INotificationTaskService {
private final INotificationTemplateService templateService;
private final TemplateRenderUtil templateRenderUtil;
private final WecomMessageSender wecomMessageSender;
private final IChannelConfigService channelConfigService;
private final MessageSenderRouter senderRouter;
private final NotificationTaskMapper taskMapper;
private final INotificationRecordService recordService;
@Qualifier("noticeExecutor")
private final Executor noticeExecutor;
/**
* 创建消息任务并立即异步发送
*/
@Override
@Transactional(rollbackFor = Exception.class)
public String createAndSendTask(String channelBizId, String templateBizId,
String receiver, String params) {
// 获取模板并渲染内容
public String createAndSendTask(String channelBizId, String templateBizId, String receiver, String params) {
// 1. 查询模板
NotificationTemplate template = templateService.getOne(
new LambdaQueryWrapper<NotificationTemplate>()
.eq(NotificationTemplate::getTemplateBizId, templateBizId)
......@@ -60,10 +62,11 @@ public class NotificationTaskServiceImpl extends ServiceImpl<NotificationTaskMap
throw new BusinessException("模板不存在或已禁用");
}
// 2. 渲染标题和内容
String title = templateRenderUtil.render(template.getTitleTemplate(), params);
String content = templateRenderUtil.render(template.getContentTemplate(), params);
// 创建任务记录(状态为待发送)
// 3. 创建任务
NotificationTask task = new NotificationTask();
String taskBizId = UUID.randomUUID().toString();
task.setTaskBizId(taskBizId);
......@@ -72,7 +75,7 @@ public class NotificationTaskServiceImpl extends ServiceImpl<NotificationTaskMap
task.setTitle(title);
task.setContent(content);
task.setReceiver(receiver);
task.setReceiverType("userid");
task.setReceiverType("userid"); // 可根据实际动态设置
task.setStatus(0); // 待发送
task.setRetryCount(0);
task.setMaxRetry(3);
......@@ -83,122 +86,106 @@ public class NotificationTaskServiceImpl extends ServiceImpl<NotificationTaskMap
log.info("创建消息任务成功,taskBizId: {},准备异步发送", taskBizId);
// 3. 异步发送(不阻塞接口响应)
// 4. 异步发送
noticeExecutor.execute(() -> doSendTask(task));
return taskBizId;
}
/**
* 实际发送逻辑(支持重试,每次尝试都会记录发送记录)
*/
private void doSendTask(NotificationTask task) {
// NotificationTask task = taskMapper.selectOne(
// new LambdaQueryWrapper<NotificationTask>()
// .eq(NotificationTask::getTaskBizId, taskBizId)
// );
String taskBizId = task.getTaskBizId();
if (task == null) {
log.error("任务不存在: {}", taskBizId);
return;
}
// 防止并发重复发送(状态不是待发送则跳过)
// 防止并发重复发送
if (task.getStatus() != 0) {
log.warn("任务状态不是待发送,跳过执行: {}", taskBizId);
return;
}
// 更新任务状态为“发送中”
// 更新状态为发送中
task.setStatus(1);
task.setUpdateTime(LocalDateTime.now());
taskMapper.updateById(task);
// 带重试的发送(每次尝试都会产生一条发送记录)
// 带重试的发送
boolean success = sendWithRetryAndRecord(task);
// 最终结果处理
task.setStatus(success ? 2 : 3);
task.setUpdateTime(LocalDateTime.now());
taskMapper.updateById(task);
if (success) {
task.setStatus(2); // 成功
log.info("消息发送成功,taskBizId: {}", taskBizId);
} else {
task.setStatus(3); // 失败(已达最大重试次数)
log.error("消息发送最终失败,taskBizId: {}", taskBizId);
}
task.setUpdateTime(LocalDateTime.now());
taskMapper.updateById(task);
}
/**
* 带重试机制的发送,每次尝试都记录 NotificationRecord
* @param task 任务实体
* @return 是否最终成功
*/
private boolean sendWithRetryAndRecord(NotificationTask task) {
int maxRetry = task.getMaxRetry();
int currentRetry = 0;
long waitMillis = 1000; // 初始等待1秒
long waitMillis = 1000;
// 获取渠道类型
ChannelConfig channelConfig = channelConfigService.getByChannelBizId(task.getChannelBizId());
if (channelConfig == null) {
log.error("渠道配置不存在,taskBizId: {}, channelBizId: {}", task.getTaskBizId(), task.getChannelBizId());
return false;
}
String channelType = channelConfig.getChannelType();
MessageSender sender = senderRouter.getSender(channelType);
while (currentRetry <= maxRetry) {
// 创建一条发送记录(状态为待发送)
// 创建发送记录(待发送)
NotificationRecord record = buildInitialRecord(task, currentRetry);
recordService.save(record);
Long recordId = record.getId(); // 获取自增ID,便于后续更新
LocalDateTime startTime = LocalDateTime.now();
boolean success = false;
String errorCode = null;
String errorMsg = null;
SendResult result = null;
Integer costTime = null;
try {
// 更新记录状态为“发送中”
// 更新记录状态为发送中
record.setStatus(1);
record.setUpdateTime(LocalDateTime.now());
recordService.updateById(record);
// 实际调用企业微信API
success = wecomMessageSender.sendTextMessage(task);
// 调用具体发送器
result = sender.send(task);
if (success) {
errorCode = null;
errorMsg = null;
}
costTime = (int) (LocalDateTimeUtil.between(startTime, LocalDateTime.now()).toMillis());
} catch (Exception e) {
log.error("发送异常,taskBizId: {}, retry: {}", task.getTaskBizId(), currentRetry, e);
errorCode = "SEND_EXCEPTION";
errorMsg = e.getMessage();
success = false;
} finally {
// 计算耗时
result = SendResult.failRetryable("UNKNOWN_ERROR", e.getMessage());
costTime = (int) (LocalDateTimeUtil.between(startTime, LocalDateTime.now()).toMillis());
}
// 更新记录最终状态
record.setStatus(success ? 2 : 3);
record.setErrorCode(errorCode);
record.setErrorMsg(errorMsg);
record.setStatus(result.isSuccess() ? 2 : 3);
record.setErrorCode(result.getErrorCode());
record.setErrorMsg(result.getErrorMsg());
record.setCostTime(costTime);
record.setSendTime(LocalDateTime.now());
record.setUpdateTime(LocalDateTime.now());
recordService.updateById(record);
if (success) {
if (result.isSuccess()) {
return true;
}
// 失败处理:增加任务的重试计数
// 失败处理
currentRetry++;
task.setRetryCount(currentRetry);
task.setUpdateTime(LocalDateTime.now());
taskMapper.updateById(task);
if (currentRetry <= maxRetry) {
if (!result.isRetryable()) {
log.warn("检测到不可重试错误,放弃重试,taskBizId: {}, errorCode: {}", task.getTaskBizId(), result.getErrorCode());
break;
}
log.warn("发送失败,{}秒后进行第{}次重试,taskBizId: {}", waitMillis / 1000, currentRetry, task.getTaskBizId());
try {
Thread.sleep(waitMillis);
waitMillis = Math.min(waitMillis * 2, 30000); // 退避,最大30秒
waitMillis = Math.min(waitMillis * 2, 30000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
......@@ -208,30 +195,21 @@ public class NotificationTaskServiceImpl extends ServiceImpl<NotificationTaskMap
return false;
}
/**
* 构建初始的发送记录(状态为待发送)
* @param task 任务实体
* @param retryIndex 第几次尝试(0表示首次)
* @return 初始记录
*/
private NotificationRecord buildInitialRecord(NotificationTask task, int retryIndex) {
NotificationRecord record = new NotificationRecord();
record.setTaskBizId(task.getTaskBizId());
record.setChannelBizId(task.getChannelBizId());
record.setTemplateBizId(task.getTemplateBizId());
// 发送人信息:企业微信场景下,发送人通常是应用名称,可以从配置获取,这里先固定
record.setSender("企业微信应用");
record.setSender("系统");
record.setSenderType("sys");
record.setReceiver(task.getReceiver());
record.setReceiverType(task.getReceiverType());
record.setTitle(task.getTitle());
record.setContent(task.getContent());
record.setStatus(0); // 待发送
record.setStatus(0);
record.setCreateTime(LocalDateTime.now());
record.setUpdateTime(LocalDateTime.now());
// 可选:备注重试次数
record.setRemark("retry_" + retryIndex);
return record;
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment