diff --git a/langstream-agents/langstream-agents-atlassian/pom.xml b/langstream-agents/langstream-agents-atlassian/pom.xml new file mode 100644 index 000000000..46526a530 --- /dev/null +++ b/langstream-agents/langstream-agents-atlassian/pom.xml @@ -0,0 +1,109 @@ + + + + + langstream-agents + ai.langstream + 0.23.1-SNAPSHOT + + 4.0.0 + + langstream-agents-atlassian + + + + + + + + ${project.groupId} + langstream-api + ${project.version} + provided + + + ${project.groupId} + langstream-agents-commons + ${project.version} + + + ai.langstream + langstream-agents-commons-storage-provider + ${project.version} + + + org.projectlombok + lombok + provided + + + org.slf4j + slf4j-api + + + org.apache.logging.log4j + log4j-core + + + com.dropbox.core + dropbox-core-sdk + 7.0.0 + + + ch.qos.logback + logback-core + test + + + ch.qos.logback + logback-classic + test + + + org.junit.jupiter + junit-jupiter + test + + + com.azure + azure-core + + + + + + org.apache.nifi + nifi-nar-maven-plugin + true + + nar + + + + default-nar + package + + nar + + + + + + + \ No newline at end of file diff --git a/langstream-agents/langstream-agents-atlassian/src/main/java/ai/langstream/agents/atlassian/AtlassianAgentsCodeProvider.java b/langstream-agents/langstream-agents-atlassian/src/main/java/ai/langstream/agents/atlassian/AtlassianAgentsCodeProvider.java new file mode 100644 index 000000000..b09b83c34 --- /dev/null +++ b/langstream-agents/langstream-agents-atlassian/src/main/java/ai/langstream/agents/atlassian/AtlassianAgentsCodeProvider.java @@ -0,0 +1,42 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.atlassian; + +import ai.langstream.agents.atlassian.confluence.ConfluenceSource; +import ai.langstream.api.runner.code.AgentCode; +import ai.langstream.api.runner.code.AgentCodeProvider; +import java.util.List; + +public class AtlassianAgentsCodeProvider implements AgentCodeProvider { + + public static final String CONFLUENCE_SOURCE = "confluence-source"; + private static final List AGENTS = List.of(CONFLUENCE_SOURCE); + + @Override + public boolean supports(String agentType) { + return AGENTS.contains(agentType); + } + + @Override + public AgentCode createInstance(String agentType) { + switch (agentType) { + case CONFLUENCE_SOURCE: + return new ConfluenceSource(); + default: + throw new IllegalArgumentException("Unsupported agent type: " + agentType); + } + } +} diff --git a/langstream-agents/langstream-agents-atlassian/src/main/java/ai/langstream/agents/atlassian/confluence/ConfluenceSource.java b/langstream-agents/langstream-agents-atlassian/src/main/java/ai/langstream/agents/atlassian/confluence/ConfluenceSource.java new file mode 100644 index 000000000..68691dc50 --- /dev/null +++ b/langstream-agents/langstream-agents-atlassian/src/main/java/ai/langstream/agents/atlassian/confluence/ConfluenceSource.java @@ -0,0 +1,161 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.atlassian.confluence; + +import ai.langstream.agents.atlassian.confluence.client.ConfluenceRestAPIClient; +import ai.langstream.agents.atlassian.confluence.client.ConfluenceSpace; +import ai.langstream.ai.agents.commons.storage.provider.StorageProviderObjectReference; +import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSource; +import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState; +import ai.langstream.api.runner.code.Header; +import ai.langstream.api.runner.code.SimpleRecord; +import ai.langstream.api.util.ConfigurationUtils; +import com.dropbox.core.v2.files.*; +import java.util.*; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ConfluenceSource + extends StorageProviderSource { + + public static class ConfluenceSourceState extends StorageProviderSourceState {} + + private ConfluenceRestAPIClient client; + + private List spaces; + private Set rootParents; + + @Override + public Class getStateClass() { + return ConfluenceSourceState.class; + } + + @Override + public void initializeClientAndConfig(Map configuration) { + String username = + ConfigurationUtils.requiredField( + configuration, "username", () -> "confluence source"); + String apiToken = + ConfigurationUtils.requiredField( + configuration, "api-token", () -> "confluence source"); + String domain = + ConfigurationUtils.requiredField( + configuration, "domain", () -> "confluence source"); + client = new ConfluenceRestAPIClient(username, apiToken, domain); + initializeConfig(configuration); + } + + void initializeConfig(Map configuration) { + spaces = ConfigurationUtils.getList("spaces", configuration); + if (spaces.isEmpty()) { + throw new IllegalArgumentException( + "At least one space (name or key) must be specified"); + } + rootParents = ConfigurationUtils.getSet("root-parents", configuration); + } + + @Override + public String getBucketName() { + return ""; + } + + @Override + public boolean isDeleteObjects() { + return false; + } + + @Override + public Collection listObjects() throws Exception { + List collect = new ArrayList<>(); + for (String space : spaces) { + List spacesForSpace = client.findSpaceByNameOrKeyOrId(space); + if (spacesForSpace.isEmpty()) { + log.error( + "Space {} not found, make sure you inserted the name or the key or the id of the space", + space); + continue; + } + for (ConfluenceSpace confluenceSpace : spacesForSpace) { + log.info("Found space {}", confluenceSpace); + int before = collect.size(); + client.visitSpacePages( + confluenceSpace.id(), + rootParents, + confluencePage -> + collect.add( + new StorageProviderObjectReference() { + @Override + public String id() { + return confluencePage.id(); + } + + @Override + public long size() { + return -1; + } + + @Override + public String contentDigest() { + return confluencePage.pageVersion(); + } + + @Override + public Collection
additionalRecordHeaders() { + return List.of( + SimpleRecord.SimpleHeader.of( + "confluence-space-name", + confluenceSpace.name()), + SimpleRecord.SimpleHeader.of( + "confluence-space-key", + confluenceSpace.key()), + SimpleRecord.SimpleHeader.of( + "confluence-space-id", + confluenceSpace.key()), + SimpleRecord.SimpleHeader.of( + "confluence-page-title", + confluencePage.title())); + } + })); + log.info("Found {} pages in space {}", collect.size() - before, confluenceSpace); + } + } + return collect; + } + + @Override + public byte[] downloadObject(StorageProviderObjectReference object) throws Exception { + try { + return client.exportPage(object.id()); + } catch (Exception e) { + log.error( + "Error downloading page {} ({})", + object.id(), + object.additionalRecordHeaders(), + e); + throw e; + } + } + + @Override + public void deleteObject(String id) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isStateStorageRequired() { + return true; + } +} diff --git a/langstream-agents/langstream-agents-atlassian/src/main/java/ai/langstream/agents/atlassian/confluence/client/ConfluencePage.java b/langstream-agents/langstream-agents-atlassian/src/main/java/ai/langstream/agents/atlassian/confluence/client/ConfluencePage.java new file mode 100644 index 000000000..c857a55c2 --- /dev/null +++ b/langstream-agents/langstream-agents-atlassian/src/main/java/ai/langstream/agents/atlassian/confluence/client/ConfluencePage.java @@ -0,0 +1,3 @@ +package ai.langstream.agents.atlassian.confluence.client; + +public record ConfluencePage(long spaceId, String id, String title, String pageVersion) {} diff --git a/langstream-agents/langstream-agents-atlassian/src/main/java/ai/langstream/agents/atlassian/confluence/client/ConfluenceRestAPIClient.java b/langstream-agents/langstream-agents-atlassian/src/main/java/ai/langstream/agents/atlassian/confluence/client/ConfluenceRestAPIClient.java new file mode 100644 index 000000000..b6d0d75cc --- /dev/null +++ b/langstream-agents/langstream-agents-atlassian/src/main/java/ai/langstream/agents/atlassian/confluence/client/ConfluenceRestAPIClient.java @@ -0,0 +1,246 @@ +package ai.langstream.agents.atlassian.confluence.client; + +import ai.langstream.api.util.ObjectMapperFactory; +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.function.Consumer; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ConfluenceRestAPIClient { + private final String domain; + private final String basicHeader; + private final HttpClient httpClient = HttpClient.newHttpClient(); + + public ConfluenceRestAPIClient(String username, String apiToken, String domain) { + this.domain = domain; + this.basicHeader = + "Basic " + + Base64.getEncoder() + .encodeToString( + (username + ":" + apiToken) + .getBytes(StandardCharsets.UTF_8)); + } + + private record Links(String next) {} + + private record SpaceObject(long id, String key, String name) {} + + private record SpacesResponse(List results, Links _links) {} + + public List findSpaceByNameOrKeyOrId(String nameOrKeyOrId) + throws IOException, InterruptedException { + + List result = new ArrayList<>(); + + String uri = "https://%s/wiki/api/v2/spaces?limit=250".formatted(domain); + while (true) { + HttpResponse response = executeGet(uri); + SpacesResponse spaces = + ObjectMapperFactory.getDefaultMapper() + .readValue(response.body(), SpacesResponse.class); + for (SpaceObject space : spaces.results) { + if (space.name.equals(nameOrKeyOrId) + || space.key.equals(nameOrKeyOrId) + || (space.id + "").equals(nameOrKeyOrId)) { + result.add(new ConfluenceSpace(space.id(), space.name(), space.key())); + } + } + if (spaces._links().next() == null) { + break; + } else { + uri = "https://%s%s".formatted(domain, spaces._links().next()); + } + } + + return result; + } + + private HttpResponse executeGet(String uri) throws IOException, InterruptedException { + HttpRequest request = + HttpRequest.newBuilder() + .uri(URI.create(uri)) + .header("Authorization", basicHeader) + .GET() + .build(); + + HttpResponse response = + httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() >= 300) { + throw new RuntimeException( + "GET failed to " + + uri + + " with status " + + response.statusCode() + + " and body " + + response.body()); + } + return response; + } + + private record PageVersion(int number) {} + + private record PageObject(String id, String title, PageVersion version, String parentId) {} + + private record PagesResponse(List results, Links _links) {} + + @SneakyThrows + public void visitSpacePages( + long spaceId, Set rootParents, Consumer consumer) { + List result = new ArrayList<>(); + String uri = + "https://%s/wiki/api/v2/spaces/%d/pages?limit=250&sort=modified-date" + .formatted(domain, spaceId); + Map pageToParent = new HashMap<>(); + while (true) { + HttpResponse response = executeGet(uri); + PagesResponse pages = + ObjectMapperFactory.getDefaultMapper() + .readValue(response.body(), PagesResponse.class); + for (PageObject page : pages.results) { + if (!rootParents.isEmpty()) { + if (page.parentId() == null) { + continue; + } + pageToParent.put(page.id(), page.parentId()); + result.add( + new ConfluencePage( + spaceId, + page.id(), + page.title(), + page.version().number() + "")); + } else { + consumer.accept( + new ConfluencePage( + spaceId, + page.id(), + page.title(), + page.version().number() + "")); + } + } + if (pages._links().next() == null) { + break; + } else { + uri = "https://%s%s".formatted(domain, pages._links().next()); + } + } + if (rootParents.isEmpty()) { + return; + } + for (ConfluencePage confluencePage : result) { + String currentPageId = confluencePage.id(); + boolean include = false; + while (true) { + if (rootParents.contains(currentPageId)) { + include = true; + break; + } + String parentId = pageToParent.get(currentPageId); + if (parentId == null) { + break; + } + currentPageId = parentId; + } + if (include) { + consumer.accept(confluencePage); + } else { + log.debug("Skipping page {}", confluencePage); + } + } + } + + private record ExportView(String value) {} + + private record ExportViewBody(ExportView export_view) {} + + private record PageObjectWithBody(ExportViewBody body) {} + + @SneakyThrows + public byte[] exportPage(String pageId) { + String pageUri = + "https://%s/wiki/api/v2/pages/%s?body-format=export_view".formatted(domain, pageId); + HttpResponse pageBody = executeGet(pageUri); + PageObjectWithBody pageObjectWithBody = + ObjectMapperFactory.getDefaultMapper() + .readValue(pageBody.body(), PageObjectWithBody.class); + String value = pageObjectWithBody.body().export_view().value(); + return value.getBytes(StandardCharsets.UTF_8); + } + + public void deletePageByTitle(long spaceId, String title) { + visitSpacePages( + spaceId, + Set.of(), + page -> { + if (page.title().equals(title)) { + deletePage(page.id()); + } + }); + } + + @SneakyThrows + public void deletePage(String pageId) { + String uri = "https://%s/wiki/api/v2/pages/%s".formatted(domain, pageId); + HttpRequest request = + HttpRequest.newBuilder() + .uri(URI.create(uri)) + .header("Authorization", basicHeader) + .DELETE() + .build(); + + HttpResponse response = + httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() >= 300) { + throw new RuntimeException( + "DELETE failed to " + + uri + + " with status " + + response.statusCode() + + " and body " + + response.body()); + } + } + + @SneakyThrows + public String createPage(long spaceId, String title, String content, String parentId) { + Map payloadMap = new HashMap<>(); + payloadMap.put("spaceId", spaceId + ""); + payloadMap.put("status", "current"); + payloadMap.put("title", title); + payloadMap.put( + "body", Map.of("storage", Map.of("value", content, "representation", "storage"))); + if (parentId != null) { + payloadMap.put("parentId", parentId); + } + String payload = ObjectMapperFactory.getDefaultMapper().writeValueAsString(payloadMap); + + String uri = "https://%s/wiki/api/v2/pages".formatted(domain); + HttpRequest request = + HttpRequest.newBuilder() + .uri(URI.create(uri)) + .header("Authorization", basicHeader) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(payload)) + .build(); + + HttpResponse response = + httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() >= 300) { + throw new RuntimeException( + "POST failed to " + + uri + + " with status " + + response.statusCode() + + " and body " + + response.body()); + } + String body = response.body(); + return ObjectMapperFactory.getDefaultMapper().readValue(body, PageObject.class).id(); + } +} diff --git a/langstream-agents/langstream-agents-atlassian/src/main/java/ai/langstream/agents/atlassian/confluence/client/ConfluenceSpace.java b/langstream-agents/langstream-agents-atlassian/src/main/java/ai/langstream/agents/atlassian/confluence/client/ConfluenceSpace.java new file mode 100644 index 000000000..0b6ed7c05 --- /dev/null +++ b/langstream-agents/langstream-agents-atlassian/src/main/java/ai/langstream/agents/atlassian/confluence/client/ConfluenceSpace.java @@ -0,0 +1,3 @@ +package ai.langstream.agents.atlassian.confluence.client; + +public record ConfluenceSpace(long id, String name, String key) {} diff --git a/langstream-agents/langstream-agents-atlassian/src/main/resources/META-INF/ai.langstream.agents.index b/langstream-agents/langstream-agents-atlassian/src/main/resources/META-INF/ai.langstream.agents.index new file mode 100644 index 000000000..a9fc3d07f --- /dev/null +++ b/langstream-agents/langstream-agents-atlassian/src/main/resources/META-INF/ai.langstream.agents.index @@ -0,0 +1 @@ +confluence-source diff --git a/langstream-agents/langstream-agents-atlassian/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider b/langstream-agents/langstream-agents-atlassian/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider new file mode 100644 index 000000000..c810d4761 --- /dev/null +++ b/langstream-agents/langstream-agents-atlassian/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider @@ -0,0 +1 @@ +ai.langstream.agents.atlassian.AtlassianAgentsCodeProvider \ No newline at end of file diff --git a/langstream-agents/pom.xml b/langstream-agents/pom.xml index 64870c51f..724f42b35 100644 --- a/langstream-agents/pom.xml +++ b/langstream-agents/pom.xml @@ -45,5 +45,6 @@ langstream-agents-google langstream-agents-ms365 langstream-agents-dropbox + langstream-agents-atlassian diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/StorageProviderSourceAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/StorageProviderSourceAgentProvider.java index 207e39b18..ba71a0fc5 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/StorageProviderSourceAgentProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/StorageProviderSourceAgentProvider.java @@ -41,6 +41,7 @@ public class StorageProviderSourceAgentProvider extends AbstractComposableAgentP protected static final String MS_365_SHAREPOINT_SOURCE = "ms365-sharepoint-source"; protected static final String MS_365_ONEDRIVE_SOURCE = "ms365-onedrive-source"; protected static final String DROPBOX_SOURCE = "dropbox-source"; + protected static final String CONFLUENCE_SOURCE = "confluence-source"; public StorageProviderSourceAgentProvider() { super( @@ -51,7 +52,8 @@ public StorageProviderSourceAgentProvider() { GOOGLE_DRIVE_SOURCE, MS_365_SHAREPOINT_SOURCE, MS_365_ONEDRIVE_SOURCE, - DROPBOX_SOURCE), + DROPBOX_SOURCE, + CONFLUENCE_SOURCE), List.of(KubernetesClusterRuntime.CLUSTER_TYPE, "none")); } @@ -77,6 +79,8 @@ protected Class getAgentConfigModelClass(String type) { return MS365OneDriveSourceConfiguration.class; case DROPBOX_SOURCE: return DropboxSourceConfiguration.class; + case CONFLUENCE_SOURCE: + return ConfluenceSourceConfiguration.class; default: throw new IllegalArgumentException("Unknown agent type: " + type); } @@ -566,4 +570,59 @@ public static class DropboxSourceConfiguration extends StorageProviderSourceBase @JsonProperty("path-prefix") private String pathPrefix; } + + @AgentConfig( + name = "Confluence Source", + description = + """ + Reads data from Confluence Spaces. + """) + @Data + @EqualsAndHashCode(callSuper = true) + public static class ConfluenceSourceConfiguration + extends StorageProviderSourceBaseConfiguration { + + @ConfigProperty( + required = true, + description = + """ + Username to authenticate with. + """) + @JsonProperty("username") + private String username; + + @ConfigProperty( + required = true, + description = + """ + API token to authenticate with. + """) + @JsonProperty("api-token") + private String apiToken; + + @ConfigProperty( + required = true, + description = + """ + Confluence domain (e.g. https://my-domain.atlassian.net or confluence..com). + """) + private String domain; + + @ConfigProperty( + required = true, + description = + """ + Confluence Spaces to read from. A space can be referenced by name, key or numeric id. + """) + private List spaces; + + @ConfigProperty( + description = + """ + Filter by parent pages. By default, all pages are included. + Set a list of parent page ids to filter by. + """) + @JsonProperty("root-parents") + private List rootParents; + } } diff --git a/langstream-runtime/langstream-runtime-impl/pom.xml b/langstream-runtime/langstream-runtime-impl/pom.xml index cbab2f976..65e9b1b9d 100644 --- a/langstream-runtime/langstream-runtime-impl/pom.xml +++ b/langstream-runtime/langstream-runtime-impl/pom.xml @@ -352,6 +352,13 @@ provided + + ${project.groupId} + langstream-agents-atlassian + ${project.version} + provided + + ${project.groupId} langstream-agent-webcrawler @@ -692,6 +699,16 @@ ${project.build.directory}/agents + + ${project.groupId} + langstream-agents-atlassian + ${project.version} + nar + nar + false + ${project.build.directory}/agents + + ${project.groupId} diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/agents/ConfluenceSourceIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/agents/ConfluenceSourceIT.java new file mode 100644 index 000000000..da0e05d3e --- /dev/null +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/agents/ConfluenceSourceIT.java @@ -0,0 +1,160 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents; + +import static ai.langstream.testrunners.AbstractApplicationRunner.INTEGRATION_TESTS_GROUP1; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3; + +import ai.langstream.agents.atlassian.confluence.client.ConfluenceRestAPIClient; +import ai.langstream.api.runner.topics.TopicConsumer; +import ai.langstream.testrunners.AbstractGenericStreamingApplicationRunner; +import java.util.*; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Slf4j +@Testcontainers +@Tag(INTEGRATION_TESTS_GROUP1) +@Disabled +class ConfluenceSourceIT extends AbstractGenericStreamingApplicationRunner { + @Container + private static final LocalStackContainer localstack = + new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.2.0")) + .withServices(S3); + + private static final String USERNAME = ""; + private static final String API_TOKEN = ""; + private static final String DOMAIN = ""; + private static final String SPACE = ""; + + @Test + public void test() throws Exception { + ConfluenceRestAPIClient confluence = + new ConfluenceRestAPIClient(USERNAME, API_TOKEN, DOMAIN); + + long spaceId = confluence.findSpaceByNameOrKeyOrId(SPACE).iterator().next().id(); + confluence.visitSpacePages( + spaceId, + Set.of(), + page -> { + if (page.title().equals("Langstream Parent")) { + confluence.deletePage(page.id()); + } + }); + String parentPageId = + confluence.createPage(spaceId, "Langstream Parent", "Parent page", null); + + final String appId = "app-" + UUID.randomUUID().toString().substring(0, 4); + + String tenant = "tenant"; + String[] expectedAgents = new String[] {appId + "-step1", appId + "-step2"}; + Map application = + Map.of( + "module.yaml", + """ + module: "module-1" + id: "pipeline-1" + topics: + - name: "${globals.output-topic}" + creation-mode: create-if-not-exists + - name: "deleted-documents" + creation-mode: create-if-not-exists + pipeline: + - type: "confluence-source" + id: "step1" + configuration: + username: %s + api-token: %s + domain: %s + spaces: [%s] + state-storage: s3 + state-storage-s3-bucket: "test-state-bucket" + state-storage-s3-endpoint: "%s" + deleted-objects-topic: "deleted-objects" + idle-time: 1 + root-parents: ["%s"] + - type: text-extractor + id: step2 + output: "${globals.output-topic}" + """ + .formatted( + USERNAME, + API_TOKEN, + DOMAIN, + SPACE, + localstack.getEndpointOverride(S3), + parentPageId)); + + List pageIds = new ArrayList<>(); + pageIds.add(parentPageId); + for (int i = 0; i < 2; i++) { + String title = "Langstream test-" + i; + confluence.deletePageByTitle(spaceId, title); + String pageId = confluence.createPage(spaceId, title, "document " + i, parentPageId); + pageIds.add(pageId); + } + + try (ApplicationRuntime applicationRuntime = + deployApplication( + tenant, appId, application, buildInstanceYaml(), expectedAgents)) { + + try (TopicConsumer deletedDocumentsConsumer = createConsumer("deleted-objects"); + TopicConsumer consumer = + createConsumer(applicationRuntime.getGlobal("output-topic")); ) { + + executeAgentRunners(applicationRuntime, 5); + waitForMessages( + consumer, + 3, + (consumerRecords, objects) -> { + assertEquals(3, consumerRecords.size()); + assertEquals(pageIds.get(0), consumerRecords.get(0).key()); + assertTrue( + ((String) consumerRecords.get(0).value()) + .contains("Parent page")); + assertEquals(pageIds.get(1), consumerRecords.get(1).key()); + assertTrue( + ((String) consumerRecords.get(1).value()) + .contains("document 0")); + assertEquals(pageIds.get(2), consumerRecords.get(2).key()); + assertTrue( + ((String) consumerRecords.get(2).value()) + .contains("document 1")); + }); + + confluence.deletePage(pageIds.get(1)); + executeAgentRunners(applicationRuntime, 5); + waitForMessages(deletedDocumentsConsumer, List.of(pageIds.get(1))); + } + } finally { + for (String pageId : pageIds) { + try { + confluence.deletePage(pageId); + } catch (Exception e) { + log.error("Error deleting page {}", pageId, e); + } + } + } + } +}