Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to process fetched results off the main thread? #17

Closed
mayurdhaka-suryasoftware opened this issue Jan 12, 2018 · 11 comments
Closed
Labels

Comments

@mayurdhaka-suryasoftware
Copy link

mayurdhaka-suryasoftware commented Jan 12, 2018

Hey!

I'm using a piece of code that looks something like this:

dbQueue.rx.changes(in: [Project.all()])
  .observeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
  .map { db in
       let foo = Row.fetchAll(db, mySQLQuery)
   ...
  }

I get a crash that reads 'Database was not used on the correct thread'. From what I understand, I am using the db object on a separate thread than the one it was initiated on (dbQueue).
But I wanted to know if there is a way to achieve what I'm trying to do because the work I'm doing in my map block is something I'd like to get off the main thread.

Any help is appreciated, thanks!

@groue
Copy link
Collaborator

groue commented Jan 12, 2018

This issue is about RxGRDB v0.7.0, and contains outdated information. Please refer to the Scheduling documentation chapter for fresh information about RxGRDB scheduling.

Original comments below


Hello @mayurdhaka-suryasoftware

I get a crash that reads 'Database was not used on the correct thread'. From what I understand, I am using the db object on a separate thread than the one it was initiated on (dbQueue).

You understand well. And GRDB absolutely prevents database connections from being used outside of their dedicated dispatch queue with a fatal error. There's no escape to this rule, which is the golden rule of GRDB thread-safety.

But I wanted to know if there is a way to achieve what I'm trying to do because the work I'm doing in my map block is something I'd like to get off the main thread.

That's a very interesting question, one that deserves a clear explanation.

Let's first "fix" your sample code by fetching from the dedicated dispatch queue:

dbQueue.rx
  .changes(in: [Project.all()])
  .map { db in
    let foo = Row.fetchAll(db, mySQLQuery)
    ...
  }

Let's try to find out on which thread the fetch happens.

dbQueue.rx.changes is documented to notify of changes on the "database writer dispatch queue". You are using a DatabaseQueue, so this dispatch queue is a unique serial dispatch queue dedicated to this database.

We thus know on which dispatch queue we are. But we don't know yet if the main thread is blocked, or not. And we remember that you want to avoid blocking the main thread. So let's keep on investigating.

dbQueue.rx.changes emit its elements as soon as a transaction has committed relevant database changes. If that database transaction has been performed from the main thread, then the fetch indeed blocks the main thread:

// From the main thread, synchronously execute a database transaction
dbQueue.inTransaction { db in
  try Project(...).insert(db)
  try ...
  return .commit // Triggers dbQueue.rx.changes
}

// Still on the main thread. Now the transaction has been committed, and
// the observable has fetched its results.

On the other side, if the transaction is executed from some other thread, then the main thread is not blocked at all:

// On some thread which is not the main thread
dbQueue.inTransaction { db in
  try Project(...).insert(db)
  try ...
  return .commit // Triggers dbQueue.rx.changes
}

// Still on some thread, which was blocked until the observable would
// fetch its results. The main thread did not see anything.

We now understand that the main thread may, or may not be blocked, depending on the threads that use the dbQueue.

And that's the absolute best you can achieve with DatabaseQueue.

So how to you absolutely avoid the main thread from being blocked?

Replace DatabaseQueue with DatabasePool.

A database pool is dedicated to efficient multi-threading, unlike a database queue which is designed to be as simple as possible. A database pool is a little more involved than a database queue, so please have a look at the reading list at the bottom of this answer. But a database pool opens new multi-threading horizons. This is precisely its job.

This is how a database pool solves your issue:

let processingQueue = DispatchQueue(label: "processing", qos: .userInitiated)
processingQueue.async { // or sync, you choose
    dbPool.rx
        .changeTokens(in: [Project.all()])
        .mapFetch(resultQueue: processingQueue) { db in
            // fetch
            return Row.fetchAll(db, mySQLQuery)
        }
        .subscribe(onNext: { foo in
            // process fetched results
            ...
        })
}

Yes that's a big rewrite (and by writing it, I realize that I may have to improve that - suggestions are welcome). But please bear with me:

First, the main thread no longer has to wait for the fetch to complete after it has performed a transaction:

// From the main thread, synchronously execute a database transaction
dbPool.writeInTransaction { db in
  try Project(...).insert(db)
  try ...
  return .commit // Triggers dbQueue.rx.changes
}

// Still on the main thread. Now the transaction has been committed, and
// the observable concurrently fetches and processes its results off
// the main thread.

Second, the main thread is not blocked when it reads from the database, even if the observable is fetching its own results:

// From the main thread: not blocked by the eventual fetch performed by the observable
dbPool.read { db in
  // fetch something
}

Third, you are now guaranteed that your processing of the fetched data happens off the main thread (in the "process queue").

For this to work well, you must think harder about dispatch queues. RxGRDB may improve in the future, but right now, this is how you avoid blocking the main thread:

// This serial dispatch queue is required for RxGRDB to guarantee the correct
// ordering of change notifications off the main thread.
let processingQueue = DispatchQueue(label: "processing", qos: .userInitiated)

// Observable must be subscribed on the processing queue:
processingQueue.async { // or sync, you choose
    
    dbPool.rx
        
        // Use `changeTokens` (not `changes`)
        .changeTokens(in: [Project.all()])
        
        // Use `mapFetch` (not `map`), and ask to get results
        // on the processing queue.
        .mapFetch(resultQueue: processingQueue) { db in
            // Fetching itself does not happen on the processing queue, but
            // on a database reading dispatch queue. You can process fetched
            // results here, but you may have better returning the fetched
            // results for later processing on the processing queue.
            return Row.fetchAll(db, mySQLQuery)
        }
        
        .subscribe(onNext: { foo in
            // Now we're on the processing queue: you can process fetched results
            ...
        })
}

If you want a reading list:

@groue groue changed the title Circumventing 'Database was not used on the correct thread.' errors How to process fetched results off the main thread? Jan 12, 2018
@groue groue added the question label Jan 12, 2018
@groue
Copy link
Collaborator

groue commented Jan 12, 2018

It is worth noting that if database pools do allow a very precise and efficient use of threads, most applications do not need that.

A general good advice is that before fixing a performance problem, one first has to experience it, and run benchmarks in order to precisely locate the bottleneck.

So: if my answer above does provide a robust solution to your question, I suggest you to keep on using a database queue. Yes, the main thread will be blocked sometimes. But for how long? A couple milliseconds is not a big deal. SQLite is very fast, and GRBD is fast. Also, remember that the main queue is not blocked if a database queue is used on some background thread (look for "On the other side..." in my previous comment).

I'm there if you have any other question.

@groue
Copy link
Collaborator

groue commented Jan 12, 2018

Here a third answer. This will help me write a documentation chapter about the main thread eventually 😉

There's more to say about plain simple database queues.

After RxGRDB notifies changes, one generally fetches and processes results.

Sometimes one can fetch, and then process. Sometimes fetching and processing have to happen together. But what is slow and should be done off the main thread? Is it the fetch? Is it the processing? Is it both?

Often the fetch is quite fast. When fetching and processing can be separated, you can accept that fetches may shortly block the main thread, and perform the slow processing of the fetched results off the main thread:

// Example 1
request.rx
    .fetchAll(in: dbQueue)
    .observeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .map { fetchedResults in
        // process off the main thread
        ...
    }

// Example 2
dbQueue.rx
    .changeTokens(in: [Project.all()]
    .mapFetch { db in return <fetched results> }
    .observeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .map { fetchedResults in
        // process off the main thread
        ...
    }

// General pattern
<Observable of fetched values>
    .observeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .map { fetchedResults in
        // process off the main thread
        ...
    }

Finally, I suggest that you read again the RxRGDB documentation, and focus on the synchronizedStart (default true) and resultQueue (default main queue) parameters.

That's all for now :-)

@mayurdhaka-suryasoftware
Copy link
Author

Hey @groue

Thanks as always for taking the time to answer what's going on in great detail.

I get the general picture of what's going on but I'm also going through the reading material you've provided to refresh my memory and get a better understanding of the way GRDB works.

Meanwhile, I've been facing a related problem with something you mentioned in your answers. You wrote:

dbQueue.rx.changes emit its elements as soon as a transaction has committed relevant database changes. If that database transaction has been performed from the main thread, then the fetch indeed blocks the main thread

I've observed this happening in my app that lead to a couple of bugs. Here's what I did:

  1. At some point in my app, I subscribed to a list of projects like so:
dbQueue.rx.changes(in: [Project.all()])
   .map {
     // Do my mapping work
  }..subscribe(onNext: { foo in
            // Update list of projects with foo
   })
  1. From another part of my app, I have a piece of code like so:
dbQueue.inTransaction { db in
  try Project(...).insert(db)
  return .commit // Triggers dbQueue.rx.changes
}

As you said, if I write to the database with my code in point 2. on the main thread, the observable also triggers on the main thread.
But, what I found was, if I write to the database on a background thread, the observable is triggered on the background thread too? So if I change the code in point 2. to be:

DispatchQueue(label: "Foo", qos: .userInitiated).async {
    dbQueue.inTransaction { db in
        try Project(...).insert(db)
         return .commit // Triggers dbQueue.rx.changes
    }
}

then the observable is also triggered on the DispatchQueue identified as Foo.

This doesn't make sense to be because I'm using dbQueue to subscribe to the observable, and I'm using a dbQueue to write to the database too. So why is the queue I make the write to the database, also end up being the queue on which the observable is triggered? And what are the sequence of steps GRDB/RxGRDB is executing in to ensure this thread consistency?
My best guess is dbQueue isn't really a regular DispatchQueue? Clearly I'm missing something here. It would be very helpful if you could elaborate why this happens?
(PS: I got around this problem by explicitly subscribing on the main queue with observeOn.)

@groue
Copy link
Collaborator

groue commented Jan 15, 2018

Hey @groue

Thanks as always for taking the time to answer what's going on in great detail.

You're welcome! I'm glad you have opened this issue. It is very useful. It will help improving both ergonomics and documentation of the library.

I get the general picture of what's going on but I'm also going through the reading material you've provided to refresh my memory and get a better understanding of the way GRDB works.

Appreciated :-) Scheduling is a touchy subject indeed.

Meanwhile, I've been facing a related problem with something you mentioned in your answers. You wrote:

dbQueue.rx.changes emit its elements as soon as a transaction has committed relevant database changes. If that database transaction has been performed from the main thread, then the fetch indeed blocks the main thread

I've observed this happening in my app that lead to a couple of bugs.

Bugs? OK, I read carefully:

Here's what I did:

  1. At some point in my app, I subscribed to a list of projects like so:
dbQueue.rx.changes(in: [Project.all()])
   .map {
     // Do my mapping work
  }..subscribe(onNext: { foo in
            // Update list of projects with foo
   })
  1. From another part of my app, I have a piece of code like so:
dbQueue.inTransaction { db in
  try Project(...).insert(db)
  return .commit // Triggers dbQueue.rx.changes
}

As you said, if I write to the database with my code in point 2. on the main thread, the observable also triggers on the main thread.

Yes.

But, what I found was, if I write to the database on a background thread, the observable is triggered on the background thread too? So if I change the code in point 2. to be:

DispatchQueue(label: "Foo", qos: .userInitiated).async {
    dbQueue.inTransaction { db in
        try Project(...).insert(db)
         return .commit // Triggers dbQueue.rx.changes
    }
}

then the observable is also triggered on the DispatchQueue identified as Foo.

Yes. The dbQueue.rx.changes observable is triggered on transactions commits, synchronously.

Quoting the documentation of this method:

All elements are emitted on the database writer dispatch queue, serialized with all database updates.

We'll see later that this definition makes changes an observable that maybe is not the one you need. But let's explain more before.

What is this "database writer dispatch queue"? To understand, let's quote database queue documentation:

A database queue can be used from any thread. The inDatabase and inTransaction methods are synchronous, and block the current thread until your database statements are executed in a protected dispatch queue. They safely serialize the database accesses.

The "database writer dispatch queue" is the same queue as the "protected dispatch queue" of the above paragraph.

Let's analyse your sample code:

DispatchQueue(label: "Foo", qos: .userInitiated).async {
    // We're in "Foo" here.
    dbQueue.inTransaction { db in
        // We're in the database "protected dispatch queue" here
        ...
        return .commit // (1)
    }
}

On (1), the dbQueue.rx.changes observable is triggered, from the database protected dispatch queue. Meanwhile the Foo queue is blocked, and waits for the whole dbQueue.inTransaction method to complete.

This doesn't make sense to be because I'm using dbQueue to subscribe to the observable, and I'm using a dbQueue to write to the database too. So why is the queue I make the write to the database, also end up being the queue on which the observable is triggered? And what are the sequence of steps GRDB/RxGRDB is executing in to ensure this thread consistency?

I hope that it is more clear now. There is no bug: everything behaves exactly as expected.

But this is not the behavior you expect. I thus suggest you keep on reading.

My best guess is dbQueue isn't really a regular DispatchQueue? Clearly I'm missing something here. It would be very helpful if you could elaborate why this happens?
(PS: I got around this problem by explicitly subscribing on the main queue with observeOn.)

You guessed right.

Now, maybe dbQueue.changes(in:).map() is not the reactive method your app needs. Instead, I suggest you look at an observable of fetched values. There are specific ones, such as request.fetchAll(in:). And there is the most general of all: dbQueue.changeTokens(in:).mapFetch().

All observables of fetched values emit their values on a single dispatch queue (which default to the main queue).

Project.all().rx
    .fetchAll(in: dbQueue)
    .subscribe(onNext: { projects: [Project] in
        // In the main queue: use projects here
    })

All observables of fetched values can eventually be reduced to the general dbQueue.changeTokens(in:).mapFetch(), which makes it more clear how various dispatch queues are involved:

dbQueue.rx
    .changeTokens(in: [Project.all()])
    .mapFetch { db -> [Project] in
        // In the database "protected dispatch queue":
        // Do your mapping work. For example:
        let projects = try Project.fetchAll(db)
        return projects
    }
    .subscribe(onNext: { projects: [Project] in
        // In the main queue: use projects here
    })

By using observables of fetched values, you enjoy observables that easily emit in the dispatch queue you need. RxGRDB does all the hard scheduling job for you.

To be clear, you just can not reproduce the same behavior with the changes method:

// Almost the same, but not quite:
dbQueue.rx
    .changes(in: [Project.all()])
    .map { db -> [Project] in
        // In the database "protected dispatch queue":
        // Do your mapping work. For example:
        let projects = try Project.fetchAll(db)
        return projects
    }
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { projects: [Project] in
        // In the main queue: use projects here
    })

There are two differences:

  1. With changes.map, the first element is emitted asynchronously on subscription. Fetching observables emit their first element synchronously on subscription.
  2. changes.map prevents RxGRDB from performing efficient scheduling when one uses a database pool (you can ignore this one since you are using a database queue).

The first point can be very important for many applications. For example, apps often want to synchronously update a view when it is shown, not after.

I hope I have convinced you that observables of fetched values are a neat tool. If you still think that you neeed to use changes, then please tell me more about what you want to achieve, so that I can adjust my advice.

For a finishing note, I must talk briefly of PR #18 which will ship shortly. We've seen above that fetching observables of RxGRDB emit their fetched values on the main queue by default. This won't change. What will change is the way one builds fetching observables that emit their fetched values on a different queue.

@groue
Copy link
Collaborator

groue commented Jan 16, 2018

A last finishing note. I believe you'd be happy with:

// Only track projects
Project.all().rx
    .fetchAll(in: dbQueue)
    .observeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .map { projects in
        // process projects off the main thread
        ...
    }

// Most feneral form
dbQueue.rx
    .changeTokens(in: [Project.all()]
    .mapFetch { db in return <fetched results> }
    .observeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .map { fetchedResults in
        // process fetched results off the main thread
        ...
    }

If not, I need more information.

@groue
Copy link
Collaborator

groue commented Jan 18, 2018

@mayurdhaka-suryasoftware

RxGRDB v0.8.0 is out (release notes). It has much more robust way to schedule database notifications.

And it especially ships with a brand new Scheduling documentation chapter. It is the definitive guide about RxGRDB scheduling.

I close this issue, which mostly contains outdated information now. Please open a new issue if you have any more question!

@groue groue closed this as completed Jan 18, 2018
@mayurdzk
Copy link

mayurdzk commented Jan 19, 2018

Hey @groue once again, thanks a bunch for the replies!

Bugs? OK, I read carefully:

Sorry, I should've been more clear that the bugs were in my code, due to an unclear understanding of how my code was working--not with RxGRDB. 😃

As for the dbQueue queue switching issue, I've been reading through some of the documentation lately and I found this piece right in the documentation of dbQueue:

The inDatabase and inTransaction methods are synchronous, and block the current thread until your database statements are executed in a protected dispatch queue.

As it says, the current thread is blocked, and the closure is executed. Since I now understand that the observable fires the instant my inTransaction method is called, it follows that I will be on the same queue as where I initiated the write on. I understand this clearly now.

I suppose the confusing bit for me was the usage of dbQueue where I was confusing it with a regular DispatchQueue that I have access to and simply writing to the database, no matter what queue I write from, will never have any effect on the observables.

As for my issue of getting the observable to execute the map/mapFetch bit off the main queue, I found a way to perform those changes using the subscribe(on:) operator. This way, all the mapping of the observable, into a new type (from Data to ViewData, say) happens off the main thread I use and the observation of the final ViewData objects happens on the main thread--which is exactly what I needed. 🎉

Also, we have switched over to using dbPools, due to our discussions and the concurrent reads it gives us is a huge gain in snappiness of the app.

Thanks a lot for taking the time to answer these questions. Can't wait to read the new documentation.
Cheers! 😃🎉

@groue
Copy link
Collaborator

groue commented Jan 19, 2018

Glad to hear that! Database Pool is the jewel of GRDB.

it requires somewhat more database skills than database queues. I especially want to stress out that most database pool writes should happen through writeInTransaction. When one use plain dbPool.write { ...}, one can inadvertently expose inconsistent database states to readers. The classical example is:

// SAFE CONCURRENCY
try dbPool.writeInTransaction { db in
    try Credit(destinationAccout, amount).insert(db)
    try Debit(sourceAccount, amount).insert(db)
    return .commit
}

// UNSAFE CONCURRENCY
try dbPool.write { db in
    try Credit(destinationAccout, amount).insert(db)
    try Debit(sourceAccount, amount).insert(db)
}

More info & details at the GRDB Concurrency Guide. And happy RxSwift/GRDB/RxGRDB combo!

@mayurdzk
Copy link

@groue Yep, we're using writeInTransaction everywhere. Although I'm curious to know GRDB supports write at all given that a) It might be unsafe and b) writeInTransaction exists?

@groue
Copy link
Collaborator

groue commented Jan 19, 2018

I sometimes ask myself the very same question 😅 And I thus guess you're right. write has to turn into unsafeWrite, and join the unsafe methods of DatabaseWriter.

groue added a commit to groue/GRDB.swift that referenced this issue Jan 20, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants