@@ -49,29 +49,64 @@ kj::Promise<void> ContainerStreamSharedState::waitForMessage() {
49
49
return kj::mv (paf.promise );
50
50
}
51
51
52
+ void ContainerAsyncStream::shutdownWrite () {
53
+ KJ_DBG (" SHUTDOWN_WRITE" );
54
+ service->shutdown_write ();
55
+ }
56
+
52
57
kj::Promise<size_t > ContainerAsyncStream::tryRead (void * buffer, size_t minBytes, size_t maxBytes) {
53
58
KJ_DBG (" TRY_READ" );
54
59
KJ_IF_SOME (consumed, sharedState->tryRead (buffer, minBytes, maxBytes)) {
55
- return consumed;
60
+ co_return consumed;
56
61
}
57
62
58
63
if (minBytes == 0 ) {
59
- return minBytes;
64
+ co_return minBytes;
60
65
}
61
66
62
- return sharedState->waitForMessage ().then (
63
- [this , buffer, minBytes, maxBytes]() -> kj::Promise<size_t > {
64
- return tryRead (buffer, minBytes, maxBytes);
65
- });
67
+ co_await sharedState->waitForMessage ();
68
+ co_return co_await tryRead (buffer, minBytes, maxBytes);
69
+ }
70
+
71
+ kj::Promise<void > ContainerAsyncStream::write (kj::ArrayPtr<const kj::byte> buffer) {
72
+ KJ_DBG (" WRITE" );
73
+ if (service->write_data (buffer.as <Rust>())) {
74
+ return kj::READY_NOW;
75
+ } else {
76
+ KJ_DBG (" WRITE FAILED" );
77
+ return KJ_EXCEPTION (DISCONNECTED, " Write failed: stream is disconnected" );
78
+ }
79
+ }
80
+
81
+ kj::Promise<void > ContainerAsyncStream::write (
82
+ kj::ArrayPtr<const kj::ArrayPtr<const kj::byte>> pieces) {
83
+ KJ_DBG (" WRITE_ALL" );
84
+ for (auto piece: pieces) {
85
+ if (!service->write_data (piece.as <Rust>())) {
86
+ KJ_DBG (" WRITE_ALL FAILED" );
87
+ return KJ_EXCEPTION (DISCONNECTED, " Write failed: stream is disconnected" );
88
+ }
89
+ }
90
+ KJ_DBG (" WRITE_ALL FINISHED" );
91
+ return kj::READY_NOW;
92
+ }
93
+
94
+ kj::Promise<void > ContainerAsyncStream::whenWriteDisconnected () {
95
+ // TODO(now): this is wrong, the returned promise should fulfill when the write end disconnects.
96
+ // as written this will only return a fulfilled promise if the write end already disconnected.
97
+ if (service->is_write_disconnected ()) {
98
+ return kj::READY_NOW;
99
+ }
100
+ return kj::NEVER_DONE;
66
101
}
67
102
68
103
kj::Own<ContainerAsyncStream> createContainerRpcStream (
69
104
kj::StringPtr address, kj::StringPtr containerName) {
70
- auto sharedState = kj::heap<ContainerStreamSharedState>();
71
- ContainerStreamSharedState* ptr = sharedState.get ();
105
+ auto sharedState = kj::rc<ContainerStreamSharedState>();
72
106
73
- rust::container::MessageCallback callback = [ptr](::rust::Slice<const uint8_t > message) {
74
- ptr->enqueueMessage (message);
107
+ rust::container::MessageCallback callback = [sharedState = sharedState.addRef ()](
108
+ ::rust::Slice<const uint8_t > message) mutable {
109
+ sharedState->enqueueMessage (message);
75
110
};
76
111
77
112
auto service = rust::container::new_service (address.cStr (), containerName.cStr (), callback);
0 commit comments