diff --git a/.dockerignore b/.dockerignore
index c9f9e5414..167eddca4 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -1,3 +1,3 @@
target
-Dockerfile
-arroyo-console/node_modules
+docker/Dockerfile
+webui/node_modules
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 7a9cacd64..df55e28ae 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -17,23 +17,9 @@ jobs:
args: [--frozen-lockfile, --strict-peer-dependencies]
- name: Run prettier
run: |
- cd arroyo-console
+ cd webui
pnpm check
- check-dockerfiles:
- runs-on: ubuntu-latest
- steps:
- - name: Check out
- uses: actions/checkout@v3
- - name: Set up Ruby
- uses: ruby/setup-ruby@ec02537da5712d66d4d50a0f33b7eb52773b5ed1
- with:
- ruby-version: '3.1'
- - name: Install dependencies
- run: gem install toml-rb
- - name: Check Dockerfiles
- run: ruby docker/check_dockerfiles.rb
-
build-rust:
runs-on: ubicloud-standard-16
env:
@@ -54,12 +40,16 @@ jobs:
with:
toolchain: stable
override: true
+ - name: Check Formatting
+ run: cargo fmt -- --check
- name: Setup npm
uses: actions/setup-node@v3
+ with:
+ node-version: '21'
- name: Install OpenAPI Generator
run: |
npm install @openapitools/openapi-generator-cli -g
- openapi-generator-cli version
+ cd crates/arroyo-openapi && openapi-generator-cli version
- name: Setup Postgres
run: |
sudo apt-get update
@@ -72,7 +62,7 @@ jobs:
tar xvfz refinery*.tar.gz
mv /tmp/refinery*-musl/refinery /tmp
popd
- /tmp/refinery migrate -e DATABASE_URL -p arroyo-api/migrations
+ /tmp/refinery migrate -e DATABASE_URL -p crates/arroyo-api/migrations
- name: Install dependencies
run: |
curl -LsSf https://get.nexte.st/latest/linux | tar zxf - -C ${CARGO_HOME:-~/.cargo}/bin
@@ -88,14 +78,12 @@ jobs:
mkdir /tmp/kraft-combined-logs
kafka_*/bin/kafka-storage.sh format -t 9v5PspiySuWU2l5NjTgRuA -c kafka_*/config/kraft/server.properties
kafka_*/bin/kafka-server-start.sh -daemon kafka_*/config/kraft/server.properties
- - name: Check Formatting
- run: cargo fmt -- --check
- name: Build
run: cargo build --all-features
- name: Validate API
run: |
npm install --global ibm-openapi-validator
- cd arroyo-openapi
+ cd crates/arroyo-openapi
lint-openapi --errors-only api-spec.json
- name: Test
run: cargo nextest run --jobs 4 --all-features
@@ -117,5 +105,5 @@ jobs:
args: [--frozen-lockfile, --strict-peer-dependencies]
- name: Build console
run: |
- cd arroyo-console
+ cd webui
pnpm build
diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml
index 790e88280..38c4c2f46 100644
--- a/.github/workflows/docker.yaml
+++ b/.github/workflows/docker.yaml
@@ -4,6 +4,7 @@ on:
push:
branches:
- master
+ - dev
- '*docker*'
tags:
- 'v[0-9]+.[0-9]+.[0-9]+*' # Semver matching pattern with optional suffix
@@ -18,103 +19,112 @@ jobs:
platform:
- linux/amd64
- linux/arm64
- image_type:
- - worker
- - services
- - compiler
- - single
include:
- platform: linux/amd64
- mold_arch: x86_64
- proto_arch: x86_64
- prom_arch: amd64
- runs_on: buildjet-8vcpu-ubuntu-2204
+ runs_on: buildjet-32vcpu-ubuntu-2204
+ arch: amd64
- platform: linux/arm64
- mold_arch: aarch64
- proto_arch: aarch_64
- prom_arch: arm64
- runs_on: buildjet-8vcpu-ubuntu-2204-arm
- - image_type: worker
- dockerfile: docker/cluster/worker/Dockerfile
- - image_type: services
- dockerfile: docker/cluster/services/Dockerfile
- - image_type: compiler
- dockerfile: docker/cluster/compiler/Dockerfile
- - image_type: single
- dockerfile: docker/single/Dockerfile
+ runs_on: buildjet-32vcpu-ubuntu-2204-arm
+ arch: arm64
runs-on: ${{ matrix.runs_on }}
steps:
- # Get the repository's code
- - name: Checkout
- uses: actions/checkout@v2
- - name: Login to Docker Hub
- uses: docker/login-action@v2
- with:
- username: ${{ secrets.DOCKER_HUB_USERNAME }}
- password: ${{ secrets.DOCKER_HUB_PASSWORD }}
+ - name: Prepare
+ run: |
+ platform=${{ matrix.platform }}
+ echo "PLATFORM_PAIR=${platform//\//-}" >> $GITHUB_ENV
+ - name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v3
+
- name: Login to GHCR
if: github.event_name != 'pull_request'
- uses: docker/login-action@v1
+ uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
+
- name: Docker meta
- id: arroyo-docker # you'll use this in the next step
- uses: docker/metadata-action@v3
+ id: meta
+ uses: docker/metadata-action@v5
with:
- images: |
- ghcr.io/arroyosystems/arroyo-${{ matrix.image_type }}
- tags: |
- type=schedule
- type=ref,event=branch
- type=ref,event=pr
- type=semver,pattern={{version}}
- type=semver,pattern={{major}}.{{minor}}
- type=semver,pattern={{major}}
- type=sha
- flavor: |
- latest=false
- prefix=${{ matrix.prom_arch }}-
- - name: Set env
- run: echo "GIT_SHA=$(git rev-parse HEAD)" >> $GITHUB_ENV
- - name: Build and push
- uses: docker/build-push-action@v2
+ images: ghcr.io/arroyosystems/arroyo-single
+
+ - name: Build and push single
+ id: build-single
+ uses: docker/build-push-action@v5
+ with:
+ file: docker/Dockerfile
+ platforms: ${{ matrix.platform }}
+ build-args: |
+ GIT_SHA=${{ github.sha }}
+ push: ${{ github.event_name != 'pull_request' }}
+ cache-from: type=registry,ref=ghcr.io/arroyosystems/arroyo-builder:buildcache-${{ matrix.arch }}
+ cache-to: type=registry,ref=ghcr.io/arroyosystems/arroyo-builder:buildcache-${{ matrix.arch }}
+ target: arroyo-single
+ outputs: type=image,name=ghcr.io/arroyosystems/arroyo-single,push-by-digest=true,name-canonical=true,push=true,store=true
+
+ - name: Docker meta
+ id: meta-arroyo
+ uses: docker/metadata-action@v5
+ with:
+ images: ghcr.io/arroyosystems/arroyo
+
+ - name: Build and push arroyo
+ id: build-arroyo
+ uses: docker/build-push-action@v5
with:
- context: .
- file: ${{ matrix.dockerfile }}
+ file: docker/Dockerfile
platforms: ${{ matrix.platform }}
build-args: |
- MOLD_ARCH=${{ matrix.mold_arch }}
- PROTO_ARCH=${{ matrix.proto_arch }}
- PROM_ARCH=${{ matrix.prom_arch }}
- GIT_SHA=${{ env.GIT_SHA }}
+ GIT_SHA=${{ github.sha }}
push: ${{ github.event_name != 'pull_request' }}
- tags: ${{ steps.arroyo-docker.outputs.tags }}
- labels: ${{ steps.arroyo-docker.outputs.labels }}
+ cache-from: type=registry,ref=ghcr.io/arroyosystems/arroyo-builder:buildcache-${{ matrix.arch }}
+ cache-to: type=registry,ref=ghcr.io/arroyosystems/arroyo-builder:buildcache-${{ matrix.arch }}
+ target: arroyo
+ outputs: type=image,name=ghcr.io/arroyosystems/arroyo,push-by-digest=true,name-canonical=true,push=true,store=true
+
+ - name: Export digest
+ run: |
+ mkdir -p /tmp/digests/arroyo-single
+ digest="${{ steps.build-single.outputs.digest }}"
+ touch "/tmp/digests/arroyo-single/${digest#sha256:}"
+
+ mkdir -p /tmp/digests/arroyo
+ digest="${{ steps.build-arroyo.outputs.digest }}"
+ touch "/tmp/digests/arroyo/${digest#sha256:}"
+ - name: Upload digest
+ uses: actions/upload-artifact@v4
+ with:
+ name: digests-${{ env.PLATFORM_PAIR }}
+ path: /tmp/digests/*
+ if-no-files-found: error
+ retention-days: 1
+
manifest:
needs: build
strategy:
matrix:
- image_type:
- - worker
- - services
- - compiler
- - single
+ image_name:
+ - arroyo
+ - arroyo-single
runs-on: ubuntu-latest
steps:
- - name: Login to GHCR
- uses: docker/login-action@v1
+ -
+ name: Download digests
+ uses: actions/download-artifact@v4
with:
- registry: ghcr.io
- username: ${{ github.repository_owner }}
- password: ${{ secrets.GITHUB_TOKEN }}
- - name: Docker meta
- id: arroyo-docker # you'll use this in the next step
- uses: docker/metadata-action@v3
+ path: /tmp/digests
+ pattern: digests-*
+ merge-multiple: true
+ -
+ name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v3
+ -
+ name: Docker meta
+ id: meta
+ uses: docker/metadata-action@v5
with:
- images: |
- ghcr.io/arroyosystems/arroyo-${{ matrix.image_type }}
+ images: ghcr.io/arroyosystems/${{ matrix.image_name }}
tags: |
type=schedule
type=ref,event=branch
@@ -123,20 +133,28 @@ jobs:
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
type=sha
- - name: Create and push Docker manifest
+ - name: Login to GHCR
+ if: github.event_name != 'pull_request'
+ uses: docker/login-action@v3
+ with:
+ registry: ghcr.io
+ username: ${{ github.repository_owner }}
+ password: ${{ secrets.GITHUB_TOKEN }}
+ -
+ name: Create manifest list and push
+ working-directory: /tmp/digests/${{ matrix.image_name }}
+ run: |
+ docker buildx imagetools create $(jq -cr '.tags | map("-t " + .) | join(" ")' <<< "$DOCKER_METADATA_OUTPUT_JSON") \
+ $(printf 'ghcr.io/arroyosystems/${{ matrix.image_name }}@sha256:%s ' *)
+ -
+ name: Inspect image
run: |
- TAG=${{ steps.arroyo-docker.outputs.version }}
- IMAGE=ghcr.io/arroyosystems/arroyo-${{ matrix.image_type }}
- docker manifest create ${IMAGE}:${TAG} ${IMAGE}:arm64-${TAG} ${IMAGE}:amd64-${TAG}
- docker manifest annotate ${IMAGE}:${TAG} ${IMAGE}:arm64-${TAG} --arch arm64
- docker manifest annotate ${IMAGE}:${TAG} ${IMAGE}:amd64-${TAG} --arch amd64
- docker manifest push ${IMAGE}:${TAG}
- - name: Push latest to tip tag.
- if: github.ref == 'refs/heads/master'
+ docker buildx imagetools inspect ghcr.io/arroyosystems/${{ matrix.image_name }}:${{ steps.meta.outputs.version }}
+ # TODO: change this to tip once merged into master
+ - name: Push to dev tag.
+ working-directory: /tmp/digests/${{ matrix.image_name }}
+ # change to dev before merging
+ if: github.ref == 'refs/heads/df-docker'
run: |
- TAG=${{ steps.arroyo-docker.outputs.version }}
- IMAGE=ghcr.io/arroyosystems/arroyo-${{ matrix.image_type }}
- docker manifest create ${IMAGE}:tip ${IMAGE}:arm64-${TAG} ${IMAGE}:amd64-${TAG}
- docker manifest annotate ${IMAGE}:tip ${IMAGE}:arm64-${TAG} --arch arm64
- docker manifest annotate ${IMAGE}:tip ${IMAGE}:amd64-${TAG} --arch amd64
- docker manifest push ${IMAGE}:tip
\ No newline at end of file
+ docker buildx imagetools create --tag ghcr.io/arroyosystems/${{ matrix.image_name }}:dev \
+ $(printf 'ghcr.io/arroyosystems/${{ matrix.image_name }}@sha256:%s ' *)
diff --git a/.gitignore b/.gitignore
index 28b7a41e2..274a8d4cc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,6 +2,7 @@ target/
.idea
.env
.vscode/
-arroyo-openapi/api-spec.json
-arroyo-openapi/client
+crates/arroyo-openapi/api-spec.json
+crates/arroyo-openapi/client
+openapitools.json
diff --git a/Cargo.lock b/Cargo.lock
index c2f39852a..4910e0b5c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -723,6 +723,7 @@ dependencies = [
name = "arroyo-bin"
version = "0.10.0-dev"
dependencies = [
+ "anyhow",
"arroyo-api",
"arroyo-compiler-service",
"arroyo-connectors",
@@ -733,6 +734,7 @@ dependencies = [
"clap 4.4.18",
"deadpool-postgres",
"postgres-types",
+ "refinery",
"serde",
"serde_json",
"tokio",
@@ -1088,6 +1090,7 @@ dependencies = [
"tower-http",
"tracing",
"tracing-appender",
+ "tracing-log",
"tracing-logfmt",
"tracing-subscriber",
"vergen",
@@ -6946,9 +6949,9 @@ dependencies = [
[[package]]
name = "refinery"
-version = "0.8.11"
+version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "529664dbccc0a296947615c997a857912d72d1c44be1fafb7bae54ecfa7a8c24"
+checksum = "431fd1854421586de5e83614052cb0d2d82025647829db6d9499b27385e58a6b"
dependencies = [
"refinery-core",
"refinery-macros",
diff --git a/Cargo.toml b/Cargo.toml
index 8920b64e4..aafaaf583 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,28 +1,28 @@
[workspace]
members = [
- "arroyo",
- "arroyo-api",
- "arroyo-bin",
- "arroyo-compiler-service",
- "arroyo-controller",
- "arroyo-connectors",
- "arroyo-datastream",
- "arroyo-df",
- "arroyo-formats",
- "arroyo-macro",
- "arroyo-metrics",
- "arroyo-node",
- "arroyo-openapi",
- "arroyo-operator",
- "arroyo-rpc",
- "arroyo-sql-testing",
- "arroyo-server-common",
- "arroyo-state",
- "arroyo-storage",
- "arroyo-types",
- "arroyo-worker",
- "copy-artifacts",
- "integ",
+ "crates/arroyo",
+ "crates/arroyo-api",
+ "crates/arroyo-bin",
+ "crates/arroyo-compiler-service",
+ "crates/arroyo-controller",
+ "crates/arroyo-connectors",
+ "crates/arroyo-datastream",
+ "crates/arroyo-df",
+ "crates/arroyo-formats",
+ "crates/arroyo-macro",
+ "crates/arroyo-metrics",
+ "crates/arroyo-node",
+ "crates/arroyo-openapi",
+ "crates/arroyo-operator",
+ "crates/arroyo-rpc",
+ "crates/arroyo-sql-testing",
+ "crates/arroyo-server-common",
+ "crates/arroyo-state",
+ "crates/arroyo-storage",
+ "crates/arroyo-types",
+ "crates/arroyo-worker",
+ "crates/copy-artifacts",
+ "crates/integ",
]
resolver = "2"
diff --git a/README.md b/README.md
index 90c77b536..1a6450e0a 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
-
+
@@ -36,7 +36,7 @@ on both bounded and unbounded sources, emitting results as soon as they are avai
In short: Arroyo lets you ask complex questions of high-volume real-time data with subsecond results.
-
+
## Features
diff --git a/arroyo-console/.env b/arroyo-console/.env
deleted file mode 100644
index da8112756..000000000
--- a/arroyo-console/.env
+++ /dev/null
@@ -1,4 +0,0 @@
-# allow the API server to replace this with the correct endpoint
-VITE_API_ENDPOINT="{{API_ENDPOINT}}"
-VITE_CLUSTER_ID="{{CLUSTER_ID}}"
-VITE_DISABLE_TELEMETRY="{{DISABLE_TELEMETRY}}"
diff --git a/arroyo-console/openapitools.json b/arroyo-console/openapitools.json
deleted file mode 100644
index cd53ff4ca..000000000
--- a/arroyo-console/openapitools.json
+++ /dev/null
@@ -1,7 +0,0 @@
-{
- "$schema": "./node_modules/@openapitools/openapi-generator-cli/config.schema.json",
- "spaces": 2,
- "generator-cli": {
- "version": "6.6.0"
- }
-}
diff --git a/arroyo-openapi/openapitools.json b/arroyo-openapi/openapitools.json
deleted file mode 100644
index cd53ff4ca..000000000
--- a/arroyo-openapi/openapitools.json
+++ /dev/null
@@ -1,7 +0,0 @@
-{
- "$schema": "./node_modules/@openapitools/openapi-generator-cli/config.schema.json",
- "spaces": 2,
- "generator-cli": {
- "version": "6.6.0"
- }
-}
diff --git a/arroyo-sql-testing/outputs/.gitignore b/arroyo-sql-testing/outputs/.gitignore
deleted file mode 100644
index 72e8ffc0d..000000000
--- a/arroyo-sql-testing/outputs/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-*
diff --git a/arroyo-api/Cargo.toml b/crates/arroyo-api/Cargo.toml
similarity index 100%
rename from arroyo-api/Cargo.toml
rename to crates/arroyo-api/Cargo.toml
diff --git a/arroyo-api/build.rs b/crates/arroyo-api/build.rs
similarity index 100%
rename from arroyo-api/build.rs
rename to crates/arroyo-api/build.rs
diff --git a/arroyo-api/migrations/V10__merge_definitions.sql b/crates/arroyo-api/migrations/V10__merge_definitions.sql
similarity index 100%
rename from arroyo-api/migrations/V10__merge_definitions.sql
rename to crates/arroyo-api/migrations/V10__merge_definitions.sql
diff --git a/arroyo-api/migrations/V11__delete_cascade.sql b/crates/arroyo-api/migrations/V11__delete_cascade.sql
similarity index 100%
rename from arroyo-api/migrations/V11__delete_cascade.sql
rename to crates/arroyo-api/migrations/V11__delete_cascade.sql
diff --git a/arroyo-api/migrations/V12__add_committing.sql b/crates/arroyo-api/migrations/V12__add_committing.sql
similarity index 100%
rename from arroyo-api/migrations/V12__add_committing.sql
rename to crates/arroyo-api/migrations/V12__add_committing.sql
diff --git a/arroyo-api/migrations/V13__backfill_pub_id.sql b/crates/arroyo-api/migrations/V13__backfill_pub_id.sql
similarity index 100%
rename from arroyo-api/migrations/V13__backfill_pub_id.sql
rename to crates/arroyo-api/migrations/V13__backfill_pub_id.sql
diff --git a/arroyo-api/migrations/V14__unify_job_ids.sql b/crates/arroyo-api/migrations/V14__unify_job_ids.sql
similarity index 100%
rename from arroyo-api/migrations/V14__unify_job_ids.sql
rename to crates/arroyo-api/migrations/V14__unify_job_ids.sql
diff --git a/arroyo-api/migrations/V15__rename_connections.sql b/crates/arroyo-api/migrations/V15__rename_connections.sql
similarity index 100%
rename from arroyo-api/migrations/V15__rename_connections.sql
rename to crates/arroyo-api/migrations/V15__rename_connections.sql
diff --git a/arroyo-api/migrations/V16__restart_failed.sql b/crates/arroyo-api/migrations/V16__restart_failed.sql
similarity index 100%
rename from arroyo-api/migrations/V16__restart_failed.sql
rename to crates/arroyo-api/migrations/V16__restart_failed.sql
diff --git a/arroyo-api/migrations/V17__udfs_table.sql b/crates/arroyo-api/migrations/V17__udfs_table.sql
similarity index 100%
rename from arroyo-api/migrations/V17__udfs_table.sql
rename to crates/arroyo-api/migrations/V17__udfs_table.sql
diff --git a/arroyo-api/migrations/V18__udfs_drop_language.sql b/crates/arroyo-api/migrations/V18__udfs_drop_language.sql
similarity index 100%
rename from arroyo-api/migrations/V18__udfs_drop_language.sql
rename to crates/arroyo-api/migrations/V18__udfs_drop_language.sql
diff --git a/arroyo-api/migrations/V19__case_insensitive_connection_names.sql b/crates/arroyo-api/migrations/V19__case_insensitive_connection_names.sql
similarity index 100%
rename from arroyo-api/migrations/V19__case_insensitive_connection_names.sql
rename to crates/arroyo-api/migrations/V19__case_insensitive_connection_names.sql
diff --git a/arroyo-api/migrations/V1__initial.sql b/crates/arroyo-api/migrations/V1__initial.sql
similarity index 100%
rename from arroyo-api/migrations/V1__initial.sql
rename to crates/arroyo-api/migrations/V1__initial.sql
diff --git a/crates/arroyo-api/migrations/V20__pipeline_v2.sql b/crates/arroyo-api/migrations/V20__pipeline_v2.sql
new file mode 100644
index 000000000..e37ce866a
--- /dev/null
+++ b/crates/arroyo-api/migrations/V20__pipeline_v2.sql
@@ -0,0 +1,2 @@
+ALTER TABLE pipelines
+ADD COLUMN proto_version INT NOT NULL DEFAULT 1;
\ No newline at end of file
diff --git a/arroyo-api/migrations/V2__add_raw_json.sql b/crates/arroyo-api/migrations/V2__add_raw_json.sql
similarity index 100%
rename from arroyo-api/migrations/V2__add_raw_json.sql
rename to crates/arroyo-api/migrations/V2__add_raw_json.sql
diff --git a/arroyo-api/migrations/V3__add_udfs.sql b/crates/arroyo-api/migrations/V3__add_udfs.sql
similarity index 100%
rename from arroyo-api/migrations/V3__add_udfs.sql
rename to crates/arroyo-api/migrations/V3__add_udfs.sql
diff --git a/arroyo-api/migrations/V4__add_sources.sql b/crates/arroyo-api/migrations/V4__add_sources.sql
similarity index 100%
rename from arroyo-api/migrations/V4__add_sources.sql
rename to crates/arroyo-api/migrations/V4__add_sources.sql
diff --git a/arroyo-api/migrations/V5__add_http_connection.sql b/crates/arroyo-api/migrations/V5__add_http_connection.sql
similarity index 100%
rename from arroyo-api/migrations/V5__add_http_connection.sql
rename to crates/arroyo-api/migrations/V5__add_http_connection.sql
diff --git a/arroyo-api/migrations/V6__add_cluster_table.sql b/crates/arroyo-api/migrations/V6__add_cluster_table.sql
similarity index 100%
rename from arroyo-api/migrations/V6__add_cluster_table.sql
rename to crates/arroyo-api/migrations/V6__add_cluster_table.sql
diff --git a/arroyo-api/migrations/V7__add_job_log_messages.sql b/crates/arroyo-api/migrations/V7__add_job_log_messages.sql
similarity index 100%
rename from arroyo-api/migrations/V7__add_job_log_messages.sql
rename to crates/arroyo-api/migrations/V7__add_job_log_messages.sql
diff --git a/arroyo-api/migrations/V8__connector_redesign.sql b/crates/arroyo-api/migrations/V8__connector_redesign.sql
similarity index 100%
rename from arroyo-api/migrations/V8__connector_redesign.sql
rename to crates/arroyo-api/migrations/V8__connector_redesign.sql
diff --git a/arroyo-api/migrations/V9__add_pub_ids.sql b/crates/arroyo-api/migrations/V9__add_pub_ids.sql
similarity index 100%
rename from arroyo-api/migrations/V9__add_pub_ids.sql
rename to crates/arroyo-api/migrations/V9__add_pub_ids.sql
diff --git a/arroyo-api/queries/api_queries.sql b/crates/arroyo-api/queries/api_queries.sql
similarity index 99%
rename from arroyo-api/queries/api_queries.sql
rename to crates/arroyo-api/queries/api_queries.sql
index daa9ca9d7..507ba91cf 100644
--- a/arroyo-api/queries/api_queries.sql
+++ b/crates/arroyo-api/queries/api_queries.sql
@@ -131,8 +131,8 @@ WHERE organization_id = :organization_id AND pub_id = :pub_id;
--: DbPipeline (state?, ttl_micros?)
--! create_pipeline(udfs?, textual_repr?)
-INSERT INTO pipelines (pub_id, organization_id, created_by, name, type, textual_repr, udfs, program)
-VALUES (:pub_id, :organization_id, :created_by, :name, :type, :textual_repr, :udfs, :program)
+INSERT INTO pipelines (pub_id, organization_id, created_by, name, type, textual_repr, udfs, program, proto_version)
+VALUES (:pub_id, :organization_id, :created_by, :name, :type, :textual_repr, :udfs, :program, :proto_version)
RETURNING id;
--! get_pipelines : DbPipeline
diff --git a/arroyo-api/src/cloud.rs b/crates/arroyo-api/src/cloud.rs
similarity index 100%
rename from arroyo-api/src/cloud.rs
rename to crates/arroyo-api/src/cloud.rs
diff --git a/arroyo-api/src/connection_profiles.rs b/crates/arroyo-api/src/connection_profiles.rs
similarity index 100%
rename from arroyo-api/src/connection_profiles.rs
rename to crates/arroyo-api/src/connection_profiles.rs
diff --git a/arroyo-api/src/connection_tables.rs b/crates/arroyo-api/src/connection_tables.rs
similarity index 100%
rename from arroyo-api/src/connection_tables.rs
rename to crates/arroyo-api/src/connection_tables.rs
diff --git a/arroyo-api/src/connectors.rs b/crates/arroyo-api/src/connectors.rs
similarity index 100%
rename from arroyo-api/src/connectors.rs
rename to crates/arroyo-api/src/connectors.rs
diff --git a/arroyo-api/src/jobs.rs b/crates/arroyo-api/src/jobs.rs
similarity index 100%
rename from arroyo-api/src/jobs.rs
rename to crates/arroyo-api/src/jobs.rs
diff --git a/arroyo-api/src/lib.rs b/crates/arroyo-api/src/lib.rs
similarity index 100%
rename from arroyo-api/src/lib.rs
rename to crates/arroyo-api/src/lib.rs
diff --git a/arroyo-api/src/metrics.rs b/crates/arroyo-api/src/metrics.rs
similarity index 100%
rename from arroyo-api/src/metrics.rs
rename to crates/arroyo-api/src/metrics.rs
diff --git a/arroyo-api/src/pipelines.rs b/crates/arroyo-api/src/pipelines.rs
similarity index 99%
rename from arroyo-api/src/pipelines.rs
rename to crates/arroyo-api/src/pipelines.rs
index 9a22c4355..71c1bc85f 100644
--- a/arroyo-api/src/pipelines.rs
+++ b/crates/arroyo-api/src/pipelines.rs
@@ -369,6 +369,7 @@ pub(crate) async fn create_pipeline<'a>(
&text,
&udfs.map(|t| serde_json::to_value(t).unwrap()),
&program_bytes,
+ &2,
)
.one()
.await
diff --git a/arroyo-api/src/rest.rs b/crates/arroyo-api/src/rest.rs
similarity index 97%
rename from arroyo-api/src/rest.rs
rename to crates/arroyo-api/src/rest.rs
index bf65b7737..2c44ff08a 100644
--- a/arroyo-api/src/rest.rs
+++ b/crates/arroyo-api/src/rest.rs
@@ -63,11 +63,10 @@ pub async fn api_fallback() -> impl IntoResponse {
}
pub fn create_rest_app(pool: Pool, controller_addr: &str) -> Router {
- let asset_dir = env::var(ASSET_DIR_ENV).unwrap_or_else(|_| "arroyo-console/dist".to_string());
+ let asset_dir = env::var(ASSET_DIR_ENV).unwrap_or_else(|_| "webui/dist".to_string());
static INDEX_HTML: Lazy = Lazy::new(|| {
- let asset_dir =
- env::var(ASSET_DIR_ENV).unwrap_or_else(|_| "arroyo-console/dist".to_string());
+ let asset_dir = env::var(ASSET_DIR_ENV).unwrap_or_else(|_| "webui/dist".to_string());
let endpoint = env::var(API_ENDPOINT_ENV).unwrap_or_else(|_| String::new());
diff --git a/arroyo-api/src/rest_utils.rs b/crates/arroyo-api/src/rest_utils.rs
similarity index 100%
rename from arroyo-api/src/rest_utils.rs
rename to crates/arroyo-api/src/rest_utils.rs
diff --git a/arroyo-api/src/udfs.rs b/crates/arroyo-api/src/udfs.rs
similarity index 100%
rename from arroyo-api/src/udfs.rs
rename to crates/arroyo-api/src/udfs.rs
diff --git a/arroyo-bin/Cargo.toml b/crates/arroyo-bin/Cargo.toml
similarity index 89%
rename from arroyo-bin/Cargo.toml
rename to crates/arroyo-bin/Cargo.toml
index 9d0b4ff08..7fc2df30c 100644
--- a/arroyo-bin/Cargo.toml
+++ b/crates/arroyo-bin/Cargo.toml
@@ -5,7 +5,6 @@ edition = "2021"
[features]
kafka-sasl = ["arroyo-connectors/kafka-sasl", "arroyo-worker/kafka-sal"]
-k8s = ["arroyo-controller/k8s"]
[dependencies]
arroyo-types = { path ="../arroyo-types" }
@@ -26,3 +25,5 @@ postgres-types = { version = "*", features = ["derive"] }
tokio-postgres = { version = "*", features = ["with-serde_json-1", "with-time-0_3", "with-uuid-1"] }
deadpool-postgres = { version = "0.10" }
uuid = { version = "1.7.0", features = ["v4"] }
+refinery = { version = "=0.8.9" , features = ["tokio-postgres"] }
+anyhow = { version = "1.0.79", features = [] }
\ No newline at end of file
diff --git a/arroyo-bin/src/main.rs b/crates/arroyo-bin/src/main.rs
similarity index 56%
rename from arroyo-bin/src/main.rs
rename to crates/arroyo-bin/src/main.rs
index 4b9b46659..99301427b 100644
--- a/arroyo-bin/src/main.rs
+++ b/crates/arroyo-bin/src/main.rs
@@ -1,3 +1,4 @@
+use anyhow::{anyhow, bail};
use arroyo_compiler_service;
use arroyo_server_common::shutdown::Shutdown;
use arroyo_server_common::{log_event, start_admin_server};
@@ -6,9 +7,12 @@ use arroyo_worker::WorkerServer;
use clap::{Parser, Subcommand};
use deadpool_postgres::{ManagerConfig, Pool, RecyclingMethod};
use serde_json::json;
+use std::process::exit;
use std::time::Duration;
-use tokio_postgres::NoTls;
-use tracing::error;
+use tokio::io::{AsyncRead, AsyncWrite};
+use tokio::time::timeout;
+use tokio_postgres::{Client, Connection, NoTls};
+use tracing::{debug, error, info};
use uuid::Uuid;
#[derive(Parser)]
@@ -34,6 +38,13 @@ enum Commands {
/// Starts an Arroyo compiler
Compiler {},
+
+ /// Runs database migrations on the configure Postgres database
+ Migrate {
+ /// If set, waits for the specified number of seconds until Postgres is ready before running migrations
+ #[arg(long)]
+ wait: Option,
+ },
}
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
@@ -75,37 +86,39 @@ async fn main() {
Commands::Worker { .. } => {
start_worker().await;
}
+ Commands::Migrate { wait } => {
+ if let Err(e) = migrate(*wait).await {
+ error!("{}", e);
+ exit(1);
+ }
+ }
};
}
async fn db_pool() -> Pool {
let config = DatabaseConfig::load();
let mut cfg = deadpool_postgres::Config::new();
- cfg.dbname = Some(config.name);
- cfg.host = Some(config.host);
+ cfg.dbname = Some(config.name.clone());
+ cfg.host = Some(config.host.clone());
cfg.port = Some(config.port);
- cfg.user = Some(config.user);
- cfg.password = Some(config.password);
+ cfg.user = Some(config.user.clone());
+ cfg.password = Some(config.password.clone());
cfg.manager = Some(ManagerConfig {
recycling_method: RecyclingMethod::Fast,
});
let pool = cfg
.create_pool(Some(deadpool_postgres::Runtime::Tokio1), NoTls)
.unwrap_or_else(|e| {
- panic!(
- "Unable to connect to database {:?}@{:?}:{:?}/{:?} {}",
- cfg.user, cfg.host, cfg.port, cfg.dbname, e
- )
+ error!("Unable to connect to database {}: {:?}", config, e);
+ exit(1);
});
match pool
.get()
.await
.unwrap_or_else(|e| {
- panic!(
- "Unable to create database connection for {:?}@{:?}:{:?}/{:?} {}",
- cfg.user, cfg.host, cfg.port, cfg.dbname, e
- )
+ error!("Unable to create database connection for {} {}", config, e);
+ exit(1);
})
.query_one("select id from cluster_info", &[])
.await
@@ -122,6 +135,88 @@ async fn db_pool() -> Pool {
pool
}
+mod migrations {
+ use refinery::embed_migrations;
+ embed_migrations!("../arroyo-api/migrations");
+}
+
+async fn connect(
+ retry: bool,
+) -> anyhow::Result<(
+ Client,
+ Connection,
+)> {
+ let config = DatabaseConfig::load();
+
+ loop {
+ match tokio_postgres::config::Config::new()
+ .host(&config.host)
+ .port(config.port)
+ .user(&config.user)
+ .password(&config.password)
+ .dbname(&config.name)
+ .connect(NoTls)
+ .await
+ {
+ Ok(r) => {
+ return Ok(r);
+ }
+ Err(e) => {
+ if !e.to_string().contains("authentication") && retry {
+ debug!("Received error from database while waiting: {}", e);
+ tokio::time::sleep(Duration::from_millis(500)).await;
+ continue;
+ }
+
+ bail!("Failed to connect to database {}: {}", config, e);
+ }
+ }
+ }
+}
+
+async fn migrate(wait: Option) -> anyhow::Result<()> {
+ let _guard = arroyo_server_common::init_logging("migrate");
+
+ let (mut client, connection) = if let Some(wait) = wait {
+ info!("Waiting for database to be ready to run migrations");
+ timeout(Duration::from_secs(wait as u64), connect(true))
+ .await
+ .map_err(|e| anyhow!("Timed out waiting for database to connect after {}", e))??
+ } else {
+ connect(false).await?
+ };
+
+ tokio::spawn(async move {
+ if let Err(e) = connection.await {
+ eprintln!("connection error: {}", e);
+ }
+ });
+
+ info!("Running migrations on database {}", DatabaseConfig::load());
+
+ let report = migrations::migrations::runner()
+ .run_async(&mut client)
+ .await
+ .map_err(|e| {
+ anyhow!(
+ "Failed to run migrations on {}: {:?}",
+ DatabaseConfig::load(),
+ e
+ )
+ })?;
+
+ for migration in report.applied_migrations() {
+ info!("Applying V{} {}", migration.version(), migration.name());
+ }
+
+ info!(
+ "Successfully applied {} migration(s)",
+ report.applied_migrations().len()
+ );
+
+ Ok(())
+}
+
async fn start_control_plane(service: CPService) {
let _guard = arroyo_server_common::init_logging(service.name());
diff --git a/arroyo-compiler-service/Cargo.toml b/crates/arroyo-compiler-service/Cargo.toml
similarity index 100%
rename from arroyo-compiler-service/Cargo.toml
rename to crates/arroyo-compiler-service/Cargo.toml
diff --git a/arroyo-compiler-service/src/lib.rs b/crates/arroyo-compiler-service/src/lib.rs
similarity index 100%
rename from arroyo-compiler-service/src/lib.rs
rename to crates/arroyo-compiler-service/src/lib.rs
diff --git a/arroyo-connectors/Cargo.toml b/crates/arroyo-connectors/Cargo.toml
similarity index 100%
rename from arroyo-connectors/Cargo.toml
rename to crates/arroyo-connectors/Cargo.toml
diff --git a/arroyo-connectors/build.rs b/crates/arroyo-connectors/build.rs
similarity index 100%
rename from arroyo-connectors/build.rs
rename to crates/arroyo-connectors/build.rs
diff --git a/arroyo-connectors/resources/nexmark.svg b/crates/arroyo-connectors/resources/nexmark.svg
similarity index 100%
rename from arroyo-connectors/resources/nexmark.svg
rename to crates/arroyo-connectors/resources/nexmark.svg
diff --git a/arroyo-connectors/src/blackhole/blackhole.svg b/crates/arroyo-connectors/src/blackhole/blackhole.svg
similarity index 100%
rename from arroyo-connectors/src/blackhole/blackhole.svg
rename to crates/arroyo-connectors/src/blackhole/blackhole.svg
diff --git a/arroyo-connectors/src/blackhole/mod.rs b/crates/arroyo-connectors/src/blackhole/mod.rs
similarity index 100%
rename from arroyo-connectors/src/blackhole/mod.rs
rename to crates/arroyo-connectors/src/blackhole/mod.rs
diff --git a/arroyo-connectors/src/blackhole/operator.rs b/crates/arroyo-connectors/src/blackhole/operator.rs
similarity index 100%
rename from arroyo-connectors/src/blackhole/operator.rs
rename to crates/arroyo-connectors/src/blackhole/operator.rs
diff --git a/arroyo-connectors/src/confluent/confluent.svg b/crates/arroyo-connectors/src/confluent/confluent.svg
similarity index 100%
rename from arroyo-connectors/src/confluent/confluent.svg
rename to crates/arroyo-connectors/src/confluent/confluent.svg
diff --git a/arroyo-connectors/src/confluent/mod.rs b/crates/arroyo-connectors/src/confluent/mod.rs
similarity index 100%
rename from arroyo-connectors/src/confluent/mod.rs
rename to crates/arroyo-connectors/src/confluent/mod.rs
diff --git a/arroyo-connectors/src/confluent/profile.json b/crates/arroyo-connectors/src/confluent/profile.json
similarity index 100%
rename from arroyo-connectors/src/confluent/profile.json
rename to crates/arroyo-connectors/src/confluent/profile.json
diff --git a/arroyo-connectors/src/delta.rs b/crates/arroyo-connectors/src/delta.rs
similarity index 100%
rename from arroyo-connectors/src/delta.rs
rename to crates/arroyo-connectors/src/delta.rs
diff --git a/arroyo-connectors/src/filesystem/filesystem.svg b/crates/arroyo-connectors/src/filesystem/filesystem.svg
similarity index 100%
rename from arroyo-connectors/src/filesystem/filesystem.svg
rename to crates/arroyo-connectors/src/filesystem/filesystem.svg
diff --git a/arroyo-connectors/src/filesystem/mod.rs b/crates/arroyo-connectors/src/filesystem/mod.rs
similarity index 100%
rename from arroyo-connectors/src/filesystem/mod.rs
rename to crates/arroyo-connectors/src/filesystem/mod.rs
diff --git a/arroyo-connectors/src/filesystem/sink/arrow.rs b/crates/arroyo-connectors/src/filesystem/sink/arrow.rs
similarity index 100%
rename from arroyo-connectors/src/filesystem/sink/arrow.rs
rename to crates/arroyo-connectors/src/filesystem/sink/arrow.rs
diff --git a/arroyo-connectors/src/filesystem/sink/delta.rs b/crates/arroyo-connectors/src/filesystem/sink/delta.rs
similarity index 100%
rename from arroyo-connectors/src/filesystem/sink/delta.rs
rename to crates/arroyo-connectors/src/filesystem/sink/delta.rs
diff --git a/arroyo-connectors/src/filesystem/sink/json.rs b/crates/arroyo-connectors/src/filesystem/sink/json.rs
similarity index 100%
rename from arroyo-connectors/src/filesystem/sink/json.rs
rename to crates/arroyo-connectors/src/filesystem/sink/json.rs
diff --git a/arroyo-connectors/src/filesystem/sink/local.rs b/crates/arroyo-connectors/src/filesystem/sink/local.rs
similarity index 100%
rename from arroyo-connectors/src/filesystem/sink/local.rs
rename to crates/arroyo-connectors/src/filesystem/sink/local.rs
diff --git a/arroyo-connectors/src/filesystem/sink/mod.rs b/crates/arroyo-connectors/src/filesystem/sink/mod.rs
similarity index 100%
rename from arroyo-connectors/src/filesystem/sink/mod.rs
rename to crates/arroyo-connectors/src/filesystem/sink/mod.rs
diff --git a/arroyo-connectors/src/filesystem/sink/parquet.rs b/crates/arroyo-connectors/src/filesystem/sink/parquet.rs
similarity index 100%
rename from arroyo-connectors/src/filesystem/sink/parquet.rs
rename to crates/arroyo-connectors/src/filesystem/sink/parquet.rs
diff --git a/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs b/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs
similarity index 100%
rename from arroyo-connectors/src/filesystem/sink/two_phase_committer.rs
rename to crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs
diff --git a/arroyo-connectors/src/filesystem/source.rs b/crates/arroyo-connectors/src/filesystem/source.rs
similarity index 100%
rename from arroyo-connectors/src/filesystem/source.rs
rename to crates/arroyo-connectors/src/filesystem/source.rs
diff --git a/arroyo-connectors/src/filesystem/table.json b/crates/arroyo-connectors/src/filesystem/table.json
similarity index 100%
rename from arroyo-connectors/src/filesystem/table.json
rename to crates/arroyo-connectors/src/filesystem/table.json
diff --git a/arroyo-connectors/src/fluvio/fluvio.svg b/crates/arroyo-connectors/src/fluvio/fluvio.svg
similarity index 100%
rename from arroyo-connectors/src/fluvio/fluvio.svg
rename to crates/arroyo-connectors/src/fluvio/fluvio.svg
diff --git a/arroyo-connectors/src/fluvio/mod.rs b/crates/arroyo-connectors/src/fluvio/mod.rs
similarity index 100%
rename from arroyo-connectors/src/fluvio/mod.rs
rename to crates/arroyo-connectors/src/fluvio/mod.rs
diff --git a/arroyo-connectors/src/fluvio/sink.rs b/crates/arroyo-connectors/src/fluvio/sink.rs
similarity index 100%
rename from arroyo-connectors/src/fluvio/sink.rs
rename to crates/arroyo-connectors/src/fluvio/sink.rs
diff --git a/arroyo-connectors/src/fluvio/source.rs b/crates/arroyo-connectors/src/fluvio/source.rs
similarity index 100%
rename from arroyo-connectors/src/fluvio/source.rs
rename to crates/arroyo-connectors/src/fluvio/source.rs
diff --git a/arroyo-connectors/src/fluvio/table.json b/crates/arroyo-connectors/src/fluvio/table.json
similarity index 100%
rename from arroyo-connectors/src/fluvio/table.json
rename to crates/arroyo-connectors/src/fluvio/table.json
diff --git a/arroyo-connectors/src/impulse/impulse.svg b/crates/arroyo-connectors/src/impulse/impulse.svg
similarity index 100%
rename from arroyo-connectors/src/impulse/impulse.svg
rename to crates/arroyo-connectors/src/impulse/impulse.svg
diff --git a/arroyo-connectors/src/impulse/mod.rs b/crates/arroyo-connectors/src/impulse/mod.rs
similarity index 100%
rename from arroyo-connectors/src/impulse/mod.rs
rename to crates/arroyo-connectors/src/impulse/mod.rs
diff --git a/arroyo-connectors/src/impulse/operator.rs b/crates/arroyo-connectors/src/impulse/operator.rs
similarity index 100%
rename from arroyo-connectors/src/impulse/operator.rs
rename to crates/arroyo-connectors/src/impulse/operator.rs
diff --git a/arroyo-connectors/src/impulse/table.json b/crates/arroyo-connectors/src/impulse/table.json
similarity index 100%
rename from arroyo-connectors/src/impulse/table.json
rename to crates/arroyo-connectors/src/impulse/table.json
diff --git a/arroyo-connectors/src/kafka/kafka.svg b/crates/arroyo-connectors/src/kafka/kafka.svg
similarity index 100%
rename from arroyo-connectors/src/kafka/kafka.svg
rename to crates/arroyo-connectors/src/kafka/kafka.svg
diff --git a/arroyo-connectors/src/kafka/mod.rs b/crates/arroyo-connectors/src/kafka/mod.rs
similarity index 100%
rename from arroyo-connectors/src/kafka/mod.rs
rename to crates/arroyo-connectors/src/kafka/mod.rs
diff --git a/arroyo-connectors/src/kafka/profile.json b/crates/arroyo-connectors/src/kafka/profile.json
similarity index 100%
rename from arroyo-connectors/src/kafka/profile.json
rename to crates/arroyo-connectors/src/kafka/profile.json
diff --git a/arroyo-connectors/src/kafka/sink/mod.rs b/crates/arroyo-connectors/src/kafka/sink/mod.rs
similarity index 100%
rename from arroyo-connectors/src/kafka/sink/mod.rs
rename to crates/arroyo-connectors/src/kafka/sink/mod.rs
diff --git a/arroyo-connectors/src/kafka/sink/test.rs b/crates/arroyo-connectors/src/kafka/sink/test.rs
similarity index 100%
rename from arroyo-connectors/src/kafka/sink/test.rs
rename to crates/arroyo-connectors/src/kafka/sink/test.rs
diff --git a/arroyo-connectors/src/kafka/source/mod.rs b/crates/arroyo-connectors/src/kafka/source/mod.rs
similarity index 100%
rename from arroyo-connectors/src/kafka/source/mod.rs
rename to crates/arroyo-connectors/src/kafka/source/mod.rs
diff --git a/arroyo-connectors/src/kafka/source/test.rs b/crates/arroyo-connectors/src/kafka/source/test.rs
similarity index 100%
rename from arroyo-connectors/src/kafka/source/test.rs
rename to crates/arroyo-connectors/src/kafka/source/test.rs
diff --git a/arroyo-connectors/src/kafka/table.json b/crates/arroyo-connectors/src/kafka/table.json
similarity index 100%
rename from arroyo-connectors/src/kafka/table.json
rename to crates/arroyo-connectors/src/kafka/table.json
diff --git a/arroyo-connectors/src/kinesis/kinesis.svg b/crates/arroyo-connectors/src/kinesis/kinesis.svg
similarity index 100%
rename from arroyo-connectors/src/kinesis/kinesis.svg
rename to crates/arroyo-connectors/src/kinesis/kinesis.svg
diff --git a/arroyo-connectors/src/kinesis/mod.rs b/crates/arroyo-connectors/src/kinesis/mod.rs
similarity index 100%
rename from arroyo-connectors/src/kinesis/mod.rs
rename to crates/arroyo-connectors/src/kinesis/mod.rs
diff --git a/arroyo-connectors/src/kinesis/sink.rs b/crates/arroyo-connectors/src/kinesis/sink.rs
similarity index 100%
rename from arroyo-connectors/src/kinesis/sink.rs
rename to crates/arroyo-connectors/src/kinesis/sink.rs
diff --git a/arroyo-connectors/src/kinesis/source.rs b/crates/arroyo-connectors/src/kinesis/source.rs
similarity index 100%
rename from arroyo-connectors/src/kinesis/source.rs
rename to crates/arroyo-connectors/src/kinesis/source.rs
diff --git a/arroyo-connectors/src/kinesis/table.json b/crates/arroyo-connectors/src/kinesis/table.json
similarity index 100%
rename from arroyo-connectors/src/kinesis/table.json
rename to crates/arroyo-connectors/src/kinesis/table.json
diff --git a/arroyo-connectors/src/lib.rs b/crates/arroyo-connectors/src/lib.rs
similarity index 100%
rename from arroyo-connectors/src/lib.rs
rename to crates/arroyo-connectors/src/lib.rs
diff --git a/arroyo-connectors/src/nexmark.rs b/crates/arroyo-connectors/src/nexmark.rs
similarity index 100%
rename from arroyo-connectors/src/nexmark.rs
rename to crates/arroyo-connectors/src/nexmark.rs
diff --git a/arroyo-connectors/src/polling_http/connector.rs b/crates/arroyo-connectors/src/polling_http/connector.rs
similarity index 100%
rename from arroyo-connectors/src/polling_http/connector.rs
rename to crates/arroyo-connectors/src/polling_http/connector.rs
diff --git a/arroyo-connectors/src/polling_http/http.svg b/crates/arroyo-connectors/src/polling_http/http.svg
similarity index 100%
rename from arroyo-connectors/src/polling_http/http.svg
rename to crates/arroyo-connectors/src/polling_http/http.svg
diff --git a/arroyo-connectors/src/polling_http/mod.rs b/crates/arroyo-connectors/src/polling_http/mod.rs
similarity index 100%
rename from arroyo-connectors/src/polling_http/mod.rs
rename to crates/arroyo-connectors/src/polling_http/mod.rs
diff --git a/arroyo-connectors/src/polling_http/table.json b/crates/arroyo-connectors/src/polling_http/table.json
similarity index 100%
rename from arroyo-connectors/src/polling_http/table.json
rename to crates/arroyo-connectors/src/polling_http/table.json
diff --git a/arroyo-connectors/src/preview/mod.rs b/crates/arroyo-connectors/src/preview/mod.rs
similarity index 100%
rename from arroyo-connectors/src/preview/mod.rs
rename to crates/arroyo-connectors/src/preview/mod.rs
diff --git a/arroyo-connectors/src/preview/operator.rs b/crates/arroyo-connectors/src/preview/operator.rs
similarity index 100%
rename from arroyo-connectors/src/preview/operator.rs
rename to crates/arroyo-connectors/src/preview/operator.rs
diff --git a/arroyo-connectors/src/redis/mod.rs b/crates/arroyo-connectors/src/redis/mod.rs
similarity index 100%
rename from arroyo-connectors/src/redis/mod.rs
rename to crates/arroyo-connectors/src/redis/mod.rs
diff --git a/arroyo-connectors/src/redis/operator/mod.rs b/crates/arroyo-connectors/src/redis/operator/mod.rs
similarity index 100%
rename from arroyo-connectors/src/redis/operator/mod.rs
rename to crates/arroyo-connectors/src/redis/operator/mod.rs
diff --git a/arroyo-connectors/src/redis/operator/sink.rs b/crates/arroyo-connectors/src/redis/operator/sink.rs
similarity index 100%
rename from arroyo-connectors/src/redis/operator/sink.rs
rename to crates/arroyo-connectors/src/redis/operator/sink.rs
diff --git a/arroyo-connectors/src/redis/profile.json b/crates/arroyo-connectors/src/redis/profile.json
similarity index 100%
rename from arroyo-connectors/src/redis/profile.json
rename to crates/arroyo-connectors/src/redis/profile.json
diff --git a/arroyo-connectors/src/redis/redis.svg b/crates/arroyo-connectors/src/redis/redis.svg
similarity index 100%
rename from arroyo-connectors/src/redis/redis.svg
rename to crates/arroyo-connectors/src/redis/redis.svg
diff --git a/arroyo-connectors/src/redis/table.json b/crates/arroyo-connectors/src/redis/table.json
similarity index 100%
rename from arroyo-connectors/src/redis/table.json
rename to crates/arroyo-connectors/src/redis/table.json
diff --git a/arroyo-connectors/src/single_file/mod.rs b/crates/arroyo-connectors/src/single_file/mod.rs
similarity index 100%
rename from arroyo-connectors/src/single_file/mod.rs
rename to crates/arroyo-connectors/src/single_file/mod.rs
diff --git a/arroyo-connectors/src/single_file/sink.rs b/crates/arroyo-connectors/src/single_file/sink.rs
similarity index 100%
rename from arroyo-connectors/src/single_file/sink.rs
rename to crates/arroyo-connectors/src/single_file/sink.rs
diff --git a/arroyo-connectors/src/single_file/source.rs b/crates/arroyo-connectors/src/single_file/source.rs
similarity index 100%
rename from arroyo-connectors/src/single_file/source.rs
rename to crates/arroyo-connectors/src/single_file/source.rs
diff --git a/arroyo-connectors/src/single_file/table.json b/crates/arroyo-connectors/src/single_file/table.json
similarity index 100%
rename from arroyo-connectors/src/single_file/table.json
rename to crates/arroyo-connectors/src/single_file/table.json
diff --git a/arroyo-connectors/src/sse/mod.rs b/crates/arroyo-connectors/src/sse/mod.rs
similarity index 100%
rename from arroyo-connectors/src/sse/mod.rs
rename to crates/arroyo-connectors/src/sse/mod.rs
diff --git a/arroyo-connectors/src/sse/operator.rs b/crates/arroyo-connectors/src/sse/operator.rs
similarity index 100%
rename from arroyo-connectors/src/sse/operator.rs
rename to crates/arroyo-connectors/src/sse/operator.rs
diff --git a/arroyo-connectors/src/sse/sse.svg b/crates/arroyo-connectors/src/sse/sse.svg
similarity index 100%
rename from arroyo-connectors/src/sse/sse.svg
rename to crates/arroyo-connectors/src/sse/sse.svg
diff --git a/arroyo-connectors/src/sse/table.json b/crates/arroyo-connectors/src/sse/table.json
similarity index 100%
rename from arroyo-connectors/src/sse/table.json
rename to crates/arroyo-connectors/src/sse/table.json
diff --git a/arroyo-connectors/src/webhook/mod.rs b/crates/arroyo-connectors/src/webhook/mod.rs
similarity index 100%
rename from arroyo-connectors/src/webhook/mod.rs
rename to crates/arroyo-connectors/src/webhook/mod.rs
diff --git a/arroyo-connectors/src/webhook/operator.rs b/crates/arroyo-connectors/src/webhook/operator.rs
similarity index 100%
rename from arroyo-connectors/src/webhook/operator.rs
rename to crates/arroyo-connectors/src/webhook/operator.rs
diff --git a/arroyo-connectors/src/webhook/table.json b/crates/arroyo-connectors/src/webhook/table.json
similarity index 100%
rename from arroyo-connectors/src/webhook/table.json
rename to crates/arroyo-connectors/src/webhook/table.json
diff --git a/arroyo-connectors/src/webhook/webhook.svg b/crates/arroyo-connectors/src/webhook/webhook.svg
similarity index 100%
rename from arroyo-connectors/src/webhook/webhook.svg
rename to crates/arroyo-connectors/src/webhook/webhook.svg
diff --git a/arroyo-connectors/src/websocket/mod.rs b/crates/arroyo-connectors/src/websocket/mod.rs
similarity index 100%
rename from arroyo-connectors/src/websocket/mod.rs
rename to crates/arroyo-connectors/src/websocket/mod.rs
diff --git a/arroyo-connectors/src/websocket/operator.rs b/crates/arroyo-connectors/src/websocket/operator.rs
similarity index 100%
rename from arroyo-connectors/src/websocket/operator.rs
rename to crates/arroyo-connectors/src/websocket/operator.rs
diff --git a/arroyo-connectors/src/websocket/table.json b/crates/arroyo-connectors/src/websocket/table.json
similarity index 100%
rename from arroyo-connectors/src/websocket/table.json
rename to crates/arroyo-connectors/src/websocket/table.json
diff --git a/arroyo-connectors/src/websocket/websocket.svg b/crates/arroyo-connectors/src/websocket/websocket.svg
similarity index 100%
rename from arroyo-connectors/src/websocket/websocket.svg
rename to crates/arroyo-connectors/src/websocket/websocket.svg
diff --git a/arroyo-controller/Cargo.toml b/crates/arroyo-controller/Cargo.toml
similarity index 87%
rename from arroyo-controller/Cargo.toml
rename to crates/arroyo-controller/Cargo.toml
index e9aadb3f0..8ffe7f25c 100644
--- a/arroyo-controller/Cargo.toml
+++ b/crates/arroyo-controller/Cargo.toml
@@ -6,7 +6,6 @@ edition = "2021"
[features]
default = []
kafka-sasl = []
-k8s = ["kube", "k8s-openapi", "serde_yaml"]
[dependencies]
arroyo-types = { path = "../arroyo-types" }
@@ -43,9 +42,9 @@ serde = "1"
anyhow = "1.0.70"
# Kubernetes
-kube = { version = "0.84", features = ["runtime", "derive"], optional = true }
-k8s-openapi = { version = "0.18.0", features = ["v1_26"], optional = true }
-serde_yaml = {version = "0.9", optional = true}
+kube = { version = "0.84", features = ["runtime", "derive"] }
+k8s-openapi = { version = "0.18.0", features = ["v1_26"] }
+serde_yaml = {version = "0.9"}
# json-schema support
serde_json = "1.0"
diff --git a/arroyo-controller/build.rs b/crates/arroyo-controller/build.rs
similarity index 100%
rename from arroyo-controller/build.rs
rename to crates/arroyo-controller/build.rs
diff --git a/arroyo-controller/queries/controller_queries.sql b/crates/arroyo-controller/queries/controller_queries.sql
similarity index 97%
rename from arroyo-controller/queries/controller_queries.sql
rename to crates/arroyo-controller/queries/controller_queries.sql
index 4a568ffb9..4d8574a6d 100644
--- a/arroyo-controller/queries/controller_queries.sql
+++ b/crates/arroyo-controller/queries/controller_queries.sql
@@ -38,7 +38,7 @@ SET state = :state,
WHERE id = :job_id;
--! get_program
-SELECT program FROM pipelines WHERE id = :id;
+SELECT program, proto_version FROM pipelines WHERE id = :id;
--! mark_checkpoints_compacted
UPDATE checkpoints
diff --git a/arroyo-controller/src/compiler.rs b/crates/arroyo-controller/src/compiler.rs
similarity index 100%
rename from arroyo-controller/src/compiler.rs
rename to crates/arroyo-controller/src/compiler.rs
diff --git a/arroyo-controller/src/job_controller/checkpointer.rs b/crates/arroyo-controller/src/job_controller/checkpointer.rs
similarity index 100%
rename from arroyo-controller/src/job_controller/checkpointer.rs
rename to crates/arroyo-controller/src/job_controller/checkpointer.rs
diff --git a/arroyo-controller/src/job_controller/mod.rs b/crates/arroyo-controller/src/job_controller/mod.rs
similarity index 100%
rename from arroyo-controller/src/job_controller/mod.rs
rename to crates/arroyo-controller/src/job_controller/mod.rs
diff --git a/arroyo-controller/src/lib.rs b/crates/arroyo-controller/src/lib.rs
similarity index 98%
rename from arroyo-controller/src/lib.rs
rename to crates/arroyo-controller/src/lib.rs
index 6a99e3b22..981146d1c 100644
--- a/arroyo-controller/src/lib.rs
+++ b/crates/arroyo-controller/src/lib.rs
@@ -498,13 +498,8 @@ impl ControllerServer {
Arc::new(NodeScheduler::new())
}
Some("kubernetes") | Some("k8s") => {
- #[cfg(feature = "k8s")]
- {
- info!("Using kubernetes scheduler");
- Arc::new(schedulers::kubernetes::KubernetesScheduler::from_env().await)
- }
- #[cfg(not(feature = "k8s"))]
- panic!("Kubernetes not enabled -- compile with `--features k8s` to enable")
+ info!("Using kubernetes scheduler");
+ Arc::new(schedulers::kubernetes::KubernetesScheduler::from_env().await)
}
Some("embedded") => {
info!("Using embedded scheduler");
diff --git a/arroyo-controller/src/schedulers/embedded.rs b/crates/arroyo-controller/src/schedulers/embedded.rs
similarity index 100%
rename from arroyo-controller/src/schedulers/embedded.rs
rename to crates/arroyo-controller/src/schedulers/embedded.rs
diff --git a/arroyo-controller/src/schedulers/kubernetes.rs b/crates/arroyo-controller/src/schedulers/kubernetes.rs
similarity index 98%
rename from arroyo-controller/src/schedulers/kubernetes.rs
rename to crates/arroyo-controller/src/schedulers/kubernetes.rs
index 08f85abae..ca8a396b3 100644
--- a/arroyo-controller/src/schedulers/kubernetes.rs
+++ b/crates/arroyo-controller/src/schedulers/kubernetes.rs
@@ -63,10 +63,7 @@ impl KubernetesScheduler {
client,
namespace: string_config(K8S_NAMESPACE_ENV, "default"),
name: format!("{}-worker", string_config(K8S_WORKER_NAME_ENV, "arroyo")),
- image: string_config(
- K8S_WORKER_IMAGE_ENV,
- "ghcr.io/arroyosystems/arroyo-worker:latest",
- ),
+ image: string_config(K8S_WORKER_IMAGE_ENV, "ghcr.io/arroyosystems/arroyo:latest"),
labels: yaml_config(K8S_WORKER_LABELS_ENV, BTreeMap::new()),
annotations: yaml_config(K8S_WORKER_ANNOTATIONS_ENV, BTreeMap::new()),
image_pull_policy: string_config(K8S_WORKER_IMAGE_PULL_POLICY_ENV, "IfNotPresent"),
@@ -194,6 +191,7 @@ impl KubernetesScheduler {
{
"name": "worker",
"image": self.image,
+ "command": ["/app/arroyo-bin", "worker"],
"imagePullPolicy": self.image_pull_policy,
"resources": self.resources,
"ports": [
diff --git a/arroyo-controller/src/schedulers/mod.rs b/crates/arroyo-controller/src/schedulers/mod.rs
similarity index 99%
rename from arroyo-controller/src/schedulers/mod.rs
rename to crates/arroyo-controller/src/schedulers/mod.rs
index affbc108f..53744b6c7 100644
--- a/arroyo-controller/src/schedulers/mod.rs
+++ b/crates/arroyo-controller/src/schedulers/mod.rs
@@ -25,9 +25,7 @@ use tokio::process::Command;
use tokio::sync::{oneshot, Mutex};
use tonic::{Request, Status};
use tracing::{info, warn};
-
pub mod embedded;
-#[cfg(feature = "k8s")]
pub mod kubernetes;
lazy_static! {
diff --git a/arroyo-controller/src/states/checkpoint_stopping.rs b/crates/arroyo-controller/src/states/checkpoint_stopping.rs
similarity index 100%
rename from arroyo-controller/src/states/checkpoint_stopping.rs
rename to crates/arroyo-controller/src/states/checkpoint_stopping.rs
diff --git a/arroyo-controller/src/states/compiling.rs b/crates/arroyo-controller/src/states/compiling.rs
similarity index 100%
rename from arroyo-controller/src/states/compiling.rs
rename to crates/arroyo-controller/src/states/compiling.rs
diff --git a/arroyo-controller/src/states/finishing.rs b/crates/arroyo-controller/src/states/finishing.rs
similarity index 100%
rename from arroyo-controller/src/states/finishing.rs
rename to crates/arroyo-controller/src/states/finishing.rs
diff --git a/arroyo-controller/src/states/mod.rs b/crates/arroyo-controller/src/states/mod.rs
similarity index 89%
rename from arroyo-controller/src/states/mod.rs
rename to crates/arroyo-controller/src/states/mod.rs
index c464b6df2..01bf4de5c 100644
--- a/arroyo-controller/src/states/mod.rs
+++ b/crates/arroyo-controller/src/states/mod.rs
@@ -13,7 +13,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use tracing::{error, info, warn};
-use anyhow::Result;
+use anyhow::{anyhow, Result};
use crate::job_controller::JobController;
use crate::queries::controller_queries;
@@ -536,24 +536,13 @@ async fn execute_state<'a>(
pub async fn run_to_completion(
config: Arc>,
+ mut program: LogicalProgram,
mut status: JobStatus,
mut state: Box,
pool: Pool,
mut rx: Receiver,
scheduler: Arc,
) {
- let c = pool.get().await.unwrap();
- let id = config.read().unwrap().pipeline_id;
- let mut program: LogicalProgram = {
- let res = controller_queries::get_program()
- .bind(&c, &id)
- .one()
- .await
- .unwrap();
-
- ArrowProgram::decode(&res[..]).unwrap().try_into().unwrap()
- };
-
let mut ctx = JobContext {
config: config.read().unwrap().clone(),
status: &mut status,
@@ -606,6 +595,38 @@ impl StateMachine {
this
}
+ fn decode_program(bs: &[u8]) -> anyhow::Result {
+ Ok(ArrowProgram::decode(&bs[..])
+ .map_err(|e| anyhow!("Failed to decode program: {:?}", e))?
+ .try_into()
+ .map_err(|e| anyhow!("Failed to construct graph from program: {:?}", e))?)
+ }
+
+ async fn get_program(
+ pool: &Pool,
+ job_id: &str,
+ id: i64,
+ ) -> anyhow::Result