概述

并发编程是现代编程的重要组成部分。Rust 通过所有权和类型系统提供了无数据竞争的并发编程能力。Rust 的并发模型基于消息传递和共享状态。

线程

创建线程

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("子线程 hi {}次", i);
            thread::sleep(Duration::from_millis(1));
        }
    });
    
    for i in 1..5 {
        println!("主线程 hi {}次", i);
        thread::sleep(Duration::from_millis(1));
    }
}

等待线程完成

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("子线程 hi {}次", i);
            thread::sleep(Duration::from_millis(1));
        }
    });
    
    for i in 1..5 {
        println!("主线程 hi {}次", i);
        thread::sleep(Duration::from_millis(1));
    }
    
    handle.join().unwrap();
}

move 闭包

use std::thread;

fn main() {
    let v = vec![1, 2, 3];
    
    let handle = thread::spawn(move || {
        println!("这是向量: {:?}", v);
    });
    
    handle.join().unwrap();
}

消息传递

通道(Channel)

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
    
    let received = rx.recv().unwrap();
    println!("收到: {}", received);
}

发送多个值

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    
    for received in rx {
        println!("收到: {}", received);
    }
}

多个生产者

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    
    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        
        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    
    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];
        
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    
    for received in rx {
        println!("收到: {}", received);
    }
}

共享状态

互斥锁(Mutex)

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);
    
    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }
    
    println!("m = {:?}", m);
}

多线程共享 Mutex

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("结果: {}", *counter.lock().unwrap());
}

RwLock 读写锁

use std::sync::RwLock;
use std::thread;

fn main() {
    let lock = RwLock::new(5);
    
    // 多个读取
    {
        let r1 = lock.read().unwrap();
        let r2 = lock.read().unwrap();
        println!("读取1: {}, 读取2: {}", r1, r2);
    }
    
    // 单个写入
    {
        let mut w = lock.write().unwrap();
        *w += 1;
        println!("写入: {}", w);
    }
}

原子类型

基本原子操作

use std::sync::atomic::{AtomicI32, Ordering};
use std::thread;

fn main() {
    let counter = AtomicI32::new(0);
    let mut handles = vec![];
    
    for _ in 0..10 {
        let counter = counter.clone();
        let handle = thread::spawn(move || {
            counter.fetch_add(1, Ordering::SeqCst);
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("结果: {}", counter.load(Ordering::SeqCst));
}

原子比较和交换

use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;

fn main() {
    let active = AtomicBool::new(false);
    
    let handle = thread::spawn(|| {
        for _ in 0..10 {
            if active.load(Ordering::Acquire) {
                println!("活跃状态");
                break;
            }
            thread::sleep(Duration::from_millis(100));
        }
    });
    
    thread::sleep(Duration::from_millis(500));
    active.store(true, Ordering::Release);
    
    handle.join().unwrap();
}

Sync 和 Send Trait

Send Trait

// 大多数类型都实现了 Send Trait
// 但 Rc<T> 没有实现 Send
// Arc<T> 实现了 Send

use std::rc::Rc;
use std::thread;

fn main() {
    let rc = Rc::new(5);
    
    // 错误:Rc<T> 不能在线程间传递
    // let handle = thread::spawn(move || {
    //     println!("rc = {}", rc);
    // });
}

Sync Trait

// Arc<T> 实现了 Sync
// Rc<T> 没有实现 Sync

use std::sync::Arc;
use std::thread;

fn main() {
    let arc = Arc::new(5);
    let arc_clone = arc.clone();
    
    let handle = thread::spawn(move || {
        println!("arc = {}", arc_clone);
    });
    
    handle.join().unwrap();
}

实际应用示例

简单的线程池

use std::sync::{mpsc, Arc, Mutex};
use std::thread;

type Job = Box<dyn FnOnce() + Send + 'static>;

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();
                println!("Worker {} 执行任务", id);
                job();
            }
        });
        
        Worker {
            id,
            thread: Some(thread),
        }
    }
}

struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

impl ThreadPool {
    fn new(size: usize) -> ThreadPool {
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);
        
        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }
        
        ThreadPool { workers, sender }
    }
    
    fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("关闭 worker {}", worker.id);
            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

fn main() {
    let pool = ThreadPool::new(4);
    
    for i in 0..8 {
        pool.execute(move || {
            println!("任务 {} 正在处理", i);
            thread::sleep(std::time::Duration::from_millis(100));
            println!("任务 {} 完成", i);
        });
    }
    
    thread::sleep(std::time::Duration::from_secs(1));
}

生产者-消费者模式

use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;

fn producer(tx: mpsc::Sender<i32>, id: i32) {
    for i in 0..5 {
        let value = id * 10 + i;
        println!("生产者 {} 生产: {}", id, value);
        tx.send(value).unwrap();
        thread::sleep(Duration::from_millis(100));
    }
}

fn consumer(rx: Arc<Mutex<mpsc::Receiver<i32>>>, id: i32) {
    loop {
        let value = rx.lock().unwrap().recv();
        match value {
            Ok(v) => println!("消费者 {} 消费: {}", id, v),
            Err(_) => break,
        }
        thread::sleep(Duration::from_millis(150));
    }
}

fn main() {
    let (tx, rx) = mpsc::channel();
    let rx = Arc::new(Mutex::new(rx));
    
    // 创建生产者
    for i in 0..2 {
        let tx = tx.clone();
        thread::spawn(move || producer(tx, i));
    }
    
    // 创建消费者
    for i in 0..3 {
        let rx = Arc::clone(&rx);
        thread::spawn(move || consumer(rx, i));
    }
    
    thread::sleep(Duration::from_secs(3));
}

并行计算

use std::sync::{Arc, Mutex};
use std::thread;

fn parallel_sum(numbers: Vec<i32>, num_threads: usize) -> i32 {
    let chunk_size = (numbers.len() + num_threads - 1) / num_threads;
    let result = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    
    for chunk in numbers.chunks(chunk_size) {
        let chunk = chunk.to_vec();
        let result = Arc::clone(&result);
        
        let handle = thread::spawn(move || {
            let sum: i32 = chunk.iter().sum();
            let mut result = result.lock().unwrap();
            *result += sum;
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    *result.lock().unwrap()
}

fn main() {
    let numbers: Vec<i32> = (1..=100).collect();
    let sum = parallel_sum(numbers, 4);
    println!("并行计算结果: {}", sum);
}

总结

本教程详细介绍了 Rust 的并发编程:

  1. 线程

    • 使用 thread::spawn 创建线程
    • 使用 join 等待线程完成
    • 使用 move 闭包捕获变量
  2. 消息传递

    • 使用通道(channel)在线程间传递消息
    • 多个生产者和消费者
    • 发送和接收多个值
  3. 共享状态

    • 使用 Mutex 实现互斥访问
    • 使用 Arc 在线程间共享所有权
    • 使用 RwLock 实现读写锁
  4. 原子类型

    • 使用 Atomic* 类型进行原子操作
    • 原子比较和交换
    • 内存排序
  5. Sync 和 Send Trait

    • 控制 Types 在线程间的传递
    • 理解哪些类型是线程安全的
  6. 实际应用

    • 线程池
    • 生产者-消费者模式
    • 并行计算

Rust 的并发编程模型通过所有权和类型系统保证了内存安全,防止了数据竞争。合理使用线程、通道和共享状态可以编写出高效、安全的并发程序。

下一步

在下一教程中,我们将学习 Rust 的项目实战,通过一个完整的项目展示如何综合运用 Rust 的各种特性。我们将了解:

  • 项目结构设计

  • 模块组织

  • 错误处理

  • 测试

  • 文档编写

  • 发布和部署

继续学习 Rust,掌握这门强大语言的更多特性!