Skip to content
This repository was archived by the owner on Aug 25, 2023. It is now read-only.
Open
2 changes: 1 addition & 1 deletion BlockV/Core/Data Pool/DataPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public final class DataPool {
// unable to load from disk
printBV(error: "[DataPool] Unable to load region state from disk. " + err.localizedDescription)

}.then { _ -> Guarantee<Void> in
}.then { _ -> Guarantee<[Any]> in

// start sync'ing region data with the server
return region.synchronize()
Expand Down
2 changes: 1 addition & 1 deletion BlockV/Core/Data Pool/Helpers/Vatom+Containment.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ extension VatomModel {
if inventoryVatom == nil {
// inspect the child region (owner & unowned)
return DataPool.children(parentID: self.id)
.getAllStable()
.synchronize()
.map { $0 as! [VatomModel] } // swiftlint:disable:this force_cast

} else {
Expand Down
3 changes: 3 additions & 0 deletions BlockV/Core/Data Pool/Regions/Region+Notifications.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import Foundation
/// Possible events
public enum RegionEvent: String {

/// TRiggered when a cached version of the data is available.
case loadedFromCache = "region.loadedFromCache"

/// Triggered when any data in the region changes. This also indicates that there is no longer an error.
case updated = "region.updated"

Expand Down
136 changes: 102 additions & 34 deletions BlockV/Core/Data Pool/Regions/Region.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,36 @@ import PromiseKit
/// - Persistance.
public class Region {

// MARK: - Enums

enum RegionError: Error {
case failedParsingResponse
case failedParsingObject
}

/// Serial io queue.
// MARK: - Dispatch Queues

/// Serial i/o queue.
///
/// Each subclass with have it's own io queue (the label does not dictate uniqueness).
let ioQueue = DispatchQueue(label: "io.blockv.sdk.datapool-io", qos: .utility)
let ioQueue = DispatchQueue(label: "io.blockv.sdk.datapool-io",
qos: .userInitiated)

/// Concurrent worker queue.
///
/// Use to process mappings and transfromations.
let workerQueue = DispatchQueue(label: "io.blockv.sdk.datapool-worker",
qos: .userInteractive,
attributes: .concurrent)

/// Concurrent synchronization.
///
/// Used exclusively for object synchronization queue (serial write, concurrent read).
let syncQueue = DispatchQueue(label: "io.blockv.sdk.object-sync",
qos: .userInitiated,
attributes: .concurrent)

// MARK: - Initialization

/// Constructor
required init(descriptor: Any) throws { }
Expand All @@ -63,13 +84,24 @@ public class Region {

/// `true` if this region contains temporary objects which should not be cached to disk, `false` otherwise.
let noCache = false

private var _objects: [String: DataObject] = [:]

/// All objects currently in our cache.
private(set) var objects: [String: DataObject] = [:]
private(set) var objects: [String: DataObject] {
get { // sync read (concurrent queue)
return syncQueue.sync { _objects }
}
set {
syncQueue.sync(flags: .barrier) { _objects = newValue }
}
}

/// `true` if data in this region is in sync with the backend.
public internal(set) var synchronized = false {
didSet {
dispatchPrecondition(condition: DispatchPredicate.onQueue(.main))

if synchronized {
self.emit(.stabalized, userInfo: [:])
} else {
Expand All @@ -91,19 +123,30 @@ public class Region {
public fileprivate(set) var closed = false

/// Re-synchronizes the region by manually fetching objects from the server again.
public func forceSynchronize() -> Guarantee<Void> {
public func forceSynchronize() -> Guarantee<[Any]> {

dispatchPrecondition(condition: DispatchPredicate.onQueue(.main))

self.synchronized = false
return self.synchronize()
}

public var isSynchronizing: Bool {
return !(_syncPromise == nil)
}

/// Currently executing synchronization promise. `nil` if there is no synchronization underway.
private var _syncPromise: Guarantee<Void>?
private var _syncPromise: Guarantee<[Any]>?

/// Attempts to stablaize the region by querying the backend for all data.
///
/// If the region is already stable, local data is returned.
///
/// - Returns: Promise which resolves when complete.
@discardableResult
public func synchronize() -> Guarantee<Void> {
public func synchronize() -> Guarantee<[Any]> {

dispatchPrecondition(condition: DispatchPredicate.onQueue(.main))

self.emit(.synchronizing, userInfo: [:])

Expand All @@ -114,18 +157,24 @@ public class Region {

// remove pending error
self.error = nil
self.emit(.updated) //FIXME: Why is this update broadcast?
// self.emit(.updated) //FIXME: This seems an odd place for an `.update` call

// stop if already in sync
if synchronized {
return Guarantee()

Guarantee { resolver in
workerQueue.async {
resolver(self.getAll())
}
}

}

// ask the subclass to load it's data
printBV(info: "[DataPool > Region] Starting synchronization for region \(self.stateKey)")

// load objects
_syncPromise = self.load().map { ids -> Void in
_syncPromise = self.load().then { ids -> Guarantee<[Any]> in

/*
The subclass is expected to call the add method as it finds object, and then, once
Expand All @@ -137,18 +186,26 @@ public class Region {
if let ids = ids {
self.diffedRemove(ids: ids)
}

// data is up to date
self.synchronized = true
self._syncPromise = nil
printBV(info: "[DataPool > Region] Region '\(self.stateKey)' is now in sync!")

return Guarantee { resolver in
self.workerQueue.async {
resolver(self.getAll())
DispatchQueue.main.async {
// data is up to date
self.synchronized = true
self._syncPromise = nil
printBV(info: "[DataPool > Region] Region '\(self.stateKey)' is now in sync!")
}
}
}

}.recover { err in
// error handling, notify listeners of an error
self._syncPromise = nil
self.error = err
printBV(error: "[DataPool > Region] Unable to load: " + err.localizedDescription)
self.emit(.error, userInfo: ["error": err])
return Guarantee.value([]) //FIXME: Surely a better approach is to return the errors from data pool?
}

// return promise
Expand All @@ -170,6 +227,8 @@ public class Region {
/// Stop and destroy this region. Subclasses can override this to do stuff on close.
public func close() {

dispatchPrecondition(condition: DispatchPredicate.onQueue(.main))

// notify data pool we have closed
DataPool.removeRegion(region: self)
// we're closed
Expand All @@ -192,6 +251,8 @@ public class Region {
///
/// - Parameter objects: The objects to add
func add(objects: [DataObject]) {

dispatchPrecondition(condition: DispatchPredicate.onQueue(.main))

// go through each object
for obj in objects {
Expand Down Expand Up @@ -223,10 +284,6 @@ public class Region {

}

// emit event
//FIXME: Why was this being broadcast?
// self.emit(.objectUpdated, userInfo: ["id": obj.id])

}

// Notify updated
Expand All @@ -243,8 +300,10 @@ public class Region {

/// Updates data objects within our pool.
///
/// - Parameter objects: The list of changes to perform to our data objects.
/// - Parameter objects: The list of changes to perform to our data objects.
func update(objects: [DataObjectUpdateRecord], source: Source? = nil) {

dispatchPrecondition(condition: DispatchPredicate.onQueue(.main))

// batch emit events, so if a object is updated multiple times, only one event is sent
var changedIDs = Set<String>()
Expand Down Expand Up @@ -314,6 +373,8 @@ public class Region {
///
/// - Parameter ids: The IDs of objects to remove
func remove(ids: [String]) {

dispatchPrecondition(condition: DispatchPredicate.onQueue(.main))

// remove all data objects with the specified IDs
var didUpdate = false
Expand Down Expand Up @@ -354,21 +415,19 @@ public class Region {
return object
}

/// Returns all the objects within this region. Waits until the region is stable first.
///
/// - Returns: Array of objects. Check the region-specific map() function to see what types are returned.
public func getAllStable() -> Guarantee<[Any]> {

// synchronize now
return self.synchronize().map {
return self.getAll()
}

}
// /// Returns all the objects within this region. Waits until the region is stable first.
// ///
// /// - Returns: Array of objects. Check the region-specific map() function to see what types are returned.
// public func getAllStable() -> Guarantee<[Any]> { //FIXME: This function is no longer needed, rather call `synchronize` directly.
//
// // synchronize now
// return self.synchronize()
//
// }

/// Returns all the objects within this region. Does NOT wait until the region is stable first.
public func getAll() -> [Any] {

// create array of all items
var items: [Any] = []
for object in objects.values {
Expand Down Expand Up @@ -399,9 +458,8 @@ public class Region {

/// Returns an object within this region by it's ID. Waits until the region is stable first.
public func getStable(id: String) -> Guarantee<Any?> {

// synchronize now
return self.synchronize().map {

return self.synchronize().map { _ in
// get item
return self.get(id: id)
}
Expand Down Expand Up @@ -448,6 +506,8 @@ public class Region {
/// Load objects from local storage.
func loadFromCache() -> Promise<Void> {

dispatchPrecondition(condition: DispatchPredicate.onQueue(.main))

return Promise { (resolver: Resolver) in

ioQueue.async {
Expand Down Expand Up @@ -493,6 +553,8 @@ public class Region {
DispatchQueue.main.async {
// add objects
self.add(objects: cleanObjects)

self.emit(.loadedFromCache)
}

// done
Expand All @@ -510,6 +572,8 @@ public class Region {

/// Saves the region to local storage.
func save() {

dispatchPrecondition(condition: DispatchPredicate.onQueue(.main))

// cancel the pending save task
if saveTask != nil {
Expand Down Expand Up @@ -567,6 +631,8 @@ public class Region {
/// - value: The new value
/// - Returns: An undo function
func preemptiveChange(id: String, keyPath: String, value: Any) -> UndoFunction {

dispatchPrecondition(condition: DispatchPredicate.onQueue(.main))

// get object. If it doesn't exist, do nothing and return an undo function which does nothing.
guard let object = objects[id], object.data != nil else {
Expand Down Expand Up @@ -608,6 +674,8 @@ public class Region {
/// - Parameter id: The object ID to remove
/// - Returns: An undo function
func preemptiveRemove(id: String) -> UndoFunction {

dispatchPrecondition(condition: DispatchPredicate.onQueue(.main))

// remove object
guard let removedObject = objects.removeValue(forKey: id) else {
Expand Down
Loading