Skip to content

Commit ab1fc3c

Browse files
authored
AsyncStreamConsumer should stream DataRows. (#316)
1 parent f636f59 commit ab1fc3c

File tree

1 file changed

+22
-19
lines changed

1 file changed

+22
-19
lines changed

Sources/PostgresNIO/New/PostgresRowSequence.swift

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,24 @@ extension PostgresRowSequence {
4343

4444
let _internal: _Internal
4545

46+
let lookupTable: [String: Int]
47+
let columns: [RowDescription.Column]
48+
4649
init(consumer: AsyncStreamConsumer) {
4750
self._internal = _Internal(consumer: consumer)
51+
self.lookupTable = consumer.lookupTable
52+
self.columns = consumer.columns
4853
}
4954

5055
public mutating func next() async throws -> PostgresRow? {
51-
try await self._internal.next()
56+
if let dataRow = try await self._internal.next() {
57+
return PostgresRow(
58+
data: dataRow,
59+
lookupTable: self.lookupTable,
60+
columns: columns
61+
)
62+
}
63+
return nil
5264
}
5365

5466
final class _Internal {
@@ -62,7 +74,7 @@ extension PostgresRowSequence {
6274
self.consumer.iteratorDeinitialized()
6375
}
6476

65-
func next() async throws -> PostgresRow? {
77+
func next() async throws -> DataRow? {
6678
try await self.consumer.next()
6779
}
6880
}
@@ -111,12 +123,7 @@ final class AsyncStreamConsumer {
111123

112124
switch receiveAction {
113125
case .succeed(let continuation, let data, signalDemandTo: let source):
114-
let row = PostgresRow(
115-
data: data,
116-
lookupTable: self.lookupTable,
117-
columns: self.columns
118-
)
119-
continuation.resume(returning: row)
126+
continuation.resume(returning: data)
120127
source?.demand()
121128

122129
case .none:
@@ -175,7 +182,7 @@ final class AsyncStreamConsumer {
175182
}
176183
}
177184

178-
func next() async throws -> PostgresRow? {
185+
func next() async throws -> DataRow? {
179186
self.lock.lock()
180187
switch self.state.next() {
181188
case .returnNil:
@@ -185,11 +192,7 @@ final class AsyncStreamConsumer {
185192
case .returnRow(let data, signalDemandTo: let source):
186193
self.lock.unlock()
187194
source?.demand()
188-
return PostgresRow(
189-
data: data,
190-
lookupTable: self.lookupTable,
191-
columns: self.columns
192-
)
195+
return data
193196

194197
case .throwError(let error):
195198
self.lock.unlock()
@@ -216,7 +219,7 @@ extension AsyncStreamConsumer {
216219
private enum UpstreamState {
217220
enum DemandState {
218221
case canAskForMore
219-
case waitingForMore(CheckedContinuation<PostgresRow?, Error>?)
222+
case waitingForMore(CheckedContinuation<DataRow?, Error>?)
220223
}
221224

222225
case initialized
@@ -395,7 +398,7 @@ extension AsyncStreamConsumer {
395398
case none
396399
}
397400

398-
mutating func next(for continuation: CheckedContinuation<PostgresRow?, Error>) -> NextSlowPathAction {
401+
mutating func next(for continuation: CheckedContinuation<DataRow?, Error>) -> NextSlowPathAction {
399402
switch self.upstreamState {
400403
case .initialized:
401404
preconditionFailure()
@@ -422,7 +425,7 @@ extension AsyncStreamConsumer {
422425
}
423426

424427
enum ReceiveAction {
425-
case succeed(CheckedContinuation<PostgresRow?, Error>, DataRow, signalDemandTo: PSQLRowStream?)
428+
case succeed(CheckedContinuation<DataRow?, Error>, DataRow, signalDemandTo: PSQLRowStream?)
426429
case none
427430
}
428431

@@ -462,8 +465,8 @@ extension AsyncStreamConsumer {
462465
}
463466

464467
enum CompletionResult {
465-
case succeed(CheckedContinuation<PostgresRow?, Error>)
466-
case fail(CheckedContinuation<PostgresRow?, Error>, Error)
468+
case succeed(CheckedContinuation<DataRow?, Error>)
469+
case fail(CheckedContinuation<DataRow?, Error>, Error)
467470
case none
468471
}
469472

0 commit comments

Comments
 (0)