Skip to content

Commit 23f7a8c

Browse files
committed
Bug 1639018 - Change TaskRunnable::dispatch to take owned runnables. r=froydnj
This matches how the `Dispatch(already_AddRefed<nsIRunnable>)` overloads work in C++: `Dispatch` takes ownership of the runnable, and leaks it if dispatch fails—because the thread manager is shutting down, for instance. This avoids a race where a runnable can be released on either the owning or target thread. Rust doesn't allow arbitrary `Self` types yet (see rust-lang/rust#44874), so we need to change `dispatch` and `dispatch_with_options` to be associated methods. Differential Revision: https://phabricator.services.mozilla.com/D75858
1 parent caa028e commit 23f7a8c

File tree

11 files changed

+66
-41
lines changed

11 files changed

+66
-41
lines changed

security/manager/ssl/cert_storage/src/lib.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,7 +1164,7 @@ impl CertStorage {
11641164
));
11651165
let thread = try_ns!(self.thread.lock());
11661166
let runnable = try_ns!(TaskRunnable::new("HasPriorData", task));
1167-
try_ns!(runnable.dispatch(&*thread));
1167+
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
11681168
NS_OK
11691169
}
11701170

@@ -1226,7 +1226,7 @@ impl CertStorage {
12261226
));
12271227
let thread = try_ns!(self.thread.lock());
12281228
let runnable = try_ns!(TaskRunnable::new("SetRevocations", task));
1229-
try_ns!(runnable.dispatch(&*thread));
1229+
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
12301230
NS_OK
12311231
}
12321232

@@ -1305,7 +1305,7 @@ impl CertStorage {
13051305
));
13061306
let thread = try_ns!(self.thread.lock());
13071307
let runnable = try_ns!(TaskRunnable::new("SetCRLiteState", task));
1308-
try_ns!(runnable.dispatch(&*thread));
1308+
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
13091309
NS_OK
13101310
}
13111311

@@ -1351,7 +1351,7 @@ impl CertStorage {
13511351
));
13521352
let thread = try_ns!(self.thread.lock());
13531353
let runnable = try_ns!(TaskRunnable::new("SetFullCRLiteFilter", task));
1354-
try_ns!(runnable.dispatch(&*thread));
1354+
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
13551355
NS_OK
13561356
}
13571357

@@ -1417,7 +1417,7 @@ impl CertStorage {
14171417
));
14181418
let thread = try_ns!(self.thread.lock());
14191419
let runnable = try_ns!(TaskRunnable::new("AddCerts", task));
1420-
try_ns!(runnable.dispatch(&*thread));
1420+
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
14211421
NS_OK
14221422
}
14231423

@@ -1445,7 +1445,7 @@ impl CertStorage {
14451445
));
14461446
let thread = try_ns!(self.thread.lock());
14471447
let runnable = try_ns!(TaskRunnable::new("RemoveCertsByHashes", task));
1448-
try_ns!(runnable.dispatch(&*thread));
1448+
try_ns!(TaskRunnable::dispatch(runnable, &*thread));
14491449
NS_OK
14501450
}
14511451

services/fxaccounts/rust-bridge/firefox-accounts-bridge/src/punt/task.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,11 @@ impl PuntTask {
372372
let runnable = TaskRunnable::new(self.name, Box::new(self))?;
373373
// `may_block` schedules the task on the I/O thread pool, since we
374374
// expect most operations to wait on I/O.
375-
runnable.dispatch_with_options(target, DispatchOptions::default().may_block(true))?;
375+
TaskRunnable::dispatch_with_options(
376+
runnable,
377+
target,
378+
DispatchOptions::default().may_block(true),
379+
)?;
376380
Ok(())
377381
}
378382

services/sync/golden_gate/src/log.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ impl Log for LogSink {
118118
};
119119
let _ =
120120
TaskRunnable::new("extension_storage_sync::Logger::log", Box::new(task))
121-
.and_then(|r| r.dispatch(logger.owning_thread()));
121+
.and_then(|r| TaskRunnable::dispatch(r, logger.owning_thread()));
122122
}
123123
Err(_) => {}
124124
}

services/sync/golden_gate/src/task.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,11 @@ where
196196
let runnable = TaskRunnable::new(self.ferry.name(), Box::new(self))?;
197197
// `may_block` schedules the task on the I/O thread pool, since we
198198
// expect most operations to wait on I/O.
199-
runnable.dispatch_with_options(target, DispatchOptions::default().may_block(true))?;
199+
TaskRunnable::dispatch_with_options(
200+
runnable,
201+
target,
202+
DispatchOptions::default().may_block(true),
203+
)?;
200204
Ok(())
201205
}
202206
}
@@ -339,7 +343,11 @@ where
339343
/// Dispatches the task to the given thread `target`.
340344
pub fn dispatch(self, target: &nsIEventTarget) -> Result<()> {
341345
let runnable = TaskRunnable::new(Self::name(), Box::new(self))?;
342-
runnable.dispatch_with_options(target, DispatchOptions::default().may_block(true))?;
346+
TaskRunnable::dispatch_with_options(
347+
runnable,
348+
target,
349+
DispatchOptions::default().may_block(true),
350+
)?;
343351
Ok(())
344352
}
345353
}

toolkit/components/bitsdownload/src/bits_interface/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ impl BitsService {
130130
let runnable = TaskRunnable::new(task_runnable_name, task).map_err(|rv| {
131131
BitsTaskError::from_nsresult(FailedToConstructTaskRunnable, action, Pretask, rv)
132132
})?;
133-
runnable.dispatch(&command_thread).map_err(|rv| {
133+
TaskRunnable::dispatch(runnable, &command_thread).map_err(|rv| {
134134
BitsTaskError::from_nsresult(FailedToDispatchRunnable, action, Pretask, rv)
135135
})
136136
}

toolkit/components/extensions/storage/webext_storage_bridge/src/area.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,11 @@ impl StorageSyncArea {
7777
let task = PuntTask::new(Arc::downgrade(&*self.store()?), punt, callback)?;
7878
let runnable = TaskRunnable::new(name, Box::new(task))?;
7979
// `may_block` schedules the runnable on a dedicated I/O pool.
80-
runnable
81-
.dispatch_with_options(self.queue.coerce(), DispatchOptions::new().may_block(true))?;
80+
TaskRunnable::dispatch_with_options(
81+
runnable,
82+
self.queue.coerce(),
83+
DispatchOptions::new().may_block(true),
84+
)?;
8285
Ok(())
8386
}
8487

@@ -240,15 +243,8 @@ impl StorageSyncArea {
240243
Some(store) => {
241244
// Interrupt any currently-running statements.
242245
store.interrupt();
243-
// If dispatching the runnable fails, we'll drop the store and
244-
// close its database connection on the main thread. This is a
245-
// last resort, and can also happen if the last `RefPtr` to this
246-
// storage area is released without calling `teardown`. In that
247-
// case, the destructor for `self.store` will run, which
248-
// automatically closes its database connection. mozStorage's
249-
// `Connection::Release` also falls back to closing the
250-
// connection on the main thread if it can't dispatch to the
251-
// background thread.
246+
// If dispatching the runnable fails, we'll leak the store
247+
// without closing its database connection.
252248
teardown(&self.queue, store, callback)?;
253249
}
254250
None => return Err(Error::AlreadyTornDown),
@@ -264,7 +260,11 @@ fn teardown(
264260
) -> Result<()> {
265261
let task = TeardownTask::new(store, callback)?;
266262
let runnable = TaskRunnable::new(TeardownTask::name(), Box::new(task))?;
267-
runnable.dispatch_with_options(queue.coerce(), DispatchOptions::new().may_block(true))?;
263+
TaskRunnable::dispatch_with_options(
264+
runnable,
265+
queue.coerce(),
266+
DispatchOptions::new().may_block(true),
267+
)?;
268268
Ok(())
269269
}
270270

toolkit/components/kvstore/src/lib.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ impl KeyValueService {
134134
nsCString::from(name),
135135
));
136136

137-
TaskRunnable::new("KVService::GetOrCreate", task)?.dispatch(thread)
137+
TaskRunnable::dispatch(TaskRunnable::new("KVService::GetOrCreate", task)?, thread)
138138
}
139139
}
140140

@@ -182,7 +182,7 @@ impl KeyValueDatabase {
182182

183183
let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
184184

185-
TaskRunnable::new("KVDatabase::Put", task)?.dispatch(thread)
185+
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Put", task)?, thread)
186186
}
187187

188188
xpcom_method!(
@@ -220,7 +220,7 @@ impl KeyValueDatabase {
220220

221221
let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
222222

223-
TaskRunnable::new("KVDatabase::WriteMany", task)?.dispatch(thread)
223+
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::WriteMany", task)?, thread)
224224
}
225225

226226
xpcom_method!(
@@ -247,7 +247,7 @@ impl KeyValueDatabase {
247247

248248
let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
249249

250-
TaskRunnable::new("KVDatabase::Get", task)?.dispatch(thread)
250+
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Get", task)?, thread)
251251
}
252252

253253
xpcom_method!(
@@ -264,7 +264,7 @@ impl KeyValueDatabase {
264264

265265
let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
266266

267-
TaskRunnable::new("KVDatabase::Has", task)?.dispatch(thread)
267+
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Has", task)?, thread)
268268
}
269269

270270
xpcom_method!(
@@ -281,7 +281,7 @@ impl KeyValueDatabase {
281281

282282
let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
283283

284-
TaskRunnable::new("KVDatabase::Delete", task)?.dispatch(thread)
284+
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Delete", task)?, thread)
285285
}
286286

287287
xpcom_method!(
@@ -297,7 +297,7 @@ impl KeyValueDatabase {
297297

298298
let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
299299

300-
TaskRunnable::new("KVDatabase::Clear", task)?.dispatch(thread)
300+
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Clear", task)?, thread)
301301
}
302302

303303
xpcom_method!(
@@ -324,7 +324,7 @@ impl KeyValueDatabase {
324324

325325
let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
326326

327-
TaskRunnable::new("KVDatabase::Enumerate", task)?.dispatch(thread)
327+
TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Enumerate", task)?, thread)
328328
}
329329
}
330330

toolkit/components/places/bookmark_sync/src/driver.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ impl dogear::Driver for Driver {
100100
"bookmark_sync::Driver::record_telemetry_event",
101101
Box::new(task),
102102
)
103-
.and_then(|r| r.dispatch(progress.owning_thread()));
103+
.and_then(|r| TaskRunnable::dispatch(r, progress.owning_thread()));
104104
}
105105
}
106106
}
@@ -140,7 +140,7 @@ impl Log for Logger {
140140
message,
141141
};
142142
let _ = TaskRunnable::new("bookmark_sync::Logger::log", Box::new(task))
143-
.and_then(|r| r.dispatch(logger.owning_thread()));
143+
.and_then(|r| TaskRunnable::dispatch(r, logger.owning_thread()));
144144
}
145145
Err(_) => {}
146146
}

toolkit/components/places/bookmark_sync/src/merger.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ impl SyncedBookmarksMerger {
108108
"bookmark_sync::SyncedBookmarksMerger::merge",
109109
Box::new(task),
110110
)?;
111-
runnable.dispatch(&async_thread)?;
111+
TaskRunnable::dispatch(runnable, &async_thread)?;
112112
let op = MergeOp::new(controller);
113113
Ok(RefPtr::new(op.coerce()))
114114
}

toolkit/components/xulstore/src/persist.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ pub(crate) fn persist(key: String, value: Option<String>) -> XULStoreResult<()>
176176
.ok_or(XULStoreError::Unavailable)?
177177
.get_ref()
178178
.ok_or(XULStoreError::Unavailable)?;
179-
TaskRunnable::new("XULStore::Persist", task)?.dispatch(thread)?;
179+
TaskRunnable::dispatch(TaskRunnable::new("XULStore::Persist", task)?, thread)?;
180180
}
181181

182182
// Now insert the key/value pair into the map. The unwrap() call here

xpcom/rust/moz_task/src/lib.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -183,17 +183,30 @@ impl TaskRunnable {
183183
}))
184184
}
185185

186+
/// Dispatches this task runnable to an event target with the default
187+
/// options.
186188
#[inline]
187-
pub fn dispatch(&self, target_thread: &nsIEventTarget) -> Result<(), nsresult> {
188-
self.dispatch_with_options(target_thread, DispatchOptions::default())
189+
pub fn dispatch(this: RefPtr<Self>, target: &nsIEventTarget) -> Result<(), nsresult> {
190+
Self::dispatch_with_options(this, target, DispatchOptions::default())
189191
}
190192

193+
/// Dispatches this task runnable to an event target, like a thread or a
194+
/// task queue, with the given options.
195+
///
196+
/// Note that this is an associated function, not a method, because it takes
197+
/// an owned reference to the runnable, and must be called like
198+
/// `TaskRunnable::dispatch_with_options(runnable, options)` and *not*
199+
/// `runnable.dispatch_with_options(options)`.
200+
///
201+
/// ### Safety
202+
///
203+
/// This function leaks the runnable if dispatch fails.
191204
pub fn dispatch_with_options(
192-
&self,
193-
target_thread: &nsIEventTarget,
205+
this: RefPtr<Self>,
206+
target: &nsIEventTarget,
194207
options: DispatchOptions,
195208
) -> Result<(), nsresult> {
196-
unsafe { target_thread.DispatchFromScript(self.coerce(), options.flags()) }.to_result()
209+
unsafe { target.DispatchFromScript(this.coerce(), options.flags()) }.to_result()
197210
}
198211

199212
xpcom_method!(run => Run());
@@ -205,7 +218,7 @@ impl TaskRunnable {
205218
Ok(_) => {
206219
assert!(!is_current_thread(&self.original_thread));
207220
self.task.run();
208-
self.dispatch(&self.original_thread)
221+
Self::dispatch(RefPtr::new(self), &self.original_thread)
209222
}
210223
Err(_) => {
211224
assert!(is_current_thread(&self.original_thread));

0 commit comments

Comments
 (0)