1use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsval::UndefinedValue;
11use js::realm::CurrentRealm;
12use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
13use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto_and_cx};
14
15use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
16use crate::dom::bindings::error::{Error, ErrorToJsval};
17use crate::dom::bindings::reflector::DomGlobal;
18use crate::dom::bindings::root::{DomRoot, MutNullableDom};
19use crate::dom::globalscope::GlobalScope;
20use crate::dom::promise::Promise;
21use crate::dom::stream::writablestream::WritableStream;
22
23#[dom_struct]
25pub struct WritableStreamDefaultWriter {
26 reflector_: Reflector,
27
28 #[conditional_malloc_size_of]
29 ready_promise: RefCell<Rc<Promise>>,
30
31 #[conditional_malloc_size_of]
33 closed_promise: RefCell<Rc<Promise>>,
34
35 stream: MutNullableDom<WritableStream>,
37}
38
39impl WritableStreamDefaultWriter {
40 fn new_inherited(cx: &mut CurrentRealm) -> WritableStreamDefaultWriter {
43 WritableStreamDefaultWriter {
44 reflector_: Reflector::new(),
45 stream: Default::default(),
46 closed_promise: RefCell::new(Promise::new_in_realm(cx)),
47 ready_promise: RefCell::new(Promise::new_in_realm(cx)),
48 }
49 }
50
51 pub(crate) fn new(
52 cx: &mut CurrentRealm,
53 global: &GlobalScope,
54 proto: Option<SafeHandleObject>,
55 ) -> DomRoot<WritableStreamDefaultWriter> {
56 reflect_dom_object_with_proto_and_cx(
57 Box::new(WritableStreamDefaultWriter::new_inherited(cx)),
58 global,
59 proto,
60 cx,
61 )
62 }
63
64 pub(crate) fn setup(&self, cx: &mut JSContext, stream: &WritableStream) -> Result<(), Error> {
67 if stream.is_locked() {
69 return Err(Error::Type(c"Stream is locked".to_owned()));
70 }
71
72 self.stream.set(Some(stream));
74
75 stream.set_writer(Some(self));
77
78 if stream.is_writable() {
82 if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
85 } else {
88 self.ready_promise.borrow().resolve_native(cx, &());
91 }
92
93 return Ok(());
96 }
97
98 if stream.is_erroring() {
100 rooted!(&in(cx) let mut error = UndefinedValue());
101 stream.get_stored_error(error.handle_mut());
102
103 let ready_promise = self.ready_promise.borrow();
107 ready_promise.reject_native(cx, &error.handle());
108 ready_promise.set_promise_is_handled(cx);
109
110 return Ok(());
113 }
114
115 if stream.is_closed() {
117 self.ready_promise.borrow().resolve_native(cx, &());
120
121 self.closed_promise.borrow().resolve_native(cx, &());
124 return Ok(());
125 }
126
127 assert!(stream.is_errored());
130
131 rooted!(&in(cx) let mut error = UndefinedValue());
133 stream.get_stored_error(error.handle_mut());
134
135 let ready_promise = self.ready_promise.borrow();
139 ready_promise.reject_native(cx, &error.handle());
140 ready_promise.set_promise_is_handled(cx);
141
142 let ready_promise = self.closed_promise.borrow();
146 ready_promise.reject_native(cx, &error.handle());
147 ready_promise.set_promise_is_handled(cx);
148
149 Ok(())
150 }
151
152 pub(crate) fn reject_closed_promise_with_stored_error(
153 &self,
154 cx: &mut JSContext,
155 error: &SafeHandleValue,
156 ) {
157 self.closed_promise.borrow().reject_native(cx, error);
158 }
159
160 pub(crate) fn set_close_promise_is_handled(&self, cx: &mut JSContext) {
161 self.closed_promise.borrow().set_promise_is_handled(cx);
162 }
163
164 pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
165 *self.ready_promise.borrow_mut() = promise;
166 }
167
168 pub(crate) fn resolve_ready_promise_with_undefined(&self, cx: &mut JSContext) {
169 self.ready_promise.borrow().resolve_native(cx, &());
170 }
171
172 pub(crate) fn resolve_closed_promise_with_undefined(&self, cx: &mut JSContext) {
173 self.closed_promise.borrow().resolve_native(cx, &());
174 }
175
176 pub(crate) fn ensure_ready_promise_rejected(
178 &self,
179 cx: &mut JSContext,
180 global: &GlobalScope,
181 error: SafeHandleValue,
182 ) {
183 let ready_promise = self.ready_promise.borrow().clone();
184
185 if ready_promise.is_pending() {
187 ready_promise.reject_native(cx, &error);
189
190 ready_promise.set_promise_is_handled(cx);
192 } else {
193 let promise = Promise::new_rejected(cx, global, error);
195
196 promise.set_promise_is_handled(cx);
198 *self.ready_promise.borrow_mut() = promise;
199 }
200 }
201
202 fn ensure_closed_promise_rejected(
204 &self,
205 cx: &mut JSContext,
206 global: &GlobalScope,
207 error: SafeHandleValue,
208 ) {
209 let closed_promise = self.closed_promise.borrow().clone();
210
211 if closed_promise.is_pending() {
213 closed_promise.reject_native(cx, &error);
215
216 closed_promise.set_promise_is_handled(cx);
218 } else {
219 let promise = Promise::new_rejected(cx, global, error);
221
222 promise.set_promise_is_handled(cx);
224 *self.closed_promise.borrow_mut() = promise;
225 }
226 }
227
228 fn abort(
230 &self,
231 cx: &mut CurrentRealm,
232 global: &GlobalScope,
233 reason: SafeHandleValue,
234 ) -> Rc<Promise> {
235 let Some(stream) = self.stream.get() else {
237 unreachable!("Stream should be set.");
239 };
240
241 stream.abort(cx, global, reason)
243 }
244
245 fn close(&self, cx: &mut JSContext, global: &GlobalScope) -> Rc<Promise> {
247 let Some(stream) = self.stream.get() else {
249 unreachable!("Stream should be set.");
251 };
252
253 stream.close(cx, global)
255 }
256
257 pub(crate) fn write(
259 &self,
260 cx: &mut JSContext,
261 global: &GlobalScope,
262 chunk: SafeHandleValue,
263 ) -> Rc<Promise> {
264 let Some(stream) = self.stream.get() else {
266 unreachable!("Stream should be set.");
268 };
269
270 let Some(controller) = stream.get_controller() else {
273 unreachable!("Controller should be set.");
274 };
275
276 let chunk_size = controller.get_chunk_size(cx, global, chunk);
278
279 if !self
282 .stream
283 .get()
284 .is_some_and(|current_stream| current_stream == stream)
285 {
286 let promise = Promise::new(cx, global);
287 promise.reject_error(
288 cx,
289 Error::Type(c"Stream is not equal to writer stream".to_owned()),
290 );
291 return promise;
292 }
293
294 if stream.is_errored() {
297 rooted!(&in(cx) let mut error = UndefinedValue());
299 stream.get_stored_error(error.handle_mut());
300 let promise = Promise::new(cx, global);
301 promise.reject_native(cx, &error.handle());
302 return promise;
303 }
304
305 if stream.close_queued_or_in_flight() || stream.is_closed() {
308 let promise = Promise::new(cx, global);
311 promise.reject_error(
312 cx,
313 Error::Type(c"Stream has been closed, or has close queued or in-flight".to_owned()),
314 );
315 return promise;
316 }
317
318 if stream.is_erroring() {
320 rooted!(&in(cx) let mut error = UndefinedValue());
322 stream.get_stored_error(error.handle_mut());
323 let promise = Promise::new(cx, global);
324 promise.reject_native(cx, &error.handle());
325 return promise;
326 }
327
328 assert!(stream.is_writable());
330
331 let promise = stream.add_write_request(cx, global);
333
334 controller.write(cx, global, chunk, chunk_size);
336
337 promise
339 }
340
341 pub(crate) fn release(&self, cx: &mut JSContext, global: &GlobalScope) {
343 let Some(stream) = self.stream.get() else {
345 unreachable!("Stream should be set.");
347 };
348
349 assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
351
352 let released_error = Error::Type(c"Writer has been released".to_owned());
354
355 rooted!(&in(cx) let mut error = UndefinedValue());
357 released_error.to_jsval(cx, global, error.handle_mut());
358
359 self.ensure_ready_promise_rejected(cx, global, error.handle());
361
362 self.ensure_closed_promise_rejected(cx, global, error.handle());
364
365 stream.set_writer(None);
367
368 self.stream.set(None);
370 }
371
372 pub(crate) fn close_with_error_propagation(
374 &self,
375 cx: &mut JSContext,
376 global: &GlobalScope,
377 ) -> Rc<Promise> {
378 let Some(stream) = self.stream.get() else {
380 unreachable!("Stream should be set.");
382 };
383
384 if stream.close_queued_or_in_flight() || stream.is_closed() {
390 let promise = Promise::new(cx, global);
392 promise.resolve_native(cx, &());
393 return promise;
394 }
395
396 if stream.is_errored() {
398 rooted!(&in(cx) let mut error = UndefinedValue());
400 stream.get_stored_error(error.handle_mut());
401 let promise = Promise::new(cx, global);
402 promise.reject_native(cx, &error.handle());
403 return promise;
404 }
405
406 assert!(stream.is_writable() || stream.is_erroring());
408
409 self.close(cx, global)
411 }
412
413 pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
414 self.stream.get()
415 }
416}
417
418impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
419 fn Closed(&self) -> Rc<Promise> {
421 return self.closed_promise.borrow().clone();
423 }
424
425 fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
427 let Some(stream) = self.stream.get() else {
429 return Err(Error::Type(c"Stream is undefined".to_owned()));
430 };
431
432 Ok(stream.get_desired_size())
434 }
435
436 fn Ready(&self) -> Rc<Promise> {
438 return self.ready_promise.borrow().clone();
440 }
441
442 fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
444 let global = GlobalScope::from_current_realm(cx);
445
446 if self.stream.get().is_none() {
448 let promise = Promise::new(cx, &global);
450 promise.reject_error(cx, Error::Type(c"Stream is undefined".to_owned()));
451 return promise;
452 }
453
454 self.abort(cx, &global, reason)
456 }
457
458 fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
460 let global = GlobalScope::from_current_realm(cx);
461 let promise = Promise::new(cx, &global);
462
463 let Some(stream) = self.stream.get() else {
465 promise.reject_error(cx, Error::Type(c"Stream is undefined".to_owned()));
468 return promise;
469 };
470
471 if stream.close_queued_or_in_flight() {
473 promise.reject_error(
475 cx,
476 Error::Type(c"Stream has closed queued or in-flight".to_owned()),
477 );
478 return promise;
479 }
480
481 self.close(cx, &global)
482 }
483
484 fn ReleaseLock(&self, cx: &mut JSContext) {
486 let Some(stream) = self.stream.get() else {
488 return;
490 };
491
492 assert!(stream.get_writer().is_some());
494
495 let global = self.global();
496
497 self.release(cx, &global);
499 }
500
501 fn Write(&self, cx: &mut CurrentRealm, chunk: SafeHandleValue) -> Rc<Promise> {
503 let global = GlobalScope::from_current_realm(cx);
504
505 if self.stream.get().is_none() {
507 let promise = Promise::new(cx, &global);
509 promise.reject_error(cx, Error::Type(c"Stream is undefined".to_owned()));
510 return promise;
511 }
512
513 self.write(cx, &global, chunk)
515 }
516
517 fn Constructor(
519 cx: &mut CurrentRealm,
520 global: &GlobalScope,
521 proto: Option<SafeHandleObject>,
522 stream: &WritableStream,
523 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
524 let writer = WritableStreamDefaultWriter::new(cx, global, proto);
525
526 writer.setup(cx, stream)?;
528
529 Ok(writer)
530 }
531}