Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,35 +121,23 @@ private Flux<ServerSentEvent<String>> streamResponse(Long userId, ConversationRe
return chatService.streamConversation(userId, request, accessToken)
.map(event -> {
try {
// ProovyAiStreamEvent를 프론트엔드 형식으로 변환
// 프론트엔드는 data 필드에 {"type":"...", ...} 형태를 기대
// V2: SSE 표준 event 필드를 사용하여 이벤트를 구분합니다.
// 프론트엔드에서는 addEventListener(eventType, ...)로 수신해야 합니다.
String eventType = event.getEvent();

// [DONE] 이벤트는 특별 처리
if ("[DONE]".equals(eventType)) {
return ServerSentEvent.<String>builder()
.data("[DONE]")
.build();
}

// 일반 이벤트: {"type": "...", ...data...} 형태로 병합
java.util.Map<String, Object> payload = new java.util.HashMap<>();
// event.getData()의 내용을 payload에 병합
if (event.getData() != null) {
payload.putAll(event.getData());
}
// event 데이터에 type이 있어도 컨트롤러에서 계산한 eventType이 우선한다.
payload.put("type", eventType);


// 데이터 Payload JSON 변환
Object payload = event.getData() != null ? event.getData() : java.util.Collections.emptyMap();
String dataJson = convertToJson(payload);

return ServerSentEvent.<String>builder()
.event(eventType) // SSE 표준 event 헤더 설정
.data(dataJson)
.build();
} catch (Exception e) {
log.error("Failed to build SSE event", e);
return ServerSentEvent.<String>builder()
.data("{\"type\":\"error\",\"content\":\"Failed to process event\"}")
.event("error")
.data("{\"message\":\"Failed to process event\"}")
.build();
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientRequestException;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.codec.ServerSentEvent;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -219,6 +221,16 @@ public Flux<ProovyAiStreamEvent> streamConversation(Long userId, ConversationReq
.publishOn(Schedulers.boundedElastic())
.doOnNext(event -> {
Map<String, Object> data = event.getData();
String eventType = event.getEvent();

// 에러 이벤트 처리
if ("run.failed".equals(eventType) || "error".equals(eventType)) {
String errMsg = "Stream Error";
if (data != null) {
errMsg = String.valueOf(data.getOrDefault("message", "Unknown error"));
}
throw new BusinessException(ErrorCode.CONV5002, "스트리밍 중 에러 발생: " + errMsg);
}

// thread_id 업데이트 (payload 내부에 포함되는 경우)
if (data != null && data.containsKey("thread_id")) {
Expand Down Expand Up @@ -248,10 +260,10 @@ public Flux<ProovyAiStreamEvent> streamConversation(Long userId, ConversationReq
}
}

// 토큰 스트림(type = token) 기준으로 내용 누적
// 토큰 스트림(event = llm.token.delta) 기준으로 내용 누적
// FinalResponse 노드에서 LLM이 생성하는 토큰을 실시간으로 수집
if ("token".equals(event.getEvent()) && data != null) {
Object content = data.get("content");
if ("llm.token.delta".equals(eventType) && data != null) {
Object content = data.get("delta");
if (content != null) {
contentBuilder.append(content.toString());
log.trace("[Chat] 토큰 수신 - content: {}", content);
Expand Down Expand Up @@ -346,10 +358,14 @@ public ConversationResponse invokeConversation(Long userId, ConversationRequest
/**
* Proovy-ai /stream 호출
*/
/**
* Proovy-ai /stream/v2 호출 (SSE v2)
*/
Comment on lines 358 to +363
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

이전 Javadoc 제거 누락

callProovyAiStream 메서드 앞에 이전 Javadoc (/** Proovy-ai /stream 호출 */, Lines 358-360)과 새 Javadoc (Lines 361-363)이 중복으로 존재합니다. 이전 Javadoc을 삭제해야 합니다.

🐛 수정 제안
-    /**
-     * Proovy-ai /stream 호출
-     */
     /**
      * Proovy-ai /stream/v2 호출 (SSE v2)
      */
     private Flux<ProovyAiStreamEvent> callProovyAiStream(ProovyAiRequest request) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/**
* Proovy-ai /stream 호출
*/
/**
* Proovy-ai /stream/v2 호출 (SSE v2)
*/
/**
* Proovy-ai /stream/v2 호출 (SSE v2)
*/
private Flux<ProovyAiStreamEvent> callProovyAiStream(ProovyAiRequest request) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/java/com/proovy/domain/conversation/service/ChatServiceImpl.java`
around lines 358 - 363, Remove the redundant/old Javadoc block that precedes the
callProovyAiStream method: delete the earlier comment /** Proovy-ai /stream 호출
*/ so only the new Javadoc /** Proovy-ai /stream/v2 호출 (SSE v2) */ remains
immediately above the callProovyAiStream(...) declaration; ensure there are no
duplicate JavaDoc blocks left for that method.

private Flux<ProovyAiStreamEvent> callProovyAiStream(ProovyAiRequest request) {
String streamUrl = proovyAiHost + "/stream";
// v2 엔드포인트 사용
String streamUrl = proovyAiHost + "/stream/v2";

log.info("Calling Proovy-ai stream: {}", streamUrl);
log.info("Calling Proovy-ai stream v2: {}", streamUrl);
log.debug("Request payload: {}", request);

return webClient.post()
Expand All @@ -358,11 +374,13 @@ private Flux<ProovyAiStreamEvent> callProovyAiStream(ProovyAiRequest request) {
.bodyValue(request)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
// v2는 ServerSentEvent 객체로 받아서 이벤트 필드를 활용
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {})
.timeout(Duration.ofMinutes(5))
.flatMap(this::parseSseEvent)
.doOnNext(event -> log.debug("Received event: {}", event.getEvent()))
.takeUntil(event -> "[DONE]".equals(event.getEvent()));
// 완료 이벤트 체크 (run.completed 또는 run.failed)
.takeUntil(event -> "run.completed".equals(event.getEvent()) || "run.failed".equals(event.getEvent()));
}

/**
Expand Down Expand Up @@ -404,58 +422,42 @@ private void checkProovyAiHealth() {
/**
* SSE 이벤트 파싱
*/
private Mono<ProovyAiStreamEvent> parseSseEvent(String rawEvent) {
/**
* SSE 이벤트 파싱 (v2: ServerSentEvent -> ProovyAiStreamEvent)
*/
Comment on lines 422 to +427
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

parseSseEvent의 이전 Javadoc 제거 누락

Lines 422-424의 이전 Javadoc(/** SSE 이벤트 파싱 */)이 새 Javadoc(Lines 425-427)과 함께 중복으로 남아 있습니다. callProovyAiStream과 동일한 문제이며 이전 Javadoc을 삭제해야 합니다.

🐛 수정 제안
-    /**
-     * SSE 이벤트 파싱
-     */
     /**
      * SSE 이벤트 파싱 (v2: ServerSentEvent -> ProovyAiStreamEvent)
      */
     private Mono<ProovyAiStreamEvent> parseSseEvent(ServerSentEvent<String> sse) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/**
* SSE 이벤트 파싱
*/
private Mono<ProovyAiStreamEvent> parseSseEvent(String rawEvent) {
/**
* SSE 이벤트 파싱 (v2: ServerSentEvent -> ProovyAiStreamEvent)
*/
/**
* SSE 이벤트 파싱 (v2: ServerSentEvent -> ProovyAiStreamEvent)
*/
private Mono<ProovyAiStreamEvent> parseSseEvent(ServerSentEvent<String> sse) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/java/com/proovy/domain/conversation/service/ChatServiceImpl.java`
around lines 422 - 427, Remove the stray/duplicate Javadoc comment above the
parseSseEvent method in ChatServiceImpl: delete the old short Javadoc block "/**
SSE 이벤트 파싱 */" so only the new Javadoc "SSE 이벤트 파싱 (v2: ServerSentEvent ->
ProovyAiStreamEvent)" remains; apply the same cleanup pattern as was done for
callProovyAiStream to avoid duplicated Javadoc comments.

private Mono<ProovyAiStreamEvent> parseSseEvent(ServerSentEvent<String> sse) {
return Mono.fromCallable(() -> {
try {
if (rawEvent == null || rawEvent.trim().isEmpty()) {
return null; // null 이벤트는 건너뛰기
}
String eventName = sse.event();
String dataStr = sse.data();

String trimmed = rawEvent.trim();

// [DONE] 토큰 처리
if ("[DONE]".equals(trimmed)) {
if (dataStr == null || dataStr.trim().isEmpty()) {
// 데이터 없는 ping(heartbeat)이거나 payload 없는 이벤트
return ProovyAiStreamEvent.builder()
.event("[DONE]")
.event(eventName != null ? eventName : "heartbeat")
.data(Collections.emptyMap())
.build();
}

// "data: {..}" 형태인 경우 prefix 제거
if (trimmed.startsWith("data:")) {
trimmed = trimmed.substring(5).trim();

// data: [DONE] 형태도 처리
if ("[DONE]".equals(trimmed)) {
return ProovyAiStreamEvent.builder()
.event("[DONE]")
.data(Collections.emptyMap())
.build();
}
}

// JSON payload 파싱
@SuppressWarnings("unchecked")
Map<String, Object> payload = objectMapper.readValue(trimmed, Map.class);
Map<String, Object> payload = objectMapper.readValue(dataStr, Map.class);

// type 필드 추출 (없으면 "message"로 기본 설정)
String type = Objects.toString(payload.get("type"), "message");

log.debug("[Chat] SSE 이벤트 파싱 - type: {}, payload keys: {}", type, payload.keySet());
log.debug("[Chat] SSE v2 이벤트 수신 - event: {}, keys: {}", eventName, payload.keySet());

return ProovyAiStreamEvent.builder()
.event(type)
.event(eventName != null ? eventName : "message")
.data(payload)
.build();

} catch (Exception e) {
log.warn("[Chat] SSE 이벤트 파싱 실패: {}", rawEvent, e);
log.warn("[Chat] SSE 이벤트 파싱 실패: {}", sse, e);
return ProovyAiStreamEvent.builder()
.event("error")
.data(Map.of("error", "Failed to parse event", "raw", rawEvent))
.data(Map.of("error", "Failed to parse event", "raw", String.valueOf(sse.data())))
.build();
}
}).filter(event -> event != null); // null 이벤트 필터링
});
}

private String toPersistableText(Object messageContent) {
Expand Down