Commit f9b8b265 by zhangxingmin

push

parent e2ae91a5
......@@ -15,12 +15,14 @@ import com.yd.common.enums.ResultCode;
import com.yd.common.exception.BusinessException;
import com.yd.notice.feign.client.ApiNotificationTaskFeignClient;
import com.yd.notice.feign.request.ApiSendRequest;
import io.reactivex.Flowable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.Arrays;
@Slf4j
......@@ -34,24 +36,22 @@ public class ApiAiStreamServiceImpl implements ApiAiStreamService {
private ApiNotificationTaskFeignClient apiNotificationTaskFeignClient;
/**
* 调用大模型接口获取AI输出的流信息
* 调用大模型接口获取AI输出的流信息(非流式,一次性返回完整内容)
* @param question
* @return
*/
@Override
public Flux<ServerSentEvent<String>> streamChat(String question) {
// 敏感词校验(此部分代码保持不变)
// 敏感词校验(保持不变)
try {
apiSensitiveWordDetailService.checkWord(question);
} catch (BusinessException e) {
int code = e.getCode();
if (code == ResultCode.SENSITIVE_WORDS_EXIST.getCode()) {
// 禁用类型敏感词:返回提示语
log.info("检测到禁用敏感词,返回提示语");
String tipMsg = "抱歉,您输入的内容包含敏感词汇,无法为您提供服务。请调整后重新提问。";
return Flux.just(ServerSentEvent.builder(tipMsg).build());
} else if (code == ResultCode.SENSITIVE_TZ_WORDS_EXIST.getCode()) {
// 通知类型敏感词:发送企业微信通知,然后返回一个特殊事件告知前端展示产品列表
log.info("检测到通知类型敏感词,发送企业微信通知");
AuthUserDto authUserDto = SecurityUtil.getCurrentLoginUser();
String userName = authUserDto.getUsername();
......@@ -63,7 +63,6 @@ public class ApiAiStreamServiceImpl implements ApiAiStreamService {
request.setParams(params);
apiNotificationTaskFeignClient.send(request);
// 返回 SSE 事件,event 类型为 sensitive_notification,前端根据此事件调用产品列表接口
return Flux.just(
ServerSentEvent.<String>builder()
.event("sensitive_notification")
......@@ -71,53 +70,46 @@ public class ApiAiStreamServiceImpl implements ApiAiStreamService {
.build()
);
}
// 其他异常继续抛出
throw e;
}
// 正常调用大模型流式接口(使用 Qwen3-Max 模型)
// ========== 核心修改:改用非流式调用,一次性获取完整回答 ==========
Generation gen = new Generation();
Message systemMsg = Message.builder().role(Role.SYSTEM.getValue()).content("You are a helpful assistant.").build();
Message userMsg = Message.builder().role(Role.USER.getValue()).content(question).build();
Message systemMsg = Message.builder()
.role(Role.SYSTEM.getValue())
.content("You are a helpful assistant.")
.build();
Message userMsg = Message.builder()
.role(Role.USER.getValue())
.content(question)
.build();
GenerationParam param = GenerationParam.builder()
.apiKey("sk-d6551c67cfbe4a759a78dc3625729291") // 请使用安全的密钥管理方式
.model("qwen3-max") // 模型改为 qwen3-max(千问3旗舰版)
.apiKey("sk-d6551c67cfbe4a759a78dc3625729291") // 生产环境请改用配置注入
.model("qwen3-max")
.messages(Arrays.asList(systemMsg, userMsg))
.resultFormat(GenerationParam.ResultFormat.MESSAGE)
.incrementalOutput(true)
.incrementalOutput(false) // 关闭增量输出,一次性返回完整答案
.build();
Flowable<GenerationResult> flowable = null;
try {
flowable = gen.streamCall(param);
} catch (NoApiKeyException e) {
log.error("NoApiKeyException: {}", e.getMessage());
return Flux.just(ServerSentEvent.builder("系统错误:API密钥配置异常").build());
} catch (InputRequiredException e) {
log.error("InputRequiredException: {}", e.getMessage());
return Flux.just(ServerSentEvent.builder("系统错误:输入参数异常").build());
}
// 将 RxJava Flowable 转为 Reactor Flux
return Flux.from(flowable)
.filter(result -> {
// 过滤掉空结果或没有内容的事件
return result != null
&& result.getOutput() != null
&& result.getOutput().getChoices() != null
&& !result.getOutput().getChoices().isEmpty()
&& result.getOutput().getChoices().get(0).getMessage() != null
&& result.getOutput().getChoices().get(0).getMessage().getContent() != null;
})
.map(result -> {
String delta = result.getOutput().getChoices().get(0).getMessage().getContent();
// ===== 核心修复:在 delta 后追加换行符 =====
// 为了确保 Markdown 格式(特别是表格)不被破坏,我们强制在 delta 后追加一个换行符。
// 这确保了流式片段在拼接后,能够保留正确的换行结构。
return ServerSentEvent.builder(delta + "\n").build();
})
// 使用 Mono.fromCallable 包装同步调用,并指定在弹性线程池执行
return Mono.fromCallable(() -> {
try {
GenerationResult result = gen.call(param);
// 提取完整回答文本
return result.getOutput().getChoices().get(0).getMessage().getContent();
} catch (NoApiKeyException e) {
log.error("NoApiKeyException: {}", e.getMessage());
return "系统错误:API密钥配置异常";
} catch (InputRequiredException e) {
log.error("InputRequiredException: {}", e.getMessage());
return "系统错误:输入参数异常";
}
})
.subscribeOn(Schedulers.boundedElastic()) // 避免阻塞主线程
.flatMapMany(fullAnswer -> Flux.just(ServerSentEvent.builder(fullAnswer).build()))
.onErrorResume(e -> {
log.error("流式响应处理出错: {}", e.getMessage());
log.error("流式响应处理出错: {}", e.getMessage());
return Flux.just(ServerSentEvent.builder("系统繁忙,请稍后重试").build());
});
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment