Rust 并发编程详解
概述
并发编程是现代编程的重要组成部分。Rust 通过所有权和类型系统提供了无数据竞争的并发编程能力。Rust 的并发模型基于消息传递和共享状态。
线程
创建线程
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("子线程 hi {}次", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("主线程 hi {}次", i);
thread::sleep(Duration::from_millis(1));
}
}
等待线程完成
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("子线程 hi {}次", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("主线程 hi {}次", i);
thread::sleep(Duration::from_millis(1));
}
handle.join().unwrap();
}
move 闭包
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("这是向量: {:?}", v);
});
handle.join().unwrap();
}
消息传递
通道(Channel)
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("收到: {}", received);
}
发送多个值
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("收到: {}", received);
}
}
多个生产者
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("收到: {}", received);
}
}
共享状态
互斥锁(Mutex)
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}
println!("m = {:?}", m);
}
多线程共享 Mutex
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("结果: {}", *counter.lock().unwrap());
}
RwLock 读写锁
use std::sync::RwLock;
use std::thread;
fn main() {
let lock = RwLock::new(5);
// 多个读取
{
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
println!("读取1: {}, 读取2: {}", r1, r2);
}
// 单个写入
{
let mut w = lock.write().unwrap();
*w += 1;
println!("写入: {}", w);
}
}
原子类型
基本原子操作
use std::sync::atomic::{AtomicI32, Ordering};
use std::thread;
fn main() {
let counter = AtomicI32::new(0);
let mut handles = vec![];
for _ in 0..10 {
let counter = counter.clone();
let handle = thread::spawn(move || {
counter.fetch_add(1, Ordering::SeqCst);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("结果: {}", counter.load(Ordering::SeqCst));
}
原子比较和交换
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;
fn main() {
let active = AtomicBool::new(false);
let handle = thread::spawn(|| {
for _ in 0..10 {
if active.load(Ordering::Acquire) {
println!("活跃状态");
break;
}
thread::sleep(Duration::from_millis(100));
}
});
thread::sleep(Duration::from_millis(500));
active.store(true, Ordering::Release);
handle.join().unwrap();
}
Sync 和 Send Trait
Send Trait
// 大多数类型都实现了 Send Trait
// 但 Rc<T> 没有实现 Send
// Arc<T> 实现了 Send
use std::rc::Rc;
use std::thread;
fn main() {
let rc = Rc::new(5);
// 错误:Rc<T> 不能在线程间传递
// let handle = thread::spawn(move || {
// println!("rc = {}", rc);
// });
}
Sync Trait
// Arc<T> 实现了 Sync
// Rc<T> 没有实现 Sync
use std::sync::Arc;
use std::thread;
fn main() {
let arc = Arc::new(5);
let arc_clone = arc.clone();
let handle = thread::spawn(move || {
println!("arc = {}", arc_clone);
});
handle.join().unwrap();
}
实际应用示例
简单的线程池
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
type Job = Box<dyn FnOnce() + Send + 'static>;
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} 执行任务", id);
job();
}
});
Worker {
id,
thread: Some(thread),
}
}
}
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
impl ThreadPool {
fn new(size: usize) -> ThreadPool {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("关闭 worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("任务 {} 正在处理", i);
thread::sleep(std::time::Duration::from_millis(100));
println!("任务 {} 完成", i);
});
}
thread::sleep(std::time::Duration::from_secs(1));
}
生产者-消费者模式
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
fn producer(tx: mpsc::Sender<i32>, id: i32) {
for i in 0..5 {
let value = id * 10 + i;
println!("生产者 {} 生产: {}", id, value);
tx.send(value).unwrap();
thread::sleep(Duration::from_millis(100));
}
}
fn consumer(rx: Arc<Mutex<mpsc::Receiver<i32>>>, id: i32) {
loop {
let value = rx.lock().unwrap().recv();
match value {
Ok(v) => println!("消费者 {} 消费: {}", id, v),
Err(_) => break,
}
thread::sleep(Duration::from_millis(150));
}
}
fn main() {
let (tx, rx) = mpsc::channel();
let rx = Arc::new(Mutex::new(rx));
// 创建生产者
for i in 0..2 {
let tx = tx.clone();
thread::spawn(move || producer(tx, i));
}
// 创建消费者
for i in 0..3 {
let rx = Arc::clone(&rx);
thread::spawn(move || consumer(rx, i));
}
thread::sleep(Duration::from_secs(3));
}
并行计算
use std::sync::{Arc, Mutex};
use std::thread;
fn parallel_sum(numbers: Vec<i32>, num_threads: usize) -> i32 {
let chunk_size = (numbers.len() + num_threads - 1) / num_threads;
let result = Arc::new(Mutex::new(0));
let mut handles = vec![];
for chunk in numbers.chunks(chunk_size) {
let chunk = chunk.to_vec();
let result = Arc::clone(&result);
let handle = thread::spawn(move || {
let sum: i32 = chunk.iter().sum();
let mut result = result.lock().unwrap();
*result += sum;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
*result.lock().unwrap()
}
fn main() {
let numbers: Vec<i32> = (1..=100).collect();
let sum = parallel_sum(numbers, 4);
println!("并行计算结果: {}", sum);
}
总结
本教程详细介绍了 Rust 的并发编程:
-
线程:
- 使用
thread::spawn创建线程 - 使用
join等待线程完成 - 使用
move闭包捕获变量
- 使用
-
消息传递:
- 使用通道(channel)在线程间传递消息
- 多个生产者和消费者
- 发送和接收多个值
-
共享状态:
- 使用
Mutex实现互斥访问 - 使用
Arc在线程间共享所有权 - 使用
RwLock实现读写锁
- 使用
-
原子类型:
- 使用
Atomic*类型进行原子操作 - 原子比较和交换
- 内存排序
- 使用
-
Sync 和 Send Trait:
- 控制 Types 在线程间的传递
- 理解哪些类型是线程安全的
-
实际应用:
- 线程池
- 生产者-消费者模式
- 并行计算
Rust 的并发编程模型通过所有权和类型系统保证了内存安全,防止了数据竞争。合理使用线程、通道和共享状态可以编写出高效、安全的并发程序。
下一步
在下一教程中,我们将学习 Rust 的项目实战,通过一个完整的项目展示如何综合运用 Rust 的各种特性。我们将了解:
-
项目结构设计
-
模块组织
-
错误处理
-
测试
-
文档编写
-
发布和部署
继续学习 Rust,掌握这门强大语言的更多特性!

