Commit 9574bb8d by zhangxingmin

push

parent a9695ca8
...@@ -2,8 +2,9 @@ package com.yd.ai.api.controller; ...@@ -2,8 +2,9 @@ package com.yd.ai.api.controller;
import com.yd.ai.api.service.ApiAiStreamService; import com.yd.ai.api.service.ApiAiStreamService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono; import reactor.core.publisher.Flux;
@RestController @RestController
@RequestMapping("/api/ai") @RequestMapping("/api/ai")
...@@ -13,11 +14,13 @@ public class ApiAiStreamController { ...@@ -13,11 +14,13 @@ public class ApiAiStreamController {
private ApiAiStreamService apiAiStreamService; private ApiAiStreamService apiAiStreamService;
/** /**
* 调用大模型接口获取AI回答(流式) * 调用大模型接口获取AI回答(流式)
*/ */
@CrossOrigin(origins = "*") @CrossOrigin(origins = "*")
@GetMapping(value = "/stream") @GetMapping(value = "/stream-sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<String> streamChat(@RequestParam String question) { public Flux<String> streamChatSse(@RequestParam String question) {
return apiAiStreamService.streamChat(question); return apiAiStreamService.streamChatWithSensitiveCheck(question)
.onErrorResume(e -> Flux.just("系统繁忙,请稍后重试"));
} }
} }
\ No newline at end of file
package com.yd.ai.api.service; package com.yd.ai.api.service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
public interface ApiAiStreamService { public interface ApiAiStreamService {
Mono<String> streamChat(String question); Flux<String> streamChatWithSensitiveCheck(String question);
} }
\ No newline at end of file
...@@ -15,12 +15,13 @@ import com.yd.common.enums.ResultCode; ...@@ -15,12 +15,13 @@ import com.yd.common.enums.ResultCode;
import com.yd.common.exception.BusinessException; import com.yd.common.exception.BusinessException;
import com.yd.notice.feign.client.ApiNotificationTaskFeignClient; import com.yd.notice.feign.client.ApiNotificationTaskFeignClient;
import com.yd.notice.feign.request.ApiSendRequest; import com.yd.notice.feign.request.ApiSendRequest;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers; // 使用 RxJava3 的 Schedulers
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono; import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.util.Arrays; import java.util.Arrays;
@Slf4j @Slf4j
...@@ -34,18 +35,18 @@ public class ApiAiStreamServiceImpl implements ApiAiStreamService { ...@@ -34,18 +35,18 @@ public class ApiAiStreamServiceImpl implements ApiAiStreamService {
private ApiNotificationTaskFeignClient apiNotificationTaskFeignClient; private ApiNotificationTaskFeignClient apiNotificationTaskFeignClient;
/** /**
* 调用大模型接口获取AI回答(非流式,返回完整字符串) * 流式对话(SSE),保留敏感词检测逻辑
*/ */
@Override @Override
public Mono<String> streamChat(String question) { public Flux<String> streamChatWithSensitiveCheck(String question) {
// 敏感词校验 // 1. 敏感词校验(与原非流式方法完全一致)
try { try {
apiSensitiveWordDetailService.checkWord(question); apiSensitiveWordDetailService.checkWord(question);
} catch (BusinessException e) { } catch (BusinessException e) {
int code = e.getCode(); int code = e.getCode();
if (code == ResultCode.SENSITIVE_WORDS_EXIST.getCode()) { if (code == ResultCode.SENSITIVE_WORDS_EXIST.getCode()) {
log.info("检测到禁用敏感词,返回提示语"); log.info("检测到禁用敏感词,返回提示语");
return Mono.just("抱歉,您输入的内容包含敏感词汇,无法为您提供服务。请调整后重新提问。"); return Flux.just("抱歉,您输入的内容包含敏感词汇,无法为您提供服务。请调整后重新提问。");
} else if (code == ResultCode.SENSITIVE_TZ_WORDS_EXIST.getCode()) { } else if (code == ResultCode.SENSITIVE_TZ_WORDS_EXIST.getCode()) {
log.info("检测到通知类型敏感词,发送企业微信通知"); log.info("检测到通知类型敏感词,发送企业微信通知");
AuthUserDto authUserDto = SecurityUtil.getCurrentLoginUser(); AuthUserDto authUserDto = SecurityUtil.getCurrentLoginUser();
...@@ -57,15 +58,13 @@ public class ApiAiStreamServiceImpl implements ApiAiStreamService { ...@@ -57,15 +58,13 @@ public class ApiAiStreamServiceImpl implements ApiAiStreamService {
request.setReceiver("zxm|Sweet"); request.setReceiver("zxm|Sweet");
request.setParams(params); request.setParams(params);
apiNotificationTaskFeignClient.send(request); apiNotificationTaskFeignClient.send(request);
// 返回特殊标记,前端识别后展示产品列表
// 注意:此处前端需要特殊处理,可通过自定义响应头或约定特定字符串 return Flux.just("__SENSITIVE_NOTIFICATION__");
// 简便做法:返回一个特殊标记字符串,前端检测到后展示产品列表
return Mono.just("__SENSITIVE_NOTIFICATION__");
} }
throw e; throw e;
} }
// 正常调用大模型非流式接口 // 2. 正常调用大模型流式接口
Generation gen = new Generation(); Generation gen = new Generation();
Message systemMsg = Message.builder() Message systemMsg = Message.builder()
.role(Role.SYSTEM.getValue()) .role(Role.SYSTEM.getValue())
...@@ -77,29 +76,36 @@ public class ApiAiStreamServiceImpl implements ApiAiStreamService { ...@@ -77,29 +76,36 @@ public class ApiAiStreamServiceImpl implements ApiAiStreamService {
.build(); .build();
GenerationParam param = GenerationParam.builder() GenerationParam param = GenerationParam.builder()
.apiKey("sk-d6551c67cfbe4a759a78dc3625729291") // 生产环境请改用配置注入 .apiKey("sk-d6551c67cfbe4a759a78dc3625729291") // 生产环境建议从配置读取
.model("qwen3-max") .model("qwen3-max") // 使用可用的模型
.messages(Arrays.asList(systemMsg, userMsg)) .messages(Arrays.asList(systemMsg, userMsg))
.resultFormat(GenerationParam.ResultFormat.MESSAGE) .resultFormat(GenerationParam.ResultFormat.MESSAGE)
.incrementalOutput(false) // 关闭增量输出 .incrementalOutput(true) // 必须开启流式
.build(); .build();
return Mono.fromCallable(() -> { return Flux.create(sink -> {
Publisher<GenerationResult> publisher = null;
try { try {
GenerationResult result = gen.call(param); publisher = gen.streamCall(param);
return result.getOutput().getChoices().get(0).getMessage().getContent();
} catch (NoApiKeyException e) { } catch (NoApiKeyException e) {
log.error("NoApiKeyException: {}", e.getMessage()); e.printStackTrace();
return "系统错误:API密钥配置异常";
} catch (InputRequiredException e) { } catch (InputRequiredException e) {
log.error("InputRequiredException: {}", e.getMessage()); e.printStackTrace();
return "系统错误:输入参数异常";
} }
}) Flowable<GenerationResult> flowable = Flowable.fromPublisher(publisher)
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.io()); // 使用 RxJava3 的 Schedulers.io()
.onErrorResume(e -> {
log.error("非流式响应处理出错: {}", e.getMessage()); flowable.subscribe(
return Mono.just("系统繁忙,请稍后重试"); result -> {
}); String delta = result.getOutput().getChoices().get(0).getMessage().getContent();
sink.next(delta);
},
error -> {
log.error("流式调用出错", error);
sink.error(error);
},
sink::complete
);
});
} }
} }
\ No newline at end of file
package com.yd.ai.api.service.impl;
import org.reactivestreams.Publisher;
import com.alibaba.dashscope.aigc.generation.Generation;
import com.alibaba.dashscope.aigc.generation.GenerationParam;
import com.alibaba.dashscope.aigc.generation.GenerationResult;
import com.alibaba.dashscope.common.Message;
import com.alibaba.dashscope.common.Role;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.Arrays;
import java.util.concurrent.Semaphore;
public class Test {
public static final String RESET = "\u001B[0m";
public static final String CYAN = "\u001B[36m";
public static final String GREEN = "\u001B[32m";
public static void main(String[] args) throws Exception {
Generation gen = new Generation();
Message systemMsg = Message.builder()
.role(Role.SYSTEM.getValue())
.content("You are a helpful assistant.")
.build();
Message userMsg = Message.builder()
.role(Role.USER.getValue())
.content("家庭如何制定年度慈善预算 给出表格和简短内容")
.build();
GenerationParam param = GenerationParam.builder()
.apiKey("sk-d6551c67cfbe4a759a78dc3625729291")
.model("qwen3-max")
.messages(Arrays.asList(systemMsg, userMsg))
.resultFormat(GenerationParam.ResultFormat.MESSAGE)
.incrementalOutput(true)
.build();
Semaphore semaphore = new Semaphore(0);
System.out.println(CYAN + "===== 开始实时流式输出 =====" + RESET);
Publisher<GenerationResult> publisher = gen.streamCall(param);
Flowable<GenerationResult> flowable = Flowable.fromPublisher(publisher)
.subscribeOn(Schedulers.io());
flowable.subscribe(
result -> {
String delta = result.getOutput().getChoices().get(0).getMessage().getContent();
// 实时着色并打印
printWithStyle(delta);
System.out.flush();
},
error -> {
System.err.println("\n调用出错: " + error.getMessage());
semaphore.release();
},
() -> {
System.out.println("\n" + CYAN + "===== 流式输出完成 =====" + RESET);
semaphore.release();
}
);
semaphore.acquire();
}
private static void printWithStyle(String text) {
for (char ch : text.toCharArray()) {
if (ch == '|' || ch == '-' || ch == '=') {
System.out.print(CYAN + ch + RESET);
} else {
System.out.print(GREEN + ch + RESET);
}
}
}
}
\ No newline at end of file
<configuration>
<!-- 控制台输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>
<!-- 设置根日志级别为 INFO,屏蔽所有 DEBUG 日志 -->
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
<!-- 明确将 DashScope 和 OkHttp 的日志级别提高到 WARN,彻底屏蔽 DEBUG -->
<logger name="com.alibaba.dashscope" level="WARN" />
<logger name="okhttp3" level="WARN" />
</configuration>
\ No newline at end of file
...@@ -58,12 +58,36 @@ ...@@ -58,12 +58,36 @@
<artifactId>freemarker</artifactId> <artifactId>freemarker</artifactId>
</dependency> </dependency>
<!-- Source: https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java --> <!-- 升级DashScope SDK至最新版本 (以2.22.15为例) -->
<dependency> <dependency>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>
<artifactId>dashscope-sdk-java</artifactId> <artifactId>dashscope-sdk-java</artifactId>
<version>2.22.15</version> <version>2.22.15</version>
<scope>compile</scope> <exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 添加 Reactive Streams 依赖 (兼容Java 8) -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.4</version>
</dependency>
<!-- 为Java 8添加 Reactive Streams 的 Flow API 兼容库 -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-flow-adapters</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.21</version>
</dependency> </dependency>
<dependency> <dependency>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment