diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 2ccdef619..ed69bcd4e 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -.github/workflows @toptal/rogue-one +* @toptal/sre diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 000000000..eab5262c2 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,42 @@ +version: 2 +registries: + toptal-github: + type: "git" + url: "https://github.com" + username: "x-access-token" + password: "${{secrets.DEPENDABOT_GITHUB_TOKEN}}" + +updates: + - package-ecosystem: bundler + directory: "/" + schedule: + interval: "weekly" + day: "wednesday" + time: "07:00" + pull-request-branch-name: + separator: "-" + labels: + - "no-jira" + - "ruby" + - "dependencies" + reviewers: + - "toptal/sre" + registries: + - toptal-github + insecure-external-code-execution: allow + open-pull-requests-limit: 3 + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" + day: "wednesday" + time: "07:00" + pull-request-branch-name: + separator: "-" + labels: + - "no-jira" + - "dependencies" + - "gha" + reviewers: + - "toptal/sre" + open-pull-requests-limit: 3 diff --git a/.github/workflows/ruby.yml b/.github/workflows/ruby.yml index 2acb40f31..cd799c571 100644 --- a/.github/workflows/ruby.yml +++ b/.github/workflows/ruby.yml @@ -1,66 +1,63 @@ name: CI -on: [push, pull_request] +on: + push: + branches: [master] + pull_request: + types: [ + synchronize, # PR was updated + opened, # PR was open + reopened # PR was reopened + ] jobs: - ruby-2: - runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - ruby: [2.6, 2.7] - gemfile: [rails.5.2.activerecord, rails.6.0.activerecord, rails.6.1.activerecord] - name: ${{ matrix.ruby }}-${{ matrix.gemfile }} - - env: - BUNDLE_GEMFILE: gemfiles/${{ matrix.gemfile }}.gemfile - - steps: - - uses: actions/checkout@v2 - - uses: ruby/setup-ruby@v1 - with: - ruby-version: ${{ matrix.ruby }} - bundler-cache: true - - name: Run Elasticsearch - uses: elastic/elastic-github-actions/elasticsearch@9de0f78f306e4ebc0838f057e6b754364685e759 - with: - stack-version: 7.10.1 - port: 9250 - - name: Tests - run: bundle exec rspec - ruby-3: runs-on: ubuntu-latest strategy: fail-fast: false matrix: - ruby: [ '3.0', 3.1 ] - gemfile: [ rails.6.1.activerecord, rails.7.0.activerecord ] + ruby: [ '3.0', '3.1', '3.2', '3.3' ] + gemfile: [rails.6.1.activerecord, rails.7.0.activerecord, rails.7.1.activerecord, rails.7.2.activerecord] + exclude: + - ruby: '3.0' + gemfile: rails.7.2.activerecord name: ${{ matrix.ruby }}-${{ matrix.gemfile }} env: BUNDLE_GEMFILE: gemfiles/${{ matrix.gemfile }}.gemfile + services: + redis: + # Docker Hub image + image: redis + ports: + - '6379:6379' + # Set health checks to wait until redis has started + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - uses: ruby/setup-ruby@v1 with: ruby-version: ${{ matrix.ruby }} bundler-cache: true - - name: Run Elasticsearch - uses: elastic/elastic-github-actions/elasticsearch@9de0f78f306e4ebc0838f057e6b754364685e759 - with: - stack-version: 7.10.1 - port: 9250 + - name: Start containers + run: | + docker compose up elasticsearch_test -d + sleep 15 + - name: Tests run: bundle exec rspec rubocop: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - uses: ruby/setup-ruby@v1 with: - ruby-version: 2.7 + ruby-version: 3.0 bundler-cache: true - run: bundle exec rubocop --format simple diff --git a/.rubocop.yml b/.rubocop.yml index ec0426c0e..6c2157253 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -2,7 +2,7 @@ inherit_from: .rubocop_todo.yml AllCops: NewCops: enable - TargetRubyVersion: 2.6 + TargetRubyVersion: 3.0 Layout/AccessModifierIndentation: EnforcedStyle: outdent @@ -59,3 +59,6 @@ Metrics/ModuleLength: Exclude: - 'lib/chewy/rake_helper.rb' - '**/*_spec.rb' + +Style/ArgumentsForwarding: + Enabled: false diff --git a/CHANGELOG.md b/CHANGELOG.md index 74c3a5573..a7963c22f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,140 @@ ### Bugs Fixed +* [#964](https://github.com/toptal/chewy/pull/964): Fix `delayed_sidekiq` worker to handle UUID primary keys correctly. + +## 8.0.0-beta (2024-08-27) + +### New Features + +* [#962](https://github.com/toptal/chewy/pull/962): ElasticSearch v.8 support added + +* `delete_all_enabled` setting introduced to align Chewy.massacre with wildcard indices deletion disabled in ES 8 by default + +### Changes + +### Bugs Fixed + +## 7.6.0 (2024-05-03) + +### Changes + +* [#933](https://github.com/toptal/chewy/pull/933): Relax allowed `elasticsearch` dependency versions. ([@mjankowski][]) + +### Bugs Fixed +* [#937](https://github.com/toptal/chewy/pull/937): Fix for race condition while using the `delayed_sidekiq` strategy. Also, fix for Redis bloating in case of reindexing error ([@skcc321](https://github.com/skcc321)) + +* [#947](https://github.com/toptal/chewy/pull/947): Fix intermittent time-based failure in delayed sidekiq spec. ([@mjankowski][]) + +## 7.5.1 (2024-01-30) + +### New Features + +* [#925](https://github.com/toptal/chewy/pull/925): Add configuration option for default scope cleanup behavior. ([@barthez][]) + +### Changes + +### Bugs Fixed + +## 7.5.0 (2024-01-15) + +### New Features + +* [#894](https://github.com/toptal/chewy/pull/894): Way of cleaning redis from artifacts left by `delayed_sidekiq` strategy which could potentially cause flaky tests. ([@Drowze](https://github.com/Drowze)) +* [#919](https://github.com/toptal/chewy/pull/919): Add pre-request filter ([@konalegi][https://github.com/konalegi]) + +## 7.4.0 (2023-12-13) + +### New Features + +### Changes + +* [#911](https://github.com/toptal/chewy/pull/911): Remove ruby 2.x. ([@konalegi][https://github.com/konalegi]) + +### Bugs Fixed + +## 7.3.6 (2023-12-13) + +### New Features + +* [#890](https://github.com/toptal/chewy/pull/890): Add the [`knn`](https://www.elastic.co/guide/en/elasticsearch/reference/current/knn-search.html) option to the request. ([@jkostolansky][]) + +### Changes + +### Bugs Fixed + +## 7.3.5 (2023-12-06) + +### New Features + +* [#907](https://github.com/toptal/chewy/pull/907): Fix deprecation warning in LogSubscriber for Rails 7.1 ([@alejandroperea](https://github.com/alejandroperea)) + +### Changes + +### Bugs Fixed + +## 7.3.4 (2023-08-29) + +### New Features + +* [#888](https://github.com/toptal/chewy/pull/892): Rake task to create missing indexes ([@konalegi](https://github.com/konalegi)) + +### Changes + +### Bugs Fixed + +## 7.3.3 (2023-07-07) + +### New Features + +* [#888](https://github.com/toptal/chewy/pull/888/files): Skip journal creation on import ([@konalegi](https://github.com/konalegi)) + +### Changes + +### Bugs Fixed + +## 7.3.2 (2023-04-20) + +### New Features + +### Changes + +### Bugs Fixed + +* [#861](https://github.com/toptal/chewy/pull/861): Fix bug in mock_elasticsearch_response_sources ([@lafeber](https://github.com/lafeber)) + +## 7.3.1 (2023-04-20) + +### Bugs Fixed + +* [#874](https://github.com/toptal/chewy/pull/874): Fix `chewy:journal:clean` task for ruby 3.x. ([@muk-ai](https://github.com/muk-ai)) +* [#882](https://github.com/toptal/chewy/pull/882): Fix memory leak during `chewy:reset` for ruby 3.2 ([@konalegi](https://github.com/konalegi)) + +## 7.3.0 (2023-04-03) + +### New Features + +* [#869](https://github.com/toptal/chewy/pull/869): New strategy - `delayed_sidekiq`. Allow passing `strategy: :delayed_sidekiq` option to `SomeIndex.import([1, ...], strategy: :delayed_sidekiq)`. The strategy is compatible with `update_fields` option as well. ([@skcc321][]) +* [#879](https://github.com/toptal/chewy/pull/879): Configure CI to check for ruby 3.2 compatibility. ([@konalegi][]) + +### Changes + +### Bugs Fixed + +* [#856](https://github.com/toptal/chewy/pull/856): Fix return value of subscribed_task_stats used in rake tasks. ([@fabiormoura][]) + +## 7.2.7 (2022-11-15) + +### New Features + +* [#857](https://github.com/toptal/chewy/pull/857): Allow passing `wait_for_completion`, `request_per_second` and `scroll_size` options to `chewy:journal:clean` rake task and `delete_all` query builder method. ([@konalegi][])([@barthez][]) + +### Changes + +### Bugs Fixed + +* [#863](https://github.com/toptal/chewy/pull/863): Fix `crutches` call doesn't respect `update_fields` option. ([@skcc321][]) + ## 7.2.6 (2022-06-13) ### New Features diff --git a/Gemfile b/Gemfile index 969f51ad2..e838bbb9d 100644 --- a/Gemfile +++ b/Gemfile @@ -1,7 +1,5 @@ source 'https://rubygems.org' -gemspec - gem 'activerecord' gem 'activejob', require: false @@ -18,5 +16,7 @@ gem 'guard-rspec' gem 'redcarpet' gem 'yard' -gem 'rexml' if RUBY_VERSION >= '3.0.0' -gem 'ruby2_keywords' if RUBY_VERSION < '2.7' +gem 'rexml' + +eval_gemfile 'gemfiles/base.gemfile' +gemspec diff --git a/README.md b/README.md index 38c1431fc..096bdb578 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ # Chewy -Chewy is an ODM (Object Document Mapper), built on top of the [the official Elasticsearch client](https://github.com/elastic/elasticsearch-ruby). +Chewy is an ODM (Object Document Mapper), built on top of [the official Elasticsearch client](https://github.com/elastic/elasticsearch-ruby). ## Why Chewy? @@ -43,14 +43,15 @@ Or install it yourself as: ### Ruby -Chewy is compatible with MRI 2.6-3.0¹. +Chewy is compatible with MRI 3.0-3.3¹. -> ¹ Ruby 3 is only supported with Rails 6.1 +> ¹ Ruby 3 is supported with Rails 6.1, 7.0, 7.1 and 7.2 ### Elasticsearch compatibility matrix | Chewy version | Elasticsearch version | | ------------- | ---------------------------------- | +| 8.0.0 | 8.x | | 7.2.x | 7.x | | 7.1.x | 7.x | | 7.0.x | 6.8, 7.x | @@ -66,7 +67,7 @@ various Chewy versions. ### Active Record -5.2, 6.0, 6.1 Active Record versions are supported by all Chewy versions. +6.1, 7.0, 7.1, 7.2 Active Record versions are supported by Chewy. ## Getting Started @@ -97,7 +98,36 @@ development: Make sure you have Elasticsearch up and running. You can [install](https://www.elastic.co/guide/en/elasticsearch/reference/current/install-elasticsearch.html) it locally, but the easiest way is to use [Docker](https://www.docker.com/get-started): ```shell -$ docker run --rm --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.11.1 +$ docker run --rm --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "xpack.security.enabled=false" elasticsearch:8.15.0 +``` + +### Security + +Please note that starting from version 8 ElasticSearch has security features enabled by default. +Docker command above has it disabled for local testing convenience. If you want to enable it, omit +`"xpack.security.enabled=false"` part from Docker command, and run these command after starting container (container name `es8` assumed): + +Reset password for `elastic` user: +``` +docker container exec es8 '/usr/share/elasticsearch/bin/elasticsearch-reset-password' -u elastic +``` + +Extract CA certificate generated by ElasticSearch on first run: +``` +docker container cp es8:/usr/share/elasticsearch/config/certs/http_ca.crt tmp/ +``` + +And then add them to settings: + +```yaml +# config/chewy.yml +development: + host: 'localhost:9200' + user: 'elastic' + password: 'SomeLongPassword' + transport_options: + ssl: + ca_file: './tmp/http_ca.crt' ``` ### Index @@ -458,7 +488,7 @@ field :hierarchy_link, type: :join, relations: {question: %i[answer comment], an ``` assuming you have `comment_type` and `commented_id` fields in your model. -Note that when you reindex a parent, it's children and grandchildren will be reindexed as well. +Note that when you reindex a parent, its children and grandchildren will be reindexed as well. This may require additional queries to the primary database and to elastisearch. Also note that the join field doesn't support crutches (it should be a field directly defined on the model). @@ -503,7 +533,7 @@ class ProductsIndex < Chewy::Index field :name # simply use crutch-fetched data as a value: - field :category_names, value: ->(product, crutches) { crutches.categories[product.id] } + field :category_names, value: ->(product, crutches) { crutches[:categories][product.id] } end ``` @@ -525,7 +555,7 @@ So Chewy Crutches™ technology is able to increase your indexing performance in ### Witchcraft™ technology -One more experimental technology to increase import performance. As far as you know, chewy defines value proc for every imported field in mapping, so at the import time each of this procs is executed on imported object to extract result document to import. It would be great for performance to use one huge whole-document-returning proc instead. So basically the idea or Witchcraft™ technology is to compile a single document-returning proc from the index definition. +One more experimental technology to increase import performance. As far as you know, chewy defines value proc for every imported field in mapping, so at the import time each of these procs is executed on imported object to extract result document to import. It would be great for performance to use one huge whole-document-returning proc instead. So basically the idea or Witchcraft™ technology is to compile a single document-returning proc from the index definition. ```ruby index_scope Product @@ -569,7 +599,7 @@ Obviously not every type of definition might be compiled. There are some restric end ``` -However, it is quite possible that your index definition will be supported by Witchcraft™ technology out of the box in the most of the cases. +However, it is quite possible that your index definition will be supported by Witchcraft™ technology out of the box in most of the cases. ### Raw Import @@ -675,7 +705,9 @@ end You may be wondering why do you need it? The answer is simple: not to lose the data. -Imagine that you reset your index in a zero-downtime manner (to separate index), and at the meantime somebody keeps updating the data frequently (to old index). So all these actions will be written to the journal index and you'll be able to apply them after index reset using the `Chewy::Journal` interface. +Imagine that you reset your index in a zero-downtime manner (to separate index), and in the meantime somebody keeps updating the data frequently (to old index). So all these actions will be written to the journal index and you'll be able to apply them after index reset using the `Chewy::Journal` interface. + +When enabled, journal can grow to enormous size, consider setting up cron job that would clean it occasionally using [`chewy:journal:clean` rake task](#chewyjournal). ### Index manipulation @@ -772,6 +804,89 @@ The default queue name is `chewy`, you can customize it in settings: `sidekiq.qu Chewy.settings[:sidekiq] = {queue: :low} ``` +#### `:delayed_sidekiq` + +It accumulates IDs of records to be reindexed during the latency window in Redis and then performs the reindexing of all accumulated records at once. +This strategy is very useful in the case of frequently mutated records. +It supports the `update_fields` option, so it will attempt to select just enough data from the database. + +Keep in mind, this strategy does not guarantee reindexing in the event of Sidekiq worker termination or an error during the reindexing phase. +This behavior is intentional to prevent continuous growth of Redis db. + +There are three options that can be defined in the index: +```ruby +class CitiesIndex... + strategy_config delayed_sidekiq: { + latency: 3, + margin: 2, + ttl: 60 * 60 * 24, + reindex_wrapper: ->(&reindex) { + ActiveRecord::Base.connected_to(role: :reading) { reindex.call } + } + # latency - will prevent scheduling identical jobs + # margin - main purpose is to cover db replication lag by the margin + # ttl - a chunk expiration time (in seconds) + # reindex_wrapper - lambda that accepts block to wrap that reindex process AR connection block. + } + + ... +end +``` + +Also you can define defaults in the `initializers/chewy.rb` +```ruby +Chewy.settings = { + strategy_config: { + delayed_sidekiq: { + latency: 3, + margin: 2, + ttl: 60 * 60 * 24, + reindex_wrapper: ->(&reindex) { + ActiveRecord::Base.connected_to(role: :reading) { reindex.call } + } + } + } +} + +``` +or in `config/chewy.yml` +```ruby + strategy_config: + delayed_sidekiq: + latency: 3 + margin: 2 + ttl: <%= 60 * 60 * 24 %> + # reindex_wrapper setting is not possible here!!! use the initializer instead +``` + +You can use the strategy identically to other strategies +```ruby +Chewy.strategy(:delayed_sidekiq) do + City.popular.map(&:do_some_update_action!) +end +``` + +The default queue name is `chewy`, you can customize it in settings: `sidekiq.queue_name` +``` +Chewy.settings[:sidekiq] = {queue: :low} +``` + +Explicit call of the reindex using `:delayed_sidekiq strategy` +```ruby +CitiesIndex.import([1, 2, 3], strategy: :delayed_sidekiq) +``` + +Explicit call of the reindex using `:delayed_sidekiq` strategy with `:update_fields` support +```ruby +CitiesIndex.import([1, 2, 3], update_fields: [:name], strategy: :delayed_sidekiq) +``` + +While running tests with delayed_sidekiq strategy and Sidekiq is using a real redis instance that is NOT cleaned up in between tests (via e.g. `Sidekiq.redis(&:flushdb)`), you'll want to cleanup some redis keys in between tests to avoid state leaking and flaky tests. Chewy provides a convenience method for that: +```ruby +# it might be a good idea to also add to your testing setup, e.g.: a rspec `before` hook +Chewy::Strategy::DelayedSidekiq.clear_timechunks! +``` + #### `:active_job` This does the same thing as `:atomic`, but using ActiveJob. This will inherit the ActiveJob configuration settings including the `active_job.queue_adapter` setting for the environment. Patch `Chewy::Strategy::ActiveJob::Worker` for index updates improving. @@ -808,7 +923,9 @@ It is convenient for use in e.g. the Rails console with non-block notation: #### `:bypass` -The bypass strategy simply silences index updates. +When the bypass strategy is active the index will not be automatically updated on object save. + +For example, on `City.first.save!` the cities index would not be updated. #### Nesting @@ -854,6 +971,8 @@ Controller actions are wrapped with the configurable value of `Chewy.request_str It is also a good idea to set up the `:bypass` strategy inside your test suite and import objects manually only when needed, and use `Chewy.massacre` when needed to flush test ES indices before every example. This will allow you to minimize unnecessary ES requests and reduce overhead. +Deprecation note: since version 8 wildcard removing of indices is disabled by default. You can enable it for a cluster with setting `action.destructive_requires_name` to false. + ```ruby RSpec.configure do |config| config.before(:suite) do @@ -886,7 +1005,7 @@ Chewy has notifying the following events: {index: 30, delete: 5} ``` - * `payload[:errors]`: might not exists. Contains grouped errors with objects ids list: + * `payload[:errors]`: might not exist. Contains grouped errors with objects ids list: ```ruby {index: { @@ -1018,7 +1137,7 @@ Request DSL also provides additional scope actions, like `delete_all`, `exists?` #### Pagination -The request DSL supports pagination with `Kaminari`. An extension is enabled on initializtion if `Kaminari` is available. See [Chewy::Search](lib/chewy/search.rb) and [Chewy::Search::Pagination::Kaminari](lib/chewy/search/pagination/kaminari.rb) for details. +The request DSL supports pagination with `Kaminari`. An extension is enabled on initialization if `Kaminari` is available. See [Chewy::Search](lib/chewy/search.rb) and [Chewy::Search::Pagination::Kaminari](lib/chewy/search/pagination/kaminari.rb) for details. #### Named scopes @@ -1119,6 +1238,10 @@ Right now the approach is that if some data had been updated, but index definiti Also, there is always full reset alternative with `rake chewy:reset`. +#### `chewy:create_missing_indexes` + +This rake task creates newly defined indexes in ElasticSearch and skips existing ones. Useful for production-like environments. + #### Parallelizing rake tasks Every task described above has its own parallel version. Every parallel rake task takes the number for processes for execution as the first argument and the rest of the arguments are exactly the same as for the non-parallel task version. @@ -1144,6 +1267,17 @@ rake chewy:journal:apply["$(date -v-1H -u +%FT%TZ)"] # apply journaled changes f rake chewy:journal:apply["$(date -v-1H -u +%FT%TZ)",users] # apply journaled changes for the past hour on UsersIndex only ``` +When the size of the journal becomes very large, the classical way of deletion would be obstructive and resource consuming. Fortunately, Chewy internally uses [delete-by-query](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-delete-by-query.html#docs-delete-by-query-task-api) ES function which supports async execution with batching and [throttling](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html#docs-delete-by-query-throttle). + +The available options, which can be set by ENV variables, are listed below: +* `WAIT_FOR_COMPLETION` - a boolean flag. It controls async execution. It waits by default. When set to `false` (`0`, `f`, `false` or `off` in any case spelling is accepted as `false`), Elasticsearch performs some preflight checks, launches the request, and returns a task reference you can use to cancel the task or get its status. +* `REQUESTS_PER_SECOND` - float. The throttle for this request in sub-requests per second. No throttling is enforced by default. +* `SCROLL_SIZE` - integer. The number of documents to be deleted in single sub-request. The default batch size is 1000. + +```bash +rake chewy:journal:clean WAIT_FOR_COMPLETION=false REQUESTS_PER_SECOND=10 SCROLL_SIZE=5000 +``` + ### RSpec integration Just add `require 'chewy/rspec'` to your spec_helper.rb and you will get additional features: @@ -1182,6 +1316,41 @@ If you use `DatabaseCleaner` in your tests with [the `transaction` strategy](htt Chewy.use_after_commit_callbacks = !Rails.env.test? ``` +### Pre-request Filter + +Should you need to inspect the query prior to it being dispatched to ElasticSearch during any queries, you can use the `before_es_request_filter`. `before_es_request_filter` is a callable object, as demonstrated below: + +```ruby +Chewy.before_es_request_filter = -> (method_name, args, kw_args) { ... } +``` + +While using the `before_es_request_filter`, please consider the following: + +* `before_es_request_filter` acts as a simple proxy before any request made via the `ElasticSearch::Client`. The arguments passed to this filter include: + * `method_name` - The name of the method being called. Examples are search, count, bulk and etc. + * `args` and `kw_args` - These are the positional arguments provided in the method call. +* The operation is synchronous, so avoid executing any heavy or time-consuming operations within the filter to prevent performance degradation. +* The return value of the proc is disregarded. This filter is intended for inspection or modification of the query rather than generating a response. +* Any exception raised inside the callback will propagate upward and halt the execution of the query. It is essential to handle potential errors adequately to ensure the stability of your search functionality. + +### Import scope clean-up behavior + +Whenever you set the `import_scope` for the index, in the case of ActiveRecord, +options for order, offset and limit will be removed. You can set the behavior of +chewy, before the clean-up itself. + +The default behavior is a warning sent to the Chewy logger (`:warn`). Another more +restrictive option is raising an exception (`:raise`). Both options have a +negative impact on performance since verifying whether the code uses any of +these options requires building AREL query. + +To avoid the loading time impact, you can ignore the check (`:ignore`) before +the clean-up. + +``` +Chewy.import_scope_cleanup_behavior = :ignore +``` + ## Contributing 1. Fork it (http://github.com/toptal/chewy/fork) diff --git a/chewy.gemspec b/chewy.gemspec index e85d12b84..decbe7355 100644 --- a/chewy.gemspec +++ b/chewy.gemspec @@ -2,7 +2,7 @@ lib = File.expand_path('lib', __dir__) $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) require 'chewy/version' -Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength +Gem::Specification.new do |spec| spec.name = 'chewy' spec.version = Chewy::VERSION spec.authors = ['Toptal, LLC', 'pyromaniac'] @@ -11,26 +11,14 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength spec.description = 'Chewy provides functionality for Elasticsearch index handling, documents import mappings and chainable query DSL' spec.homepage = 'https://github.com/toptal/chewy' spec.license = 'MIT' + spec.required_ruby_version = '~> 3.0' spec.files = `git ls-files`.split($RS) spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) } - spec.test_files = spec.files.grep(%r{^(test|spec|features)/}) spec.require_paths = ['lib'] - spec.add_development_dependency 'database_cleaner' - spec.add_development_dependency 'elasticsearch-extensions' - spec.add_development_dependency 'rake' - spec.add_development_dependency 'rspec', '>= 3.7.0' - spec.add_development_dependency 'rspec-collection_matchers' - spec.add_development_dependency 'rspec-its' - spec.add_development_dependency 'rubocop', '1.11' - spec.add_development_dependency 'sqlite3' - spec.add_development_dependency 'timecop' - - spec.add_development_dependency 'method_source' - spec.add_development_dependency 'unparser' - - spec.add_dependency 'activesupport', '>= 5.2' - spec.add_dependency 'elasticsearch', '>= 7.12.0', '< 7.14.0' + spec.add_dependency 'activesupport', '>= 6.1' + spec.add_dependency 'elasticsearch', '>= 8.14', '< 9.0' spec.add_dependency 'elasticsearch-dsl' + spec.metadata['rubygems_mfa_required'] = 'true' end diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 000000000..0c1309de2 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,15 @@ +version: "3.4" +services: + elasticsearch_test: + image: "elasticsearch:8.15.0" + environment: + - bootstrap.memory_lock=${ES_MEMORY_LOCK:-false} + - "ES_JAVA_OPTS=-Xms${TEST_ES_HEAP_SIZE:-500m} -Xmx${TEST_ES_HEAP_SIZE:-500m}" + - discovery.type=single-node + - xpack.security.enabled=false + ports: + - "127.0.0.1:9250:9200" + ulimits: + nofile: + soft: 65536 + hard: 65536 diff --git a/gemfiles/base.gemfile b/gemfiles/base.gemfile new file mode 100644 index 000000000..83269e4bf --- /dev/null +++ b/gemfiles/base.gemfile @@ -0,0 +1,12 @@ +gem 'database_cleaner' +gem 'elasticsearch-extensions' +gem 'method_source' +gem 'rake' +gem 'redis', require: false +gem 'rspec', '>= 3.7.0' +gem 'rspec-collection_matchers' +gem 'rspec-its' +gem 'rubocop', '1.65.1' +gem 'sqlite3', '~> 1.4' +gem 'timecop' +gem 'unparser' diff --git a/gemfiles/rails.6.1.activerecord.gemfile b/gemfiles/rails.6.1.activerecord.gemfile index cdc7b82cc..526db972f 100644 --- a/gemfiles/rails.6.1.activerecord.gemfile +++ b/gemfiles/rails.6.1.activerecord.gemfile @@ -8,6 +8,7 @@ gem 'parallel', require: false gem 'rspec_junit_formatter', '~> 0.4.1' gem 'sidekiq', require: false -gem 'rexml' if RUBY_VERSION >= '3.0.0' +gem 'rexml' gemspec path: '../' +eval_gemfile 'base.gemfile' diff --git a/gemfiles/rails.7.0.activerecord.gemfile b/gemfiles/rails.7.0.activerecord.gemfile index e90b18cab..1176622ac 100644 --- a/gemfiles/rails.7.0.activerecord.gemfile +++ b/gemfiles/rails.7.0.activerecord.gemfile @@ -8,6 +8,7 @@ gem 'parallel', require: false gem 'rspec_junit_formatter', '~> 0.4.1' gem 'sidekiq', require: false -gem 'rexml' if RUBY_VERSION >= '3.0.0' +gem 'rexml' gemspec path: '../' +eval_gemfile 'base.gemfile' diff --git a/gemfiles/rails.5.2.activerecord.gemfile b/gemfiles/rails.7.1.activerecord.gemfile similarity index 60% rename from gemfiles/rails.5.2.activerecord.gemfile rename to gemfiles/rails.7.1.activerecord.gemfile index 5838db590..eb39c6ee2 100644 --- a/gemfiles/rails.5.2.activerecord.gemfile +++ b/gemfiles/rails.7.1.activerecord.gemfile @@ -1,11 +1,14 @@ source 'https://rubygems.org' -gem 'activejob', '~> 5.2.0' -gem 'activerecord', '~> 5.2.0' -gem 'activesupport', '~> 5.2.0' +gem 'activejob', '~> 7.1.0' +gem 'activerecord', '~> 7.1.0' +gem 'activesupport', '~> 7.1.0' gem 'kaminari-core', '~> 1.1.0', require: false gem 'parallel', require: false gem 'rspec_junit_formatter', '~> 0.4.1' gem 'sidekiq', require: false +gem 'rexml' + gemspec path: '../' +eval_gemfile 'base.gemfile' diff --git a/gemfiles/rails.6.0.activerecord.gemfile b/gemfiles/rails.7.2.activerecord.gemfile similarity index 60% rename from gemfiles/rails.6.0.activerecord.gemfile rename to gemfiles/rails.7.2.activerecord.gemfile index 1f0696be9..2cbbb94c0 100644 --- a/gemfiles/rails.6.0.activerecord.gemfile +++ b/gemfiles/rails.7.2.activerecord.gemfile @@ -1,11 +1,14 @@ source 'https://rubygems.org' -gem 'activejob', '~> 6.0.0' -gem 'activerecord', '~> 6.0.0' -gem 'activesupport', '~> 6.0.0' +gem 'activejob', '~> 7.2.0' +gem 'activerecord', '~> 7.2.0' +gem 'activesupport', '~> 7.2.0' gem 'kaminari-core', '~> 1.1.0', require: false gem 'parallel', require: false gem 'rspec_junit_formatter', '~> 0.4.1' gem 'sidekiq', require: false +gem 'rexml' + gemspec path: '../' +eval_gemfile 'base.gemfile' diff --git a/lib/chewy.rb b/lib/chewy.rb index a6239be31..dfeed3416 100644 --- a/lib/chewy.rb +++ b/lib/chewy.rb @@ -1,3 +1,4 @@ +require 'active_support' require 'active_support/version' require 'active_support/concern' require 'active_support/deprecation' @@ -47,7 +48,8 @@ def try_require(path) require 'chewy/fields/base' require 'chewy/fields/root' require 'chewy/journal' -require 'chewy/railtie' if defined?(::Rails::Railtie) +require 'chewy/railtie' if defined?(Rails::Railtie) +require 'chewy/elastic_client' ActiveSupport.on_load(:active_record) do include Chewy::Index::Observe::ActiveRecordMethods @@ -96,12 +98,7 @@ def derive_name(index_name) # Main elasticsearch-ruby client instance # def client - Chewy.current[:chewy_client] ||= begin - client_configuration = configuration.deep_dup - client_configuration.delete(:prefix) # used by Chewy, not relevant to Elasticsearch::Client - block = client_configuration[:transport_options].try(:delete, :proc) - ::Elasticsearch::Client.new(client_configuration, &block) - end + Chewy.current[:chewy_client] ||= Chewy::ElasticClient.new end # Sends wait_for_status request to ElasticSearch with status @@ -119,6 +116,10 @@ def wait_for_status # Be careful, if current prefix is blank, this will destroy all the indexes. # def massacre + unless Chewy.settings[:delete_all_enabled] + raise FeatureDisabled, 'Feature disabled by default in ES 8. You can enable it in the cluster and set `delete_all_enabled` option in settings' + end + Chewy.client.indices.delete(index: [Chewy.configuration[:prefix], '*'].reject(&:blank?).join('_')) Chewy.wait_for_status end diff --git a/lib/chewy/config.rb b/lib/chewy/config.rb index 29f71749b..fdae4ae45 100644 --- a/lib/chewy/config.rb +++ b/lib/chewy/config.rb @@ -38,7 +38,12 @@ class Config # for type mappings like `_all`. :default_root_options, # Default field type for any field in any Chewy type. Defaults to 'text'. - :default_field_type + :default_field_type, + # Callback called on each search request to be done into ES + :before_es_request_filter, + # Behavior when import scope for index includes order, offset or limit. + # Can be :ignore, :warn, :raise. Defaults to :warn + :import_scope_cleanup_behavior attr_reader :transport_logger, :transport_tracer, # Chewy search request DSL base class, used by every index. @@ -60,6 +65,7 @@ def initialize @indices_path = 'app/chewy' @default_root_options = {} @default_field_type = 'text'.freeze + @import_scope_cleanup_behavior = :warn @search_class = build_search_class(Chewy::Search::Request) end @@ -127,17 +133,19 @@ def configuration private def yaml_settings - @yaml_settings ||= begin - if defined?(Rails::VERSION) - file = Rails.root.join('config', 'chewy.yml') + @yaml_settings ||= build_yaml_settings || {} + end - if File.exist?(file) - yaml = ERB.new(File.read(file)).result - hash = YAML.respond_to?(:unsafe_load) ? YAML.unsafe_load(yaml) : YAML.load(yaml) # rubocop:disable Security/YAMLLoad - hash[Rails.env].try(:deep_symbolize_keys) if hash - end - end || {} - end + def build_yaml_settings + return unless defined?(Rails::VERSION) + + file = Rails.root.join('config', 'chewy.yml') + + return unless File.exist?(file) + + yaml = ERB.new(File.read(file)).result + hash = YAML.unsafe_load(yaml) + hash[Rails.env].try(:deep_symbolize_keys) if hash end def build_search_class(base) diff --git a/lib/chewy/elastic_client.rb b/lib/chewy/elastic_client.rb new file mode 100644 index 000000000..41a985ccc --- /dev/null +++ b/lib/chewy/elastic_client.rb @@ -0,0 +1,31 @@ +module Chewy + # Replacement for Chewy.client + class ElasticClient + def self.build_es_client(configuration = Chewy.configuration) + client_configuration = configuration.deep_dup + client_configuration.delete(:prefix) # used by Chewy, not relevant to Elasticsearch::Client + block = client_configuration[:transport_options].try(:delete, :proc) + ::Elasticsearch::Client.new(client_configuration, &block) + end + + def initialize(elastic_client = self.class.build_es_client) + @elastic_client = elastic_client + end + + private + + def method_missing(name, *args, **kwargs, &block) + inspect_payload(name, args, kwargs) + + @elastic_client.__send__(name, *args, **kwargs, &block) + end + + def respond_to_missing?(name, _include_private = false) + @elastic_client.respond_to?(name) || super + end + + def inspect_payload(name, args, kwargs) + Chewy.config.before_es_request_filter&.call(name, args, kwargs) + end + end +end diff --git a/lib/chewy/errors.rb b/lib/chewy/errors.rb index 5b198ed26..e16848794 100644 --- a/lib/chewy/errors.rb +++ b/lib/chewy/errors.rb @@ -7,7 +7,7 @@ class UndefinedIndex < Error class UndefinedUpdateStrategy < Error def initialize(_type) - super <<-MESSAGE + super(<<-MESSAGE) Index update strategy is undefined for current context. Please wrap your code with `Chewy.strategy(:strategy_name) block.` MESSAGE @@ -27,7 +27,7 @@ def initialize(type, import_errors) message << " on #{documents.count} documents: #{documents}\n" end end - super message + super(message) end end @@ -36,4 +36,10 @@ def initialize(join_field_type, join_field_name, relations) super("`#{join_field_type}` set for the join field `#{join_field_name}` is not on the :relations list (#{relations})") end end + + class ImportScopeCleanupError < Error + end + + class FeatureDisabled < Error + end end diff --git a/lib/chewy/fields/root.rb b/lib/chewy/fields/root.rb index c37044bf3..e821cb9ca 100644 --- a/lib/chewy/fields/root.rb +++ b/lib/chewy/fields/root.rb @@ -4,7 +4,7 @@ class Root < Chewy::Fields::Base attr_reader :dynamic_templates, :id def initialize(name, **options) - super(name, **options) + super @value ||= -> { self } @dynamic_templates = [] @@ -27,7 +27,7 @@ def mappings_hash mappings[name] end - ruby2_keywords def dynamic_template(*args) + def dynamic_template(*args) options = args.extract_options!.deep_symbolize_keys if args.first template_name = :"template_#{dynamic_templates.count.next}" diff --git a/lib/chewy/index.rb b/lib/chewy/index.rb index cdb3fa055..4c63cd8e4 100644 --- a/lib/chewy/index.rb +++ b/lib/chewy/index.rb @@ -20,6 +20,10 @@ class Index pipeline raw_import refresh replication ].freeze + STRATEGY_OPTIONS = { + delayed_sidekiq: %i[latency margin ttl reindex_wrapper] + }.freeze + include Search include Actions include Aliases @@ -221,6 +225,27 @@ def default_import_options(params) params.assert_valid_keys(IMPORT_OPTIONS_KEYS) self._default_import_options = _default_import_options.merge(params) end + + def strategy_config(params = {}) + @strategy_config ||= begin + config_struct = Struct.new(*STRATEGY_OPTIONS.keys).new + + STRATEGY_OPTIONS.each_with_object(config_struct) do |(strategy, options), res| + res[strategy] = case strategy + when :delayed_sidekiq + Struct.new(*STRATEGY_OPTIONS[strategy]).new.tap do |config| + options.each do |option| + config[option] = params.dig(strategy, option) || Chewy.configuration.dig(:strategy_config, strategy, option) + end + + config[:reindex_wrapper] ||= ->(&reindex) { reindex.call } # default wrapper + end + else + raise NotImplementedError, "Unsupported strategy: '#{strategy}'" + end + end + end + end end end end diff --git a/lib/chewy/index/actions.rb b/lib/chewy/index/actions.rb index afc7debcc..a146f47cc 100644 --- a/lib/chewy/index/actions.rb +++ b/lib/chewy/index/actions.rb @@ -32,7 +32,7 @@ def exists? # def create(*args, **kwargs) create!(*args, **kwargs) - rescue Elasticsearch::Transport::Transport::Errors::BadRequest + rescue Elastic::Transport::Transport::Errors::BadRequest false end @@ -83,9 +83,9 @@ def delete(suffix = nil) result = client.indices.delete index: index_names.join(',') Chewy.wait_for_status if result result - # es-ruby >= 1.0.10 handles Elasticsearch::Transport::Transport::Errors::NotFound + # es-ruby >= 1.0.10 handles Elastic::Transport::Transport::Errors::NotFound # by itself, rescue is for previous versions - rescue Elasticsearch::Transport::Transport::Errors::NotFound + rescue Elastic::Transport::Transport::Errors::NotFound false end @@ -99,9 +99,9 @@ def delete(suffix = nil) # UsersIndex.delete '01-2014' # deletes `users_01-2014` index # def delete!(suffix = nil) - # es-ruby >= 1.0.10 handles Elasticsearch::Transport::Transport::Errors::NotFound + # es-ruby >= 1.0.10 handles Elastic::Transport::Transport::Errors::NotFound # by itself, so it is raised here - delete(suffix) or raise Elasticsearch::Transport::Transport::Errors::NotFound + delete(suffix) or raise Elastic::Transport::Transport::Errors::NotFound end # Deletes and recreates index. Supports suffixes. diff --git a/lib/chewy/index/adapter/active_record.rb b/lib/chewy/index/adapter/active_record.rb index b69e11b9a..8682b3b13 100644 --- a/lib/chewy/index/adapter/active_record.rb +++ b/lib/chewy/index/adapter/active_record.rb @@ -6,16 +6,26 @@ module Adapter class ActiveRecord < Orm def self.accepts?(target) defined?(::ActiveRecord::Base) && ( - target.is_a?(Class) && target < ::ActiveRecord::Base || + (target.is_a?(Class) && target < ::ActiveRecord::Base) || target.is_a?(::ActiveRecord::Relation)) end private def cleanup_default_scope! - if Chewy.logger && (@default_scope.arel.orders.present? || + behavior = Chewy.config.import_scope_cleanup_behavior + + if behavior != :ignore && (@default_scope.arel.orders.present? || @default_scope.arel.limit.present? || @default_scope.arel.offset.present?) - Chewy.logger.warn('Default type scope order, limit and offset are ignored and will be nullified') + if behavior == :warn && Chewy.logger + gem_dir = File.realpath('../..', __dir__) + source = caller.grep_v(Regexp.new(gem_dir)).first + Chewy.logger.warn( + "Default type scope order, limit and offset are ignored and will be nullified (called from: #{source})" + ) + elsif behavior == :raise + raise ImportScopeCleanupError, 'Default type scope order, limit and offset are ignored and will be nullified' + end end @default_scope = @default_scope.reorder(nil).limit(nil).offset(nil) diff --git a/lib/chewy/index/adapter/object.rb b/lib/chewy/index/adapter/object.rb index f8df7995c..156a10232 100644 --- a/lib/chewy/index/adapter/object.rb +++ b/lib/chewy/index/adapter/object.rb @@ -85,7 +85,7 @@ def identify(collection) # @param args [Array<#to_json>] # @option options [Integer] :batch_size import processing batch size # @return [true, false] - ruby2_keywords def import(*args, &block) + def import(*args, &block) collection, options = import_args(*args) import_objects(collection, options, &block) end @@ -113,7 +113,7 @@ def identify(collection) # end # # @see Chewy::Index::Adapter::Base#import_fields - ruby2_keywords def import_fields(*args, &block) + def import_fields(*args, &block) return enum_for(:import_fields, *args) unless block_given? options = args.extract_options! @@ -139,7 +139,7 @@ def identify(collection) # For the Object adapter returns the objects themselves in batches. # # @see Chewy::Index::Adapter::Base#import_references - ruby2_keywords def import_references(*args, &block) + def import_references(*args, &block) return enum_for(:import_references, *args) unless block_given? collection, options = import_args(*args) diff --git a/lib/chewy/index/adapter/orm.rb b/lib/chewy/index/adapter/orm.rb index 57e53e82b..c86b6d06b 100644 --- a/lib/chewy/index/adapter/orm.rb +++ b/lib/chewy/index/adapter/orm.rb @@ -72,7 +72,7 @@ def identify(collection) # # or # UsersIndex.import users.map(&:id) # user ids will be deleted from index # - ruby2_keywords def import(*args, &block) + def import(*args, &block) collection, options = import_args(*args) if !collection.is_a?(relation_class) || options[:direct_import] @@ -82,7 +82,7 @@ def identify(collection) end end - ruby2_keywords def import_fields(*args, &block) + def import_fields(*args, &block) return enum_for(:import_fields, *args) unless block_given? collection, options = import_args(*args) diff --git a/lib/chewy/index/aliases.rb b/lib/chewy/index/aliases.rb index cfcf74732..9b4a2caef 100644 --- a/lib/chewy/index/aliases.rb +++ b/lib/chewy/index/aliases.rb @@ -22,7 +22,7 @@ def aliases def empty_if_not_found yield - rescue Elasticsearch::Transport::Transport::Errors::NotFound + rescue Elastic::Transport::Transport::Errors::NotFound [] end end diff --git a/lib/chewy/index/crutch.rb b/lib/chewy/index/crutch.rb index a36fc4ef8..4377187af 100644 --- a/lib/chewy/index/crutch.rb +++ b/lib/chewy/index/crutch.rb @@ -12,13 +12,21 @@ class Crutches def initialize(index, collection) @index = index @collection = collection - @index._crutches.each_key do |name| - singleton_class.class_eval <<-METHOD, __FILE__, __LINE__ + 1 - def #{name} - @#{name} ||= @index._crutches[:#{name}].call @collection - end - METHOD - end + @crutches_instances = {} + end + + def method_missing(name, *, **) + return self[name] if @index._crutches.key?(name) + + super + end + + def respond_to_missing?(name, include_private = false) + @index._crutches.key?(name) || super + end + + def [](name) + @crutches_instances[name] ||= @index._crutches[:"#{name}"].call(@collection) end end diff --git a/lib/chewy/index/import.rb b/lib/chewy/index/import.rb index d9a23aaee..8de6a877a 100644 --- a/lib/chewy/index/import.rb +++ b/lib/chewy/index/import.rb @@ -72,8 +72,8 @@ module ClassMethods # @option options [true, false] update_failover enables full objects reimport in cases of partial update errors, `true` by default # @option options [true, Integer, Hash] parallel enables parallel import processing with the Parallel gem, accepts the number of workers or any Parallel gem acceptable options # @return [true, false] false in case of errors - ruby2_keywords def import(*args) - import_routine(*args).blank? + def import(*args) + intercept_import_using_strategy(*args).blank? end # @!method import!(*collection, **options) @@ -83,8 +83,9 @@ module ClassMethods # in case of any import errors. # # @raise [Chewy::ImportFailed] in case of errors - ruby2_keywords def import!(*args) - errors = import_routine(*args) + def import!(*args) + errors = intercept_import_using_strategy(*args) + raise Chewy::ImportFailed.new(self, errors) if errors.present? true @@ -126,6 +127,32 @@ def compose(object, crutches = nil, fields: []) private + def intercept_import_using_strategy(*args) + args_clone = args.deep_dup + options = args_clone.extract_options! + strategy = options.delete(:strategy) + + return import_routine(*args) if strategy.blank? + + ids = args_clone.flatten + return {} if ids.blank? + return {argument: {"#{strategy} supports ids only!" => ids}} unless ids.all? do |id| + id.respond_to?(:to_i) + end + + case strategy + when :delayed_sidekiq + begin + Chewy::Strategy::DelayedSidekiq::Scheduler.new(self, ids, options).postpone + {} # success. errors handling convention + rescue StandardError => e + {scheduler: {e.message => ids}} + end + else + {argument: {"unsupported strategy: '#{strategy}'" => ids}} + end + end + def import_routine(*args) return if !args.first.nil? && empty_objects_or_scope?(args.first) diff --git a/lib/chewy/index/import/bulk_builder.rb b/lib/chewy/index/import/bulk_builder.rb index c39a30cfd..52e3fdcfd 100644 --- a/lib/chewy/index/import/bulk_builder.rb +++ b/lib/chewy/index/import/bulk_builder.rb @@ -48,12 +48,11 @@ def crutches_for_index def index_entry(object) entry = {} entry[:_id] = index_object_ids[object] if index_object_ids[object] + entry[:routing] = routing(object) if join_field? - data = data_for(object) parent = cache(entry[:_id]) - - entry[:routing] = routing(object) if join_field? - if parent_changed?(data, parent) + data = data_for(object) if parent.present? + if parent.present? && parent_changed?(data, parent) reindex_entries(object, data) + reindex_descendants(object) elsif @fields.present? return [] unless entry[:_id] @@ -61,7 +60,7 @@ def index_entry(object) entry[:data] = {doc: data_for(object, fields: @fields)} [{update: entry}] else - entry[:data] = data + entry[:data] = data || data_for(object) [{index: entry}] end end @@ -163,12 +162,12 @@ def load_cache .filter(ids: {values: ids_for_cache}) .order('_doc') .pluck(:_id, :_routing, join_field) - .map do |id, routing, join| + .to_h do |id, routing, join| [ id, {routing: routing, parent_id: join['parent']} ] - end.to_h + end end def existing_routing(id) diff --git a/lib/chewy/index/import/routine.rb b/lib/chewy/index/import/routine.rb index 556510572..61004955a 100644 --- a/lib/chewy/index/import/routine.rb +++ b/lib/chewy/index/import/routine.rb @@ -64,7 +64,7 @@ def initialize(index, **options) # Creates the journal index and the corresponding index if necessary. # @return [Object] whatever def create_indexes! - Chewy::Stash::Journal.create if @options[:journal] + Chewy::Stash::Journal.create if @options[:journal] && !Chewy.configuration[:skip_journal_creation_on_import] return if Chewy.configuration[:skip_index_creation_on_import] @index.create!(**@bulk_options.slice(:suffix)) unless @index.exists? diff --git a/lib/chewy/index/observe/active_record_methods.rb b/lib/chewy/index/observe/active_record_methods.rb index ca5834efc..7f4897705 100644 --- a/lib/chewy/index/observe/active_record_methods.rb +++ b/lib/chewy/index/observe/active_record_methods.rb @@ -71,7 +71,7 @@ def initialize_chewy_callbacks end end - ruby2_keywords def update_index(type_name, *args, &block) + def update_index(type_name, *args, &block) callback_options = Observe.extract_callback_options!(args) update_proc = Observe.update_proc(type_name, *args, &block) callback = Chewy::Index::Observe::Callback.new(update_proc, callback_options) diff --git a/lib/chewy/index/syncer.rb b/lib/chewy/index/syncer.rb index 4016989da..47408c9af 100644 --- a/lib/chewy/index/syncer.rb +++ b/lib/chewy/index/syncer.rb @@ -27,7 +27,7 @@ class Index # @see Chewy::Index::Actions::ClassMethods#sync class Syncer DEFAULT_SYNC_BATCH_SIZE = 20_000 - ISO_DATETIME = /\A(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)(\.\d+)?\z/.freeze + ISO_DATETIME = /\A(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)(\.\d+)?\z/ OUTDATED_IDS_WORKER = lambda do |outdated_sync_field_type, source_data_hash, index, total, index_data| ::Process.setproctitle("chewy [#{index}]: sync outdated calculation (#{::Parallel.worker_number + 1}/#{total})") if index index_data.each_with_object([]) do |(id, index_sync_value), result| @@ -213,7 +213,7 @@ def outdated_sync_field_type @outdated_sync_field_type = mappings .fetch('properties', {}) .fetch(@index.outdated_sync_field.to_s, {})['type'] - rescue Elasticsearch::Transport::Transport::Errors::NotFound + rescue Elastic::Transport::Transport::Errors::NotFound nil end end diff --git a/lib/chewy/journal.rb b/lib/chewy/journal.rb index da351fb63..1fdbb9a5d 100644 --- a/lib/chewy/journal.rb +++ b/lib/chewy/journal.rb @@ -43,8 +43,12 @@ def apply(since_time, fetch_limit: 10, **import_options) # # @param until_time [Time, DateTime] time to clean up until it # @return [Hash] delete_by_query ES API call result - def clean(until_time = nil) - Chewy::Stash::Journal.clean(until_time, only: @only) + def clean(until_time = nil, delete_by_query_options: {}) + Chewy::Stash::Journal.clean( + until_time, + only: @only, + delete_by_query_options: delete_by_query_options.merge(refresh: false) + ) end private diff --git a/lib/chewy/log_subscriber.rb b/lib/chewy/log_subscriber.rb index c35d63fbf..368c0813b 100644 --- a/lib/chewy/log_subscriber.rb +++ b/lib/chewy/log_subscriber.rb @@ -24,7 +24,11 @@ def render_action(action, event) subject = payload[:type].presence || payload[:index] action = "#{subject} #{action} (#{event.duration.round(1)}ms)" - action = color(action, GREEN, true) + action = if ActiveSupport.version >= Gem::Version.new('7.1') + color(action, GREEN, bold: true) + else + color(action, GREEN, true) + end debug(" #{action} #{description}") end diff --git a/lib/chewy/minitest/helpers.rb b/lib/chewy/minitest/helpers.rb index 26ff95e3f..0254c8f97 100644 --- a/lib/chewy/minitest/helpers.rb +++ b/lib/chewy/minitest/helpers.rb @@ -97,7 +97,7 @@ def mock_elasticsearch_response_sources(index, hits, &block) { '_index' => index.index_name, '_type' => '_doc', - '_id' => (i + 1).to_s, + '_id' => hit[:id] || (i + 1).to_s, '_score' => 3.14, '_source' => hit } @@ -142,7 +142,7 @@ def index_everything! teardown do # always destroy indexes between tests # Prevent croll pollution of test cases due to indexing - Chewy.massacre + drop_indices end end end diff --git a/lib/chewy/minitest/search_index_receiver.rb b/lib/chewy/minitest/search_index_receiver.rb index d8f0bd4f8..69c95b55e 100644 --- a/lib/chewy/minitest/search_index_receiver.rb +++ b/lib/chewy/minitest/search_index_receiver.rb @@ -6,6 +6,8 @@ # The class will capture the data from the *param on the Chewy::Index.bulk method and # aggregate the data for test analysis. class SearchIndexReceiver + MUTATION_FOR_CLASS = Struct.new(:indexes, :deletes, keyword_init: true) + def initialize @mutations = {} end @@ -71,6 +73,6 @@ def updated_indexes # @param index [Chewy::Index] the index to fetch. # @return [#indexes, #deletes] an object with a list of indexes and a list of deletes. def mutation_for(index) - @mutations[index] ||= OpenStruct.new(indexes: [], deletes: []) + @mutations[index] ||= MUTATION_FOR_CLASS.new(indexes: [], deletes: []) end end diff --git a/lib/chewy/rake_helper.rb b/lib/chewy/rake_helper.rb index 8e0de510d..32de31f6e 100644 --- a/lib/chewy/rake_helper.rb +++ b/lib/chewy/rake_helper.rb @@ -19,6 +19,9 @@ module RakeHelper output.puts " Applying journal to #{targets}, #{count} entries, stage #{payload[:stage]}" end + DELETE_BY_QUERY_OPTIONS = %w[WAIT_FOR_COMPLETION REQUESTS_PER_SECOND SCROLL_SIZE].freeze + FALSE_VALUES = %w[0 f false off].freeze + class << self # Performs zero-downtime reindexing of all documents for the specified indexes # @@ -162,7 +165,7 @@ def journal_apply(time: nil, only: nil, except: nil, output: $stdout) subscribed_task_stats(output) do output.puts "Applying journal entries created after #{time}" - count = Chewy::Journal.new(indexes_from(only: only, except: except)).apply(time) + count = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).apply(time) output.puts 'No journal entries were created after the specified time' if count.zero? end end @@ -181,12 +184,29 @@ def journal_apply(time: nil, only: nil, except: nil, output: $stdout) # @param except [Array, Chewy::Index, String] indexes to exclude from processing # @param output [IO] output io for logging # @return [Array] indexes that were actually updated - def journal_clean(time: nil, only: nil, except: nil, output: $stdout) + def journal_clean(time: nil, only: nil, except: nil, delete_by_query_options: {}, output: $stdout) subscribed_task_stats(output) do output.puts "Cleaning journal entries created before #{time}" if time - response = Chewy::Journal.new(indexes_from(only: only, except: except)).clean(time) - count = response['deleted'] || response['_indices']['_all']['deleted'] - output.puts "Cleaned up #{count} journal entries" + response = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).clean(time, delete_by_query_options: delete_by_query_options) + if response.key?('task') + output.puts "Task to cleanup the journal has been created, #{response['task']}" + else + count = response['deleted'] || response['_indices']['_all']['deleted'] + output.puts "Cleaned up #{count} journal entries" + end + end + end + + # Creates journal index. + # + # @example + # Chewy::RakeHelper.journal_create # creates journal + # + # @param output [IO] output io for logging + # @return Chewy::Index Returns instance of chewy index + def journal_create(output: $stdout) + subscribed_task_stats(output) do + Chewy::Stash::Journal.create! end end @@ -228,6 +248,44 @@ def update_mapping(name:, output: $stdout) end end + # Reads options that are required to run journal cleanup asynchronously from ENV hash + # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html + # + # @example + # Chewy::RakeHelper.delete_by_query_options_from_env({'WAIT_FOR_COMPLETION' => 'false','REQUESTS_PER_SECOND' => '10','SCROLL_SIZE' => '5000'}) + # # => { wait_for_completion: false, requests_per_second: 10.0, scroll_size: 5000 } + # + def delete_by_query_options_from_env(env) + env + .slice(*DELETE_BY_QUERY_OPTIONS) + .transform_keys { |k| k.downcase.to_sym } + .to_h do |key, value| + case key + when :wait_for_completion then [key, !FALSE_VALUES.include?(value.downcase)] + when :requests_per_second then [key, value.to_f] + when :scroll_size then [key, value.to_i] + end + end + end + + def create_missing_indexes!(output: $stdout, env: ENV) + subscribed_task_stats(output) do + Chewy.eager_load! + all_indexes = Chewy::Index.descendants + all_indexes -= [Chewy::Stash::Journal] unless Chewy.configuration[:journal] + all_indexes.each do |index| + if index.exists? + output.puts "#{index.name} already exists, skipping" if env['VERBOSE'] + next + end + + index.create! + + output.puts "#{index.name} index successfully created" + end + end + end + def normalize_indexes(*identifiers) identifiers.flatten(1).map { |identifier| normalize_index(identifier) } end @@ -243,11 +301,18 @@ def subscribed_task_stats(output = $stdout, &block) ActiveSupport::Notifications.subscribed(JOURNAL_CALLBACK.curry[output], 'apply_journal.chewy') do ActiveSupport::Notifications.subscribed(IMPORT_CALLBACK.curry[output], 'import_objects.chewy', &block) end + ensure output.puts "Total: #{human_duration(Time.now - start)}" end private + def journal_indexes_from(only: nil, except: nil) + return if Array.wrap(only).empty? && Array.wrap(except).empty? + + indexes_from(only: only, except: except) + end + def indexes_from(only: nil, except: nil) indexes = if only.present? normalize_indexes(Array.wrap(only)) @@ -255,11 +320,7 @@ def indexes_from(only: nil, except: nil) all_indexes end - indexes = if except.present? - indexes - normalize_indexes(Array.wrap(except)) - else - indexes - end + indexes -= normalize_indexes(Array.wrap(except)) if except.present? indexes.sort_by(&:derivable_name) end @@ -282,9 +343,9 @@ def warn_missing_index(output) return if journal_exists? output.puts "############################################################\n" \ - "WARN: You are risking to lose some changes during the reset.\n" \ - " Please consider enabling journaling.\n" \ - " See https://github.com/toptal/chewy#journaling\n" \ + "WARN: You are risking to lose some changes during the reset.\n " \ + "Please consider enabling journaling.\n " \ + "See https://github.com/toptal/chewy#journaling\n" \ '############################################################' end diff --git a/lib/chewy/rspec/update_index.rb b/lib/chewy/rspec/update_index.rb index bd6b2a3f2..3a16ece2a 100644 --- a/lib/chewy/rspec/update_index.rb +++ b/lib/chewy/rspec/update_index.rb @@ -108,7 +108,7 @@ def supports_block_expectations? params_matcher = @no_refresh ? has_entry(refresh: false) : any_parameters Chewy::Index::Import::BulkRequest.stubs(:new).with(index, params_matcher).returns(mock_bulk_request) else - mocked_already = ::RSpec::Mocks.space.proxy_for(Chewy::Index::Import::BulkRequest).method_double_if_exists_for_message(:new) + mocked_already = RSpec::Mocks.space.proxy_for(Chewy::Index::Import::BulkRequest).method_double_if_exists_for_message(:new) allow(Chewy::Index::Import::BulkRequest).to receive(:new).and_call_original unless mocked_already params_matcher = @no_refresh ? hash_including(refresh: false) : any_args allow(Chewy::Index::Import::BulkRequest).to receive(:new).with(index, params_matcher).and_return(mock_bulk_request) @@ -220,7 +220,7 @@ def extract_documents(*args) expected_count = options[:times] || options[:count] expected_attributes = (options[:with] || options[:attributes] || {}).deep_symbolize_keys - args.flatten.map do |document| + args.flatten.to_h do |document| id = document.respond_to?(:id) ? document.id.to_s : document.to_s [id, { document: document, @@ -229,7 +229,7 @@ def extract_documents(*args) real_count: 0, real_attributes: {} }] - end.to_h + end end def compare_attributes(expected, real) diff --git a/lib/chewy/runtime/version.rb b/lib/chewy/runtime/version.rb index 1154ec0dc..a96880544 100644 --- a/lib/chewy/runtime/version.rb +++ b/lib/chewy/runtime/version.rb @@ -5,7 +5,7 @@ class Version attr_reader :major, :minor, :patch def initialize(version) - @major, @minor, @patch = *(version.to_s.split('.', 3) + [0] * 3).first(3).map(&:to_i) + @major, @minor, @patch = *(version.to_s.split('.', 3) + ([0] * 3)).first(3).map(&:to_i) end def to_s diff --git a/lib/chewy/search.rb b/lib/chewy/search.rb index 78dc59a1a..be2ba29a3 100644 --- a/lib/chewy/search.rb +++ b/lib/chewy/search.rb @@ -56,7 +56,7 @@ def search_string(query, options = {}) # # @example # PlacesIndex.query(match: {name: 'Moscow'}) - ruby2_keywords def method_missing(name, *args, &block) + def method_missing(name, *args, &block) if search_class::DELEGATED_METHODS.include?(name) all.send(name, *args, &block) else @@ -84,10 +84,12 @@ def build_search_class(base) def delegate_scoped(source, destination, methods) methods.each do |method| destination.class_eval do - define_method method do |*args, &block| - scoping { source.public_send(method, *args, &block) } + define_method method do |*args, **kwargs, &block| + scoping do + source.public_send(method, *args, **kwargs, &block) + end end - ruby2_keywords method + method end end end diff --git a/lib/chewy/search/parameters.rb b/lib/chewy/search/parameters.rb index 1f15e6168..3f55e0384 100644 --- a/lib/chewy/search/parameters.rb +++ b/lib/chewy/search/parameters.rb @@ -1,5 +1,5 @@ -Dir.glob(File.join(File.dirname(__FILE__), 'parameters', 'concerns', '*.rb')).sort.each { |f| require f } -Dir.glob(File.join(File.dirname(__FILE__), 'parameters', '*.rb')).sort.each { |f| require f } +Dir.glob(File.join(File.dirname(__FILE__), 'parameters', 'concerns', '*.rb')).each { |f| require f } +Dir.glob(File.join(File.dirname(__FILE__), 'parameters', '*.rb')).each { |f| require f } module Chewy module Search @@ -53,7 +53,7 @@ def initialize(initial = {}, **kinitial) # @param other [Object] any object # @return [true, false] def ==(other) - super || other.is_a?(self.class) && compare_storages(other) + super || (other.is_a?(self.class) && compare_storages(other)) end # Clones the specified storage, performs the operation diff --git a/lib/chewy/search/parameters/indices.rb b/lib/chewy/search/parameters/indices.rb index dc3dc28ca..d3a59b292 100644 --- a/lib/chewy/search/parameters/indices.rb +++ b/lib/chewy/search/parameters/indices.rb @@ -17,7 +17,7 @@ class Indices < Storage # @param other [Chewy::Search::Parameters::Storage] any storage instance # @return [true, false] the result of comparison def ==(other) - super || other.class == self.class && other.render == render + super || (other.class == self.class && other.render == render) end # Just adds indices to indices. diff --git a/lib/chewy/search/parameters/knn.rb b/lib/chewy/search/parameters/knn.rb new file mode 100644 index 000000000..fa3de772b --- /dev/null +++ b/lib/chewy/search/parameters/knn.rb @@ -0,0 +1,16 @@ +require 'chewy/search/parameters/storage' + +module Chewy + module Search + class Parameters + # Just a standard hash storage. Nothing to see here. + # + # @see Chewy::Search::Parameters::HashStorage + # @see Chewy::Search::Request#knn + # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/knn-search.html + class Knn < Storage + include HashStorage + end + end + end +end diff --git a/lib/chewy/search/parameters/storage.rb b/lib/chewy/search/parameters/storage.rb index 6fe00d601..e0effe4c3 100644 --- a/lib/chewy/search/parameters/storage.rb +++ b/lib/chewy/search/parameters/storage.rb @@ -35,7 +35,7 @@ def initialize(value = nil) # @param other [Chewy::Search::Parameters::Storage] any storage instance # @return [true, false] the result of comparision def ==(other) - super || other.class == self.class && other.value == value + super || (other.class == self.class && other.value == value) end # Replaces current value with normalized provided one. Doesn't diff --git a/lib/chewy/search/request.rb b/lib/chewy/search/request.rb index 687e8a093..27d5cdefe 100644 --- a/lib/chewy/search/request.rb +++ b/lib/chewy/search/request.rb @@ -20,7 +20,7 @@ class Request UNDEFINED = Class.new.freeze EVERFIELDS = %w[_index _type _id _parent _routing].freeze DELEGATED_METHODS = %i[ - query filter post_filter order reorder docvalue_fields + query filter post_filter knn order reorder docvalue_fields track_scores track_total_hits request_cache explain version profile search_type preference limit offset terminate_after timeout min_score source stored_fields search_after @@ -41,12 +41,12 @@ class Request EXTRA_STORAGES = %i[aggs suggest].freeze # An array of storage names that are changing the returned hist collection in any way. WHERE_STORAGES = %i[ - query filter post_filter none min_score rescore indices_boost collapse + query filter post_filter knn none min_score rescore indices_boost collapse ].freeze delegate :hits, :wrappers, :objects, :records, :documents, :object_hash, :record_hash, :document_hash, - :total, :max_score, :took, :timed_out?, to: :response + :total, :max_score, :took, :timed_out?, :terminated_early?, to: :response delegate :each, :size, :to_a, :[], to: :wrappers alias_method :to_ary, :to_a alias_method :total_count, :total @@ -520,7 +520,18 @@ def reorder(value, *values) # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/collapse-search-results.html # @param value [Hash] # @return [Chewy::Search::Request] - %i[request_cache search_type preference timeout limit offset terminate_after min_score ignore_unavailable collapse].each do |name| + # + # @!method knn(value) + # Replaces the value of the `knn` request part. + # + # @example + # PlacesIndex.knn(field: :vector, query_vector: [4, 2], k: 5, num_candidates: 50) + # # => {:knn=>{"field"=>:vector, "query_vector"=>[4, 2], "k"=>5, "num_candidates"=>50}}}> + # @see Chewy::Search::Parameters::Knn + # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/knn-search.html + # @param value [Hash] + # @return [Chewy::Search::Request] + %i[request_cache search_type preference timeout limit offset terminate_after min_score ignore_unavailable collapse knn].each do |name| define_method name do |value| modify(name) { replace!(value) } end @@ -843,7 +854,7 @@ def count else Chewy.client.count(only(WHERE_STORAGES).render)['count'] end - rescue Elasticsearch::Transport::Transport::Errors::NotFound + rescue Elastic::Transport::Transport::Errors::NotFound 0 end @@ -880,7 +891,7 @@ def exists? def first(limit = UNDEFINED) request_limit = limit == UNDEFINED ? 1 : limit - if performed? && (request_limit <= size || size == total) + if performed? && (terminated_early? || request_limit <= size || size == total) limit == UNDEFINED ? wrappers.first : wrappers.first(limit) else result = except(EXTRA_STORAGES).limit(request_limit).to_a @@ -962,10 +973,22 @@ def pluck(*fields) # # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html # @note The result hash is different for different API used. - # @param refresh [true, false] field names + # @param refresh [true, false] Refreshes all shards involved in the delete by query + # @param wait_for_completion [true, false] wait for request completion or run it asynchronously + # and return task reference at `.tasks/task/${taskId}`. + # @param requests_per_second [Float] The throttle for this request in sub-requests per second + # @param scroll_size [Integer] Size of the scroll request that powers the operation + # @return [Hash] the result of query execution - def delete_all(refresh: true) - request_body = only(WHERE_STORAGES).render.merge(refresh: refresh) + def delete_all(refresh: true, wait_for_completion: nil, requests_per_second: nil, scroll_size: nil) + request_body = only(WHERE_STORAGES).render.merge( + { + refresh: refresh, + wait_for_completion: wait_for_completion, + requests_per_second: requests_per_second, + scroll_size: scroll_size + }.compact + ) ActiveSupport::Notifications.instrument 'delete_query.chewy', notification_payload(request: request_body) do request_body[:body] = {query: {match_all: {}}} if request_body[:body].empty? Chewy.client.delete_by_query(request_body) @@ -1012,7 +1035,7 @@ def perform(additional = {}) request_body = render.merge(additional) ActiveSupport::Notifications.instrument 'search_query.chewy', notification_payload(request: request_body) do Chewy.client.search(request_body) - rescue Elasticsearch::Transport::Transport::Errors::NotFound + rescue Elastic::Transport::Transport::Errors::NotFound {} end end diff --git a/lib/chewy/search/response.rb b/lib/chewy/search/response.rb index 0a5becb24..5409a113d 100644 --- a/lib/chewy/search/response.rb +++ b/lib/chewy/search/response.rb @@ -47,6 +47,13 @@ def timed_out? @timed_out ||= @body['timed_out'] end + # Has the request been terminated early? + # + # @return [true, false] + def terminated_early? + @terminated_early ||= @body['terminated_early'] + end + # The `suggest` response part. Returns empty hash if suggests # were not requested. # diff --git a/lib/chewy/search/scrolling.rb b/lib/chewy/search/scrolling.rb index 6074b3252..f0c738374 100644 --- a/lib/chewy/search/scrolling.rb +++ b/lib/chewy/search/scrolling.rb @@ -39,7 +39,8 @@ def scroll_batches(batch_size: Request::DEFAULT_BATCH_SIZE, scroll: Request::DEF hits = hits.first(last_batch_size) if last_batch_size != 0 && fetched >= total yield(hits) if hits.present? scroll_id = result['_scroll_id'] - break if fetched >= total + + break if result['terminated_early'] || fetched >= total result = perform_scroll(scroll: scroll, scroll_id: scroll_id) end diff --git a/lib/chewy/stash.rb b/lib/chewy/stash.rb index 2181ee492..d49957d50 100644 --- a/lib/chewy/stash.rb +++ b/lib/chewy/stash.rb @@ -28,12 +28,12 @@ def self.entries(since_time, only: []) # Cleans up all the journal entries until the specified time. If nothing is # specified - cleans up everything. # - # @param since_time [Time, DateTime] the time top boundary + # @param until_time [Time, DateTime] Clean everything before that date # @param only [Chewy::Index, Array] indexes to clean up journal entries for - def self.clean(until_time = nil, only: []) + def self.clean(until_time = nil, only: [], delete_by_query_options: {}) scope = self.for(only) scope = scope.filter(range: {created_at: {lte: until_time}}) if until_time - scope.delete_all + scope.delete_all(**delete_by_query_options) end # Selects all the journal entries for the specified indices. diff --git a/lib/chewy/strategy.rb b/lib/chewy/strategy.rb index 83baadb24..a8c6c5dfa 100644 --- a/lib/chewy/strategy.rb +++ b/lib/chewy/strategy.rb @@ -8,6 +8,7 @@ require 'sidekiq' require 'chewy/strategy/sidekiq' require 'chewy/strategy/lazy_sidekiq' + require 'chewy/strategy/delayed_sidekiq' rescue LoadError nil end diff --git a/lib/chewy/strategy/delayed_sidekiq.rb b/lib/chewy/strategy/delayed_sidekiq.rb new file mode 100644 index 000000000..23b373b2d --- /dev/null +++ b/lib/chewy/strategy/delayed_sidekiq.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +module Chewy + class Strategy + class DelayedSidekiq < Sidekiq + require_relative 'delayed_sidekiq/scheduler' + + # cleanup the redis sets used internally. Useful mainly in tests to avoid + # leak and potential flaky tests. + def self.clear_timechunks! + ::Sidekiq.redis do |redis| + keys_to_delete = redis.keys("#{Scheduler::KEY_PREFIX}*") + + # Delete keys one by one + keys_to_delete.each do |key| + redis.del(key) + end + end + end + + def leave + @stash.each do |type, ids| + next if ids.empty? + + DelayedSidekiq::Scheduler.new(type, ids).postpone + end + end + end + end +end diff --git a/lib/chewy/strategy/delayed_sidekiq/scheduler.rb b/lib/chewy/strategy/delayed_sidekiq/scheduler.rb new file mode 100644 index 000000000..d931c338b --- /dev/null +++ b/lib/chewy/strategy/delayed_sidekiq/scheduler.rb @@ -0,0 +1,168 @@ +# frozen_string_literal: true + +require_relative '../../index' + +# The class is responsible for accumulating in redis [type, ids] +# that were requested to be reindexed during `latency` seconds. +# The reindex job is going to be scheduled after a `latency` seconds. +# that job is going to read accumulated [type, ids] from the redis +# and reindex all them at once. +module Chewy + class Strategy + class DelayedSidekiq + require_relative 'worker' + + LUA_SCRIPT = <<~LUA + local timechunk_key = KEYS[1] + local timechunks_key = KEYS[2] + local serialize_data = ARGV[1] + local at = ARGV[2] + local ttl = tonumber(ARGV[3]) + + local schedule_job = false + + -- Check if the 'sadd?' method is available + if redis.call('exists', 'sadd?') == 1 then + redis.call('sadd?', timechunk_key, serialize_data) + else + redis.call('sadd', timechunk_key, serialize_data) + end + + -- Set expiration for timechunk_key + redis.call('expire', timechunk_key, ttl) + + -- Check if timechunk_key exists in the sorted set + if not redis.call('zrank', timechunks_key, timechunk_key) then + -- Add timechunk_key to the sorted set + redis.call('zadd', timechunks_key, at, timechunk_key) + -- Set expiration for timechunks_key + redis.call('expire', timechunks_key, ttl) + schedule_job = true + end + + return schedule_job + LUA + + class Scheduler + DEFAULT_TTL = 60 * 60 * 24 # in seconds + DEFAULT_LATENCY = 10 + DEFAULT_MARGIN = 2 + DEFAULT_QUEUE = 'chewy' + KEY_PREFIX = 'chewy:delayed_sidekiq' + FALLBACK_FIELDS = 'all' + FIELDS_IDS_SEPARATOR = ';' + IDS_SEPARATOR = ',' + + def initialize(type, ids, options = {}) + @type = type + @ids = ids + @options = options + end + + # the diagram: + # + # inputs: + # latency == 2 + # reindex_time = Time.current + # + # Parallel OR Sequential triggers of reindex: | What is going on in reindex store (Redis): + # -------------------------------------------------------------------------------------------------- + # | + # process 1 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1] + # Schedule.new(CitiesIndex, [1]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] + # | & schedule a DelayedSidekiq::Worker at 1679347869 (at + 3) + # | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347866 score and reindex all ids with zpoped keys + # | chewy:delayed_sidekiq:CitiesIndex:1679347866 + # | + # | + # process 2 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2] + # Schedule.new(CitiesIndex, [2]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] + # | & do not schedule a new worker + # | + # | + # process 1 (reindex_time + (latency - 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3] + # Schedule.new(CitiesIndex, [3]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] + # | & do not schedule a new worker + # | + # | + # process 2 (reindex_time + (latency + 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3] + # Schedule.new(CitiesIndex, [4]).postpone | chewy:delayed_sidekiq:CitiesIndex:1679347868 = [4] + # | chewy:delayed_sidekiq:timechunks = [ + # | { score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"} + # | { score: 1679347868, "chewy:delayed_sidekiq:CitiesIndex:1679347868"} + # | ] + # | & schedule a DelayedSidekiq::Worker at 1679347871 (at + 3) + # | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347868 score and reindex all ids with zpoped keys + # | chewy:delayed_sidekiq:CitiesIndex:1679347866 (in case of failed previous reindex), + # | chewy:delayed_sidekiq:CitiesIndex:1679347868 + def postpone + ::Sidekiq.redis do |redis| + # do the redis stuff in a single command to avoid concurrency issues + if redis.eval(LUA_SCRIPT, keys: [timechunk_key, timechunks_key], argv: [serialize_data, at, ttl]) + ::Sidekiq::Client.push( + 'queue' => sidekiq_queue, + 'at' => at + margin, + 'class' => Chewy::Strategy::DelayedSidekiq::Worker, + 'args' => [type_name, at] + ) + end + end + end + + private + + attr_reader :type, :ids, :options + + # this method returns predictable value that jumps by latency value + # another words each latency seconds it return the same value + def at + @at ||= begin + schedule_at = latency.seconds.from_now.to_f + + (schedule_at - (schedule_at % latency)).to_i + end + end + + def fields + options[:update_fields].presence || [FALLBACK_FIELDS] + end + + def timechunks_key + "#{KEY_PREFIX}:#{type_name}:timechunks" + end + + def timechunk_key + "#{KEY_PREFIX}:#{type_name}:#{at}" + end + + def serialize_data + [ids.join(IDS_SEPARATOR), fields.join(IDS_SEPARATOR)].join(FIELDS_IDS_SEPARATOR) + end + + def type_name + type.name + end + + def latency + strategy_config.latency || DEFAULT_LATENCY + end + + def margin + strategy_config.margin || DEFAULT_MARGIN + end + + def ttl + strategy_config.ttl || DEFAULT_TTL + end + + def sidekiq_queue + Chewy.settings.dig(:sidekiq, :queue) || DEFAULT_QUEUE + end + + def strategy_config + type.strategy_config.delayed_sidekiq + end + end + end + end +end diff --git a/lib/chewy/strategy/delayed_sidekiq/worker.rb b/lib/chewy/strategy/delayed_sidekiq/worker.rb new file mode 100644 index 000000000..7b539c3ca --- /dev/null +++ b/lib/chewy/strategy/delayed_sidekiq/worker.rb @@ -0,0 +1,76 @@ +# frozen_string_literal: true + +module Chewy + class Strategy + class DelayedSidekiq + class Worker + include ::Sidekiq::Worker + + LUA_SCRIPT = <<~LUA + local type = ARGV[1] + local score = tonumber(ARGV[2]) + local prefix = ARGV[3] + local timechunks_key = prefix .. ":" .. type .. ":timechunks" + + -- Get timechunk_keys with scores less than or equal to the specified score + local timechunk_keys = redis.call('zrangebyscore', timechunks_key, '-inf', score) + + -- Get all members from the sets associated with the timechunk_keys + local members = {} + for _, timechunk_key in ipairs(timechunk_keys) do + local set_members = redis.call('smembers', timechunk_key) + for _, member in ipairs(set_members) do + table.insert(members, member) + end + end + + -- Remove timechunk_keys and their associated sets + for _, timechunk_key in ipairs(timechunk_keys) do + redis.call('del', timechunk_key) + end + + -- Remove timechunks with scores less than or equal to the specified score + redis.call('zremrangebyscore', timechunks_key, '-inf', score) + + return members + LUA + + def perform(type, score, options = {}) + options[:refresh] = !Chewy.disable_refresh_async if Chewy.disable_refresh_async + + ::Sidekiq.redis do |redis| + members = redis.eval(LUA_SCRIPT, keys: [], argv: [type, score, Scheduler::KEY_PREFIX]) + + # extract ids and fields & do the reset of records + ids, fields = extract_ids_and_fields(members) + options[:update_fields] = fields if fields + + index = type.constantize + index.strategy_config.delayed_sidekiq.reindex_wrapper.call do + options.any? ? index.import!(ids, **options) : index.import!(ids) + end + end + end + + private + + def extract_ids_and_fields(members) + ids = [] + fields = [] + + members.each do |member| + member_ids, member_fields = member.split(Scheduler::FIELDS_IDS_SEPARATOR).map do |v| + v.split(Scheduler::IDS_SEPARATOR) + end + ids |= member_ids + fields |= member_fields + end + + fields = nil if fields.include?(Scheduler::FALLBACK_FIELDS) + + [ids, fields] + end + end + end + end +end diff --git a/lib/chewy/version.rb b/lib/chewy/version.rb index 5b896f45c..6ba954467 100644 --- a/lib/chewy/version.rb +++ b/lib/chewy/version.rb @@ -1,3 +1,3 @@ module Chewy - VERSION = '7.2.6'.freeze + VERSION = '8.0.0-beta'.freeze end diff --git a/lib/tasks/chewy.rake b/lib/tasks/chewy.rake index 5cf4ef17c..636ffb871 100644 --- a/lib/tasks/chewy.rake +++ b/lib/tasks/chewy.rake @@ -57,6 +57,11 @@ namespace :chewy do Chewy::RakeHelper.update_mapping(name: args[:index_name]) end + desc 'Creates missing indexes' + task create_missing_indexes: :environment do + Chewy::RakeHelper.create_missing_indexes! + end + namespace :parallel do desc 'Parallel version of `rake chewy:reset`' task reset: :environment do |_task, args| @@ -87,6 +92,11 @@ namespace :chewy do end namespace :journal do + desc 'Create manually journal, useful when `skip_journal_creation_on_import` is used' + task create: :environment do |_task, _args| + Chewy::RakeHelper.journal_create + end + desc 'Applies changes that were done after the specified time for the specified indexes/types or all of them' task apply: :environment do |_task, args| Chewy::RakeHelper.journal_apply(**parse_journal_args(args.extras)) @@ -94,7 +104,13 @@ namespace :chewy do desc 'Removes journal records created before the specified timestamp for the specified indexes/types or all of them' task clean: :environment do |_task, args| - Chewy::RakeHelper.journal_clean(**parse_journal_args(args.extras)) + delete_options = Chewy::RakeHelper.delete_by_query_options_from_env(ENV) + Chewy::RakeHelper.journal_clean( + **[ + parse_journal_args(args.extras), + {delete_by_query_options: delete_options} + ].reduce({}, :merge) + ) end end end diff --git a/spec/chewy/elastic_client_spec.rb b/spec/chewy/elastic_client_spec.rb new file mode 100644 index 000000000..58cdc0cc7 --- /dev/null +++ b/spec/chewy/elastic_client_spec.rb @@ -0,0 +1,26 @@ +require 'spec_helper' + +describe Chewy::ElasticClient do + describe 'payload inspection' do + let(:filter) { instance_double('Proc') } + let!(:filter_previous_value) { Chewy.before_es_request_filter } + + before do + drop_indices + stub_index(:products) do + field :id, type: :integer + end + ProductsIndex.create + Chewy.before_es_request_filter = filter + end + + after do + Chewy.before_es_request_filter = filter_previous_value + end + + it 'call filter with the request body' do + expect(filter).to receive(:call).with(:search, [{body: {size: 0}, index: ['products']}], {}) + Chewy.client.search({index: ['products'], body: {size: 0}}).to_a + end + end +end diff --git a/spec/chewy/fields/base_spec.rb b/spec/chewy/fields/base_spec.rb index bd8149bd6..cf82bde2a 100644 --- a/spec/chewy/fields/base_spec.rb +++ b/spec/chewy/fields/base_spec.rb @@ -5,7 +5,7 @@ specify { expect(described_class.new('name', type: 'integer').options[:type]).to eq('integer') } describe '#compose' do - let(:field) { described_class.new(:name, value: ->(o) { o.value }) } + let(:field) { described_class.new(:name, value: lambda(&:value)) } specify { expect(field.compose(double(value: 'hello'))).to eq(name: 'hello') } specify { expect(field.compose(double(value: %w[hello world]))).to eq(name: %w[hello world]) } @@ -23,7 +23,7 @@ context 'nested fields' do before do - field.children.push(described_class.new(:subname1, value: ->(o) { o.subvalue1 })) + field.children.push(described_class.new(:subname1, value: lambda(&:subvalue1))) field.children.push(described_class.new(:subname2, value: -> { subvalue2 })) field.children.push(described_class.new(:subname3)) end diff --git a/spec/chewy/fields/time_fields_spec.rb b/spec/chewy/fields/time_fields_spec.rb index 31aef9777..a9ddc8c45 100644 --- a/spec/chewy/fields/time_fields_spec.rb +++ b/spec/chewy/fields/time_fields_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe 'Time fields' do - before { Chewy.massacre } + before { drop_indices } before do stub_index(:posts) do diff --git a/spec/chewy/index/actions_spec.rb b/spec/chewy/index/actions_spec.rb index 0ca812d92..27d5682b4 100644 --- a/spec/chewy/index/actions_spec.rb +++ b/spec/chewy/index/actions_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe Chewy::Index::Actions do - before { Chewy.massacre } + before { drop_indices } before do stub_index :dummies @@ -78,12 +78,12 @@ specify do expect do DummiesIndex.create! - end.to raise_error(Elasticsearch::Transport::Transport::Errors::BadRequest).with_message(/already exists.*dummies/) + end.to raise_error(Elastic::Transport::Transport::Errors::BadRequest).with_message(/already exists.*dummies/) end specify do expect do DummiesIndex.create!('2013') - end.to raise_error(Elasticsearch::Transport::Transport::Errors::BadRequest).with_message(/Invalid alias name \[dummies\]/) + end.to raise_error(Elastic::Transport::Transport::Errors::BadRequest).with_message(/Invalid alias name \[dummies\]/) end end @@ -100,7 +100,7 @@ specify do expect do DummiesIndex.create!('2013') - end.to raise_error(Elasticsearch::Transport::Transport::Errors::BadRequest).with_message(/already exists.*dummies_2013/) + end.to raise_error(Elastic::Transport::Transport::Errors::BadRequest).with_message(/already exists.*dummies_2013/) end specify { expect(DummiesIndex.create!('2014')['acknowledged']).to eq(true) } @@ -190,11 +190,11 @@ end describe '.delete!' do - specify { expect { DummiesIndex.delete! }.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) } + specify { expect { DummiesIndex.delete! }.to raise_error(Elastic::Transport::Transport::Errors::NotFound) } specify do expect do DummiesIndex.delete!('2013') - end.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + end.to raise_error(Elastic::Transport::Transport::Errors::NotFound) end context do @@ -610,7 +610,7 @@ specify 'with journal application' do cities p 'cities created1' - ::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base) + ActiveRecord::Base.connection.close if defined?(ActiveRecord::Base) [ parallel_update, Thread.new do @@ -619,7 +619,7 @@ p 'end reset1' end ].map(&:join) - ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base) + ActiveRecord::Base.connection.reconnect! if defined?(ActiveRecord::Base) p 'expect1' expect(CitiesIndex::City.pluck(:_id, :name)).to contain_exactly(%w[1 NewName1], %w[2 Name2], %w[3 NewName3]) p 'end expect1' @@ -628,7 +628,7 @@ specify 'without journal application' do cities p 'cities created2' - ::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base) + ActiveRecord::Base.connection.close if defined?(ActiveRecord::Base) [ parallel_update, Thread.new do @@ -637,7 +637,7 @@ p 'end reset2' end ].map(&:join) - ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base) + ActiveRecord::Base.connection.reconnect! if defined?(ActiveRecord::Base) p 'expect2' expect(CitiesIndex::City.pluck(:_id, :name)).to contain_exactly(%w[1 Name1], %w[2 Name2], %w[3 Name3]) p 'end expect2' @@ -768,7 +768,7 @@ .to receive(:clear_cache) .and_call_original expect { CitiesIndex.clear_cache({index: unexisted_index_name}) } - .to raise_error Elasticsearch::Transport::Transport::Errors::NotFound + .to raise_error Elastic::Transport::Transport::Errors::NotFound end end @@ -820,7 +820,7 @@ .to receive(:reindex) .and_call_original expect { CitiesIndex.reindex(source: unexisting_index, dest: dest_index_with_prefix) } - .to raise_error Elasticsearch::Transport::Transport::Errors::NotFound + .to raise_error Elastic::Transport::Transport::Errors::NotFound end end @@ -883,7 +883,7 @@ context 'index name' do specify do expect { CitiesIndex.update_mapping(unexisting_index, body_hash) } - .to raise_error Elasticsearch::Transport::Transport::Errors::NotFound + .to raise_error Elastic::Transport::Transport::Errors::NotFound end end diff --git a/spec/chewy/index/adapter/active_record_spec.rb b/spec/chewy/index/adapter/active_record_spec.rb index 4f2f8b9ab..062f17cb3 100644 --- a/spec/chewy/index/adapter/active_record_spec.rb +++ b/spec/chewy/index/adapter/active_record_spec.rb @@ -35,6 +35,68 @@ def rating specify { expect(described_class.new(City.where(rating: 10)).default_scope).to eq(City.where(rating: 10)) } end + describe '.new' do + context 'with logger' do + let(:test_logger) { Logger.new('/dev/null') } + let(:default_scope_behavior) { :warn } + + around do |example| + previous_logger = Chewy.logger + Chewy.logger = test_logger + + previous_default_scope_behavior = Chewy.config.import_scope_cleanup_behavior + Chewy.config.import_scope_cleanup_behavior = default_scope_behavior + + example.run + ensure + Chewy.logger = previous_logger + Chewy.config.import_scope_cleanup_behavior = previous_default_scope_behavior + end + + specify do + expect(test_logger).to receive(:warn) + described_class.new(City.order(:id)) + end + + specify do + expect(test_logger).to receive(:warn) + described_class.new(City.offset(10)) + end + + specify do + expect(test_logger).to receive(:warn) + described_class.new(City.limit(10)) + end + + context 'ignore import scope warning' do + let(:default_scope_behavior) { :ignore } + + specify do + expect(test_logger).not_to receive(:warn) + described_class.new(City.order(:id)) + end + + specify do + expect(test_logger).not_to receive(:warn) + described_class.new(City.offset(10)) + end + + specify do + expect(test_logger).not_to receive(:warn) + described_class.new(City.limit(10)) + end + end + + context 'raise exception on import scope with order/limit/offset' do + let(:default_scope_behavior) { :raise } + + specify { expect { described_class.new(City.order(:id)) }.to raise_error(Chewy::ImportScopeCleanupError) } + specify { expect { described_class.new(City.limit(10)) }.to raise_error(Chewy::ImportScopeCleanupError) } + specify { expect { described_class.new(City.offset(10)) }.to raise_error(Chewy::ImportScopeCleanupError) } + end + end + end + describe '#type_name' do specify { expect(described_class.new(City).type_name).to eq('city') } specify { expect(described_class.new(City.order(:id)).type_name).to eq('city') } diff --git a/spec/chewy/index/aliases_spec.rb b/spec/chewy/index/aliases_spec.rb index dc29e1189..ad5419c7d 100644 --- a/spec/chewy/index/aliases_spec.rb +++ b/spec/chewy/index/aliases_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe Chewy::Index::Aliases do - before { Chewy.massacre } + before { drop_indices } before { stub_index :dummies } diff --git a/spec/chewy/index/import/bulk_builder_spec.rb b/spec/chewy/index/import/bulk_builder_spec.rb index c4b21ae22..5fe598bed 100644 --- a/spec/chewy/index/import/bulk_builder_spec.rb +++ b/spec/chewy/index/import/bulk_builder_spec.rb @@ -17,7 +17,7 @@ def derived end describe Chewy::Index::Import::BulkBuilder do - before { Chewy.massacre } + before { drop_indices } subject { described_class.new(index, to_index: to_index, delete: delete, fields: fields) } let(:index) { CitiesIndex } @@ -62,6 +62,8 @@ def derived let(:to_index) { cities.first(2) } let(:delete) { [cities.last] } specify do + expect(subject).to receive(:data_for).with(cities.first).and_call_original + expect(subject).to receive(:data_for).with(cities.second).and_call_original expect(subject.bulk_body).to eq([ {index: {_id: 1, data: {'name' => 'City17', 'rating' => 42}}}, {index: {_id: 2, data: {'name' => 'City18', 'rating' => 42}}}, @@ -72,6 +74,8 @@ def derived context ':fields' do let(:fields) { %w[name] } specify do + expect(subject).to receive(:data_for).with(cities.first, fields: [:name]).and_call_original + expect(subject).to receive(:data_for).with(cities.second, fields: [:name]).and_call_original expect(subject.bulk_body).to eq([ {update: {_id: 1, data: {doc: {'name' => 'City17'}}}}, {update: {_id: 2, data: {doc: {'name' => 'City18'}}}}, @@ -128,7 +132,7 @@ def derived before do stub_index(:cities) do crutch :names do |collection| - collection.map { |item| [item.id, "Name#{item.id}"] }.to_h + collection.to_h { |item| [item.id, "Name#{item.id}"] } end field :name, value: ->(o, c) { c.names[o.id] } @@ -194,7 +198,7 @@ def derived index_scope Comment crutch :content_with_crutches do |collection| # collection here is a current batch of products - collection.map { |comment| [comment.id, "[crutches] #{comment.content}"] }.to_h + collection.to_h { |comment| [comment.id, "[crutches] #{comment.content}"] } end field :content @@ -212,7 +216,7 @@ def derived end def do_raw_index_comment(options:, data:) - CommentsIndex.client.index(options.merge(index: 'comments', type: '_doc', refresh: true, body: data)) + CommentsIndex.client.index(options.merge(index: 'comments', refresh: true, body: data)) end def raw_index_comment(comment) @@ -268,7 +272,7 @@ def root(comment) default_import_options raw_import: ->(hash) { SimpleComment.new(hash) } crutch :content_with_crutches do |collection| # collection here is a current batch of products - collection.map { |comment| [comment.id, "[crutches] #{comment.content}"] }.to_h + collection.to_h { |comment| [comment.id, "[crutches] #{comment.content}"] } end field :content diff --git a/spec/chewy/index/import/bulk_request_spec.rb b/spec/chewy/index/import/bulk_request_spec.rb index 0869804a1..bf33d2b16 100644 --- a/spec/chewy/index/import/bulk_request_spec.rb +++ b/spec/chewy/index/import/bulk_request_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe Chewy::Index::Import::BulkRequest do - before { Chewy.massacre } + before { drop_indices } subject { described_class.new(index, suffix: suffix, bulk_size: bulk_size, **bulk_options) } let(:suffix) {} diff --git a/spec/chewy/index/import/routine_spec.rb b/spec/chewy/index/import/routine_spec.rb index 5d1064b7b..55588e314 100644 --- a/spec/chewy/index/import/routine_spec.rb +++ b/spec/chewy/index/import/routine_spec.rb @@ -2,7 +2,7 @@ # TODO: add more specs here later describe Chewy::Index::Import::Routine do - before { Chewy.massacre } + before { drop_indices } before do stub_index(:cities) do field :name diff --git a/spec/chewy/index/import_spec.rb b/spec/chewy/index/import_spec.rb index ddd0cd76b..b200a4599 100644 --- a/spec/chewy/index/import_spec.rb +++ b/spec/chewy/index/import_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe Chewy::Index::Import do - before { Chewy.massacre } + before { drop_indices } before do stub_model(:city) @@ -60,6 +60,19 @@ def subscribe_notification CitiesIndex.import(dummy_city) end end + + context 'skip journal creation on import' do + before do + Chewy::Stash::Journal.create! + Chewy.config.settings[:skip_journal_creation_on_import] = true + end + after { Chewy.config.settings[:skip_journal_creation_on_import] = nil } + + specify do + expect(Chewy::Stash::Journal).not_to receive(:create!) + CitiesIndex.import(dummy_city, journal: true) + end + end end shared_examples 'importing' do @@ -191,10 +204,10 @@ def subscribe_notification end end - let(:mapper_parsing_exception) do + let(:document_parsing_exception) do { - 'type' => 'mapper_parsing_exception', - 'reason' => 'object mapping for [name] tried to parse field [name] as object, but found a concrete value' + 'type' => 'document_parsing_exception', + 'reason' => '[1:9] object mapping for [name] tried to parse field [name] as object, but found a concrete value' } end @@ -202,8 +215,8 @@ def subscribe_notification payload = subscribe_notification import dummy_cities, batch_size: 2 expect(payload).to eq(index: CitiesIndex, - errors: {index: {mapper_parsing_exception => %w[1 2 3]}}, - import: {index: 3}) + errors: {index: {document_parsing_exception => %w[1 2 3]}}, + import: {index: 3}) end end end @@ -257,8 +270,8 @@ def subscribe_notification expect(payload).to eq( errors: { index: {{ - 'type' => 'mapper_parsing_exception', - 'reason' => 'object mapping for [object] tried to parse field [object] as object, but found a concrete value' + 'type' => 'document_parsing_exception', + 'reason' => '[1:27] object mapping for [object] tried to parse field [object] as object, but found a concrete value' } => %w[2 4]} }, import: {index: 6}, @@ -280,8 +293,8 @@ def subscribe_notification expect(payload).to eq( errors: { index: {{ - 'type' => 'mapper_parsing_exception', - 'reason' => 'object mapping for [object] tried to parse field [object] as object, but found a concrete value' + 'type' => 'document_parsing_exception', + 'reason' => '[1:27] object mapping for [object] tried to parse field [object] as object, but found a concrete value' } => %w[2 4]} }, import: {index: 6}, @@ -306,8 +319,8 @@ def subscribe_notification expect(payload).to eq( errors: { index: {{ - 'type' => 'mapper_parsing_exception', - 'reason' => 'object mapping for [object] tried to parse field [object] as object, but found a concrete value' + 'type' => 'document_parsing_exception', + 'reason' => '[1:27] object mapping for [object] tried to parse field [object] as object, but found a concrete value' } => %w[2 4]} }, import: {index: 6}, @@ -370,8 +383,8 @@ def subscribe_notification # Full match doesn't work here. expect(payload[:errors][:update].keys).to match([ - hash_including('type' => 'document_missing_exception', 'reason' => '[_doc][1]: document missing'), - hash_including('type' => 'document_missing_exception', 'reason' => '[_doc][3]: document missing') + hash_including('type' => 'document_missing_exception', 'reason' => '[1]: document missing'), + hash_including('type' => 'document_missing_exception', 'reason' => '[3]: document missing') ]) expect(payload[:errors][:update].values).to eq([['1'], ['3']]) expect(imported_cities).to match_array([ @@ -418,8 +431,8 @@ def subscribe_notification expect(payload).to eq( errors: { update: {{ - 'type' => 'mapper_parsing_exception', - 'reason' => 'object mapping for [object] tried to parse field [object] as object, but found a concrete value' + 'type' => 'document_parsing_exception', + 'reason' => '[1:26] object mapping for [object] tried to parse field [object] as object, but found a concrete value' } => %w[2 4]} }, import: {index: 6}, @@ -554,7 +567,7 @@ def imported_comments before do stub_index(:cities) do crutch :names do |collection| - collection.map { |o| [o.name, "#{o.name}42"] }.to_h + collection.to_h { |o| [o.name, "#{o.name}42"] } end field :name, value: ->(o, c) { c.names[o.name] } field :rating diff --git a/spec/chewy/index/observe/callback_spec.rb b/spec/chewy/index/observe/callback_spec.rb index 825e3b405..84cd6219b 100644 --- a/spec/chewy/index/observe/callback_spec.rb +++ b/spec/chewy/index/observe/callback_spec.rb @@ -21,7 +21,7 @@ end context 'when executable is has arity 1' do - let(:executable) { ->(record) { record.population } } + let(:executable) { lambda(&:population) } it 'calls exectuable within context' do expect(callback.call(city)).to eq(city.population) diff --git a/spec/chewy/index/specification_spec.rb b/spec/chewy/index/specification_spec.rb index 68a34b8d5..cd1b3bd7b 100644 --- a/spec/chewy/index/specification_spec.rb +++ b/spec/chewy/index/specification_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe Chewy::Index::Specification do - before { Chewy.massacre } + before { drop_indices } let(:index1) do stub_index(:places) do @@ -46,7 +46,6 @@ specify do expect { specification1.lock! }.to change { Chewy::Stash::Specification.all.hits }.from([]).to([{ '_index' => 'chewy_specifications', - '_type' => '_doc', '_id' => 'places', '_score' => 1.0, '_source' => {'specification' => Base64.encode64({ @@ -62,7 +61,6 @@ specify do expect { specification5.lock! }.to change { Chewy::Stash::Specification.all.hits }.to([{ '_index' => 'chewy_specifications', - '_type' => '_doc', '_id' => 'places', '_score' => 1.0, '_source' => {'specification' => Base64.encode64({ @@ -71,7 +69,6 @@ }.to_json)} }, { '_index' => 'chewy_specifications', - '_type' => '_doc', '_id' => 'namespace/cities', '_score' => 1.0, '_source' => {'specification' => Base64.encode64({ diff --git a/spec/chewy/index/syncer_spec.rb b/spec/chewy/index/syncer_spec.rb index e71617f04..176cf047a 100644 --- a/spec/chewy/index/syncer_spec.rb +++ b/spec/chewy/index/syncer_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe Chewy::Index::Syncer, :orm do - before { Chewy.massacre } + before { drop_indices } before do stub_model(:city) stub_index(:cities) do diff --git a/spec/chewy/index_spec.rb b/spec/chewy/index_spec.rb index 57565861b..d96d1bf6f 100644 --- a/spec/chewy/index_spec.rb +++ b/spec/chewy/index_spec.rb @@ -177,7 +177,7 @@ def self.by_id; end context do before do - Chewy.massacre + drop_indices PlacesIndex.import!( double(colors: ['red']), double(colors: %w[red green]), diff --git a/spec/chewy/journal_spec.rb b/spec/chewy/journal_spec.rb index 3d518be09..e392f8aba 100644 --- a/spec/chewy/journal_spec.rb +++ b/spec/chewy/journal_spec.rb @@ -21,7 +21,7 @@ default_import_options journal: true end - Chewy.massacre + drop_indices Chewy.settings[:prefix] = 'some_prefix' Timecop.freeze(time) end @@ -145,7 +145,7 @@ def timestamp(time) end context do - before { Chewy.massacre } + before { drop_indices } before do stub_model(:city) do update_index 'cities', :self diff --git a/spec/chewy/minitest/helpers_spec.rb b/spec/chewy/minitest/helpers_spec.rb index 98fd0f08e..c411a14e4 100644 --- a/spec/chewy/minitest/helpers_spec.rb +++ b/spec/chewy/minitest/helpers_spec.rb @@ -10,14 +10,14 @@ def assert_includes(haystack, needle, _comment) expect(haystack).to include(needle) end - include ::Chewy::Minitest::Helpers + include Chewy::Minitest::Helpers def assert_equal(expected, actual, message) raise message unless expected == actual end before do - Chewy.massacre + drop_indices end before do @@ -32,14 +32,14 @@ def assert_equal(expected, actual, message) { '_index' => 'dummies', '_type' => '_doc', - '_id' => '1', + '_id' => '2', '_score' => 3.14, '_source' => source } ] end - let(:source) { {'name' => 'some_name'} } + let(:source) { {'name' => 'some_name', id: '2'} } let(:sources) { [source] } context 'mocks by raw response' do diff --git a/spec/chewy/minitest/search_index_receiver_spec.rb b/spec/chewy/minitest/search_index_receiver_spec.rb index 213d46de8..d93b97f41 100644 --- a/spec/chewy/minitest/search_index_receiver_spec.rb +++ b/spec/chewy/minitest/search_index_receiver_spec.rb @@ -24,6 +24,8 @@ def parse_request(request) SearchIndexReceiver.new end + let(:dummy_class) { Struct.new(:id) } + before do stub_index(:dummies) do root value: ->(_o) { {} } @@ -82,12 +84,12 @@ def parse_request(request) end specify 'validates that an object was indexed' do - dummy = OpenStruct.new(id: 1) + dummy = dummy_class.new(1) expect(receiver.indexed?(dummy, DummiesIndex)).to be(true) end specify 'doesn\'t validate than unindexed objects were indexed' do - dummy = OpenStruct.new(id: 2) + dummy = dummy_class.new(2) expect(receiver.indexed?(dummy, DummiesIndex)).to be(false) end end @@ -98,12 +100,12 @@ def parse_request(request) end specify 'validates than an object was deleted' do - dummy = OpenStruct.new(id: 1) + dummy = dummy_class.new(1) expect(receiver.deleted?(dummy, DummiesIndex)).to be(true) end specify 'doesn\'t validate than undeleted objects were deleted' do - dummy = OpenStruct.new(id: 2) + dummy = dummy_class.new(2) expect(receiver.deleted?(dummy, DummiesIndex)).to be(false) end end diff --git a/spec/chewy/multi_search_spec.rb b/spec/chewy/multi_search_spec.rb index 8ca1e2cd2..2e7afaefb 100644 --- a/spec/chewy/multi_search_spec.rb +++ b/spec/chewy/multi_search_spec.rb @@ -2,7 +2,7 @@ require 'chewy/multi_search' describe Chewy::MultiSearch do - before { Chewy.massacre } + before { drop_indices } before do stub_model(:city) diff --git a/spec/chewy/rake_helper_spec.rb b/spec/chewy/rake_helper_spec.rb index d9754ef68..353164217 100644 --- a/spec/chewy/rake_helper_spec.rb +++ b/spec/chewy/rake_helper_spec.rb @@ -1,7 +1,8 @@ require 'spec_helper' +require 'rake' describe Chewy::RakeHelper, :orm do - before { Chewy.massacre } + before { drop_indices } before do described_class.instance_variable_set(:@journal_exists, journal_exists) @@ -104,10 +105,10 @@ expect { described_class.reset(only: [CitiesIndex], output: output) } .to update_index(CitiesIndex) expect(output.string).to include( - "############################################################\n"\ - "WARN: You are risking to lose some changes during the reset.\n" \ - " Please consider enabling journaling.\n" \ - ' See https://github.com/toptal/chewy#journaling' + "############################################################\n" \ + "WARN: You are risking to lose some changes during the reset.\n " \ + "Please consider enabling journaling.\n " \ + 'See https://github.com/toptal/chewy#journaling' ) end end @@ -426,6 +427,108 @@ described_class.journal_clean(except: CitiesIndex, output: output) expect(output.string).to match(Regexp.new(<<-OUTPUT, Regexp::MULTILINE)) \\ACleaned up 1 journal entries +Total: \\d+s\\Z + OUTPUT + end + + it 'executes asynchronously' do + output = StringIO.new + expect(Chewy.client).to receive(:delete_by_query).with( + { + body: {query: {match_all: {}}}, + index: ['chewy_journal'], + refresh: false, + requests_per_second: 10.0, + scroll_size: 200, + wait_for_completion: false + } + ).and_call_original + described_class.journal_clean( + output: output, + delete_by_query_options: { + wait_for_completion: false, + requests_per_second: 10.0, + scroll_size: 200 + } + ) + + expect(output.string).to match(Regexp.new(<<-OUTPUT, Regexp::MULTILINE)) +\\ATask to cleanup the journal has been created, [^\\n]* +Total: \\d+s\\Z + OUTPUT + end + + context 'execute "chewy:journal:clean" rake task' do + subject(:task) { Rake.application['chewy:journal:clean'] } + before do + Rake::DefaultLoader.new.load('lib/tasks/chewy.rake') + Rake::Task.define_task(:environment) + end + it 'does not raise error' do + expect { task.invoke }.to_not raise_error + end + end + end + + describe '.create_missing_indexes!' do + before do + [CountriesIndex, Chewy::Stash::Specification].map(&:create!) + + # To avoid flaky issues when previous specs were run + expect(Chewy::Index).to receive(:descendants).and_return( + [ + UsersIndex, + CountriesIndex, + CitiesIndex, + Chewy::Stash::Specification, + Chewy::Stash::Journal + ] + ) + end + + specify do + output = StringIO.new + described_class.create_missing_indexes!(output: output) + expect(CitiesIndex.exists?).to be_truthy + expect(UsersIndex.exists?).to be_truthy + expect(Chewy::Stash::Journal.exists?).to be_falsey + expect(output.string).to match(Regexp.new(<<-OUTPUT, Regexp::MULTILINE)) +UsersIndex index successfully created +CitiesIndex index successfully created +Total: \\d+s\\Z + OUTPUT + end + + context 'when verbose' do + specify do + output = StringIO.new + described_class.create_missing_indexes!(output: output, env: {'VERBOSE' => '1'}) + expect(output.string).to match(Regexp.new(<<-OUTPUT, Regexp::MULTILINE)) +UsersIndex index successfully created +CountriesIndex already exists, skipping +CitiesIndex index successfully created +Chewy::Stash::Specification already exists, skipping +Total: \\d+s\\Z + OUTPUT + end + end + + context 'when journaling is enabled' do + before { Chewy.config.settings[:journal] = true } + after { Chewy.config.settings.delete(:journal) } + specify do + described_class.create_missing_indexes!(output: StringIO.new) + expect(Chewy::Stash::Journal.exists?).to be_truthy + end + end + end + + describe '.journal_create' do + specify do + output = StringIO.new + described_class.journal_create(output: output) + expect(Chewy::Stash::Journal.exists?).to be_truthy + expect(output.string).to match(Regexp.new(<<-OUTPUT, Regexp::MULTILINE)) Total: \\d+s\\Z OUTPUT end @@ -502,4 +605,52 @@ end end end + + describe '.delete_by_query_options_from_env' do + subject(:options) { described_class.delete_by_query_options_from_env(env) } + let(:env) do + { + 'WAIT_FOR_COMPLETION' => 'false', + 'REQUESTS_PER_SECOND' => '10', + 'SCROLL_SIZE' => '5000' + } + end + + it 'parses the options' do + expect(options).to eq( + wait_for_completion: false, + requests_per_second: 10.0, + scroll_size: 5000 + ) + end + + context 'with different boolean values' do + it 'parses the option correctly' do + %w[1 t true TRUE on ON].each do |v| + expect(described_class.delete_by_query_options_from_env({'WAIT_FOR_COMPLETION' => v})) + .to eq(wait_for_completion: true) + end + + %w[0 f false FALSE off OFF].each do |v| + expect(described_class.delete_by_query_options_from_env({'WAIT_FOR_COMPLETION' => v})) + .to eq(wait_for_completion: false) + end + end + end + + context 'with other env' do + let(:env) { {'SOME_ENV' => '123', 'REQUESTS_PER_SECOND' => '15'} } + + it 'parses only the options' do + expect(options).to eq(requests_per_second: 15.0) + end + end + end + + describe '.subscribed_task_stats' do + specify do + block_output = described_class.subscribed_task_stats(StringIO.new) { 'expected output' } + expect(block_output).to eq('expected output') + end + end end diff --git a/spec/chewy/rspec/helpers_spec.rb b/spec/chewy/rspec/helpers_spec.rb index 0b3938dcc..1d2e6cfd3 100644 --- a/spec/chewy/rspec/helpers_spec.rb +++ b/spec/chewy/rspec/helpers_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe :rspec_helper do - include ::Chewy::Rspec::Helpers + include Chewy::Rspec::Helpers before do stub_model(:city) diff --git a/spec/chewy/rspec/update_index_spec.rb b/spec/chewy/rspec/update_index_spec.rb index 5982f4c51..8054bcce6 100644 --- a/spec/chewy/rspec/update_index_spec.rb +++ b/spec/chewy/rspec/update_index_spec.rb @@ -9,7 +9,7 @@ end before do - Chewy.massacre + drop_indices DummiesIndex.create! end diff --git a/spec/chewy/runtime_spec.rb b/spec/chewy/runtime_spec.rb index edc85231b..e8cc457d3 100644 --- a/spec/chewy/runtime_spec.rb +++ b/spec/chewy/runtime_spec.rb @@ -3,7 +3,7 @@ describe Chewy::Runtime do describe '.version' do specify { expect(described_class.version).to be_a(described_class::Version) } - specify { expect(described_class.version).to be >= '7.0' } - specify { expect(described_class.version).to be < '8.0' } + specify { expect(described_class.version).to be >= '8.0' } + specify { expect(described_class.version).to be < '9.0' } end end diff --git a/spec/chewy/search/loader_spec.rb b/spec/chewy/search/loader_spec.rb index 1bfde065a..31dfff1b5 100644 --- a/spec/chewy/search/loader_spec.rb +++ b/spec/chewy/search/loader_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe Chewy::Search::Loader do - before { Chewy.massacre } + before { drop_indices } before do stub_model(:city) diff --git a/spec/chewy/search/pagination/kaminari_examples.rb b/spec/chewy/search/pagination/kaminari_examples.rb index ccb033bfd..aa86c9700 100644 --- a/spec/chewy/search/pagination/kaminari_examples.rb +++ b/spec/chewy/search/pagination/kaminari_examples.rb @@ -1,7 +1,7 @@ require 'spec_helper' shared_examples :kaminari do |request_base_class| - before { Chewy.massacre } + before { drop_indices } before do stub_index(:products) do @@ -24,7 +24,7 @@ let(:data) { Array.new(10) { |i| {id: i.next.to_s, name: "Name#{i.next}", age: 10 * i.next}.stringify_keys! } } before { ProductsIndex.import!(data.map { |h| double(h) }) } - before { allow(::Kaminari.config).to receive_messages(default_per_page: 3) } + before { allow(Kaminari.config).to receive_messages(default_per_page: 3) } describe '#per, #page' do specify { expect(search.map { |e| e.attributes.except(*except_fields) }).to match_array(data) } diff --git a/spec/chewy/search/pagination/kaminari_spec.rb b/spec/chewy/search/pagination/kaminari_spec.rb index c75eacaa8..bd117d4d6 100644 --- a/spec/chewy/search/pagination/kaminari_spec.rb +++ b/spec/chewy/search/pagination/kaminari_spec.rb @@ -6,7 +6,7 @@ let(:data) { Array.new(12) { |i| {id: i.next.to_s, name: "Name#{i.next}", age: 10 * i.next}.stringify_keys! } } before { ProductsIndex.import!(data.map { |h| double(h) }) } - before { allow(::Kaminari.config).to receive_messages(default_per_page: 17) } + before { allow(Kaminari.config).to receive_messages(default_per_page: 17) } specify { expect(search.objects.class).to eq(Kaminari::PaginatableArray) } specify { expect(search.objects.total_count).to eq(12) } diff --git a/spec/chewy/search/parameters/knn_spec.rb b/spec/chewy/search/parameters/knn_spec.rb new file mode 100644 index 000000000..cc4f45f70 --- /dev/null +++ b/spec/chewy/search/parameters/knn_spec.rb @@ -0,0 +1,5 @@ +require 'chewy/search/parameters/hash_storage_examples' + +describe Chewy::Search::Parameters::Knn do + it_behaves_like :hash_storage, :knn +end diff --git a/spec/chewy/search/request_spec.rb b/spec/chewy/search/request_spec.rb index f8ea8d1ea..b73b99161 100644 --- a/spec/chewy/search/request_spec.rb +++ b/spec/chewy/search/request_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe Chewy::Search::Request do - before { Chewy.massacre } + before { drop_indices } before do stub_index(:products) do @@ -314,14 +314,16 @@ end end - describe '#collapse' do - specify { expect(subject.collapse(foo: {bar: 42}).render[:body]).to include(collapse: {'foo' => {bar: 42}}) } - specify do - expect(subject.collapse(foo: {bar: 42}).collapse(moo: {baz: 43}).render[:body]) - .to include(collapse: {'moo' => {baz: 43}}) + %i[collapse knn].each do |name| + describe "##{name}" do + specify { expect(subject.send(name, foo: {bar: 42}).render[:body]).to include(name => {'foo' => {bar: 42}}) } + specify do + expect(subject.send(name, foo: {bar: 42}).send(name, moo: {baz: 43}).render[:body]) + .to include(name => {'moo' => {baz: 43}}) + end + specify { expect(subject.send(name, foo: {bar: 42}).send(name, nil).render[:body]).to be_blank } + specify { expect { subject.send(name, foo: {bar: 42}) }.not_to change { subject.render } } end - specify { expect(subject.collapse(foo: {bar: 42}).collapse(nil).render[:body]).to be_blank } - specify { expect { subject.collapse(foo: {bar: 42}) }.not_to change { subject.render } } end describe '#docvalue_fields' do @@ -817,6 +819,31 @@ request: {index: ['products'], body: {query: {match: {name: 'name3'}}}, refresh: false} ) end + + it 'delete records asynchronously' do + outer_payload = nil + ActiveSupport::Notifications.subscribe('delete_query.chewy') do |_name, _start, _finish, _id, payload| + outer_payload = payload + end + subject.query(match: {name: 'name3'}).delete_all( + refresh: false, + wait_for_completion: false, + requests_per_second: 10.0, + scroll_size: 2000 + ) + expect(outer_payload).to eq( + index: ProductsIndex, + indexes: [ProductsIndex], + request: { + index: ['products'], + body: {query: {match: {name: 'name3'}}}, + refresh: false, + wait_for_completion: false, + requests_per_second: 10.0, + scroll_size: 2000 + } + ) + end end describe '#response=' do diff --git a/spec/chewy/search/response_spec.rb b/spec/chewy/search/response_spec.rb index 7b291288d..3cf5830d2 100644 --- a/spec/chewy/search/response_spec.rb +++ b/spec/chewy/search/response_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe Chewy::Search::Response, :orm do - before { Chewy.massacre } + before { drop_indices } before do stub_model(:city) @@ -39,7 +39,7 @@ specify { expect(subject.hits).to all be_a(Hash) } specify do expect(subject.hits.flat_map(&:keys).uniq) - .to match_array(%w[_id _index _type _score _source sort]) + .to match_array(%w[_id _index _score _source sort]) end context do diff --git a/spec/chewy/search/scrolling_spec.rb b/spec/chewy/search/scrolling_spec.rb index 4dfe68941..003d899c7 100644 --- a/spec/chewy/search/scrolling_spec.rb +++ b/spec/chewy/search/scrolling_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe Chewy::Search::Scrolling, :orm do - before { Chewy.massacre } + before { drop_indices } before do stub_model(:city) diff --git a/spec/chewy/search_spec.rb b/spec/chewy/search_spec.rb index d7cafa40d..b8ead283e 100644 --- a/spec/chewy/search_spec.rb +++ b/spec/chewy/search_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe Chewy::Search do - before { Chewy.massacre } + before { drop_indices } before do stub_index(:products) diff --git a/spec/chewy/stash_spec.rb b/spec/chewy/stash_spec.rb index 2d2774ff5..fc0df7487 100644 --- a/spec/chewy/stash_spec.rb +++ b/spec/chewy/stash_spec.rb @@ -5,7 +5,7 @@ def fetch_deleted_number(response) response['deleted'] || response['_indices']['_all']['deleted'] end - before { Chewy.massacre } + before { drop_indices } before do stub_model(:city) diff --git a/spec/chewy/strategy/active_job_spec.rb b/spec/chewy/strategy/active_job_spec.rb index 71d9563d1..7fda880d4 100644 --- a/spec/chewy/strategy/active_job_spec.rb +++ b/spec/chewy/strategy/active_job_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -if defined?(::ActiveJob) +if defined?(ActiveJob) describe Chewy::Strategy::ActiveJob do around do |example| active_job_settings = Chewy.settings[:active_job] @@ -9,12 +9,12 @@ Chewy.settings[:active_job] = active_job_settings end before(:all) do - ::ActiveJob::Base.logger = Chewy.logger + ActiveJob::Base.logger = Chewy.logger end before do - ::ActiveJob::Base.queue_adapter = :test - ::ActiveJob::Base.queue_adapter.enqueued_jobs.clear - ::ActiveJob::Base.queue_adapter.performed_jobs.clear + ActiveJob::Base.queue_adapter = :test + ActiveJob::Base.queue_adapter.enqueued_jobs.clear + ActiveJob::Base.queue_adapter.performed_jobs.clear end before do @@ -39,7 +39,7 @@ Chewy.strategy(:active_job) do [city, other_city].map(&:save!) end - enqueued_job = ::ActiveJob::Base.queue_adapter.enqueued_jobs.first + enqueued_job = ActiveJob::Base.queue_adapter.enqueued_jobs.first expect(enqueued_job[:job]).to eq(Chewy::Strategy::ActiveJob::Worker) expect(enqueued_job[:queue]).to eq('low') end @@ -48,12 +48,12 @@ Chewy.strategy(:active_job) do [city, other_city].map(&:save!) end - enqueued_job = ::ActiveJob::Base.queue_adapter.enqueued_jobs.first + enqueued_job = ActiveJob::Base.queue_adapter.enqueued_jobs.first expect(enqueued_job[:queue]).to eq('low') end specify do - ::ActiveJob::Base.queue_adapter = :inline + ActiveJob::Base.queue_adapter = :inline expect { [city, other_city].map(&:save!) } .to update_index(CitiesIndex, strategy: :active_job) .and_reindex(city, other_city).only diff --git a/spec/chewy/strategy/delayed_sidekiq_spec.rb b/spec/chewy/strategy/delayed_sidekiq_spec.rb new file mode 100644 index 000000000..03ead7755 --- /dev/null +++ b/spec/chewy/strategy/delayed_sidekiq_spec.rb @@ -0,0 +1,225 @@ +require 'spec_helper' + +if defined?(Sidekiq) + require 'sidekiq/testing' + require 'redis' + + describe Chewy::Strategy::DelayedSidekiq do + around do |example| + Chewy.strategy(:bypass) { example.run } + end + + before do + redis = Redis.new + allow(Sidekiq).to receive(:redis).and_yield(redis) + Sidekiq::Worker.clear_all + described_class.clear_timechunks! + end + + before do + stub_model(:city) do + update_index('cities') { self } + end + + stub_uuid_model(:user) do + update_index('users') { self } + end + + stub_index(:cities) do + index_scope City + end + + stub_index(:users) do + index_scope User + end + end + + let(:city) { City.create!(name: 'hello') } + let(:other_city) { City.create!(name: 'world') } + let(:user) { User.create!(name: 'John') } + let(:other_user) { User.create!(name: 'Jane') } + + it 'does not trigger immediate reindex due to it`s async nature' do + expect { [city, other_city].map(&:save!) } + .not_to update_index(CitiesIndex, strategy: :delayed_sidekiq) + end + + it "respects 'refresh: false' options" do + allow(Chewy).to receive(:disable_refresh_async).and_return(true) + expect(CitiesIndex).to receive(:import!).with(match_array([city.id.to_s, other_city.id.to_s]), refresh: false) + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id, other_city.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + + it 'works with models with string primary key' do + expect(UsersIndex).to receive(:import!).with(match_array([user.id, other_user.id])) + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(UsersIndex, [user.id, other_user.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + + context 'with default config' do + it 'does schedule a job that triggers reindex with default options' do + Timecop.freeze do + expect(Sidekiq::Client).to receive(:push).with( + hash_including( + 'queue' => 'chewy', + 'at' => expected_at_time.to_i, + 'class' => Chewy::Strategy::DelayedSidekiq::Worker, + 'args' => ['CitiesIndex', an_instance_of(Integer)] + ) + ).and_call_original + + expect($stdout).not_to receive(:puts) + + Sidekiq::Testing.inline! do + expect { [city, other_city].map(&:save!) } + .to update_index(CitiesIndex, strategy: :delayed_sidekiq) + .and_reindex(city, other_city).only + end + end + end + + def expected_at_time + target = described_class::Scheduler::DEFAULT_LATENCY.seconds.from_now.to_i + target - (target % described_class::Scheduler::DEFAULT_LATENCY) + described_class::Scheduler::DEFAULT_MARGIN.seconds + end + end + + context 'with custom config' do + before do + CitiesIndex.strategy_config( + delayed_sidekiq: { + reindex_wrapper: lambda { |&reindex| + puts 'hello' + reindex.call + }, + margin: 5, + latency: 60 + } + ) + end + + it 'respects :strategy_config options' do + Timecop.freeze do + expect(Sidekiq::Client).to receive(:push).with( + hash_including( + 'queue' => 'chewy', + 'at' => (60.seconds.from_now.change(sec: 0) + 5.seconds).to_i, + 'class' => Chewy::Strategy::DelayedSidekiq::Worker, + 'args' => ['CitiesIndex', an_instance_of(Integer)] + ) + ).and_call_original + + expect($stdout).to receive(:puts).with('hello') # check that reindex_wrapper works + + Sidekiq::Testing.inline! do + expect { [city, other_city].map(&:save!) } + .to update_index(CitiesIndex, strategy: :delayed_sidekiq) + .and_reindex(city, other_city).only + end + end + end + end + + context 'two reindex call within the timewindow' do + it 'accumulates all ids does the reindex one time' do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with(match_array([city.id.to_s, other_city.id.to_s])).once + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id]) + scheduler.postpone + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + + context 'one call with update_fields another one without update_fields' do + it 'does reindex of all fields' do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with(match_array([city.id.to_s, other_city.id.to_s])).once + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) + scheduler.postpone + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + end + + context 'both calls with different update fields' do + it 'deos reindex with union of fields' do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with(match_array([city.id.to_s, other_city.id.to_s]), update_fields: match_array(%w[name description])).once + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) + scheduler.postpone + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id], update_fields: ['description']) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + end + end + + context 'two calls within different timewindows' do + it 'does two separate reindexes' do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([city.id.to_s]).once + expect(CitiesIndex).to receive(:import!).with([other_city.id.to_s]).once + Timecop.travel(20.seconds.ago) do + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id]) + scheduler.postpone + end + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + end + + context 'first call has update_fields' do + it 'does first reindex with the expected update_fields and second without update_fields' do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([city.id.to_s], update_fields: ['name']).once + expect(CitiesIndex).to receive(:import!).with([other_city.id.to_s]).once + Timecop.travel(20.seconds.ago) do + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) + scheduler.postpone + end + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + end + + context 'both calls have update_fields option' do + it 'does both reindexes with their expected update_fields option' do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([city.id.to_s], update_fields: ['name']).once + expect(CitiesIndex).to receive(:import!).with([other_city.id.to_s], update_fields: ['description']).once + Timecop.travel(20.seconds.ago) do + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) + scheduler.postpone + end + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id], update_fields: ['description']) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + end + + describe '#clear_delayed_sidekiq_timechunks test helper' do + it 'clears redis from the timechunk sorted sets to avoid leak between tests' do + timechunks_set = -> { Sidekiq.redis { |redis| redis.zrange('chewy:delayed_sidekiq:CitiesIndex:timechunks', 0, -1) } } + + expect { CitiesIndex.import!([1], strategy: :delayed_sidekiq) } + .to change { timechunks_set.call.size }.by(1) + + expect { Chewy::Strategy::DelayedSidekiq.clear_timechunks! } + .to change { timechunks_set.call.size }.to(0) + end + end + end +end diff --git a/spec/chewy/strategy/lazy_sidekiq_spec.rb b/spec/chewy/strategy/lazy_sidekiq_spec.rb index e221c67ab..4078fb1a3 100644 --- a/spec/chewy/strategy/lazy_sidekiq_spec.rb +++ b/spec/chewy/strategy/lazy_sidekiq_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -if defined?(::Sidekiq) +if defined?(Sidekiq) require 'sidekiq/testing' describe Chewy::Strategy::LazySidekiq do @@ -10,7 +10,7 @@ Chewy.strategy(:bypass) { example.run } Chewy.settings[:sidekiq] = sidekiq_settings end - before { ::Sidekiq::Worker.clear_all } + before { Sidekiq::Worker.clear_all } context 'strategy' do before do @@ -32,14 +32,14 @@ end it 'updates indices asynchronously on record save' do - expect(::Sidekiq::Client).to receive(:push) + expect(Sidekiq::Client).to receive(:push) .with(hash_including( 'class' => Chewy::Strategy::LazySidekiq::IndicesUpdateWorker, 'queue' => 'low' )) .and_call_original .once - ::Sidekiq::Testing.inline! do + Sidekiq::Testing.inline! do expect { [city, other_city].map(&:save!) } .to update_index(CitiesIndex, strategy: :lazy_sidekiq) .and_reindex(city, other_city).only @@ -47,12 +47,12 @@ end it 'updates indices asynchronously with falling back to sidekiq strategy on record destroy' do - expect(::Sidekiq::Client).not_to receive(:push) + expect(Sidekiq::Client).not_to receive(:push) .with(hash_including( 'class' => Chewy::Strategy::LazySidekiq::IndicesUpdateWorker, 'queue' => 'low' )) - expect(::Sidekiq::Client).to receive(:push) + expect(Sidekiq::Client).to receive(:push) .with(hash_including( 'class' => Chewy::Strategy::Sidekiq::Worker, 'queue' => 'low', @@ -60,7 +60,7 @@ )) .and_call_original .once - ::Sidekiq::Testing.inline! do + Sidekiq::Testing.inline! do expect { [city, other_city].map(&:destroy) }.to update_index(CitiesIndex, strategy: :sidekiq) end end @@ -71,7 +71,7 @@ expect(other_city).to receive(:run_chewy_callbacks).and_call_original expect do - ::Sidekiq::Testing.inline! do + Sidekiq::Testing.inline! do Chewy::Strategy::LazySidekiq::IndicesUpdateWorker.new.perform({'City' => [city.id, other_city.id]}) end end.to update_index(CitiesIndex).and_reindex(city, other_city).only @@ -88,7 +88,7 @@ expect(other_city).to receive(:run_chewy_callbacks).and_call_original expect do - ::Sidekiq::Testing.inline! do + Sidekiq::Testing.inline! do Chewy::Strategy::LazySidekiq::IndicesUpdateWorker.new.perform({'City' => [city.id, other_city.id]}) end end.to update_index(CitiesIndex).and_reindex(city, other_city).only.no_refresh @@ -97,7 +97,7 @@ end context 'integration' do - around { |example| ::Sidekiq::Testing.inline! { example.run } } + around { |example| Sidekiq::Testing.inline! { example.run } } let(:update_condition) { true } diff --git a/spec/chewy/strategy/sidekiq_spec.rb b/spec/chewy/strategy/sidekiq_spec.rb index 943ac6f7e..13ca55e90 100644 --- a/spec/chewy/strategy/sidekiq_spec.rb +++ b/spec/chewy/strategy/sidekiq_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -if defined?(::Sidekiq) +if defined?(Sidekiq) require 'sidekiq/testing' describe Chewy::Strategy::Sidekiq do @@ -10,7 +10,7 @@ Chewy.strategy(:bypass) { example.run } Chewy.settings[:sidekiq] = sidekiq_settings end - before { ::Sidekiq::Worker.clear_all } + before { Sidekiq::Worker.clear_all } before do stub_model(:city) do update_index('cities') { self } @@ -30,8 +30,8 @@ end specify do - expect(::Sidekiq::Client).to receive(:push).with(hash_including('queue' => 'low')).and_call_original - ::Sidekiq::Testing.inline! do + expect(Sidekiq::Client).to receive(:push).with(hash_including('queue' => 'low')).and_call_original + Sidekiq::Testing.inline! do expect { [city, other_city].map(&:save!) } .to update_index(CitiesIndex, strategy: :sidekiq) .and_reindex(city, other_city).only diff --git a/spec/chewy/strategy_spec.rb b/spec/chewy/strategy_spec.rb index 25a2344bd..817e2dfc3 100644 --- a/spec/chewy/strategy_spec.rb +++ b/spec/chewy/strategy_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe Chewy::Strategy do - before { Chewy.massacre } + before { drop_indices } subject(:strategy) { Chewy::Strategy.new } describe '#current' do diff --git a/spec/chewy_spec.rb b/spec/chewy_spec.rb index 4dcf2718a..e946aecca 100644 --- a/spec/chewy_spec.rb +++ b/spec/chewy_spec.rb @@ -31,8 +31,8 @@ end end - describe '.massacre' do - before { Chewy.massacre } + xdescribe '.massacre' do + before { drop_indices } before do allow(Chewy).to receive_messages(configuration: Chewy.configuration.merge(prefix: 'prefix1')) @@ -40,7 +40,7 @@ allow(Chewy).to receive_messages(configuration: Chewy.configuration.merge(prefix: 'prefix2')) stub_index(:developers).create! - Chewy.massacre + drop_indices allow(Chewy).to receive_messages(configuration: Chewy.configuration.merge(prefix: 'prefix1')) end @@ -57,18 +57,21 @@ before do Chewy.current[:chewy_client] = nil - allow(Chewy).to receive_messages(configuration: {transport_options: {proc: faraday_block}}) + end + + specify do + expect(Chewy).to receive_messages(configuration: {transport_options: {proc: faraday_block}}) - allow(::Elasticsearch::Client).to receive(:new).with(expected_client_config) do |*_args, &passed_block| + expect(Elasticsearch::Client).to receive(:new).with(expected_client_config) do |*_args, &passed_block| # RSpec's `with(..., &block)` was used previously, but doesn't actually do # any verification of the passed block (even of its presence). expect(passed_block.source_location).to eq(faraday_block.source_location) mock_client end - end - its(:client) { is_expected.to eq(mock_client) } + expect(Chewy.client).to be_a(Chewy::ElasticClient) + end after { Chewy.current[:chewy_client] = initial_client } end @@ -81,7 +84,8 @@ # To avoid flaky issues when previous specs were run allow(Chewy::Index).to receive(:descendants).and_return([CitiesIndex, PlacesIndex]) - Chewy.massacre + CitiesIndex.delete + PlacesIndex.delete end specify do @@ -108,7 +112,7 @@ expect(CitiesIndex.exists?).to eq true expect(PlacesIndex.exists?).to eq true - expect { Chewy.create_indices! }.to raise_error(Elasticsearch::Transport::Transport::Errors::BadRequest) + expect { Chewy.create_indices! }.to raise_error(Elastic::Transport::Transport::Errors::BadRequest) end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 48e901d6c..c67ff0f88 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -9,7 +9,7 @@ require 'timecop' -Kaminari::Hooks.init if defined?(::Kaminari::Hooks) +Kaminari::Hooks.init if defined?(Kaminari::Hooks) require 'support/fail_helpers' require 'support/class_helpers' @@ -28,6 +28,32 @@ } } +# To work with security enabled: +# +# user = ENV['ES_USER'] || 'elastic' +# password = ENV['ES_PASSWORD'] || '' +# ca_cert = ENV['ES_CA_CERT'] || './tmp/http_ca.crt' +# +# Chewy.settings.merge!( +# user: user, +# password: password, +# transport_options: { +# ssl: { +# ca_file: ca_cert +# } +# } +# ) + +# Low-level substitute for now-obsolete drop_indices +def drop_indices + response = Chewy.client.cat.indices + indices = response.body.lines.map { |line| line.split[2] } + return if indices.blank? + + Chewy.client.indices.delete(index: indices) + Chewy.wait_for_status +end + # Chewy.transport_logger = Logger.new(STDERR) RSpec.configure do |config| diff --git a/spec/support/active_record.rb b/spec/support/active_record.rb index d081e5be4..e64660563 100644 --- a/spec/support/active_record.rb +++ b/spec/support/active_record.rb @@ -6,17 +6,16 @@ ActiveRecord::Base.raise_in_transactional_callbacks = true end -ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS 'countries'") -ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS 'cities'") -ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS 'locations'") -ActiveRecord::Schema.define do +def create_countries_table create_table :countries do |t| t.column :name, :string t.column :country_code, :string t.column :rating, :integer t.column :updated_at, :datetime end +end +def create_cities_table create_table :cities do |t| t.column :country_id, :integer t.column :name, :string @@ -25,13 +24,17 @@ t.column :rating, :integer t.column :updated_at, :datetime end +end +def create_locations_table create_table :locations do |t| t.column :city_id, :integer t.column :lat, :string t.column :lon, :string end +end +def create_comments_table create_table :comments do |t| t.column :content, :string t.column :comment_type, :string @@ -40,6 +43,26 @@ end end +def create_users_table + create_table :users, id: false do |t| + t.column :id, :string + t.column :name, :string + end +end + +ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS 'countries'") +ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS 'cities'") +ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS 'locations'") +ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS 'comments'") +ActiveRecord::Base.connection.execute("DROP TABLE IF EXISTS 'users'") +ActiveRecord::Schema.define do + create_countries_table + create_cities_table + create_locations_table + create_comments_table + create_users_table +end + module ActiveRecordClassHelpers extend ActiveSupport::Concern @@ -73,6 +96,14 @@ def expects_no_query(except: nil, &block) def stub_model(name, superclass = nil, &block) stub_class(name, superclass || ActiveRecord::Base, &block) end + + def stub_uuid_model(name, superclass = nil, &block) + stub_class(name, superclass || ActiveRecord::Base) do + before_create { self.id = SecureRandom.uuid } + define_singleton_method(:primary_key, -> { 'id' }) + class_eval(&block) + end + end end RSpec.configure do |config|