diff --git a/src/main/java/com/proovy/domain/conversation/controller/ConversationController.java b/src/main/java/com/proovy/domain/conversation/controller/ConversationController.java index 7a9acf3..da8b8fd 100644 --- a/src/main/java/com/proovy/domain/conversation/controller/ConversationController.java +++ b/src/main/java/com/proovy/domain/conversation/controller/ConversationController.java @@ -121,35 +121,23 @@ private Flux> streamResponse(Long userId, ConversationRe return chatService.streamConversation(userId, request, accessToken) .map(event -> { try { - // ProovyAiStreamEvent를 프론트엔드 형식으로 변환 - // 프론트엔드는 data 필드에 {"type":"...", ...} 형태를 기대 + // V2: SSE 표준 event 필드를 사용하여 이벤트를 구분합니다. + // 프론트엔드에서는 addEventListener(eventType, ...)로 수신해야 합니다. String eventType = event.getEvent(); - - // [DONE] 이벤트는 특별 처리 - if ("[DONE]".equals(eventType)) { - return ServerSentEvent.builder() - .data("[DONE]") - .build(); - } - - // 일반 이벤트: {"type": "...", ...data...} 형태로 병합 - java.util.Map payload = new java.util.HashMap<>(); - // event.getData()의 내용을 payload에 병합 - if (event.getData() != null) { - payload.putAll(event.getData()); - } - // event 데이터에 type이 있어도 컨트롤러에서 계산한 eventType이 우선한다. - payload.put("type", eventType); - + + // 데이터 Payload JSON 변환 + Object payload = event.getData() != null ? event.getData() : java.util.Collections.emptyMap(); String dataJson = convertToJson(payload); return ServerSentEvent.builder() + .event(eventType) // SSE 표준 event 헤더 설정 .data(dataJson) .build(); } catch (Exception e) { log.error("Failed to build SSE event", e); return ServerSentEvent.builder() - .data("{\"type\":\"error\",\"content\":\"Failed to process event\"}") + .event("error") + .data("{\"message\":\"Failed to process event\"}") .build(); } }) diff --git a/src/main/java/com/proovy/domain/conversation/service/ChatServiceImpl.java b/src/main/java/com/proovy/domain/conversation/service/ChatServiceImpl.java index 39d3192..fe46433 100644 --- a/src/main/java/com/proovy/domain/conversation/service/ChatServiceImpl.java +++ b/src/main/java/com/proovy/domain/conversation/service/ChatServiceImpl.java @@ -37,6 +37,8 @@ import org.springframework.transaction.support.TransactionTemplate; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClientRequestException; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.codec.ServerSentEvent; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -219,6 +221,16 @@ public Flux streamConversation(Long userId, ConversationReq .publishOn(Schedulers.boundedElastic()) .doOnNext(event -> { Map data = event.getData(); + String eventType = event.getEvent(); + + // 에러 이벤트 처리 + if ("run.failed".equals(eventType) || "error".equals(eventType)) { + String errMsg = "Stream Error"; + if (data != null) { + errMsg = String.valueOf(data.getOrDefault("message", "Unknown error")); + } + throw new BusinessException(ErrorCode.CONV5002, "스트리밍 중 에러 발생: " + errMsg); + } // thread_id 업데이트 (payload 내부에 포함되는 경우) if (data != null && data.containsKey("thread_id")) { @@ -248,10 +260,10 @@ public Flux streamConversation(Long userId, ConversationReq } } - // 토큰 스트림(type = token) 기준으로 내용 누적 + // 토큰 스트림(event = llm.token.delta) 기준으로 내용 누적 // FinalResponse 노드에서 LLM이 생성하는 토큰을 실시간으로 수집 - if ("token".equals(event.getEvent()) && data != null) { - Object content = data.get("content"); + if ("llm.token.delta".equals(eventType) && data != null) { + Object content = data.get("delta"); if (content != null) { contentBuilder.append(content.toString()); log.trace("[Chat] 토큰 수신 - content: {}", content); @@ -346,10 +358,14 @@ public ConversationResponse invokeConversation(Long userId, ConversationRequest /** * Proovy-ai /stream 호출 */ + /** + * Proovy-ai /stream/v2 호출 (SSE v2) + */ private Flux callProovyAiStream(ProovyAiRequest request) { - String streamUrl = proovyAiHost + "/stream"; + // v2 엔드포인트 사용 + String streamUrl = proovyAiHost + "/stream/v2"; - log.info("Calling Proovy-ai stream: {}", streamUrl); + log.info("Calling Proovy-ai stream v2: {}", streamUrl); log.debug("Request payload: {}", request); return webClient.post() @@ -358,11 +374,13 @@ private Flux callProovyAiStream(ProovyAiRequest request) { .bodyValue(request) .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() - .bodyToFlux(String.class) + // v2는 ServerSentEvent 객체로 받아서 이벤트 필드를 활용 + .bodyToFlux(new ParameterizedTypeReference>() {}) .timeout(Duration.ofMinutes(5)) .flatMap(this::parseSseEvent) .doOnNext(event -> log.debug("Received event: {}", event.getEvent())) - .takeUntil(event -> "[DONE]".equals(event.getEvent())); + // 완료 이벤트 체크 (run.completed 또는 run.failed) + .takeUntil(event -> "run.completed".equals(event.getEvent()) || "run.failed".equals(event.getEvent())); } /** @@ -404,58 +422,42 @@ private void checkProovyAiHealth() { /** * SSE 이벤트 파싱 */ - private Mono parseSseEvent(String rawEvent) { + /** + * SSE 이벤트 파싱 (v2: ServerSentEvent -> ProovyAiStreamEvent) + */ + private Mono parseSseEvent(ServerSentEvent sse) { return Mono.fromCallable(() -> { try { - if (rawEvent == null || rawEvent.trim().isEmpty()) { - return null; // null 이벤트는 건너뛰기 - } + String eventName = sse.event(); + String dataStr = sse.data(); - String trimmed = rawEvent.trim(); - - // [DONE] 토큰 처리 - if ("[DONE]".equals(trimmed)) { + if (dataStr == null || dataStr.trim().isEmpty()) { + // 데이터 없는 ping(heartbeat)이거나 payload 없는 이벤트 return ProovyAiStreamEvent.builder() - .event("[DONE]") + .event(eventName != null ? eventName : "heartbeat") .data(Collections.emptyMap()) .build(); } - // "data: {..}" 형태인 경우 prefix 제거 - if (trimmed.startsWith("data:")) { - trimmed = trimmed.substring(5).trim(); - - // data: [DONE] 형태도 처리 - if ("[DONE]".equals(trimmed)) { - return ProovyAiStreamEvent.builder() - .event("[DONE]") - .data(Collections.emptyMap()) - .build(); - } - } - // JSON payload 파싱 @SuppressWarnings("unchecked") - Map payload = objectMapper.readValue(trimmed, Map.class); + Map payload = objectMapper.readValue(dataStr, Map.class); - // type 필드 추출 (없으면 "message"로 기본 설정) - String type = Objects.toString(payload.get("type"), "message"); - - log.debug("[Chat] SSE 이벤트 파싱 - type: {}, payload keys: {}", type, payload.keySet()); + log.debug("[Chat] SSE v2 이벤트 수신 - event: {}, keys: {}", eventName, payload.keySet()); return ProovyAiStreamEvent.builder() - .event(type) + .event(eventName != null ? eventName : "message") .data(payload) .build(); } catch (Exception e) { - log.warn("[Chat] SSE 이벤트 파싱 실패: {}", rawEvent, e); + log.warn("[Chat] SSE 이벤트 파싱 실패: {}", sse, e); return ProovyAiStreamEvent.builder() .event("error") - .data(Map.of("error", "Failed to parse event", "raw", rawEvent)) + .data(Map.of("error", "Failed to parse event", "raw", String.valueOf(sse.data()))) .build(); } - }).filter(event -> event != null); // null 이벤트 필터링 + }); } private String toPersistableText(Object messageContent) {