Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions app/api/entities/envelope_download.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,24 @@ module API
module Entities
# Presenter for EnvelopeDownload
class EnvelopeDownload < Grape::Entity
expose :id,
documentation: { type: 'string', desc: 'ID (in UUID format)' }
expose :display_status, as: :status,
documentation: { type: 'string', desc: 'Status of download' }

expose :status,
documentation: { type: 'string', desc: 'Status of download' }
expose :enqueued_at,
documentation: { type: 'string', desc: 'When the download was enqueued' },
if: ->(object) { object.pending? }

expose :finished_at,
documentation: { type: 'string', desc: 'When the download finished' },
if: ->(object) { object.finished? }

expose :started_at,
documentation: { type: 'string', desc: 'When the download started' },
if: ->(object) { object.in_progress? }

expose :url,
documentation: { type: 'string', desc: 'AWS S3 URL' }
documentation: { type: 'string', desc: 'AWS S3 URL' },
if: ->(object) { object.finished? }
end
end
end
51 changes: 28 additions & 23 deletions app/api/v1/envelopes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
require 'v1/single_envelope'
require 'v1/revisions'
require 'v1/envelope_events'
require 'download_envelopes_job'

module API
module V1
Expand Down Expand Up @@ -62,10 +63,36 @@ class Envelopes < MountableAPI
type: params[:metadata_only] ? :metadata_only : :full
end

include API::V1::EnvelopeEvents

desc 'Gives general info about the envelopes'
get(:info) { envelopes_info }

include API::V1::EnvelopeEvents
resources :download do
before do
authenticate!
authorize Envelope, :index?

@envelope_download = current_community.envelope_download ||
current_community.create_envelope_download!
end

desc 'Returns the envelope download'
get do
present @envelope_download, with: API::Entities::EnvelopeDownload
end

desc 'Starts an envelope download'
post do
@envelope_download.update!(
enqueued_at: Time.current,
status: :pending
)

DownloadEnvelopesJob.perform_later(@envelope_download.id)
present @envelope_download, with: API::Entities::EnvelopeDownload
end
end

route_param :envelope_id do
after_validation do
Expand All @@ -86,28 +113,6 @@ class Envelopes < MountableAPI
include API::V1::SingleEnvelope
include API::V1::Revisions
end

resources :downloads do
before do
authenticate!
end

desc 'Returns the download object with the given ID'
get ':id' do
authorize Envelope, :index?

envelope_download = current_user_community.envelope_downloads.find(params[:id])
present envelope_download, with: API::Entities::EnvelopeDownload
end

desc 'Starts new envelope download'
post do
authorize Envelope, :index?

present current_user_community.envelope_downloads.create!,
with: API::Entities::EnvelopeDownload
end
end
end
end
end
Expand Down
53 changes: 3 additions & 50 deletions app/jobs/download_envelopes_job.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require 'entities/envelope'
require 'download_envelopes'
require 'envelope_download'

# Create a ZIP archive contaning all of the envelopes from a certain community,
Expand All @@ -10,56 +10,9 @@ def perform(envelope_download_id)
envelope_download = EnvelopeDownload.find_by(id: envelope_download_id)
return unless envelope_download

envelope_download.update!(
internal_error_backtrace: [],
internal_error_message: nil,
started_at: Time.current
)

envelope_download.url = upload_to_s3(envelope_download)
DownloadEnvelopes.call(envelope_download:)
rescue StandardError => e
Airbrake.notify(e, envelope_download_id:)
envelope_download&.internal_error_backtrace = e.backtrace
envelope_download&.internal_error_message = e.message
ensure
envelope_download&.update!(finished_at: Time.current)
end

private

def bucket
ENV.fetch('ENVELOPE_DOWNLOADS_BUCKET')
end

def create_zip_archive(envelope_download)
envelopes = envelope_download.envelopes.includes(
:envelope_community, :organization, :publishing_organization
)

file_path = MR.root_path.join(SecureRandom.hex)

Zip::OutputStream.open(file_path) do |stream|
envelopes.find_each do |envelope|
stream.put_next_entry("#{envelope.envelope_ceterms_ctid}.json")
stream.puts(API::Entities::Envelope.represent(envelope).to_json)
end
end

file_path
end

def region
ENV.fetch('AWS_REGION')
end

def upload_to_s3(envelope_download)
community = envelope_download.envelope_community.name
key = "#{community}_#{Time.current.to_i}_#{SecureRandom.hex}.zip"
path = create_zip_archive(envelope_download)
object = Aws::S3::Resource.new(region:).bucket(bucket).object(key)
object.upload_file(path)
object.public_url
ensure
File.delete(path)
raise e
end
end
2 changes: 1 addition & 1 deletion app/models/envelope_community.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class EnvelopeCommunity < ActiveRecord::Base
include AttributeNormalizer

has_one :envelope_community_config
has_many :envelope_downloads
has_one :envelope_download
has_many :envelopes
has_many :envelope_resources, through: :envelopes
has_many :indexed_envelope_resources
Expand Down
24 changes: 8 additions & 16 deletions app/models/envelope_download.rb
Original file line number Diff line number Diff line change
@@ -1,25 +1,17 @@
require 'download_envelopes_job'

# Stores the status and AWS S3 URL of an asynchronously performed envelope download
class EnvelopeDownload < ActiveRecord::Base
belongs_to :envelope_community
has_many :envelopes, -> { not_deleted }, through: :envelope_community

after_commit :enqueue_job, on: :create

def status
if finished_at?
return internal_error_message? ? 'failed' : 'finished'
elsif started_at?
return 'in progress'
end

'pending'
end
enum :status, {
finished: 'finished',
in_progress: 'in_progress',
pending: 'pending'
}

private
def display_status
return 'failed' if internal_error_message?

def enqueue_job
DownloadEnvelopesJob.perform_later(id)
status
end
end
2 changes: 1 addition & 1 deletion app/models/extensions/ce_registry_resources.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def ce_registry?
end

def self.generate_ctid
"urn:ctid:#{SecureRandom.uuid}"
"ce-#{SecureRandom.uuid}"
end
end
end
149 changes: 149 additions & 0 deletions app/services/download_envelopes.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Dumps an envelope community's envelopes into a ZIP archive and uploads it to S3
class DownloadEnvelopes # rubocop:todo Metrics/ClassLength
attr_reader :envelope_download, :updated_at

delegate :envelope_community, to: :envelope_download

def initialize(envelope_download)
@envelope_download = envelope_download
@updated_at = envelope_download.started_at
end

def self.call(envelope_download:)
new(envelope_download).run
end

def bucket
ENV.fetch('ENVELOPE_DOWNLOADS_BUCKET')
end

def create_or_update_entries
FileUtils.mkdir_p(dirname)

log('Adding recently published envelopes into the dump')

published_envelopes.find_each do |envelope|
File.write(
File.join(dirname, "#{envelope.envelope_ceterms_ctid}.json"),
API::Entities::Envelope.represent(envelope).to_json
)
end
end

def dirname
@dirname ||= [
envelope_community.name,
Time.current.to_i,
SecureRandom.hex
].join('_')
end

def download_file # rubocop:todo Metrics/AbcSize
return unless envelope_download.url?

log("Downloading the existing dump from #{envelope_download.url}")

File.open(filename, 'wb') do |file|
URI.parse(envelope_download.url).open do |data|
file.write(data.read)
end
end

log("Unarchiving the downloaded dump into #{dirname}")
system("unzip -qq #{filename} -d #{dirname}", exception: true)
rescue StandardError => e
Airbrake.notify(e)
end

def destroy_envelope_events
@deleted_envelope_ctids = envelope_community
.versions
.where(event: 'destroy')
.where('created_at >= ?', updated_at)
end

def filename
@filename ||= "#{dirname}.zip"
end

def log(message)
MR.logger.info(message)
end

def published_envelopes
@published_envelopes = begin
envelopes = envelope_community
.envelopes
.not_deleted
.includes(:envelope_community, :organization, :publishing_organization)

envelopes.where!('updated_at >= ?', updated_at) if updated_at
envelopes
end
end

def region
ENV.fetch('AWS_REGION')
end

def remove_entries
log('Removing recently deleted envelopes from the dump')

destroy_envelope_events.select(:id, :envelope_ceterms_ctid).find_each do |event|
FileUtils.remove_file(
File.join(dirname, "#{event.envelope_ceterms_ctid}.json"),
true
)
end
end

def run # rubocop:todo Metrics/AbcSize, Metrics/MethodLength
envelope_download.update!(
internal_error_backtrace: [],
internal_error_message: nil,
started_at: Time.current,
status: :in_progress
)

envelope_download.with_lock do
if up_to_date?
log('The dump is up to date.')
return
end

download_file
create_or_update_entries
remove_entries
envelope_download.url = upload_file
rescue StandardError => e
Airbrake.notify(e)
envelope_download&.internal_error_backtrace = e.backtrace
envelope_download&.internal_error_message = e.message
ensure
log('Deleting intermediate files.')
FileUtils.rm_rf(dirname)
FileUtils.rm_f(filename)
envelope_download.update!(finished_at: Time.current, status: :finished)
log('Finished.')
end
end

def up_to_date?
published_envelopes.none? && destroy_envelope_events.none?
end

def upload_file
log('Archiving the updated dump.')

system(
"find #{dirname} -type f -print | zip -FSjqq #{filename} -@",
exception: true
)

log('Uploading the updated dump to S3.')

object = Aws::S3::Resource.new(region:).bucket(bucket).object(filename)
object.upload_file(filename)
object.public_url
end
end
2 changes: 1 addition & 1 deletion config/database.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ default: &default
database: <%= ENV['POSTGRESQL_DATABASE'] %>
password: <%= ENV['POSTGRESQL_PASSWORD'] %>
port: <%= ENV.fetch('POSTGRESQL_PORT', 5432) %>
pool: <%= ENV.fetch('SIDEKIQ_CONCURRENCY', 10) %>
pool: <%= ENV.fetch('SIDEKIQ_CONCURRENCY', 10).to_i + 1 %>
encoding: utf8

development:
Expand Down
Loading
Loading