Basic multi-threading (#4366)

This commit is contained in:
Laurenz 2024-06-10 15:28:40 +02:00 committed by GitHub
parent a68a241570
commit 7fa86eed0e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 163 additions and 56 deletions

View File

@ -217,6 +217,11 @@ pub struct SharedArgs {
/// Arguments related to storage of packages in the system
#[clap(flatten)]
pub package_storage_args: PackageStorageArgs,
/// Number of parallel jobs spawned during compilation,
/// defaults to number of CPUs.
#[clap(long, short)]
pub jobs: Option<usize>,
}
/// Arguments related to where packages are stored in the system.

View File

@ -56,6 +56,11 @@ pub struct SystemWorld {
impl SystemWorld {
/// Create a new system world.
pub fn new(command: &SharedArgs) -> Result<Self, WorldCreationError> {
// Set up the thread pool.
if let Some(jobs) = command.jobs {
rayon::ThreadPoolBuilder::new().num_threads(jobs).build_global().ok();
}
// Resolve the system-global input path.
let input = match &command.input {
Input::Stdin => None,

View File

@ -145,6 +145,7 @@ pub fn deferred_image(image: Image) -> (Deferred<EncodedImage>, Option<ColorSpac
/// whether the image has color.
///
/// Skips the alpha channel as that's encoded separately.
#[typst_macros::time(name = "encode raster image")]
fn encode_raster_image(image: &RasterImage) -> (Vec<u8>, Filter, bool) {
let dynamic = image.dynamic();
let channel_count = dynamic.color().channel_count();
@ -169,6 +170,7 @@ fn encode_raster_image(image: &RasterImage) -> (Vec<u8>, Filter, bool) {
}
/// Encode an image's alpha channel if present.
#[typst_macros::time(name = "encode alpha")]
fn encode_alpha(raster: &RasterImage) -> (Vec<u8>, Filter) {
let pixels: Vec<_> = raster
.dynamic()
@ -179,6 +181,7 @@ fn encode_alpha(raster: &RasterImage) -> (Vec<u8>, Filter) {
}
/// Encode an SVG into a chunk of PDF objects.
#[typst_macros::time(name = "encode svg")]
fn encode_svg(svg: &SvgImage) -> (Chunk, Ref) {
svg2pdf::to_chunk(svg.tree(), svg2pdf::ConversionOptions::default())
}

View File

@ -5,6 +5,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use comemo::{Track, Tracked, TrackedMut, Validate};
use ecow::EcoVec;
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use crate::diag::{SourceDiagnostic, SourceResult};
use crate::foundations::{Styles, Value};
@ -44,6 +45,46 @@ impl Engine<'_> {
}
}
}
/// Runs tasks on the engine in parallel.
pub fn parallelize<P, I, T, U, F>(&mut self, iter: P, f: F) -> impl Iterator<Item = U>
where
P: IntoIterator<IntoIter = I>,
I: Iterator<Item = T>,
T: Send,
U: Send,
F: Fn(&mut Engine, T) -> U + Send + Sync,
{
let Engine { world, introspector, traced, ref route, .. } = *self;
// We collect into a vector and then call `into_par_iter` instead of
// using `par_bridge` because it does not retain the ordering.
let work: Vec<T> = iter.into_iter().collect();
// Work in parallel.
let mut pairs: Vec<(U, Sink)> = Vec::with_capacity(work.len());
work.into_par_iter()
.map(|value| {
let mut sink = Sink::new();
let mut engine = Engine {
world,
introspector,
traced,
sink: sink.track_mut(),
route: route.clone(),
};
(f(&mut engine, value), sink)
})
.collect_into_vec(&mut pairs);
// Apply the subsinks to the outer sink.
for (_, sink) in &mut pairs {
let sink = std::mem::take(sink);
self.sink.extend(sink.delayed, sink.warnings, sink.values);
}
pairs.into_iter().map(|(output, _)| output)
}
}
/// May hold a span that is currently under inspection.
@ -143,6 +184,22 @@ impl Sink {
self.values.push((value, styles));
}
}
/// Extend from another sink.
fn extend(
&mut self,
delayed: EcoVec<SourceDiagnostic>,
warnings: EcoVec<SourceDiagnostic>,
values: EcoVec<(Value, Option<Styles>)>,
) {
self.delayed.extend(delayed);
for warning in warnings {
self.warn(warning);
}
if let Some(remaining) = Self::MAX_VALUES.checked_sub(self.values.len()) {
self.values.extend(values.into_iter().take(remaining));
}
}
}
/// The route the engine took during compilation. This is used to detect

View File

@ -13,7 +13,7 @@ use crate::foundations::{
Packed, Resolve, Smart, StyleChain, Value,
};
use crate::introspection::{
Counter, CounterDisplayElem, CounterKey, Locator, ManualPageCounter,
Counter, CounterDisplayElem, CounterKey, Locator, ManualPageCounter, SplitLocator,
};
use crate::layout::{
Abs, AlignElem, Alignment, Axes, ColumnsElem, Dir, Frame, HAlignment, Length,
@ -348,14 +348,13 @@ impl Packed<PageElem> {
/// a fragment consisting of multiple frames, one per output page of this
/// page run.
#[typst_macros::time(name = "page", span = self.span())]
pub fn layout(
&self,
pub fn layout<'a>(
&'a self,
engine: &mut Engine,
locator: Locator,
styles: StyleChain,
page_counter: &mut ManualPageCounter,
locator: Locator<'a>,
styles: StyleChain<'a>,
extend_to: Option<Parity>,
) -> SourceResult<Vec<Page>> {
) -> SourceResult<PageLayout<'a>> {
let mut locator = locator.split();
// When one of the lengths is infinite the page fits its content along
@ -382,14 +381,6 @@ impl Packed<PageElem> {
.resolve(styles)
.relative_to(size);
// Determine the binding.
let binding =
self.binding(styles)
.unwrap_or_else(|| match TextElem::dir_in(styles) {
Dir::LTR => Binding::Left,
_ => Binding::Right,
});
// Realize columns.
let mut child = self.body().clone();
let columns = self.columns(styles);
@ -405,27 +396,70 @@ impl Packed<PageElem> {
regions.root = true;
// Layout the child.
let mut frames = child
let frames = child
.layout(engine, locator.next(&self.span()), styles, regions)?
.into_frames();
Ok(PageLayout {
page: self,
locator,
styles,
extend_to,
area,
margin,
two_sided,
frames,
})
}
}
/// A prepared layout of a page run that can be finalized with access to the
/// page counter.
pub struct PageLayout<'a> {
page: &'a Packed<PageElem>,
locator: SplitLocator<'a>,
styles: StyleChain<'a>,
extend_to: Option<Parity>,
area: Size,
margin: Sides<Abs>,
two_sided: bool,
frames: Vec<Frame>,
}
impl PageLayout<'_> {
/// Finalize the layout with access to the next page counter.
#[typst_macros::time(name = "finalize page", span = self.page.span())]
pub fn finalize(
mut self,
engine: &mut Engine,
page_counter: &mut ManualPageCounter,
) -> SourceResult<Vec<Page>> {
let styles = self.styles;
// Align the child to the pagebreak's parity.
// Check for page count after adding the pending frames
if extend_to
.is_some_and(|p| !p.matches(page_counter.physical().get() + frames.len()))
{
if self.extend_to.is_some_and(|p| {
!p.matches(page_counter.physical().get() + self.frames.len())
}) {
// Insert empty page after the current pages.
let size = area.map(Abs::is_finite).select(area, Size::zero());
frames.push(Frame::hard(size));
let size = self.area.map(Abs::is_finite).select(self.area, Size::zero());
self.frames.push(Frame::hard(size));
}
let fill = self.fill(styles);
let foreground = self.foreground(styles);
let background = self.background(styles);
let header_ascent = self.header_ascent(styles);
let footer_descent = self.footer_descent(styles);
let numbering = self.numbering(styles);
let number_align = self.number_align(styles);
let fill = self.page.fill(styles);
let foreground = self.page.foreground(styles);
let background = self.page.background(styles);
let header_ascent = self.page.header_ascent(styles);
let footer_descent = self.page.footer_descent(styles);
let numbering = self.page.numbering(styles);
let number_align = self.page.number_align(styles);
let binding =
self.page
.binding(styles)
.unwrap_or_else(|| match TextElem::dir_in(styles) {
Dir::LTR => Binding::Left,
_ => Binding::Right,
});
// Construct the numbering (for header or footer).
let numbering_marginal = numbering.as_ref().map(|numbering| {
@ -440,7 +474,7 @@ impl Packed<PageElem> {
both,
)
.pack()
.spanned(self.span());
.spanned(self.page.span());
// We interpret the Y alignment as selecting header or footer
// and then ignore it for aligning the actual number.
@ -451,8 +485,8 @@ impl Packed<PageElem> {
counter
});
let header = self.header(styles);
let footer = self.footer(styles);
let header = self.page.header(styles);
let footer = self.page.footer(styles);
let (header, footer) = if matches!(number_align.y(), Some(OuterVAlignment::Top)) {
(
header.as_ref().unwrap_or(&numbering_marginal),
@ -466,16 +500,16 @@ impl Packed<PageElem> {
};
// Post-process pages.
let mut pages = Vec::with_capacity(frames.len());
for mut frame in frames {
let mut pages = Vec::with_capacity(self.frames.len());
for mut frame in self.frames {
// The padded width of the page's content without margins.
let pw = frame.width();
// If two sided, left becomes inside and right becomes outside.
// Thus, for left-bound pages, we want to swap on even pages and
// for right-bound pages, we want to swap on odd pages.
let mut margin = margin;
if two_sided && binding.swap(page_counter.physical()) {
let mut margin = self.margin;
if self.two_sided && binding.swap(page_counter.physical()) {
std::mem::swap(&mut margin.left, &mut margin.right);
}
@ -511,7 +545,7 @@ impl Packed<PageElem> {
let sub = content
.clone()
.styled(AlignElem::set_alignment(align))
.layout(engine, locator.next(&content.span()), styles, pod)?
.layout(engine, self.locator.next(&content.span()), styles, pod)?
.into_frame();
if ptr::eq(marginal, header) || ptr::eq(marginal, background) {

View File

@ -79,29 +79,32 @@ impl Packed<DocumentElem> {
locator: Locator,
styles: StyleChain,
) -> SourceResult<Document> {
let mut pages = Vec::with_capacity(self.children().len());
let mut page_counter = ManualPageCounter::new();
let children = self.children();
let mut iter = children.chain(&styles).peekable();
let mut peekable = children.chain(&styles).peekable();
let mut locator = locator.split();
while let Some((child, styles)) = iter.next() {
if let Some(page) = child.to_packed::<PageElem>() {
let extend_to = iter
.peek()
.and_then(|(next, _)| *next.to_packed::<PageElem>()?.clear_to()?);
let run = page.layout(
engine,
locator.next(&page.span()),
styles,
&mut page_counter,
extend_to,
)?;
pages.extend(run);
} else {
bail!(child.span(), "unexpected document child");
}
let iter = std::iter::from_fn(|| {
let (child, styles) = peekable.next()?;
let extend_to = peekable
.peek()
.and_then(|(next, _)| *next.to_packed::<PageElem>()?.clear_to()?);
let locator = locator.next(&child.span());
Some((child, styles, extend_to, locator))
});
let layouts =
engine.parallelize(iter, |engine, (child, styles, extend_to, locator)| {
if let Some(page) = child.to_packed::<PageElem>() {
page.layout(engine, locator, styles, extend_to)
} else {
bail!(child.span(), "unexpected document child");
}
});
let mut page_counter = ManualPageCounter::new();
let mut pages = Vec::with_capacity(self.children().len());
for result in layouts {
pages.extend(result?.finalize(engine, &mut page_counter)?);
}
Ok(Document {