diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f047ebd..399ce8f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -62,28 +62,43 @@ jobs: runs-on: ubuntu-latest needs: lint timeout-minutes: 10 - # services: - # cassandra: - # image: cassandra:5 # Use a specific version if needed - # ports: - # - 9042:9042 - # options: >- - # --hostname cassandra - # --health-cmd "cqlsh --debug -e 'DESCRIBE KEYSPACES' || exit 1" - # --health-interval 10s - # --health-timeout 5s - # --health-retries 7 + services: + cassandra: + image: cassandra:5@sha256:1614e9d798651aa0c57adb1be04a6e6e07fcc4661334dc77393d7844ce51ec27 + ports: + - 9042:9042 + options: >- + --hostname cassandra + --health-cmd "cqlsh --debug -e 'DESCRIBE KEYSPACES' || exit 1" + --health-interval 10s + --health-timeout 5s + --health-retries 7 + cosmosdb: + image: mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:vnext-preview@sha256:10272f84fb9f39eadce9bfd6705d48f3080f6d353fdab753de01fbf9cbaaa156 + ports: + - 8081:8081 + env: + ENABLE_EXPLORER: "false" + options: >- + --hostname cosmosdb + --health-cmd "curl -f http://localhost:8080/ready" + --health-interval 10s + --health-timeout 5s + --health-retries 7 steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4 with: fetch-depth: 1 - - uses: actions/setup-go@v5 + - uses: actions/setup-go@40f1582b2485089dde7abd97c1529aa768e1baff # v5 with: go-version: '1.25.3' cache: true - name: Run tests run: go test -v -cover ./... - # env: - # CASSANDRA_HOSTS: "localhost" - # CASSANDRA_KEYSPACE: "testkeyspace" + env: + CASSANDRA_HOSTS: "localhost" + CASSANDRA_KEYSPACE: "testkeyspace" + COSMOSDB_ENDPOINT: "http://localhost:8081/" + COSMOSDB_KEY: "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGg==" + COSMOSDB_DATABASE: "magic" diff --git a/examples/main.go b/examples/main.go index 4d22eb8..ae44c70 100644 --- a/examples/main.go +++ b/examples/main.go @@ -50,7 +50,7 @@ func main() { // "database": "magic", // } - // config := map[string]string{ + // config = map[string]string{ // "provider": "cassandra", // "endpoint": "localhost", // "keyspace": "todo", diff --git a/go.mod b/go.mod index 0039d51..f401c1e 100644 --- a/go.mod +++ b/go.mod @@ -11,12 +11,13 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.19.7 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.55.0 github.com/aws/aws-sdk-go-v2/service/sns v1.39.11 + github.com/gocql/gocql v1.7.0 + github.com/scylladb/go-reflectx v1.0.1 + github.com/scylladb/gocqlx/v2 v2.8.0 gopkg.in/yaml.v3 v3.0.1 gorm.io/gorm v1.31.1 ) -// replace github.com/gocql/gocql => github.com/scylladb/gocql v1.17.0 - require ( filippo.io/edwards25519 v1.1.0 // indirect github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect @@ -36,19 +37,21 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.13 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/go-sql-driver/mysql v1.8.1 // indirect - github.com/golang-jwt/jwt/v5 v5.3.0 // indirect + github.com/go-sql-driver/mysql v1.9.3 // indirect + github.com/golang-jwt/jwt/v5 v5.3.1 // indirect + github.com/golang/snappy v1.0.0 // indirect + github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect - github.com/jackc/pgx/v5 v5.6.0 // indirect + github.com/jackc/pgx/v5 v5.8.0 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/kylelemons/godebug v1.1.0 // indirect - github.com/mattn/go-sqlite3 v1.14.22 // indirect + github.com/mattn/go-sqlite3 v1.14.33 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect + github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect golang.org/x/crypto v0.47.0 // indirect golang.org/x/net v0.49.0 // indirect @@ -56,6 +59,7 @@ require ( golang.org/x/sys v0.40.0 // indirect golang.org/x/text v0.33.0 // indirect gopkg.in/go-jose/go-jose.v2 v2.6.3 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect ) require ( diff --git a/go.sum b/go.sum index b736785..16665d1 100644 --- a/go.sum +++ b/go.sum @@ -60,6 +60,10 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 h1:5fFjR/ToSOzB2OQ/XqWpZBmNvmP/ github.com/aws/aws-sdk-go-v2/service/sts v1.41.6/go.mod h1:qgFDZQSD/Kys7nJnVqYlWKnh0SSdMjAi0uSwON4wgYQ= github.com/aws/smithy-go v1.24.0 h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk= github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -67,20 +71,27 @@ github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= github.com/go-chi/render v1.0.3 h1:AsXqd2a1/INaIfUSKq3G5uA8weYx20FOsM7uSoCyyt4= github.com/go-chi/render v1.0.3/go.mod h1:/gr3hVkmYR0YlEy3LxCuVRFzEu9Ruok+gFqbIofjao0= -github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= -github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= -github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo= -github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= +github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo= +github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU= +github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus= +github.com/gocql/gocql v1.7.0/go.mod h1:vnlvXyFZeLBF0Wy+RS8hrOdbn0UWsWtdg07XJnFxZ+4= +github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY= +github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= -github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= +github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= +github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= @@ -89,14 +100,17 @@ github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/keybase/go-keychain v0.0.1 h1:way+bWYa6lDppZoZcgMbYsvC7GxljxrskdNInRtuthU= github.com/keybase/go-keychain v0.0.1/go.mod h1:PdEILRW3i9D8JcdM+FmY6RwkHGnhHxXwkPPMeUgOK1k= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= -github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= -github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/mattn/go-sqlite3 v1.14.33 h1:A5blZ5ulQo2AtayQ9/limgHEkFreKj1Dv226a1K73s0= +github.com/mattn/go-sqlite3 v1.14.33/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -104,13 +118,18 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/scylladb/go-reflectx v1.0.1 h1:b917wZM7189pZdlND9PbIJ6NQxfDPfBvUaQ7cjj1iZQ= +github.com/scylladb/go-reflectx v1.0.1/go.mod h1:rWnOfDIRWBGN0miMLIcoPt/Dhi2doCMZqwMCJ3KupFc= +github.com/scylladb/gocqlx/v2 v2.8.0 h1:f/oIgoEPjKDKd+RIoeHqexsIQVIbalVmT+axwvUqQUg= +github.com/scylladb/gocqlx/v2 v2.8.0/go.mod h1:4/+cga34PVqjhgSoo5Nr2fX1MQIqZB5eCE5DK4xeDig= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= +github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= +github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= @@ -131,6 +150,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/go-jose/go-jose.v2 v2.6.3 h1:nt80fvSDlhKWQgSWyHyy5CfmlQr+asih51R8PTWNKKs= gopkg.in/go-jose/go-jose.v2 v2.6.3/go.mod h1:zzZDPkNNw/c9IE7Z9jr11mBZQhKQTMzoEEIoEdZlFBI= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/storage/cassandra.go b/storage/cassandra.go new file mode 100644 index 0000000..55ec313 --- /dev/null +++ b/storage/cassandra.go @@ -0,0 +1,399 @@ +package storage + +import ( + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "log/slog" + "slices" + "strconv" + "strings" + "sync" + "time" + + "github.com/gocql/gocql" + "github.com/scylladb/go-reflectx" + "github.com/scylladb/gocqlx/v2" + "github.com/scylladb/gocqlx/v2/qb" + "github.com/scylladb/gocqlx/v2/table" + "github.com/tink3rlabs/magic/logger" +) + +var ( + cassandraAdapterLock = &sync.Mutex{} + cassandraAdapterInstance *CassandraAdapter +) + +const ( + hosts = "hosts" + keyspace = "keyspace" + tables = "tables" + provider = "provider" + username = "username" + password = "password" + protocolVersion = "protocolVersion" + port = "port" +) + +type CassandraAdapter struct { + StorageAdapter + config map[string]string + clusterConfig *gocql.ClusterConfig + provider StorageProviders + tables map[string]*table.Table +} + +func GetCassandraAdapter(config map[string]string) (*CassandraAdapter, error) { + if cassandraAdapterInstance == nil { + cassandraAdapterLock.Lock() + defer cassandraAdapterLock.Unlock() + if cassandraAdapterInstance == nil { + cassandraAdapterInstance = &CassandraAdapter{config: config} + cassandraAdapterInstance.initConfig() + // The call to createSchema will set clusterConfig.Keyspace to the + // actual Keyspace, this is why its here. + if createSchemaErr := cassandraAdapterInstance.CreateSchema(); createSchemaErr != nil { + errMessage := "failed to call CreateSchema" + slog.Error(errMessage, slog.Any("error", createSchemaErr)) + return nil, errors.Join(fmt.Errorf("%s", errMessage), createSchemaErr) + } + if err := cassandraAdapterInstance.initializeTableMappers(); err != nil { + errMessage := "failed to call initializeTableMappers" + slog.Error(errMessage, slog.Any("error", err)) + return nil, errors.Join(fmt.Errorf("%s", errMessage), err) + } + } + } + return cassandraAdapterInstance, nil + +} + +func (c *CassandraAdapter) GetSchemaName() string { + return c.config[keyspace] +} + +func (c *CassandraAdapter) GetProvider() StorageProviders { + return c.provider +} + +func (c *CassandraAdapter) GetType() StorageAdapterType { + return CASSANDRA +} + +func (c *CassandraAdapter) initConfig() { + c.clusterConfig = gocql.NewCluster(strings.Split(c.config[hosts], ",")...) + + /** + * Setting to "system" to handle the case where the actual + * Keyspace does not exist yet, so createSchema() will create it, then + * set the actual Keyspace in c.clusterConfig + */ + c.clusterConfig.Keyspace = "system" + if username, ok := c.config[username]; ok { + c.clusterConfig.Authenticator = gocql.PasswordAuthenticator{ + Username: username, + Password: c.config[password], + } + } + if protoVersion, ok := c.config[protocolVersion]; ok { + parsedProtoVer, parseErr := strconv.Atoi(protoVersion) + if parseErr != nil { + logger.Fatal("failed to parse protocol version %s", protoVersion) + } + c.clusterConfig.ProtoVersion = parsedProtoVer + } + if portStr, ok := c.config[port]; ok { + parsedPort, parseErr := strconv.Atoi(portStr) + if parseErr != nil { + logger.Fatal("failed to parse port %s", portStr) + } + c.clusterConfig.Port = parsedPort + } +} + +func (c *CassandraAdapter) createSession() (gocqlx.Session, error) { + return gocqlx.WrapSession(c.clusterConfig.CreateSession()) +} + +func (c *CassandraAdapter) initializeTableMappers() error { + s, e := c.createSession() + if e != nil { + return errors.Join( + fmt.Errorf("failed creating a session"), + e, + ) + } + metadata, mErr := s.KeyspaceMetadata(c.config[keyspace]) + if mErr != nil { + return errors.Join( + fmt.Errorf("failed reading tables metadata"), + mErr, + ) + } + + for _, t := range metadata.Tables { + tableMetadata := table.Metadata{ + Name: t.Name, + Columns: t.OrderedColumns, + } + if len(t.PartitionKey) > 0 { + partitionKeys := []string{} + for _, k := range t.PartitionKey { + partitionKeys = append(partitionKeys, k.Name) + } + tableMetadata.PartKey = partitionKeys + } + if len(t.ClusteringColumns) > 0 { + sortKeys := []string{} + for _, k := range t.ClusteringColumns { + sortKeys = append(sortKeys, k.Name) + } + tableMetadata.SortKey = sortKeys + } + c.tables[t.Name] = table.New(tableMetadata) + } + return nil +} + +func (c *CassandraAdapter) getTableForItem(item any) (*table.Table, error) { + itemName := typeName(item) + tableName := reflectx.CamelToSnakeASCII(itemName) + if t, ok := c.tables[tableName]; !ok { + return nil, fmt.Errorf("no table metadata for [%s]", tableName) + } else { + return t, nil + } +} + +func (c *CassandraAdapter) Execute(statement string) error { + if s, err := c.createSession(); err != nil { + return errors.Join( + fmt.Errorf("failed creating a session"), + err, + ) + } else { + defer s.Close() + return s.ExecStmt(statement) + } +} + +func (c *CassandraAdapter) Ping() error { + if s, err := c.createSession(); err != nil { + return errors.Join( + fmt.Errorf("failed creating a session to %v", c.clusterConfig.Hosts), + err, + ) + } else { + defer s.Close() + return nil + } +} + +// CreateSchema is a function that will create the application Cassandra Keyspace +// if it doesn't exist. +// To handle the initial/first run of the app, when the Keyspace hasn't been created +// The clusterConfig.Keyspace is set to 'system' during initConfig function call +// If CreateSchema "CREATE KEYSPACE" query succeeded, clusterConfig.Keyspace will be +// set to the actual Keyspace name from the c.config . +func (c *CassandraAdapter) CreateSchema() error { + replicationClass := "SimpleStrategy" + replicationFactor := 1 + createKeyspaceErr := c.Execute( + fmt.Sprintf( + "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = {'class':'%s', 'replication_factor': %d}", + c.GetSchemaName(), + replicationClass, + replicationFactor, + ), + ) + if createKeyspaceErr != nil { + return errors.Join( + fmt.Errorf("failed creating keyspace %s", c.GetSchemaName()), + createKeyspaceErr, + ) + } + slog.Debug("schema created, setting clusterConfig.Keyspace", + slog.String("keyspace", c.GetSchemaName()), + ) + c.clusterConfig.Keyspace = c.config[keyspace] + return nil +} + +func (c *CassandraAdapter) CreateMigrationTable() error { + statement := fmt.Sprintf( + `CREATE TABLE IF NOT EXISTS %s.migrations ( + id DECIMAL PRIMARY KEY, + name TEXT, + description TEXT, + timestamp DECIMAL)`, + c.GetSchemaName()) + return c.Execute(statement) + +} + +func (c *CassandraAdapter) UpdateMigrationTable(id int, name string, desc string) error { + t, e := c.getTableForItem("migrations") + if e != nil { + return errors.Join( + fmt.Errorf("failed getting migration table"), + e, + ) + } + s, sErr := c.createSession() + if sErr != nil { + return errors.Join( + fmt.Errorf("failed creating a session"), + sErr, + ) + } + params := map[string]any{} + params["id"] = id + params["name"] = name + params["description"] = desc + params["timestamp"] = time.Now().UnixMilli() + q := t.InsertQuery(s) + q = q.BindMap(params) + return q.ExecRelease() +} + +func (c *CassandraAdapter) GetLatestMigration() (int, error) { + t, e := c.getTableForItem("migrations") + if e != nil { + return -1, errors.Join( + fmt.Errorf("failed getting migration table"), + e, + ) + } + s, sErr := c.createSession() + if sErr != nil { + return -1, errors.Join( + fmt.Errorf("failed creating a session"), + e, + ) + } + var latestMigration int + migrationErr := t.SelectBuilder("id").Max("id").Query(s).Bind(&latestMigration).ExecRelease() + if migrationErr != nil { + return -1, errors.Join( + fmt.Errorf("failed getLatestMigration"), + migrationErr, + ) + } + return latestMigration, nil +} + +func (c *CassandraAdapter) Create(item any, params ...map[string]any) error { + s, err := c.createSession() + if err != nil { + return err + } + defer s.Close() + + t, tableErr := c.getTableForItem(item) + if tableErr != nil { + return tableErr + } + return t.InsertQuery(s).BindStruct(item).ExecRelease() +} +func (c *CassandraAdapter) Get(dest any, filters map[string]any, params ...map[string]any) error { + s, sessionErr := c.createSession() + if sessionErr != nil { + return errors.Join(fmt.Errorf("get failed to create session"), sessionErr) + } + defer s.Close() + + t, tableErr := c.getTableForItem(dest) + if tableErr != nil { + return errors.Join(fmt.Errorf("get failed getting table for item"), tableErr) + } + return t.GetQuery(s).BindMap(filters).GetRelease(dest) +} +func (c *CassandraAdapter) Update(item any, filters map[string]any, params ...map[string]any) error { + s, sessionErr := c.createSession() + if sessionErr != nil { + return errors.Join(fmt.Errorf("update failed to create session"), sessionErr) + } + defer s.Close() + + t, tableErr := c.getTableForItem(item) + if tableErr != nil { + return errors.Join(fmt.Errorf("update failed getting table for item"), tableErr) + } + + marshalledItem, marshalErr := json.Marshal(item) + if marshalErr != nil { + return errors.Join(fmt.Errorf("updated failed json.Marshal"), marshalErr) + } + var jsonMap map[string]any + unmarshalErr := json.Unmarshal(marshalledItem, &jsonMap) + if unmarshalErr != nil { + return errors.Join(fmt.Errorf("update failed json.Unmarshal"), unmarshalErr) + } + columns := []string{} + for k := range jsonMap { + if !slices.Contains(t.PrimaryKeyCmp(), qb.Eq(k)) { + columns = append(columns, k) + } else { + slog.Debug("Column is part of primary key, excluding from columns slice", slog.String("pKeyColumn", k)) + } + } + return t.UpdateQuery(s, columns...).BindStruct(item).ExecRelease() +} +func (c *CassandraAdapter) Delete(item any, filters map[string]any, params ...map[string]any) error { + s, sessionErr := c.createSession() + if sessionErr != nil { + return errors.Join(fmt.Errorf("delete failed to create session"), sessionErr) + } + defer s.Close() + + t, tableErr := c.getTableForItem(item) + if tableErr != nil { + return errors.Join(fmt.Errorf("delete failed getting table for item"), tableErr) + } + return t.DeleteQuery(s).BindMap(filters).ExecRelease() +} +func (c *CassandraAdapter) List(dest any, sortKey string, filters map[string]any, limit int, cursor string, params ...map[string]any) (string, error) { + s, sessionErr := c.createSession() + if sessionErr != nil { + return "", errors.Join(fmt.Errorf("list failed to create session"), sessionErr) + } + defer s.Close() + + t, tableErr := c.getTableForItem(dest) + if tableErr != nil { + return "", errors.Join(fmt.Errorf("list failed getting table for item"), tableErr) + } + q := t.SelectQuery(s).BindMap(filters) + if cursor != "" { + bytes, err := base64.StdEncoding.DecodeString(cursor) + if err != nil { + return "", fmt.Errorf("invalid cursor: %w", err) + } + q = q.PageState(bytes) + } + if limit > 0 { + q = q.PageSize(limit) + } + // TODO: Verify pagination behavior + return cursor, q.SelectRelease(dest) +} +func (c *CassandraAdapter) Search(dest any, sortKey string, query string, limit int, cursor string, params ...map[string]any) (string, error) { + return "", fmt.Errorf("unimplemented") +} +func (c *CassandraAdapter) Count(dest any, filter map[string]any, params ...map[string]any) (int64, error) { + s, sessionErr := c.createSession() + if sessionErr != nil { + return -1, errors.Join(fmt.Errorf("count failed to create session"), sessionErr) + } + defer s.Close() + + t, tableErr := c.getTableForItem(dest) + if tableErr != nil { + return -1, errors.Join(fmt.Errorf("count failed getting table for item"), tableErr) + } + return -1, t.SelectBuilder().CountAll().Query(s).BindStruct(dest).ExecRelease() +} +func (c *CassandraAdapter) Query(dest any, statement string, limit int, cursor string, params ...map[string]any) (string, error) { + return "", fmt.Errorf("unimplemented") +} diff --git a/storage/cassandra.go.backup b/storage/cassandra.go.backup deleted file mode 100644 index ed71cd1..0000000 --- a/storage/cassandra.go.backup +++ /dev/null @@ -1,399 +0,0 @@ -// package storage - -// import ( -// "encoding/base64" -// "encoding/json" -// "errors" -// "fmt" -// "log/slog" -// "slices" -// "strconv" -// "strings" -// "sync" -// "time" - -// "github.com/gocql/gocql" -// "github.com/scylladb/go-reflectx" -// "github.com/scylladb/gocqlx/v3" -// "github.com/scylladb/gocqlx/v3/qb" -// "github.com/scylladb/gocqlx/v3/table" -// "github.com/tink3rlabs/magic/logger" -// ) - -// var ( -// cassandraAdapterLock = &sync.Mutex{} -// cassandraAdapterInstance *CassandraAdapter -// ) - -// const ( -// hosts = "hosts" -// keyspace = "keyspace" -// tables = "tables" -// provider = "provider" -// username = "username" -// password = "password" -// protocolVersion = "protocolVersion" -// port = "port" -// ) - -// type CassandraAdapter struct { -// StorageAdapter -// config map[string]string -// clusterConfig *gocql.ClusterConfig -// provider StorageProviders -// tables map[string]*table.Table -// } - -// func GetCassandraAdapter(config map[string]string) (*CassandraAdapter, error) { -// if cassandraAdapterInstance == nil { -// cassandraAdapterLock.Lock() -// defer cassandraAdapterLock.Unlock() -// if cassandraAdapterInstance == nil { -// cassandraAdapterInstance = &CassandraAdapter{config: config} -// cassandraAdapterInstance.initConfig() -// // The call to createSchema will set clusterConfig.Keyspace to the -// // actual Keyspace, this is why its here. -// if createSchemaErr := cassandraAdapterInstance.CreateSchema(); createSchemaErr != nil { -// errMessage := "failed to call CreateSchema" -// slog.Error(errMessage, slog.Any("error", createSchemaErr)) -// return nil, errors.Join(fmt.Errorf("%s", errMessage), createSchemaErr) -// } -// if err := cassandraAdapterInstance.initializeTableMappers(); err != nil { -// errMessage := "failed to call initializeTableMappers" -// slog.Error(errMessage, slog.Any("error", err)) -// return nil, errors.Join(fmt.Errorf("%s", errMessage), err) -// } -// } -// } -// return cassandraAdapterInstance, nil - -// } - -// func (c *CassandraAdapter) GetSchemaName() string { -// return c.config[keyspace] -// } - -// func (c *CassandraAdapter) GetProvider() StorageProviders { -// return c.provider -// } - -// func (c *CassandraAdapter) GetType() StorageAdapterType { -// return CASSANDRA -// } - -// func (c *CassandraAdapter) initConfig() { -// c.clusterConfig = gocql.NewCluster(strings.Split(c.config[hosts], ",")...) - -// /** -// * Setting to "system" to handle the case where the actual -// * Keyspace does not exist yet, so createSchema() will create it, then -// * set the actual Keyspace in c.clusterConfig -// */ -// c.clusterConfig.Keyspace = "system" -// if username, ok := c.config[username]; ok { -// c.clusterConfig.Authenticator = gocql.PasswordAuthenticator{ -// Username: username, -// Password: c.config[password], -// } -// } -// if protoVersion, ok := c.config[protocolVersion]; ok { -// parsedProtoVer, parseErr := strconv.Atoi(protoVersion) -// if parseErr != nil { -// logger.Fatal("failed to parse protocol version %s", protoVersion) -// } -// c.clusterConfig.ProtoVersion = parsedProtoVer -// } -// if portStr, ok := c.config[port]; ok { -// parsedPort, parseErr := strconv.Atoi(portStr) -// if parseErr != nil { -// logger.Fatal("failed to parse port %s", portStr) -// } -// c.clusterConfig.Port = parsedPort -// } -// } - -// func (c *CassandraAdapter) createSession() (gocqlx.Session, error) { -// return gocqlx.WrapSession(c.clusterConfig.CreateSession()) -// } - -// func (c *CassandraAdapter) initializeTableMappers() error { -// s, e := c.createSession() -// if e != nil { -// return errors.Join( -// fmt.Errorf("failed creating a session"), -// e, -// ) -// } -// metadata, mErr := s.KeyspaceMetadata(c.config[keyspace]) -// if mErr != nil { -// return errors.Join( -// fmt.Errorf("failed reading tables metadata"), -// mErr, -// ) -// } - -// for _, t := range metadata.Tables { -// tableMetadata := table.Metadata{ -// Name: t.Name, -// Columns: t.OrderedColumns, -// } -// if len(t.PartitionKey) > 0 { -// partitionKeys := []string{} -// for _, k := range t.PartitionKey { -// partitionKeys = append(partitionKeys, k.Name) -// } -// tableMetadata.PartKey = partitionKeys -// } -// if len(t.ClusteringColumns) > 0 { -// sortKeys := []string{} -// for _, k := range t.ClusteringColumns { -// sortKeys = append(sortKeys, k.Name) -// } -// tableMetadata.SortKey = sortKeys -// } -// c.tables[t.Name] = table.New(tableMetadata) -// } -// return nil -// } - -// func (c *CassandraAdapter) getTableForItem(item any) (*table.Table, error) { -// itemName := typeName(item) -// tableName := reflectx.CamelToSnakeASCII(itemName) -// if t, ok := c.tables[tableName]; !ok { -// return nil, fmt.Errorf("no table metadata for [%s]", tableName) -// } else { -// return t, nil -// } -// } - -// func (c *CassandraAdapter) Execute(statement string) error { -// if s, err := c.createSession(); err != nil { -// return errors.Join( -// fmt.Errorf("failed creating a session"), -// err, -// ) -// } else { -// defer s.Close() -// return s.ExecStmt(statement) -// } -// } - -// func (c *CassandraAdapter) Ping() error { -// if s, err := c.createSession(); err != nil { -// return errors.Join( -// fmt.Errorf("failed creating a session to %v", c.clusterConfig.Hosts), -// err, -// ) -// } else { -// defer s.Close() -// return nil -// } -// } - -// // CreateSchema is a function that will create the application Cassandra Keyspace -// // if it doesn't exist. -// // To handle the initial/first run of the app, when the Keyspace hasn't been created -// // The clusterConfig.Keyspace is set to 'system' during initConfig function call -// // If CreateSchema "CREATE KEYSPACE" query succeeded, clusterConfig.Keyspace will be -// // set to the actual Keyspace name from the c.config . -// func (c *CassandraAdapter) CreateSchema() error { -// replicationClass := "SimpleStrategy" -// replicationFactor := 1 -// createKeyspaceErr := c.Execute( -// fmt.Sprintf( -// "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = {'class':'%s', 'replication_factor': %d}", -// c.GetSchemaName(), -// replicationClass, -// replicationFactor, -// ), -// ) -// if createKeyspaceErr != nil { -// return errors.Join( -// fmt.Errorf("failed creating keyspace %s", c.GetSchemaName()), -// createKeyspaceErr, -// ) -// } -// slog.Debug("schema created, setting clusterConfig.Keyspace", -// slog.String("keyspace", c.GetSchemaName()), -// ) -// c.clusterConfig.Keyspace = c.config[keyspace] -// return nil -// } - -// func (c *CassandraAdapter) CreateMigrationTable() error { -// statement := fmt.Sprintf( -// `CREATE TABLE IF NOT EXISTS %s.migrations ( -// id DECIMAL PRIMARY KEY, -// name TEXT, -// description TEXT, -// timestamp DECIMAL)`, -// c.GetSchemaName()) -// return c.Execute(statement) - -// } - -// func (c *CassandraAdapter) UpdateMigrationTable(id int, name string, desc string) error { -// t, e := c.getTableForItem("migrations") -// if e != nil { -// return errors.Join( -// fmt.Errorf("failed getting migration table"), -// e, -// ) -// } -// s, sErr := c.createSession() -// if sErr != nil { -// return errors.Join( -// fmt.Errorf("failed creating a session"), -// sErr, -// ) -// } -// params := map[string]any{} -// params["id"] = id -// params["name"] = name -// params["description"] = desc -// params["timestamp"] = time.Now().UnixMilli() -// q := t.InsertQuery(s) -// q = q.BindMap(params) -// return q.ExecRelease() -// } - -// func (c *CassandraAdapter) GetLatestMigration() (int, error) { -// t, e := c.getTableForItem("migrations") -// if e != nil { -// return -1, errors.Join( -// fmt.Errorf("failed getting migration table"), -// e, -// ) -// } -// s, sErr := c.createSession() -// if sErr != nil { -// return -1, errors.Join( -// fmt.Errorf("failed creating a session"), -// e, -// ) -// } -// var latestMigration int -// migrationErr := t.SelectBuilder("id").Max("id").Query(s).Bind(&latestMigration).ExecRelease() -// if migrationErr != nil { -// return -1, errors.Join( -// fmt.Errorf("failed getLatestMigration"), -// migrationErr, -// ) -// } -// return latestMigration, nil -// } - -// func (c *CassandraAdapter) Create(item any, params ...map[string]any) error { -// s, err := c.createSession() -// if err != nil { -// return err -// } -// defer s.Close() - -// t, tableErr := c.getTableForItem(item) -// if tableErr != nil { -// return tableErr -// } -// return t.InsertQuery(s).BindStruct(item).ExecRelease() -// } -// func (c *CassandraAdapter) Get(dest any, filters map[string]any, params ...map[string]any) error { -// s, sessionErr := c.createSession() -// if sessionErr != nil { -// return errors.Join(fmt.Errorf("get failed to create session"), sessionErr) -// } -// defer s.Close() - -// t, tableErr := c.getTableForItem(dest) -// if tableErr != nil { -// return errors.Join(fmt.Errorf("get failed getting table for item"), tableErr) -// } -// return t.GetQuery(s).BindMap(filters).GetRelease(dest) -// } -// func (c *CassandraAdapter) Update(item any, filters map[string]any, params ...map[string]any) error { -// s, sessionErr := c.createSession() -// if sessionErr != nil { -// return errors.Join(fmt.Errorf("update failed to create session"), sessionErr) -// } -// defer s.Close() - -// t, tableErr := c.getTableForItem(item) -// if tableErr != nil { -// return errors.Join(fmt.Errorf("update failed getting table for item"), tableErr) -// } - -// marshalledItem, marshalErr := json.Marshal(item) -// if marshalErr != nil { -// return errors.Join(fmt.Errorf("updated failed json.Marshal"), marshalErr) -// } -// var jsonMap map[string]any -// unmarshalErr := json.Unmarshal(marshalledItem, &jsonMap) -// if unmarshalErr != nil { -// return errors.Join(fmt.Errorf("update failed json.Unmarshal"), unmarshalErr) -// } -// columns := []string{} -// for k := range jsonMap { -// if !slices.Contains(t.PrimaryKeyCmp(), qb.Eq(k)) { -// columns = append(columns, k) -// } else { -// slog.Debug("Column is part of primary key, excluding from columns slice", slog.String("pKeyColumn", k)) -// } -// } -// return t.UpdateQuery(s, columns...).BindStruct(item).ExecRelease() -// } -// func (c *CassandraAdapter) Delete(item any, filters map[string]any, params ...map[string]any) error { -// s, sessionErr := c.createSession() -// if sessionErr != nil { -// return errors.Join(fmt.Errorf("delete failed to create session"), sessionErr) -// } -// defer s.Close() - -// t, tableErr := c.getTableForItem(item) -// if tableErr != nil { -// return errors.Join(fmt.Errorf("delete failed getting table for item"), tableErr) -// } -// return t.DeleteQuery(s).BindMap(filters).ExecRelease() -// } -// func (c *CassandraAdapter) List(dest any, sortKey string, filters map[string]any, limit int, cursor string, params ...map[string]any) (string, error) { -// s, sessionErr := c.createSession() -// if sessionErr != nil { -// return "", errors.Join(fmt.Errorf("list failed to create session"), sessionErr) -// } -// defer s.Close() - -// t, tableErr := c.getTableForItem(dest) -// if tableErr != nil { -// return "", errors.Join(fmt.Errorf("list failed getting table for item"), tableErr) -// } -// q := t.SelectQuery(s).BindMap(filters) -// if cursor != "" { -// bytes, err := base64.StdEncoding.DecodeString(cursor) -// if err != nil { -// return "", fmt.Errorf("invalid cursor: %w", err) -// } -// q = q.PageState(bytes) -// } -// if limit > 0 { -// q = q.PageSize(limit) -// } -// // TODO: Verify pagination behavior -// return cursor, q.SelectRelease(dest) -// } -// func (c *CassandraAdapter) Search(dest any, sortKey string, query string, limit int, cursor string, params ...map[string]any) (string, error) { -// return "", fmt.Errorf("unimplemented") -// } -// func (c *CassandraAdapter) Count(dest any, filter map[string]any, params ...map[string]any) (int64, error) { -// s, sessionErr := c.createSession() -// if sessionErr != nil { -// return -1, errors.Join(fmt.Errorf("count failed to create session"), sessionErr) -// } -// defer s.Close() - -// t, tableErr := c.getTableForItem(dest) -// if tableErr != nil { -// return -1, errors.Join(fmt.Errorf("count failed getting table for item"), tableErr) -// } -// return -1, t.SelectBuilder().CountAll().Query(s).BindStruct(dest).ExecRelease() -// } -// func (c *CassandraAdapter) Query(dest any, statement string, limit int, cursor string, params ...map[string]any) (string, error) { -// return "", fmt.Errorf("unimplemented") -// } diff --git a/storage/reflection.go b/storage/reflection.go new file mode 100644 index 0000000..39f1c31 --- /dev/null +++ b/storage/reflection.go @@ -0,0 +1,50 @@ +package storage + +import ( + "fmt" + "reflect" + "strings" +) + +// getValue is a function that uses reflection to extract the value +// of `field` from `item` +// Its useful when writing type-agnostic logic, such as Magic +// storage adapters that accept `any` type. +func GetValue(item any, field string) (any, error) { + val := reflect.ValueOf(item) + if val.Kind() == reflect.Pointer { + val = val.Elem() + } + + if val.Kind() != reflect.Struct { + return nil, fmt.Errorf("expected struct fot %s", val.Kind()) + } + + for fIdx := range val.NumField() { + f := val.Type().Field(fIdx) + if strings.EqualFold(f.Name, field) || strings.EqualFold(strings.Split(f.Tag.Get("json"), ",")[0], field) { + return val.Field(fIdx).Interface(), nil + } + } + return nil, nil +} + +// typeName is a function that uses reflection to resolve the +// `item` type name. +// It is useful when trying to match between the object passed to +// a storage adapter, and the target table with which it interacts. +func typeName(item any) string { + typeOf := reflect.TypeOf(item) + valueOf := reflect.ValueOf(item) + if valueOf.Kind() == reflect.Pointer { + elemVal := valueOf.Elem() + if elemVal.Kind() == reflect.Slice { + sliceType := elemVal.Type() + sliceElemType := sliceType.Elem() + if sliceElemType.Kind() == reflect.Pointer { + typeOf = sliceElemType.Elem() + } + } + } + return typeOf.Name() +} diff --git a/storage/reflection.go.backup b/storage/reflection.go.backup deleted file mode 100644 index f11cc32..0000000 --- a/storage/reflection.go.backup +++ /dev/null @@ -1,50 +0,0 @@ -// package storage - -// import ( -// "fmt" -// "reflect" -// "strings" -// ) - -// // getValue is a function that uses reflection to extract the value -// // of `field` from `item` -// // Its useful when writing type-agnostic logic, such as Magic -// // storage adapters that accept `any` type. -// func GetValue(item any, field string) (any, error) { -// val := reflect.ValueOf(item) -// if val.Kind() == reflect.Pointer { -// val = val.Elem() -// } - -// if val.Kind() != reflect.Struct { -// return nil, fmt.Errorf("expected struct fot %s", val.Kind()) -// } - -// for fIdx := range val.NumField() { -// f := val.Type().Field(fIdx) -// if strings.EqualFold(f.Name, field) || strings.EqualFold(strings.Split(f.Tag.Get("json"), ",")[0], field) { -// return val.Field(fIdx).Interface(), nil -// } -// } -// return nil, nil -// } - -// // typeName is a function that uses reflection to resolve the -// // `item` type name. -// // It is useful when trying to match between the object passed to -// // a storage adapter, and the target table with which it interacts. -// func typeName(item any) string { -// typeOf := reflect.TypeOf(item) -// valueOf := reflect.ValueOf(item) -// if valueOf.Kind() == reflect.Pointer { -// elemVal := valueOf.Elem() -// if elemVal.Kind() == reflect.Slice { -// sliceType := elemVal.Type() -// sliceElemType := sliceType.Elem() -// if sliceElemType.Kind() == reflect.Pointer { -// typeOf = sliceElemType.Elem() -// } -// } -// } -// return typeOf.Name() -// } diff --git a/storage/storage.go b/storage/storage.go index 2414a3f..17ffbfa 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -33,11 +33,11 @@ type StorageProviders string type StorageAdapterFactory struct{} const ( - // CASSANDRA StorageAdapterType = "cassandra" - COSMOSDB StorageAdapterType = "cosmosdb" - DYNAMODB StorageAdapterType = "dynamodb" - MEMORY StorageAdapterType = "memory" - SQL StorageAdapterType = "sql" + CASSANDRA StorageAdapterType = "cassandra" + COSMOSDB StorageAdapterType = "cosmosdb" + DYNAMODB StorageAdapterType = "dynamodb" + MEMORY StorageAdapterType = "memory" + SQL StorageAdapterType = "sql" ) const ( @@ -52,8 +52,8 @@ func (s StorageAdapterFactory) GetInstance(adapterType StorageAdapterType, confi config = make(map[string]string) } switch adapterType { - // case CASSANDRA: - // return GetCassandraAdapter(config.(map[string]string)) + case CASSANDRA: + return GetCassandraAdapter(config.(map[string]string)) case MEMORY: return GetMemoryAdapterInstance(), nil case SQL: diff --git a/storage/storage_test.go b/storage/storage_test.go index a25f69d..e533057 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -1,39 +1,30 @@ package storage_test import ( + "os" "testing" "github.com/tink3rlabs/magic/storage" ) func TestGetInstanceReturnsExpectedAdapterForEachType(t *testing.T) { - // host, keyspace := getCassandraHostAndKeyspace() tests := []struct { name string typ storage.StorageAdapterType config map[string]string }{ - // { - // "CASSANDRA", - // storage.CASSANDRA, - // map[string]string{ - // "hosts": host, - // "keyspace": keyspace, - // }, - // }, - // {"COSMOSDB", storage.COSMOSDB, - // map[string]string{ - // "endpoint": "https://your-cosmosdb-account.documents.azure.com:443/", - // "key": "your-cosmosdb-key", - // "database": "magic", - // }, - // }, - // {"DYNAMODB", storage.DYNAMODB, - // map[string]string{ - // "region": "us-west-2", - // "endpoint": "http://localhost:8000", - // }, - // }, + {"CASSANDRA", storage.CASSANDRA, + getCassandraHostAndKeyspace(), + }, + {"COSMOSDB", storage.COSMOSDB, + getCosmosDBConfig(), + }, + {"DYNAMODB", storage.DYNAMODB, + map[string]string{ + "region": "us-west-2", + "endpoint": "http://localhost:8000", + }, + }, {"MEMORY", storage.MEMORY, nil}, {"SQL", storage.SQL, map[string]string{"dsn": "test"}}, } @@ -54,15 +45,38 @@ func TestGetInstanceReturnsExpectedAdapterForEachType(t *testing.T) { } } -// func getCassandraHostAndKeyspace() (string, string) { -// host := os.Getenv("CASSANDRA_HOSTS") -// if host == "" { -// // Default for local DevContainer -// host = "host.docker.internal" -// } -// keyspace := os.Getenv("CASSANDRA_KEYSPACE") -// if keyspace == "" { -// keyspace = "testkeyspace" -// } -// return host, keyspace -// } +func getCassandraHostAndKeyspace() map[string]string { + host := os.Getenv("CASSANDRA_HOSTS") + if host == "" { + // Default for local DevContainer + host = "host.docker.internal" + } + keyspace := os.Getenv("CASSANDRA_KEYSPACE") + if keyspace == "" { + keyspace = "testkeyspace" + } + return map[string]string{ + "hosts": host, + "keyspace": keyspace, + } +} + +func getCosmosDBConfig() map[string]string { + endpoint := os.Getenv("COSMOSDB_ENDPOINT") + if endpoint == "" { + endpoint = "https://localhost:8081/" + } + key := os.Getenv("COSMOSDB_KEY") + if key == "" { + key = "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGg==" + } + database := os.Getenv("COSMOSDB_DATABASE") + if database == "" { + database = "magic" + } + return map[string]string{ + "endpoint": endpoint, + "key": key, + "database": database, + } +}