Commit 048c02c8 by zhangxingmin

push

parent 3c484189
...@@ -8,6 +8,7 @@ import com.alibaba.dashscope.common.Role; ...@@ -8,6 +8,7 @@ import com.alibaba.dashscope.common.Role;
import com.alibaba.dashscope.exception.InputRequiredException; import com.alibaba.dashscope.exception.InputRequiredException;
import com.alibaba.dashscope.exception.NoApiKeyException; import com.alibaba.dashscope.exception.NoApiKeyException;
import io.reactivex.Flowable; import io.reactivex.Flowable;
import lombok.var;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent; import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
...@@ -21,9 +22,19 @@ public class ApiAiStreamController { ...@@ -21,9 +22,19 @@ public class ApiAiStreamController {
@CrossOrigin(origins = "*") // 开发时允许跨域 @CrossOrigin(origins = "*") // 开发时允许跨域
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChat(@RequestParam String question) { public Flux<ServerSentEvent<String>> streamChat(@RequestParam String question) {
Generation gen = new Generation(); if (question == null || question.trim().isEmpty()) {
Message systemMsg = Message.builder().role(Role.SYSTEM.getValue()).content("You are a helpful assistant.").build(); return Flux.error(new IllegalArgumentException("question cannot be empty"));
Message userMsg = Message.builder().role(Role.USER.getValue()).content(question).build(); }
Message systemMsg = Message.builder()
.role(Role.SYSTEM.getValue())
.content("You are a helpful assistant.")
.build();
Message userMsg = Message.builder()
.role(Role.USER.getValue())
.content(question)
.build();
GenerationParam param = GenerationParam.builder() GenerationParam param = GenerationParam.builder()
.apiKey("sk-d6551c67cfbe4a759a78dc3625729291") .apiKey("sk-d6551c67cfbe4a759a78dc3625729291")
.model("qwen-plus") .model("qwen-plus")
...@@ -32,19 +43,45 @@ public class ApiAiStreamController { ...@@ -32,19 +43,45 @@ public class ApiAiStreamController {
.incrementalOutput(true) .incrementalOutput(true)
.build(); .build();
Flowable<GenerationResult> flowable = null; Generation gen = new Generation();
try {
flowable = gen.streamCall(param); // 使用 defer 延迟执行,避免 flowable 为 null
} catch (NoApiKeyException e) { return Flux.from(Flowable.defer(() -> {
e.printStackTrace(); try {
} catch (InputRequiredException e) { return gen.streamCall(param);
e.printStackTrace(); } catch (NoApiKeyException e) {
} return Flowable.error(new RuntimeException("API Key missing", e));
// 将 RxJava Flowable 转为 Reactor Flux } catch (InputRequiredException e) {
return Flux.from(flowable) return Flowable.error(new RuntimeException("Invalid input", e));
} catch (Exception e) {
return Flowable.error(new RuntimeException("DashScope call failed", e));
}
}))
.map(result -> { .map(result -> {
String delta = result.getOutput().getChoices().get(0).getMessage().getContent(); String delta = "";
return ServerSentEvent.builder(delta).build(); if (result.getOutput() != null && !result.getOutput().getChoices().isEmpty()) {
}); var choice = result.getOutput().getChoices().get(0);
if (choice.getMessage() != null) {
delta = choice.getMessage().getContent();
}
}
// 发送标准 SSE 事件,包含 event 字段便于前端区分
return ServerSentEvent.<String>builder()
.event("message")
.data(delta)
.build();
})
.concatWith(Flux.just(
ServerSentEvent.<String>builder()
.event("complete")
.data("[DONE]")
.build()
))
.onErrorResume(error ->
Flux.just(ServerSentEvent.<String>builder()
.event("error")
.data("Error: " + error.getMessage())
.build())
);
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment