diff --git a/src/database/engine.rs b/src/database/engine.rs
index 3975d3d9a7dac7966b72c0623231d0fefa97e01a..6c5b5bd21dfd97752cd23446a051325f3c2a382f 100644
--- a/src/database/engine.rs
+++ b/src/database/engine.rs
@@ -49,7 +49,7 @@ pub(crate) fn open(server: &Arc<Server>) -> Result<Arc<Self>> {
 
 		let mut db_env = Env::new().or_else(or_else)?;
 		let row_cache = Cache::new_lru_cache(row_cache_capacity_bytes);
-		let db_opts = db_options(config, &mut db_env, &row_cache, col_cache.get("primary").expect("cache"));
+		let db_opts = db_options(config, &mut db_env, &row_cache, col_cache.get("primary").expect("cache"))?;
 
 		let load_time = std::time::Instant::now();
 		if config.rocksdb_repair {
@@ -63,9 +63,15 @@ pub(crate) fn open(server: &Arc<Server>) -> Result<Arc<Self>> {
 			.collect::<BTreeSet<_>>();
 
 		debug!("Opening {} column family descriptors in database", cfs.len());
+		let cfopts = cfs
+			.iter()
+			.map(|name| cf_options(config, name, db_opts.clone(), &mut col_cache))
+			.collect::<Result<Vec<_>>>()?;
+
 		let cfds = cfs
 			.iter()
-			.map(|name| ColumnFamilyDescriptor::new(name, cf_options(config, name, db_opts.clone(), &mut col_cache)))
+			.zip(cfopts.into_iter())
+			.map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
 			.collect::<Vec<_>>();
 
 		debug!("Opening database...");
@@ -102,7 +108,7 @@ pub(crate) fn open_cf(&self, name: &str) -> Result<Arc<BoundColumnFamily<'_>>> {
 			debug!("Creating new column family in database: {name}");
 
 			let mut col_cache = self.col_cache.write().expect("locked");
-			let opts = cf_options(&self.server.config, name, self.opts.clone(), &mut col_cache);
+			let opts = cf_options(&self.server.config, name, self.opts.clone(), &mut col_cache)?;
 			if let Err(e) = self.db.create_cf(name, &opts) {
 				error!(?name, "Failed to create new column family: {e}");
 				return or_else(e);
diff --git a/src/database/opts.rs b/src/database/opts.rs
index 8791e77834d994845075919dcd60cacb3c82889b..890e0188fe30e74fd7abb24967b5356b9e4aaff3 100644
--- a/src/database/opts.rs
+++ b/src/database/opts.rs
@@ -1,6 +1,6 @@
-use std::{cmp, collections::HashMap};
+use std::{cmp, collections::HashMap, convert::TryFrom};
 
-use conduit::{utils, Config};
+use conduit::{utils, Config, Result};
 use rocksdb::{
 	statistics::StatsLevel, BlockBasedOptions, Cache, DBCompactionStyle, DBCompressionType, DBRecoveryMode, Env,
 	LogLevel, Options, UniversalCompactOptions, UniversalCompactionStopStyle,
@@ -11,8 +11,7 @@
 /// resulting value. Note that we require special per-column options on some
 /// columns, therefor columns should only be opened after passing this result
 /// through cf_options().
-pub(crate) fn db_options(config: &Config, env: &mut Env, row_cache: &Cache, col_cache: &Cache) -> Options {
-	const MIN_PARALLELISM: usize = 2;
+pub(crate) fn db_options(config: &Config, env: &mut Env, row_cache: &Cache, col_cache: &Cache) -> Result<Options> {
 	const DEFAULT_STATS_LEVEL: StatsLevel = if cfg!(debug_assertions) {
 		StatsLevel::ExceptDetailedTimers
 	} else {
@@ -25,14 +24,9 @@ pub(crate) fn db_options(config: &Config, env: &mut Env, row_cache: &Cache, col_
 	set_logging_defaults(&mut opts, config);
 
 	// Processing
-	let threads = if config.rocksdb_parallelism_threads == 0 {
-		cmp::max(MIN_PARALLELISM, utils::available_parallelism())
-	} else {
-		cmp::max(MIN_PARALLELISM, config.rocksdb_parallelism_threads)
-	};
-
-	opts.set_max_background_jobs(threads.try_into().unwrap());
-	opts.set_max_subcompactions(threads.try_into().unwrap());
+	opts.set_enable_pipelined_write(true);
+	opts.set_max_background_jobs(num_threads::<i32>(config)?);
+	opts.set_max_subcompactions(num_threads::<u32>(config)?);
 	opts.set_max_file_opening_threads(0);
 	if config.rocksdb_compaction_prio_idle {
 		env.lower_thread_pool_cpu_priority();
@@ -100,13 +94,15 @@ pub(crate) fn db_options(config: &Config, env: &mut Env, row_cache: &Cache, col_
 	});
 
 	opts.set_env(env);
-	opts
+	Ok(opts)
 }
 
 /// Adjust options for the specific column by name. Provide the result of
 /// db_options() as the argument to this function and use the return value in
 /// the arguments to open the specific column.
-pub(crate) fn cf_options(cfg: &Config, name: &str, mut opts: Options, cache: &mut HashMap<String, Cache>) -> Options {
+pub(crate) fn cf_options(
+	cfg: &Config, name: &str, mut opts: Options, cache: &mut HashMap<String, Cache>,
+) -> Result<Options> {
 	// Columns with non-default compaction options
 	match name {
 		"backupid_algorithm"
@@ -169,7 +165,7 @@ pub(crate) fn cf_options(cfg: &Config, name: &str, mut opts: Options, cache: &mu
 		&_ => {},
 	}
 
-	opts
+	Ok(opts)
 }
 
 fn set_logging_defaults(opts: &mut Options, config: &Config) {
@@ -346,3 +342,15 @@ fn table_options(_config: &Config) -> BlockBasedOptions {
 
 	opts
 }
+
+fn num_threads<T: TryFrom<usize>>(config: &Config) -> Result<T> {
+	const MIN_PARALLELISM: usize = 2;
+
+	let requested = if config.rocksdb_parallelism_threads != 0 {
+		config.rocksdb_parallelism_threads
+	} else {
+		utils::available_parallelism()
+	};
+
+	utils::math::try_into::<T, usize>(cmp::max(MIN_PARALLELISM, requested))
+}