13
13
//===----------------------------------------------------------------------===//
14
14
15
15
import Logging
16
+ import NIOConcurrencyHelpers
16
17
import NIOCore
17
18
import NIOEmbedded
18
19
import NIOHTTP1
@@ -833,10 +834,11 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
833
834
)
834
835
try channel. connect ( to: . init( ipAddress: " 127.0.0.1 " , port: 80 ) ) . wait ( )
835
836
836
- let request = MockHTTPExecutableRequest ( )
837
837
// non empty body is important to trigger this bug as we otherwise finish the request in a single flush
838
- request. requestFramingMetadata. body = . fixedSize( 1 )
839
- request. raiseErrorIfUnimplementedMethodIsCalled = false
838
+ let request = MockHTTPExecutableRequest (
839
+ framingMetadata: RequestFramingMetadata ( connectionClose: false , body: . fixedSize( 1 ) ) ,
840
+ raiseErrorIfUnimplementedMethodIsCalled: false
841
+ )
840
842
channel. writeAndFlush ( request, promise: nil )
841
843
XCTAssertEqual ( request. events. map ( \. kind) , [ . willExecuteRequest, . requestHeadSent] )
842
844
}
@@ -897,34 +899,43 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
897
899
}
898
900
}
899
901
900
- class TestBackpressureWriter {
902
+ final class TestBackpressureWriter : Sendable {
901
903
let eventLoop : EventLoop
902
904
903
905
let parts : Int
904
906
905
907
var finishFuture : EventLoopFuture < Void > { self . finishPromise. futureResult }
906
908
private let finishPromise : EventLoopPromise < Void >
907
- private( set) var written : Int = 0
908
909
909
- private var channelIsWritable : Bool = false
910
+ private struct State {
911
+ var written = 0
912
+ var channelIsWritable = false
913
+ }
914
+
915
+ var written : Int {
916
+ self . state. value. written
917
+ }
918
+
919
+ private let state : NIOLoopBoundBox < State >
910
920
911
921
init ( eventLoop: EventLoop , parts: Int ) {
912
922
self . eventLoop = eventLoop
913
923
self . parts = parts
914
-
924
+ self . state = . makeBoxSendingValue ( State ( ) , eventLoop : eventLoop )
915
925
self . finishPromise = eventLoop. makePromise ( of: Void . self)
916
926
}
917
927
918
928
func start( writer: HTTPClient . Body . StreamWriter , expectedErrors: [ HTTPClientError ] = [ ] ) -> EventLoopFuture < Void > {
929
+ @Sendable
919
930
func recursive( ) {
920
931
XCTAssert ( self . eventLoop. inEventLoop)
921
- XCTAssert ( self . channelIsWritable)
922
- if self . written == self . parts {
932
+ XCTAssert ( self . state . value . channelIsWritable)
933
+ if self . state . value . written == self . parts {
923
934
self . finishPromise. succeed ( ( ) )
924
935
} else {
925
936
self . eventLoop. execute {
926
937
let future = writer. write ( . byteBuffer( . init( bytes: [ 0 , 1 ] ) ) )
927
- self . written += 1
938
+ self . state . value . written += 1
928
939
future. whenComplete { result in
929
940
switch result {
930
941
case . success:
@@ -951,36 +962,35 @@ class TestBackpressureWriter {
951
962
}
952
963
953
964
func writabilityChanged( _ newValue: Bool ) {
954
- self . channelIsWritable = newValue
965
+ self . state . value . channelIsWritable = newValue
955
966
}
956
967
}
957
968
958
- class ResponseBackpressureDelegate : HTTPClientResponseDelegate {
969
+ final class ResponseBackpressureDelegate : HTTPClientResponseDelegate {
959
970
typealias Response = Void
960
971
961
- enum State {
972
+ enum State : Sendable {
962
973
case consuming( EventLoopPromise < Void > )
963
974
case waitingForRemote( CircularBuffer < EventLoopPromise < ByteBuffer ? > > )
964
975
case buffering( ( ByteBuffer ? , EventLoopPromise < Void > ) ? )
965
976
case done
966
977
}
967
978
968
979
let eventLoop : EventLoop
969
- private var state : State = . buffering ( nil )
980
+ private let state : NIOLoopBoundBox < State >
970
981
971
982
init ( eventLoop: EventLoop ) {
972
983
self . eventLoop = eventLoop
973
-
974
- self . state = . consuming( self . eventLoop. makePromise ( of: Void . self) )
984
+ self . state = . makeBoxSendingValue( . consuming( eventLoop. makePromise ( of: Void . self) ) , eventLoop: eventLoop)
975
985
}
976
986
977
987
func next( ) -> EventLoopFuture < ByteBuffer ? > {
978
- switch self . state {
988
+ switch self . state. value {
979
989
case . consuming( let backpressurePromise) :
980
990
var promiseBuffer = CircularBuffer < EventLoopPromise < ByteBuffer ? > > ( )
981
991
let newPromise = self . eventLoop. makePromise ( of: ByteBuffer ? . self)
982
992
promiseBuffer. append ( newPromise)
983
- self . state = . waitingForRemote( promiseBuffer)
993
+ self . state. value = . waitingForRemote( promiseBuffer)
984
994
backpressurePromise. succeed ( ( ) )
985
995
return newPromise. futureResult
986
996
@@ -991,18 +1001,18 @@ class ResponseBackpressureDelegate: HTTPClientResponseDelegate {
991
1001
)
992
1002
let promise = self . eventLoop. makePromise ( of: ByteBuffer ? . self)
993
1003
promiseBuffer. append ( promise)
994
- self . state = . waitingForRemote( promiseBuffer)
1004
+ self . state. value = . waitingForRemote( promiseBuffer)
995
1005
return promise. futureResult
996
1006
997
1007
case . buffering( . none) :
998
1008
var promiseBuffer = CircularBuffer < EventLoopPromise < ByteBuffer ? > > ( )
999
1009
let promise = self . eventLoop. makePromise ( of: ByteBuffer ? . self)
1000
1010
promiseBuffer. append ( promise)
1001
- self . state = . waitingForRemote( promiseBuffer)
1011
+ self . state. value = . waitingForRemote( promiseBuffer)
1002
1012
return promise. futureResult
1003
1013
1004
1014
case . buffering( . some( ( let buffer, let promise) ) ) :
1005
- self . state = . buffering( nil )
1015
+ self . state. value = . buffering( nil )
1006
1016
promise. succeed ( ( ) )
1007
1017
return self . eventLoop. makeSucceededFuture ( buffer)
1008
1018
@@ -1012,7 +1022,7 @@ class ResponseBackpressureDelegate: HTTPClientResponseDelegate {
1012
1022
}
1013
1023
1014
1024
func didReceiveHead( task: HTTPClient . Task < Void > , _ head: HTTPResponseHead ) -> EventLoopFuture < Void > {
1015
- switch self . state {
1025
+ switch self . state. value {
1016
1026
case . consuming( let backpressurePromise) :
1017
1027
return backpressurePromise. futureResult
1018
1028
@@ -1025,7 +1035,7 @@ class ResponseBackpressureDelegate: HTTPClientResponseDelegate {
1025
1035
}
1026
1036
1027
1037
func didReceiveBodyPart( task: HTTPClient . Task < Void > , _ buffer: ByteBuffer ) -> EventLoopFuture < Void > {
1028
- switch self . state {
1038
+ switch self . state. value {
1029
1039
case . waitingForRemote( var promiseBuffer) :
1030
1040
assert (
1031
1041
!promiseBuffer. isEmpty,
@@ -1034,18 +1044,18 @@ class ResponseBackpressureDelegate: HTTPClientResponseDelegate {
1034
1044
let promise = promiseBuffer. removeFirst ( )
1035
1045
if promiseBuffer. isEmpty {
1036
1046
let newBackpressurePromise = self . eventLoop. makePromise ( of: Void . self)
1037
- self . state = . consuming( newBackpressurePromise)
1047
+ self . state. value = . consuming( newBackpressurePromise)
1038
1048
promise. succeed ( buffer)
1039
1049
return newBackpressurePromise. futureResult
1040
1050
} else {
1041
- self . state = . waitingForRemote( promiseBuffer)
1051
+ self . state. value = . waitingForRemote( promiseBuffer)
1042
1052
promise. succeed ( buffer)
1043
1053
return self . eventLoop. makeSucceededVoidFuture ( )
1044
1054
}
1045
1055
1046
1056
case . buffering( . none) :
1047
1057
let promise = self . eventLoop. makePromise ( of: Void . self)
1048
- self . state = . buffering( ( buffer, promise) )
1058
+ self . state. value = . buffering( ( buffer, promise) )
1049
1059
return promise. futureResult
1050
1060
1051
1061
case . buffering( . some) :
@@ -1059,15 +1069,15 @@ class ResponseBackpressureDelegate: HTTPClientResponseDelegate {
1059
1069
}
1060
1070
1061
1071
func didFinishRequest( task: HTTPClient . Task < Void > ) throws {
1062
- switch self . state {
1072
+ switch self . state. value {
1063
1073
case . waitingForRemote( let promiseBuffer) :
1064
1074
for promise in promiseBuffer {
1065
1075
promise. succeed ( . none)
1066
1076
}
1067
- self . state = . done
1077
+ self . state. value = . done
1068
1078
1069
1079
case . buffering( . none) :
1070
- self . state = . done
1080
+ self . state. value = . done
1071
1081
1072
1082
case . done, . consuming:
1073
1083
preconditionFailure ( " Invalid state: \( self . state) " )
@@ -1093,7 +1103,7 @@ class ReadEventHitHandler: ChannelOutboundHandler {
1093
1103
}
1094
1104
}
1095
1105
1096
- final class FailEndHandler : ChannelOutboundHandler {
1106
+ final class FailEndHandler : ChannelOutboundHandler , Sendable {
1097
1107
typealias OutboundIn = HTTPClientRequestPart
1098
1108
typealias OutboundOut = HTTPClientRequestPart
1099
1109
0 commit comments