这是Rust编码挑战系列的第二篇文章,我们将学习计算机科学基础知识并编写一些实际的代码以制作可用的程序。在这个挑战中,我们将学习如何在Rust中实现一个简单的单词计数命令。首先,以下是我们的程序需要执行的两个主要功能:
计算给定文本输入中的单词数量
计算给定文本输入中特定单词的出现数量
满足此功能确实很简单。基本上,我们循环通过,使文本进行统计,并总结数字,容易,对吗?但这是解决这一挑战的警告,输入文本有1000万行随机文本,这似乎不是客观地说,因为它仅大约是700mb的文本,但是在某种程度上,我认为开始学习一些基本优化是一个很好的数字。关于生锈的很酷的事情是,我们不需要太多的经验,也不需要太低级别的细节来使我们的程序表现。通过做这项挑战,我们将学习一种新的数据结构,专门用于处理大型文件,利用CPU功率进行并行处理,高效的块,以及象征性的,不安全的操作,借用的数据以及Rust中可重复使用的功能。因此,让我们解决这个问题!
天真的方法(30秒)
这是一个简单的程序,我们可以首先提出解决这个单词计数练习:
pub fn read_file (file_path : & str ) - > String { let mut file = File :: open (file_path) . expect ( "Fail to open the file" ); let mut content = String :: new (); file . read_to_string ( &mut content) . expect ( "Fail to read the file" ); content}pub fn count_words_simple (text : String ) - > usize { text . lines () . flat_map ( | line | line . split_whitespace ()) . count ()}
我们首先将所有文件内容加载到内存中,然后有一个很小的功能来进行单词计数作业。在这里,我们循环遍历每条线,将每一行均匀地单词,然后计算它们。结果是它正确计算了我们1000万行输入的单词数量,但在30秒内。基于UNIX的OS的本机wc命令几乎立即输出相同的结果。让我们进行一些改进!
对于read_file函数,您认为我们需要以某种方式调整它吗?首先,我们可以测量将1000万行文件加载到内存中所需的时间。只是这样:
pub fn read_file (file_path : & str ) - > String { let mut file = File :: open (file_path) . expect ( "Fail to open the file" ); let mut content = String :: new (); file . read_to_string ( &mut content) . expect ( "Fail to read the file" ); content}pub fn main () { let start = Instant :: now (); let file_path = "/path/to/the/input/file.txt" let content = read_file (file_path); println! ( "Total time read file: {:?}" , start . elapsed ()); }
总时间阅读文件:137.034ms
事实证明,我们的read_file的性能已经令人印象深刻,只需大约137毫秒即可加载我的计算机上的所有文件内容。 File::open通过系统调用,例如Unix中的read() ,建立了与我们应用程序中文件的连接。实际工作发生在file.read_to_string(...)中,内部生锈使用缓冲区为此目的,因此,内存中的缓冲区中的缓冲区不是读取文件字节字节,而是存储数据的直接位置。可能会出现一个问题,为什么当2个目的地之间有更多层次的层次时,我们必须从文件中转到存储器中的缓冲区,最后到达应用程序。
因为每次读取文件数据时,它都需要执行“系统调用”,然后这样做,CPU需要从用户模式转换为内核模式,并且上下文转换确实很昂贵。因此,缓冲区的想法是通过使用内存中的临时位置来减少系统调用的数量来存储数据,可以调节缓冲区的大小以最大程度地提高性能,典型的缓冲区大小将从4KB到4MB。因此,例如,我们只能对每个字节发出系统调用,而是每个4MB一次。如果我们看一些数字,这种方法的优点将是巨大的。
例如,我们有一个100MB文件,每个系统呼叫的费用为200N。忽略内存访问延迟和其他因素,那么我们需要等待的时间将是:
100 million bytes × 200 ns / system call = 100_000_000 * 200 ns = 20 seconds
但是,随着缓冲区的大小为4KB,我们必须等待的时间将大大减少:
100 MB / 4 MB = 25 MB - > 25_000 system calls 25_000 * 200 ns = 5_000_000 ns = 5 ms
第一个优化
让我们注意count_word_simple功能,因为这花费了大部分时间。我们可以在这里做什么?第一个直觉是利用可用的CPU功率,我们可以将文本分解成块,并平行计数每个块,最后将它们汇总,听起来确实像是一小片Map-reduce!首先,我们编写一个函数来计数块中的单词:
fn count_words_in_chunk_1 (chunk : & [ String ]) - > usize { chunk . iter () . flat_map ( | line | line . split_whitespace ()) . count ()}
然后,我们拥有进行并行操作的主要功能:
pub fn count_word_parallel_1 (text : String ) - > usize { let num_threads = std :: thread :: available_parallelism () . unwrap () . get (); let lines : Vec < String > = text . lines () . map ( | line | line . to_string ()) . collect (); let lines = Arc :: new (lines); let lines_per_threads = (lines . len () + num_threads - 1 ) / num_threads; let total_word_count = Arc :: new ( AtomicUsize :: new ( 0 )); let mut handles = Vec :: new (); for i in 0 .. num_threads { let start = lines_per_threads * i; let len = lines . len (); let end = min (start + lines_per_threads, lines . len ()); if start < end { let total_word_count_clone = total_word_count . clone (); let lines_clone = Arc :: clone ( & lines); let handle = thread :: spawn ( move || { let chunk = & lines_clone[start .. end]; let count = count_words_in_chunk_1 (chunk); total_word_count_clone . fetch_add (count, Ordering :: Relaxed ); }); handles . push (handle); } } for handle in handles { handle . join () . unwrap (); } total_word_count . load ( Ordering :: Relaxed )}
结果?从30秒开始,这是我们的新指标:
Total words : 85039300 , elapsed time : 7. 608273208s
在研究我们的单词计数功能进一步改进之前,让我们分解一些我们在这里遇到的一些新概念。首先,我们将文本转换为线路向量,并且由于线之间将在不同的线程之间共享线路,因此,为了安全地访问线条,我们将其包装到ARC数据结构中,该弧数据结构代表“原子上参考计数”,因此我们将块的工作平均分配给可用线程。如果我们使用lines.len() / num_threads分配工作负载,则所有工人都将具有相等的块进行处理,但是提醒,这意味着最后的某些线条将毫无根据。这就是为什么我们需要采用(lines.len() + num_threads - 1) / num_threads来覆盖输入中的所有行。
接下来,我们产生许多线程,每个线程独立地在块中工作。注意线程:: Spawn语法和MOVE关键字。 move关键字将关键字移至Worker线程后的封闭式内部的所有内容,这意味着对total_word_count_clone的引用在主线程中不再有效,因为它已“移动”到工作线程。要确切地了解它的含义,这里有一个不会编译的代码:
let lines_clone = Arc :: clone ( & lines);let handle = thread :: spawn ( move || { let chunk = & lines_clone[start .. end]; let count = count_words_in_chunk_1 (chunk); total_word_count_clone . fetch_add (count, Ordering :: Relaxed );});// Attempt to use lines_clone here would cause a compile errorlet lines_ref = lines_clone;
让我们命名为主线程main_thread和产卵线程worker_thread ,这是一些可视化:
Before the Move : After the Move :+---------------------+ +---------------------+| Main Thread | | Main Thread || | | || +---------------+ | | +---------------+ || | lines_clone | | | | lines_clone | | | | ( Valid ) | | | | ( Invalid ) | || +---------------+ | | +---------------+ || | | || +---------------+ | | +---------------+ || | lines_ref | | | | lines_ref | | | | ( Valid ) | | | | ( Valid ) | || +---------------+ | | +---------------+ |+---------------------+ +---------------------+ | | | | | | | move | V V+-----------------------------+ +-----------------------------+| Worker Thread | | Worker Thread || | | || +---------------------+ | | +---------------------+ || | lines_clone | | | | lines_clone | | | | ( Now owned) | | | | ( Now owned) | || +---------------------+ | | +---------------------+ || | | |+-----------------------------+ +-----------------------------+
并发概念
弧和原子量的组合是以螺纹安全方式递增计数器的正确方法之一。在这里,ARC使您可以将数据共享到多个线程,而不会过多地将完全相同的数据复制到每个线程中,而不是使用Arc::clone(&lines)而不是使用lines.clone() ,这在内存使用情况和计算功率方面效率很低,因为我们必须分配更多内存并复制更多的数据,并复制数据。只需考虑一下包装器的弧形即可到达一个值,并且可以在线程之间安全共享。
与其使用Mutex (也是标准库中的数据结构)充当锁定的数据结构,我们只需要使用AtomicUsize ,因为我们只对总数数字感兴趣,而是保证了fetch_add函数是原子质的,因此它可以正确地完成其工作,并且在同一时间有效地效率地启动了任何锁定或同步。
但是,解释Ordering::Relaxed可能是一个挑战,让我尝试以一种简单的方式制定它。在多线程应用程序中,多个线程可以同时执行操作,如果有一个线程读取值x的线程和其他一些线程同时写入值x的线程怎么办?存在潜在的数据竞赛,因为X值在编写之前可能会读取,因为执行顺序从情况到情况下的执行顺序并不总是相同的,如果线程之间没有订购保证,则有时可以在书面之前看到该值是在书写之前读取的,并且有时是由于CPU优化而读取了某些因素,因为它是在读取之前读取的。例如,如果2个线程同时读取和编写一个值,则执行订单之一可能是:
+-------------------------+| Thread A ( Writer ) ||------------------------|| 1 . Lock || 2 . Write 42 || 3 . Unlock |+-------------------------++-------------------------+| Thread B ( Reader ) ||------------------------|| 4 . Lock || 5 . Read (expect 42 ) || 6 . Unlock |+-------------------------+
这是我们期望的理想情况,在这里,我们有一个共享变量持有整数值,并且在第一次编写后将读取值42。但这是执行命令的另一种可能性:
+-------------------------+| Thread A ( Writer ) ||------------------------|| 1 . Lock || 2 . Unlock || 3 . Write 42 |+-------------------------++-------------------------+| Thread B ( Reader ) ||------------------------|| 4 . Lock || 5 . Read (sees 0 , not 42 || 6 . Unlock |+-------------------------+
在这种情况下,我们并不幸运,CPU已重新排序说明,并且在时间线程B读取它时,更新值不可见。这就是为什么Ordering概念发挥作用的原因,如果使用Ordering::Release ,则可以保证2个线程正在同时读取和写作,写操作将在读取操作之前进行。在我们的单词计数示例中,因为我们的每个工人都独立计算块,并且之间没有直接的交流,这就是为什么我们使用Ordering::Relaxed原因,因为此枚举没有保留任何指示顺序,只是原子操作。
在函数count_word_parallel_1末尾, handle.join()阻止主线程,直到每个工人中的计算完成。最后,我们加载计算值并将其返回。
进一步优化
在count_word_parallel_1的一开始,我们将文本转换为线路向量:
fn count_word_parallel_1 (text : String ) - > usize {let lines : Vec < String > = text . lines () . map ( | line | line . to_string ()) . collect ();...}
有多个问题,首先我们需要在所有线路上循环,然后将每条线转换为拥有的字符串,然后将它们收集到矢量中,当我们的输入文本很大时,这是一个昂贵的操作。相反,我们可以完全删除此计算并直接与输入字符串一起使用,类似的内容:
fn count_word_parallel_2 (text : & str ) - > usize {// what to do here? }
现在,我们需要弄清楚在引擎盖下进行大块的方法,文本是一个字符的数组,我们可以创建一个辅助函数,以获取输入文本,我们想要的块数量,最后它将返回借入的字符串的向量(不拥有一个):
fn get_chunks (text : & str , partitions : usize ) - > Vec < & str > { let mut end_index = 0 ; let mut chunks = Vec :: with_capacity (partitions); for i in 0 .. partitions { let start_index = if i == 0 { 0 } else { end_index + 1 }; end_index = get_end_index (text, i, start_index, partitions); if start_index < text . len () { let chunk = & text[start_index .. end_index]; chunks . push (chunk); } } chunks}fn get_end_index (text : & str , i : usize , start_index : usize , partitions : usize ) - > usize { let chunk_size = text . len () / partitions; let bytes = text . as_bytes (); let mut end_index = start_index + chunk_size; if end_index >= text . len () || i == partitions - 1 { return text . len (); } while end_index < text . len () && bytes[end_index] != b' ' { end_index += 1 ; } end_index}
我们使用的分区等于我们的计算机的可用工人数量,因此,对于每个分区,我们需要计算起始索引和结束索引的正确索引,这里唯一的约束是我们不在单词中间开始或结束,这就是为什么我们具有get_end_index功能来帮助我们在这种情况下为我们提供帮助。
您可以在这里进行的一个观察是,我们直接使用text.as_bytes()而不是text.chars() ,为什么更好?因为当chars()需要将UTF-8字节值转换为有效字符值时(不是字符文字,而是代表它们的4字节Unicode标量)时,它的成本更高。但是,使用as_bytes() ,没有复杂的解码!空间字符是使用单个字节编码的,这就是为什么我们只需要检查每个字节是否是空格字符,如果不是空间字符,那么我们不断将end_index , b' ' ,只需转换字符文字,此处的空间字符在utf-8格式中的字节表示中。
然后,我们将count_word_in_chunk_1修改一点点,从使用自有的字符串数据到借用一个:
fn count_words_in_chunk_1 (chunk : & str ) - > usize { chunk . lines () . flat_map ( | line | line . split_whitespace ()) . count ()}
让我们看看经过一些优化后,我们取得了多少改进:
Total words : 85039300 , elapsed time : 5. 268985375s
大约有2.25秒的差异,相当不错的改进。返回count_word_in_chunk_1 ,我们可以进一步改善此功能,而无需通过这样的事情牺牲正确性:
fn count_word_in_chunk_2 (chunk : & str ) - > usize { let mut count = 0 ; let mut in_word = false ; let bytes = chunk . as_bytes (); for byte in bytes { // Check for any ASCII whitespace (space, newline, tab, etc.) if byte . is_ascii_whitespace () { in_word = false ; } else { if ! in_word { count += 1 ; in_word = true ; } } } count}
在这里,我们应用了与get_chunks函数中使用的相同技术,并远离lines()和flat_map()因为这些函数会创建额外的迭代器和内存分配,请记住,在这里我们只考虑ascii whitespace,Unicode可能会有其他一些。
现在,让我们看看我们的word_count函数的最终结果:
Total words : 85039300 , elapsed time : 1. 22284s
1000万行的文本在仅1.2秒内而没有太多地介入低级细节,这就是我们通过一些基本优化实现的目标。
单词出现
接下来,我们将创建另一个功能,该功能获取输入文件和一个单词并返回该单词的出现数量,在这里我们可以应用与count_word函数所做的相同技术:
使用缓冲区逐步读取文件
直接使用借入的数据,而不是为拥有的一个分配内存
避免立即开销,有效地将文本分成大块
同时处理所有块,计算每个块中的出现数量
最后,总结所有发生的情况。
让我们定义我们的count_word_occurrences_in_chunk :
fn count_word_occurrences_in_chunk (chunk : & str , word : & str ) - > usize { let mut count = 0 ; let word_len = word . len (); let mut start = 0 ; let chunk_bytes = chunk . as_bytes (); while let Some (pos) = chunk[start .. ] . find (word) { let end = start + pos + word_len; if start + pos == 0 || chunk_bytes[start + pos - 1 ] . is_ascii_whitespace () && (end == chunk . len () || chunk_bytes[end] . is_ascii_whitespace ()) { count += 1 ; } start += pos + word_len; } count}
首先,查看此代码可能会有些困惑。让我们分解我们在这里拥有的东西,我们仍然像往常一样将块转换为字节。在段循环的内部,我们一直在搜索这个词,如果找到一个单词,我们需要计算左字符串的下一个开始位置,例如chunk[start...] ,我们有end = start + pos + word_len而不是end = start + word_len因为这里的start位置此处的启动位置在这里不是start + pos at pos he at pos y ost y ost y ost y ost y ot pos chunk[start..] chunk[0..] 。我们只需在匹配的第一个字母之前检查索引是否是一个空间,然后递增计数器,最后返回计数。
但是,由于我们一直在研究第二个功能,因此注意到我们只需要创建一个额外的函数count_word_occurrences_in_chunk ,并且可以重复使用旧逻辑!现在,我们准备创建可重复使用的功能,该功能可以在我们需要执行的两个任务中使用:
fn count_in_parallel < F >(text : & str , count_fn : F ) - > usizewhere F : Fn ( & str ) - > usize + Send + Sync + ' static ,{ let num_threads : usize = thread :: available_parallelism () . unwrap () . get (); let total_word_count : Arc < AtomicUsize > = Arc :: new ( AtomicUsize :: new ( 0 )); let count_fn = Arc :: new (count_fn); let chunks = get_chunks (text, num_threads); thread :: scope ( | scope | { for i in 0 .. chunks . len () { let total_word_count_clone = Arc :: clone ( & total_word_count); let count_fn_clone = Arc :: new ( & count_fn); let chunk = chunks[i]; scope . spawn ( move || { let count = count_fn_clone (chunk); total_word_count_clone . fetch_add (count, Ordering :: Relaxed ); }); } }); total_word_count . load ( Ordering :: Relaxed )}
我们传递的第一个参数是文本输入,第二个参数是我们可以传递的自定义函数,具体取决于我们需要执行的任务。在这里,基本上意味着我们通过的自定义功能需要采用字符串slice (&str)并返回usize , Send和Sync是重要的特征,可确保我们的count_fn函数可以在多个线程中安全地访问,并且'static寿命可确保该功能可以在整个程序的整个过程中使用。
pub fn count_words (text : & str ) - > usize { count_in_parallel (text, count_words_in_chunk)}pub fn count_word_occurrences (text : & str , word : String ) - > usize { count_in_parallel (text, move | chunk | { count_word_occurrences_in_chunk (chunk, & word) })}
让我们看一下执行第二个任务时的性能(即计数单词出现):
./ wc - command rccwc - wo rust / Users / learntocodetogether / rcc / wc - command / input / random_10m_lines . txt`Total occurrences of 'rust' : 8502920 , elapsed time : 1. 749215542s
刚刚超过1.7秒!
奖金:MMAP数据结构
代表内存映射文件的mmap是处理大文件时的有效数据结构,并且需要随机访问。文件内容将直接映射到内存中,没有任何系统调用之间,文件数据并未像file.read_to_string()那样将文件数据完全充分加载到内存中,而只能检索文件的部分,因此,当大文件打开时,将大大降低内存使用情况。
我们首先需要导入依赖性:
// Cargo.toml[dependencies]memmap2 = "0.9.5"
然后,我们可以按以下方式使用它:
pub fn read_file_as_string (file_path : & str ) - > String { let file = File :: open (file_path) . expect ( "Path is not valid" ); let mmap = unsafe { MmapOptions :: new () . map ( & file) . expect ( "Cannot map from file" ) }; let text = unsafe { str :: from_utf8_unchecked ( & mmap) . to_string () }; text}
Rust以内存安全而闻名,但是语言本身并不是100%的记忆安全,因为我们仍然可以执行不安全的操作。不安全的操作是那些访问内存位置的操作,例如,在其中不应该使用原始/空指针。在我们的情况下, mmap操作是不安全的,因为如果同时编写文件时映射文件,或者文件格式可以是UTF-8格式的某些内容,在这种情况下,它将导致意外行为并将崩溃程序。
可以在此处找到此挑战的完整实施:https://github.com/namvdo/rust-coding-challenges/tree/master/master/challenges/wc-command